「Netty源码分析二」服务端启动流程分析
先看源码
源码地址
public static void main(String[] args) throws Exception { // Configure SSL. //配置ssl final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建 boss 线程组 用于服务端接受客户端的连接 EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建 worker 线程组 用于进行 SocketChannel 的数据读写 // 创建 EchoServerHandler 对象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); // 设置使用的 EventLoopGroup b.group(bossGroup, workerGroup) //获取对应的Channel工厂 .channel(NioServerSocketChannel.class) // 设置 NioServerSocketChannel 的可选项 .option(ChannelOption.SO_BACKLOG, 100) // 设置 NioServerSocketChannel 的处理器 .handler(new LoggingHandler(LogLevel.INFO)) //设置子 Channel 的可选项 .childOption(ChannelOption.SO_KEEPALIVE,true) .childOption(NioChannelOption.SO_KEEPALIVE,true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 设置连入服务端的 Client 的 SocketChannel 的处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 启动服务的地方 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } 复制代码
源码分析
配置
ServerBootstrap是一个帮助类,更好的使用Channel。 ServerBootstrap的继承关系如下 上面有一大堆配置,没啥好讲的,就是给各个成员变量赋值,这里帮大家更直观的看下,分别是对以下成员变量赋值,后面对源码解析的时候,也更容易看一点。
ServerBootstrap{ //bossGroup(AbstractBootstrap) volatile EventLoopGroup group; //workerGroup private volatile EventLoopGroup childGroup; //NioServerSocketChannel.class封装进ReflectiveChannelFactory private volatile ChannelFactory<? extends C> channelFactory; //option封装进map中(AbstractBootstrap) private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); //自定义的handler封装进handler中(AbstractBootstrap) private volatile ChannelHandler handler; //childOption放进map中 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); //childHandler private volatile ChannelHandler childHandler; } 复制代码
bind()
创建channel,和各种对channel的配置都是在这个方法中完成的。 进入到AbstractBootstrap::doBind
方法,可以粗略的看出
创建
Channel
是在initAndRegister
或者更底层的方法中完成,只需要解析initAndRegister
即可io.netty.channel.Channel
内部必然维护了java.nio.channels.Channel
,因为有绑定等操作。ChannelFuture
是个可以监听事件完成并触发回调
private ChannelFuture doBind(final SocketAddress localAddress) { //初始化并注册一个 Channel 对象,因为注册是异步的过程,所以返回一个 ChannelFuture 对象。 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //因为是异步,不能保证是否完成 //绑定Channel端口,并注册channel到selectionKey中 if (regFuture.isDone()) { // 注册完成 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 注册还未完成 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // EventLoop 上的注册失败,因此一旦我们尝试访问 Channel 的 EventLoop,就直接使 ChannelPromise 失败,以免导致 IllegalStateException。 promise.setFailure(cause); } else { // 注册成功,所以设置正确的执行器来使用。 // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } 复制代码
创建并初始化channel
initAndRegister
首先我们来看下initAndRegister
方法。初步有以下结论(对不对另说啊)
Channel由反射创建,channelFactory为上面传入的
NioServerSocketChannel.class
init完成对channel的各项初始化
ChannelFuture regFuture = config().group().register(channel);
看上去和serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
(不清楚的看我上一篇取)类似
final ChannelFuture initAndRegister() { Channel channel = null; try { //创建一个channel对象 channel = channelFactory.newChannel(); //初始化channel配置 init(channel); } catch (Throwable t) { if (channel != null) { //如果 newChannel 崩溃,channel 可以为 null channel.unsafe().closeForcibly(); // 由于通道尚未注册,我们需要强制使用 GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // 由于通道尚未注册,我们需要强制使用 GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //获取初始化设置的bossGroup,将channel绑定到 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } /** * 如果在这promise没有失败,则一定是以下原因 * 1.如果我们尝试从事件循环中注册,此时注册已经完成。因为channel注册完成,使用bind(),connect()是安全的 * 2.如果我们尝试从其他线程注册,则注册请求已经成功添加到事件循环的任务队列以供稍后执行 * 因为bind(),connect()将在定时任务后执行 * 应为register(), bind(), and connect()被绑定在相同的线程 */ return regFuture; } 复制代码
NioServerSocketChannel
看到这里就可以明白了,io.netty.channel.Channel
就是对java.nio.channels.Channel
的一层封装,证明了上面的猜想
NioServerSocketChannelConfig
就是对io.netty.channel.Channel
和ServerSocket
的增强类
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } private final ServerSocketChannelConfig config; /** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); } } } 复制代码
init
init中就是对channel的初始化,然后把ChannelInitializer
添加进pipeline,ChannelInitializer
中又把ServerBootstrapAcceptor
添加进pipeline,肯定相当重要
void init(Channel channel) { //将之前配置的options放进Channel的ChannelConfig里面 setChannelOptions(channel, newOptionsArray(), logger); //将之前配置的attr放进Channel的ChannelConfig里面 setAttributes(channel, newAttributesArray()); //获取channel初始化的时候创建的pipeline ChannelPipeline p = channel.pipeline(); //初始化socketChannel而使用的 //对应WorkerGroup final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; //属性转换成这个 final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs); /** * 在pipeline中封装成 DefaultChannelHandlerContext * ChannelInitializer一次性初始化的handler; * 负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除 * ServerBootstrapAcceptor handler: 负责接收客户端连接创建后,对连接的初始化工作 */ p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //等待执行 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } 复制代码
config().group().register(channel);
config().group()
这个就是前面配置的bossGroup。主要说一下这个register
方法
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } 复制代码
在AbstractChannel.AbstractUnsafe::register
中回异步执行register0
这个方法,核心方法都在这。 PS:所有的unsafe
类中的方法都十分核心。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { //省略无用代码 //channel的eventLoop绑定当前的eventloop AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { //省略无用代码 } } } 复制代码
register0
这个方法十分重要,有必要单独令出来说一下
private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; //执行外面往pipeline里面添加的ChannelInitializer // 确保我们在实际通知承诺之前调用 handlerAdded(...)。 // 这是必需的,因为用户可能已经通过 ChannelFutureListener 中的管道触发事件 pipeline.invokeHandlerAddedIfNeeded(); //里面就判断下,是否要打印日志 safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 如果频道从未注册过,则仅触发 channelActive 。 // 如果通道被取消注册和重新注册,这可以防止触发多个通道活动. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // 此通道之前已注册并设置了 autoRead()。 // 这意味着我们需要再次开始读取,以便我们处理入站数据. // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } 复制代码
doRegister
执行AbstractNioChannel::doRegister
方法,可以看出是对EventLoopGroup
是对Selector的一层封装,得到的SelectorKey
交给Channel
维护。 ops是0,但是SelectorKey的ops枚举分别是
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
这个应该是没有注册任何事件。后面会和其他地方呼应。
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //把channel注册到selector上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } 复制代码
invokeHandlerAddedIfNeeded
这个方法最终会调用ChannelHandler.handlerAdded
最终调用外层(ServerBootstrap)pipeline添加的ChannelInitializer
执行initChannel
方法。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //等待执行 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); 复制代码
safeSetSuccess
这个方法会执行外层添加的ChannelFutureListener
,也就是AbstractBootstrap::doBind
方法
regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // EventLoop 上的注册失败,因此一旦我们尝试访问 Channel 的 EventLoop,就直接使 ChannelPromise 失败,以免导致 IllegalStateException。 promise.setFailure(cause); } else { // 注册成功,所以设置正确的执行器来使用。 // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); 复制代码
绑定端口
最后回进入到AbstractChannel::bind
方法
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //省略无用代码 //Channel是否激活 boolean wasActive = isActive(); try { //绑定channel端口 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //绑定后,菜开始激活 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } 复制代码
doBind0
最终会调用,javaChannel()
就是获取channel封装的ServerSocketChannel
@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } 复制代码
绑定端口结束,pipeline.fireChannelActive();
最终回执行到doBeginRead()
方法给channel注册真正的监听事件。
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; //0 final int interestOps = selectionKey.interestOps(); //是否监听了readInterestOp,则监听readInterestOp if ((interestOps & readInterestOp) == 0) { //OP_ACCEPT 1<<4 16 selectionKey.interestOps(interestOps | readInterestOp); } } 复制代码
至此,Netty服务端启动成功。
总结
io.netty.channel.Channel
就是对java.nio.channels.Channel
的一层封装io.netty.channel.Channel
内部也维护了SelectionKey
EventLoopGroup
是对Selector的一层封装初始化的时候只用到了bossGroup,用来监听连接事件
作者:山间小僧
链接:https://juejin.cn/post/7026389526134292516