阅读 449

netty服务端启动流程解析(netty服务端主动关闭连接)

JDK的nio服务端启动

因为netty底层使用的是nio,所以先回顾nio代码的流程,然后再看netty的启动的流程。 JDK的NIO代码启动主要步骤

  1. 创建ServerSocketChannel,channel配置为非阻塞,并且绑定端口

  2. 把channel关注的accept事件注册到select

  3. 循环调用select,拿到就绪的key

  4. 如果是accept事件,建立SocketChannel,并且把这个chanel关注的读写事件

  5. 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。

public class WriteServer {
    public static void main(String[] args) throws IOException {
        // 1. channel配置为非阻塞,并且绑定端口
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));
        
        // 2. 把channel关注的accept时间注册到select
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            // 3. 拿到就绪的key 
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                if (key.isAcceptable()) {
                    // 4. 如果是accept事件,建立socketChannel,并且把这个chanel关注的读写事件注册到select
                } else if (key.isWritable() || key.isReadable()) {
                    // 5. 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。
                }
            }
        }
    }
}复制代码

使用netty的主干代码

//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open(); 

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);复制代码

netty的启动流程

服务端启动demo

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.group(boss, worker);
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
        }
    });
    serverSocketChannel = serverBootstrap.bind(port).sync().channel();
    serverSocketChannel.closeFuture().sync();
} catch (InterruptedException e) {
} finally {
}复制代码

NioServerSocketChannel

netty的NioServerSocketChannel封装了JDK的ServerSocketChannel,NioServerSocketChannel底层还是会open这个JDK的channel。serverBootstrap.channel(NioServerSocketChannel.class) 指定了bootStrap类中的channelFactory 返回NioServerSocketChannel这个类型的channel。

bind()方法 开始启动

主要的三个步骤

  1. 初始化channel

  2. 注册channel

  3. 执行dobind0回调

// 主干代码
final ChannelFuture initAndRegister() {
    Channel channel = null;
    channel = channelFactory.newChannel();
    // 初始化channel
    init(channel);

    // 注册channel到select
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}复制代码

初始化channel

channelFactory.newChannel() 通过channelFactory会返回NioServerSocketChannel

init()中向流水线添加了一个channel初始化处理器,这个处理器只会被执行一次,那么什么时候被执行?这个放到后面讲

channel.pipeline().addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});复制代码

注册channel

channel首先会进行一次线程切换。当前运行的是主线程,但是注册和初始化channel操作是由eventGroup中的nio线程来执行。返回的是一个promise对象用来异步拿到结果。 eventLoop.inEventLoop() 是判断当前线程是不是nio线程,显然不是,那么主线程就会封装一个任务提交给nio线程来执行。

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}复制代码

接下来继续看 register0(promise)。JavaChannel() 拿到 NioServerSocketChannel创建的JDK ServerSocketChannel。把这个原生channel注册到eventLoop的select上,绑定附件是NioServerSocketChannel,之后可以通过附件拿到这个channel,并且这个附件应该是唯一的。

image.png

注册完毕之后通过pipeline.invokeHandlerAddedIfNeeded(); 调用init中注册的初始化处理器,向nioChannel中添加了一个handler,在accept事件发生后执行这个handler建立连接。ServerBootstrapAcceptor继承自chnnelInboundHandlerAdapter

pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));复制代码

执行doBind0回调

前面提到过注册和初始化操作是主线程提交一个任务由nio线程异步执行,提交之后返回一个promise对象。在初始化channel完成、注册channel完成之后,nio线程通过safeSetSuccess();向promise中设置一个成功值,使得该promise的回调被执行。

image.png

regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
            // IllegalStateException once we try to access the EventLoop of the Channel.
            promise.setFailure(cause);
        } else {
            // Registration was successful, so set the correct executor to use.
            // See https://github.com/netty/netty/issues/2586
            promise.registered();

            doBind0(regFuture, channel, localAddress, promise);
        }
    }
});复制代码

回调中执行doBind0()方法,调用链最后就是调用原生channel进行端口绑定

protected void doBind(SocketAddress localAddress) throws Exception {
   if (PlatformDependent.javaVersion() >= 7) {
       javaChannel().bind(localAddress, config.getBacklog());
   } else {
       javaChannel().socket().bind(localAddress, config.getBacklog());
   }
}复制代码

在执行完doBind方法后,向select注册channel的accept事件,这里看上去是read事件呢,其实做完位运算得到的是16对应accept事件。

image.png

image.png

image.png

然后到accept时间侦听到连接之后,会执行ServerBootstrapAcceptor这个handler这个处理器。

至此启动流程结束

总结

总结一下启动流程

  1. 初始化channel

  2. 把这个channel注册到select上,这个注册关注的事件是0

  3. 前面两步是异步操作,当执行完之后执行回调

  4. 在回调方法中 绑定指定的端口

  5. 注册accept事件

  6. 当由accept事件之后调用ServerBootstrapAcceptor handler建立连接


作者:进击的阿尉
链接:https://juejin.cn/post/7032868709320523783


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐