阅读 70

「Netty源码分析二」服务端启动流程分析

先看源码

源码地址

public static void main(String[] args) throws Exception {     // Configure SSL.     //配置ssl     final SslContext sslCtx;     if (SSL) {         SelfSignedCertificate ssc = new SelfSignedCertificate();         sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();     } else {         sslCtx = null;     }     // Configure the server.     EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建 boss 线程组 用于服务端接受客户端的连接     EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建 worker 线程组 用于进行 SocketChannel 的数据读写     // 创建 EchoServerHandler 对象     final EchoServerHandler serverHandler = new EchoServerHandler();     try {         ServerBootstrap b = new ServerBootstrap();         // 设置使用的 EventLoopGroup         b.group(bossGroup, workerGroup)          //获取对应的Channel工厂          .channel(NioServerSocketChannel.class)          // 设置 NioServerSocketChannel 的可选项          .option(ChannelOption.SO_BACKLOG, 100)          // 设置 NioServerSocketChannel 的处理器          .handler(new LoggingHandler(LogLevel.INFO))          //设置子 Channel 的可选项          .childOption(ChannelOption.SO_KEEPALIVE,true)          .childOption(NioChannelOption.SO_KEEPALIVE,true)          .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)          // 设置连入服务端的 Client 的 SocketChannel 的处理器          .childHandler(new ChannelInitializer<SocketChannel>() {              @Override              public void initChannel(SocketChannel ch) throws Exception {                  ChannelPipeline p = ch.pipeline();                  if (sslCtx != null) {                      p.addLast(sslCtx.newHandler(ch.alloc()));                  }                  //p.addLast(new LoggingHandler(LogLevel.INFO));                  p.addLast(serverHandler);              }          });         // 启动服务的地方         ChannelFuture f = b.bind(PORT).sync();         // Wait until the server socket is closed.         f.channel().closeFuture().sync();     } finally {         // Shut down all event loops to terminate all threads.         bossGroup.shutdownGracefully();         workerGroup.shutdownGracefully();     } } 复制代码

源码分析

配置

ServerBootstrap是一个帮助类,更好的使用Channel。 ServerBootstrap的继承关系如下 image.png 上面有一大堆配置,没啥好讲的,就是给各个成员变量赋值,这里帮大家更直观的看下,分别是对以下成员变量赋值,后面对源码解析的时候,也更容易看一点。

ServerBootstrap{ //bossGroup(AbstractBootstrap) volatile EventLoopGroup group; //workerGroup private volatile EventLoopGroup childGroup; //NioServerSocketChannel.class封装进ReflectiveChannelFactory private volatile ChannelFactory<? extends C> channelFactory; //option封装进map中(AbstractBootstrap) private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); //自定义的handler封装进handler中(AbstractBootstrap) private volatile ChannelHandler handler; //childOption放进map中 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); //childHandler private volatile ChannelHandler childHandler; } 复制代码

bind()

创建channel,和各种对channel的配置都是在这个方法中完成的。 进入到AbstractBootstrap::doBind方法,可以粗略的看出

  1. 创建Channel是在initAndRegister或者更底层的方法中完成,只需要解析initAndRegister即可

  2. io.netty.channel.Channel内部必然维护了java.nio.channels.Channel,因为有绑定等操作。

  3. ChannelFuture是个可以监听事件完成并触发回调

private ChannelFuture doBind(final SocketAddress localAddress) {     //初始化并注册一个 Channel 对象,因为注册是异步的过程,所以返回一个 ChannelFuture 对象。     final ChannelFuture regFuture = initAndRegister();     final Channel channel = regFuture.channel();     if (regFuture.cause() != null) {         return regFuture;     }     //因为是异步,不能保证是否完成     //绑定Channel端口,并注册channel到selectionKey中     if (regFuture.isDone()) {         // 注册完成         ChannelPromise promise = channel.newPromise();         doBind0(regFuture, channel, localAddress, promise);         return promise;     } else {         // 注册还未完成         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);         regFuture.addListener(new ChannelFutureListener() {             @Override             public void operationComplete(ChannelFuture future) throws Exception {                 Throwable cause = future.cause();                 if (cause != null) {                     // EventLoop 上的注册失败,因此一旦我们尝试访问 Channel 的 EventLoop,就直接使 ChannelPromise 失败,以免导致 IllegalStateException。                     promise.setFailure(cause);                 } else {                     // 注册成功,所以设置正确的执行器来使用。                     // See https://github.com/netty/netty/issues/2586                     promise.registered();                     doBind0(regFuture, channel, localAddress, promise);                 }             }         });         return promise;     } } 复制代码

创建并初始化channel

initAndRegister

首先我们来看下initAndRegister方法。初步有以下结论(对不对另说啊)

  1. Channel由反射创建,channelFactory为上面传入的NioServerSocketChannel.class

  2. init完成对channel的各项初始化

  3. ChannelFuture regFuture = config().group().register(channel);看上去和serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);(不清楚的看我上一篇取)类似

final ChannelFuture initAndRegister() {     Channel channel = null;     try {         //创建一个channel对象         channel = channelFactory.newChannel();         //初始化channel配置         init(channel);     } catch (Throwable t) {         if (channel != null) {             //如果 newChannel 崩溃,channel 可以为 null             channel.unsafe().closeForcibly();             // 由于通道尚未注册,我们需要强制使用 GlobalEventExecutor             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);         }         // 由于通道尚未注册,我们需要强制使用 GlobalEventExecutor         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);     }     //获取初始化设置的bossGroup,将channel绑定到     ChannelFuture regFuture = config().group().register(channel);     if (regFuture.cause() != null) {         if (channel.isRegistered()) {             channel.close();         } else {             channel.unsafe().closeForcibly();         }     }     /**      * 如果在这promise没有失败,则一定是以下原因      * 1.如果我们尝试从事件循环中注册,此时注册已经完成。因为channel注册完成,使用bind(),connect()是安全的      * 2.如果我们尝试从其他线程注册,则注册请求已经成功添加到事件循环的任务队列以供稍后执行      *  因为bind(),connect()将在定时任务后执行      *  应为register(), bind(), and connect()被绑定在相同的线程      */         return regFuture; } 复制代码

NioServerSocketChannel

看到这里就可以明白了,io.netty.channel.Channel就是对java.nio.channels.Channel的一层封装,证明了上面的猜想

NioServerSocketChannelConfig就是对io.netty.channel.ChannelServerSocket的增强类

public class NioServerSocketChannel extends AbstractNioMessageChannel                              implements io.netty.channel.socket.ServerSocketChannel {     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();     private static ServerSocketChannel newSocket(SelectorProvider provider) {         try {             return provider.openServerSocketChannel();         } catch (IOException e) {             throw new ChannelException(                     "Failed to open a server socket.", e);         }     }     private final ServerSocketChannelConfig config;     /**      * Create a new instance      */     public NioServerSocketChannel() {         this(newSocket(DEFAULT_SELECTOR_PROVIDER));     }       public NioServerSocketChannel(ServerSocketChannel channel) {         super(null, channel, SelectionKey.OP_ACCEPT);         config = new NioServerSocketChannelConfig(this, javaChannel().socket());     }     private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {         private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {             super(channel, javaSocket);         }     }      } 复制代码

init

init中就是对channel的初始化,然后把ChannelInitializer添加进pipeline,ChannelInitializer中又把ServerBootstrapAcceptor添加进pipeline,肯定相当重要

void init(Channel channel) {     //将之前配置的options放进Channel的ChannelConfig里面     setChannelOptions(channel, newOptionsArray(), logger);     //将之前配置的attr放进Channel的ChannelConfig里面     setAttributes(channel, newAttributesArray());     //获取channel初始化的时候创建的pipeline     ChannelPipeline p = channel.pipeline();     //初始化socketChannel而使用的     //对应WorkerGroup     final EventLoopGroup currentChildGroup = childGroup;     final ChannelHandler currentChildHandler = childHandler;     //属性转换成这个     final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);     final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);     /**      * 在pipeline中封装成 DefaultChannelHandlerContext      * ChannelInitializer一次性初始化的handler;      * 负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除      * ServerBootstrapAcceptor handler: 负责接收客户端连接创建后,对连接的初始化工作      */     p.addLast(new ChannelInitializer<Channel>() {         @Override         public void initChannel(final Channel ch) {             final ChannelPipeline pipeline = ch.pipeline();             ChannelHandler handler = config.handler();             if (handler != null) {                 pipeline.addLast(handler);             }             //等待执行             ch.eventLoop().execute(new Runnable() {                 @Override                 public void run() {                     pipeline.addLast(new ServerBootstrapAcceptor(                             ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                 }             });         }     }); } 复制代码

config().group().register(channel);

config().group()这个就是前面配置的bossGroup。主要说一下这个register方法

public ChannelFuture  register(final ChannelPromise promise) {     ObjectUtil.checkNotNull(promise, "promise");     promise.channel().unsafe().register(this, promise);     return promise; } 复制代码

AbstractChannel.AbstractUnsafe::register中回异步执行register0这个方法,核心方法都在这。 PS:所有的unsafe类中的方法都十分核心。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {         //省略无用代码               //channel的eventLoop绑定当前的eventloop     AbstractChannel.this.eventLoop = eventLoop;     if (eventLoop.inEventLoop()) {         register0(promise);     } else {         try {             eventLoop.execute(new Runnable() {                 @Override                 public void run() {                     register0(promise);                 }             });         } catch (Throwable t) {             //省略无用代码         }     } } 复制代码

register0

这个方法十分重要,有必要单独令出来说一下

private void  register0(ChannelPromise promise) {     try {         boolean firstRegistration = neverRegistered;         doRegister();         neverRegistered = false;         registered = true;         //执行外面往pipeline里面添加的ChannelInitializer         // 确保我们在实际通知承诺之前调用 handlerAdded(...)。         // 这是必需的,因为用户可能已经通过 ChannelFutureListener 中的管道触发事件         pipeline.invokeHandlerAddedIfNeeded();         //里面就判断下,是否要打印日志         safeSetSuccess(promise);         pipeline.fireChannelRegistered();         // 如果频道从未注册过,则仅触发 channelActive 。         // 如果通道被取消注册和重新注册,这可以防止触发多个通道活动.         if (isActive()) {             if (firstRegistration) {                 pipeline.fireChannelActive();             } else if (config().isAutoRead()) {                 // 此通道之前已注册并设置了 autoRead()。                 // 这意味着我们需要再次开始读取,以便我们处理入站数据.                 // See https://github.com/netty/netty/issues/4805                 beginRead();             }         }     } catch (Throwable t) {         // Close the channel directly to avoid FD leak.         closeForcibly();         closeFuture.setClosed();         safeSetFailure(promise, t);     } } 复制代码

doRegister

执行AbstractNioChannel::doRegister方法,可以看出是对EventLoopGroup是对Selector的一层封装,得到的SelectorKey交给Channel维护。 ops是0,但是SelectorKey的ops枚举分别是

  • public static final int OP_READ = 1 << 0;

  • public static final int OP_WRITE = 1 << 2;

  • public static final int OP_CONNECT = 1 << 3;

  • public static final int OP_ACCEPT = 1 << 4;

这个应该是没有注册任何事件。后面会和其他地方呼应。

protected void doRegister() throws Exception {     boolean selected = false;     for (;;) {         try {             //把channel注册到selector上             selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);             return;         } catch (CancelledKeyException e) {             if (!selected) {                 // Force the Selector to select now as the "canceled" SelectionKey may still be                 // cached and not removed because no Select.select(..) operation was called yet.                 eventLoop().selectNow();                 selected = true;             } else {                 // We forced a select operation on the selector before but the SelectionKey is still cached                 // for whatever reason. JDK bug ?                 throw e;             }         }     } } 复制代码

invokeHandlerAddedIfNeeded

这个方法最终会调用ChannelHandler.handlerAdded最终调用外层(ServerBootstrap)pipeline添加的ChannelInitializer执行initChannel方法。

p.addLast(new ChannelInitializer<Channel>() {         @Override         public void initChannel(final Channel ch) {             final ChannelPipeline pipeline = ch.pipeline();             ChannelHandler handler = config.handler();             if (handler != null) {                 pipeline.addLast(handler);             }             //等待执行             ch.eventLoop().execute(new Runnable() {                 @Override                 public void run() {                     pipeline.addLast(new ServerBootstrapAcceptor(                             ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                 }             });         }     }); 复制代码

safeSetSuccess

这个方法会执行外层添加的ChannelFutureListener,也就是AbstractBootstrap::doBind方法

regFuture.addListener(new ChannelFutureListener() {    @Override    public void operationComplete(ChannelFuture future) throws Exception {        Throwable cause = future.cause();        if (cause != null) {            // EventLoop 上的注册失败,因此一旦我们尝试访问 Channel 的 EventLoop,就直接使 ChannelPromise 失败,以免导致 IllegalStateException。            promise.setFailure(cause);        } else {            // 注册成功,所以设置正确的执行器来使用。            // See https://github.com/netty/netty/issues/2586            promise.registered();            doBind0(regFuture, channel, localAddress, promise);        }    } }); 复制代码

绑定端口

最后回进入到AbstractChannel::bind方法

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {     //省略无用代码     //Channel是否激活     boolean wasActive = isActive();     try {         //绑定channel端口         doBind(localAddress);     } catch (Throwable t) {         safeSetFailure(promise, t);         closeIfClosed();         return;     }     //绑定后,菜开始激活     if (!wasActive && isActive()) {         invokeLater(new Runnable() {             @Override             public void run() {                 pipeline.fireChannelActive();             }         });     }     safeSetSuccess(promise); } 复制代码

doBind0最终会调用,javaChannel()就是获取channel封装的ServerSocketChannel

@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception {     if (PlatformDependent.javaVersion() >= 7) {         javaChannel().bind(localAddress, config.getBacklog());     } else {         javaChannel().socket().bind(localAddress, config.getBacklog());     } } 复制代码

绑定端口结束,pipeline.fireChannelActive();最终回执行到doBeginRead()方法给channel注册真正的监听事件。

protected void doBeginRead() throws Exception {     // Channel.read() or ChannelHandlerContext.read() was called     final SelectionKey selectionKey = this.selectionKey;     if (!selectionKey.isValid()) {         return;     }     readPending = true;     //0     final int interestOps = selectionKey.interestOps();     //是否监听了readInterestOp,则监听readInterestOp     if ((interestOps & readInterestOp) == 0) {         //OP_ACCEPT 1<<4 16         selectionKey.interestOps(interestOps | readInterestOp);     } } 复制代码

至此,Netty服务端启动成功。


总结

  1. io.netty.channel.Channel就是对java.nio.channels.Channel的一层封装

  2. io.netty.channel.Channel内部也维护了SelectionKey

  3. EventLoopGroup是对Selector的一层封装

  4. 初始化的时候只用到了bossGroup,用来监听连接事件


作者:山间小僧
链接:https://juejin.cn/post/7026389526134292516


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐