05.Netty服务启动 之前讲过的 EchoServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port" , "8007" )); public static void main (String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup(); EchoServerHandler echoServerHandler = new EchoServerHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100 ) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(echoServerHandler); } }); ChannelFuture f = serverBootstrap.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
问题 通过前面使用案例的学习,我们知道,Netty 启动的时候主要是下面这行代码:
1 ChannelFuture f = serverBootstrap.bind(PORT).sync();
这里主要有两个方法,一个是 bind (),一个是 sync (),sync () 属于 ChannelFuture 的范畴,我们之后再说:
Netty 的 Channel 跟 Java 原生的 Channel 是否有某种关系?
bind () 是否调用了 Java 底层的 Socket 相关的操作?
Netty 服务启动之后 ChannelPipeline 里面长什么样?
好了,让我们带着这几个问题探索吧。
服务启动过程剖析 让我们将断点打在 ChannelFuture f = serverBootstrap.bind(PORT).sync();
这行,以 Debug 模式启动程序,程序会停在此处,按 F7 或者 Shirft+F7 进入 bind () 方法内部:
1 2 3 public ChannelFuture bind (int inetPort) { return bind(new InetSocketAddress(inetPort)); }
可以看到,我们只传进来了一个端口,而使用 InetSocketAddress
类构造了一个地址,默认的,会生成一个 0.0.0.0:8007
的地址。
接着往下走,会来到一个叫 doBind()
的方法,一般地,在开源框架中带 doXxx
开头的方法都是干正事的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
关键方法,我前面加上一个 key
表示。
doBind()
主要干了两件事:
initAndRegister (),初始化并注册什么呢?
doBind0 (),到底绑定的是什么?
让我们继续跟进到 initAndRegister()
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null ) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
initAndRegister
主要干了三个事:
channelFactory.newChannel (),通过反射的形式创建 Channel,而且是无参构造方法,new 的时候做了哪些事儿?
init(channel)
,初始化 Channel 的什么?
register(channel)
,注册 Channel 到哪里?
因为我们这里使用的是 NioServerSocketChannel,所以,直接查看它的无参构造方法即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public NioServerSocketChannel () { this (newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket (SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket." , e); } } public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this , javaChannel().socket()); } protected AbstractNioMessageChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent, ch, readInterestOp); } protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket." , e2); } throw new ChannelException("Failed to enter non-blocking mode." , e); } } protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline (Channel channel) { this .channel = ObjectUtil.checkNotNull(channel, "channel" ); succeededFuture = new SucceededChannelFuture(channel, null ); voidPromise = new VoidChannelPromise(channel, true ); tail = new TailContext(this ); head = new HeadContext(this ); head.next = tail; tail.prev = head; }
到这里 NioServerSocketChannel 的创建过程就完毕了,我们简单总结一下:
Netty 的 ServerSocketChannel 会与 Java 原生的 ServerSocketChannel 绑定在一起;
会注册 Accept 事件;
会为每一个 Channel 分配一个 id;
会为每一个 Channel 创建一个叫作 unsafe 的东西;
会为每一个 Channel 分配一个 ChannelPipeline;
ChannelPipeline 中默认存在一个双向链表 head<=>tail
;
好了,再来看 init(channel)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override void init (Channel channel) { setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
init(channel)
方法整体来说还是比较简单的,就是把 ServerBootstrap 中的配置设置到 Channel 中,不过依然有几处我们现在可能还不太理解的地方:
ChannelInitializer 的实现原理是什么?
ch.eventLoop().execute()
这是什么写法?
ServerBootstrapAcceptor 是干什么?
这三个问题,我们留到后面的章节中再解答。
好了,我们再来看看 initAndRegister()
方法的最后一个关键步骤,ChannelFuture regFuture = config().group().register(channel);
注册 Channel 到什么地方?
查看源码,可以发现,这里的 group 就是我们的 bossGroup,所以这里就是调用 bossGroup 的 register(channel)
方法。
1 2 3 4 @Override public ChannelFuture register (Channel channel) { return next().register(channel); }
这里会调用 next()
方法选择出来一个 EventLoop 来注册 Channel,里面实际上使用的是一个叫做 EventExecutorChooser
的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本质上就是从 EventExecutor 数组中选择一个 EventExecutor,我们这里就是 NioEventLoop,那么,它们有什么区别呢?有兴趣的可以点开它们的源码看看,我简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。
最后,来到了 EventLoop 的 register(channel)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public ChannelFuture register (Channel channel) { return register(new DefaultChannelPromise(channel, this )); } @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; }
可以看到,先创建了一个叫做 ChannelPromise
的东西,它是 ChannelFuture 的子类,暂时先把它当作 ChannelFuture 来看待。最后,又调回了 Channel 的 Unsafe 的 register () 方法,这里第一个参数是 this,也就是 NioEventLoop,第二个参数是刚创建的 ChannelPromise。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final void register (EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop" ); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这个方法主要干了两件事:
把 EventLoop 与 Channel 绑定在一起;
调用 register0 () 方法;
这里又出现了 eventLoop.execute () 这种写法,先忽略它,专注于主要逻辑。
接着,跟踪到 register0()
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
这里有两个个非常重要的方法:
doRegister (),一看就是干正事的方法
pipeline.invokeHandlerAddedIfNeeded (),触发添加 Handler 的回调,其中 pineline.addLast (ChannelInitializer) 的处理就是在这一步完成的,有兴趣的同学可以跟踪看一下,这一块我们本节不详细展开
先来看 doRegister()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
这里其实就一行关键代码,将 Selector 与 Java 原生 Channel 绑定在一起,并将当前 Netty 的 Channel 通过 attachment 的形式绑定到 SelectionKey 上,到这里,你可能会有疑问:为什么要把 Netty 的 Channel 当作附件放到 SelectionKey 中呢?后面你会知道的,相信我。
所以,整个注册的过程主要就干了三个事:
把 Channel 绑定到一个 EventLoop 上;
把 Java 原生 Channel、Netty 的 Channel、Selector 绑定到 SelectionKey 中;
触发 Register 相关的事件;
至此,initAndRegister()
方法内部就分析完成了,我们再来看看另一个重要方法 doBind0()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public final ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override public void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND" , localAddress)); } ctx.bind(localAddress, promise); } @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); } @Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } @SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
可以看到,doBind0()
最后也是通过 Java 原生 Channel 的 bind () 方法来实现的。
最后,我们来总结一下整个服务启动的过程,服务启动主要是通过两个主要的大方法来完成的:
initAndRegister (),初始化并注册什么呢?
channelFactory.newChannel()
通过反射创建一个 NioServerSocketChannel
将 Java 原生 Channel 绑定到 NettyChannel 中
注册 Accept 事件
为 Channel 分配 id
为 Channel 创建 unsafe
为 Channel 创建 ChannelPipeline(默认是 head<=>tail 的双向链表)
init(channel)
把 ServerBootstrap 中的配置设置到 Channel 中
添加 ServerBootstrapAcceptor 这个 Handler
register(channel)
把 Channel 绑定到一个 EventLoop 上
把 Java 原生 Channel、Netty 的 Channel、Selector 绑定到 SelectionKey 中
触发 Register 相关的事件
doBind0 (),到底绑定的是什么?
通过 Java 原生 Channel 绑定到一个本地地址上
思维导图