阅读 136

Netty编程(二)—— EventLoop

EventLoop和EventLoopGroup

事件循环对象 EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 IO 事件。

事件循环组   EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)。

EventLoop的使用

创建EventLoopGroup:

EventLoopGroup group = new NioEventLoopGroup(); EventLoopGroup group = new DefaultEventLoopGroup( ); 复制代码

第一种NioEventLoopGroup是用来处理io事件、普通任务和定时任务,第二种DefaultEventLoopGroup只能处理普通任务和定时任务,在新建时可以传入一个int数表示group中的线程数,否则使用默认的线程数。

此外可以使用group.next()来获取下一个LoopGroup对象,使用execute()方法执行普通任务,使用        scheduleAtFixedRate()方法来处理定时任务

使用 shutdownGracefully() 方法来关闭EventLoopGroup。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

下面是一段操作 EventLoopGroup 的代码:

public static void main(String[] args) {         EventLoopGroup group1 = new NioEventLoopGroup(3);         EventLoopGroup group2 = new DefaultEventLoopGroup(2);         System.out.println(group1.next());         System.out.println(group2.next());         group1.next().execute(()->{             for (int i = 0; i < 10; i++) {                 System.out.println(Thread.currentThread().getName()+"hello "+i);             }         });         group2.next().scheduleAtFixedRate(()->{             System.out.println(Thread.currentThread().getName()+"  hello2");         },0,1, TimeUnit.SECONDS);         group1.shutdownGracefully();         group2.shutdownGracefully();     } 复制代码

代码结果:

在这里插入图片描述

处理IO事件

网络通信中最常见的还是要处理IO事件,可以使用EventLoopGroup来处理IO事件,代码与Netty编程(一)—— 初识Netty+超全注释 - 掘金 (juejin.cn)中的基础代码类似,重写读入函数完成

服务端代码:

public class HelloServer {     public static void main(String[] args) {         new ServerBootstrap()                 .group(new NioEventLoopGroup(3))                 .channel(NioServerSocketChannel.class)                 .childHandler(new ChannelInitializer<NioSocketChannel>() {                     @Override                     protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//添加handler                         nioSocketChannel.pipeline().addLast(new StringDecoder());                         nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {                             @Override                             protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception  {                                 System.out.println(Thread.currentThread().getName()+"  "+s);                             }                         });                     }                 }).bind(8080);     } } 复制代码

客户端代码:

public class HelloClient {     public static void main(String[] args) throws InterruptedException {         new Bootstrap()                 .group(new NioEventLoopGroup())                 .channel(NioSocketChannel.class)                 .handler(new ChannelInitializer<NioSocketChannel>() {                     @Override                     protected void initChannel(NioSocketChannel channel) throws Exception {                         channel.pipeline().addLast(new StringEncoder());                     }                 })                 .connect(new InetSocketAddress("localhost", 8080))                 .sync()                 .channel()                 .writeAndFlush("hello world"+"  "+Thread.currentThread().getName());     } } 复制代码

需要注意的是:一旦客户端和服务端建立连接,channel就和事件循环组中的某一个eventloop进行绑定了,即之后的该channel的读写事件都由这个eventloop负责,下面的图说明了这一过程,每个channel的所有事件都被同一个EventLoop处理。

在这里插入图片描述

分工细化

细化1

可以把事件循环组的EventLoop分工得更加细一些,即让一个EventLoop处理accept事件,其他的EventLoop处理读写事件。Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。两个Group,分别为 Boss 负责serversocketchannel上的Accept事件,Worker 负责socketchannel上的读写事件

public class MyServer {         public static void main(String[] args) {             new ServerBootstrap()                     .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))              ...         }     } 复制代码

细化2

EventLoop虽然可以做到多路复用,但是如果有一个读写事件耗时过长,会影响这个EventLoop下的其他事件的进行,因此可以创建一个独立的EventLoopGroup处理耗时较长的事件,当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。

那么就会有个问题,创建的事件循环组如何与耗时较长的handle联系起来?:在调用addLast()方法时可以传递进来,addLast()有三个参数:事件循环组(空则默认为上方建立的),循环组名称,处理函数。

下面以一个例子来说明,假设服务器端接收客户端的消息后需要6s去处理(休眠6s),那么这个休眠的事件可以放在新的EventLoopGroup中去处理,此外连续打开10个客户端连接服务端测试效果,客户端代码与上方的客户端类似,唯一不同的是使用for循环去连续连接服务端10次。

下面是服务端代码,加入了两个handler,第一个是默认的EventLoopGroup(当前ServerBootstrap的EventLoopGroup),并且使用ctx.fireChannelRead(msg)将msg传给第二个handler,第二个使用的是新建的EventLoopGroup去处理耗时较长的事件。

public class MyServer {     public static void main(String[] args) {         EventLoopGroup group = new DefaultEventLoopGroup(3);         new ServerBootstrap()                 .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))                 .channel(NioServerSocketChannel.class)                 .childHandler(new ChannelInitializer<SocketChannel>() {                     @Override                     protected void initChannel(SocketChannel socketChannel) throws Exception {                         socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter(){                             @Override                             public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                 ByteBuf buf = (ByteBuf) msg;                                 System.out.println("nioHandler"+Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));                                 // 将消息传递给下一个handler                                 ctx.fireChannelRead(msg);                             }                         }).addLast(group,"myhandler",new ChannelInboundHandlerAdapter(){                             @Override                             public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                 ByteBuf buf = (ByteBuf) msg;                                 Thread.sleep(5000);                                 System.out.println("myhandler"+Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));                             }                         });                     }                 })                 .bind(8080);     } } 复制代码

结果如下,可以看到第二个handler把并没有把第一个handler阻塞住:

在这里插入图片描述

如何切换

上面说到了在处理事件时可以从一个EventLoopGroup切换到另一个EventLoopGroup,另一个handler专门处理耗时较长的事件,降低对其他事件造成的影响,那么netty内部是怎么做到不同的EventLoopGroup切换呢?

在这里插入图片描述

上面的图描述的就是切换EventLoopGroup,当handler中绑定的EventLoopGroup不同时,需要切换EventLoopGroup来执行不同的任务,具体来说netty是使用下面这个方法进行切换:

  • 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用;否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);          EventExecutor executor = next.executor(); // 获得下一个EventLoop, excutor 即为 EventLoopGroup          // 如果下一个EventLoop 在当前的 EventLoopGroup中     if (executor.inEventLoop()) {//当前handler中的线程是否和eventloop是同一个线程         // 使用当前 EventLoopGroup 中的 EventLoop 来处理任务         next.invokeChannelRead(m);     } else {         // 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行         executor.execute(new Runnable() {//此时需要在下一个线程中执行             public void run() {                 next.invokeChannelRead(m);             }         });     } }


作者:JAVAWarrior
链接:https://juejin.cn/post/7033267579955183646


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐