阅读 79

Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践

前一篇文章写了第一款Netty入门的应用程序,本文主要就是从上文的代码结合本文的流程图进一步分析Netty的工作流程和核心组件。

最后再进一步举一个实例来让大家进一步理解。

希望能够让你有所收获!!????

一、Netty 工作流程

我们先来看看Netty的工作原理图,简单说一下工作流程,然后通过这张图来一一分析Netty的核心组件。

1.1、Server工作流程图:

image-20210825155927290

1.2、Server工作流程分析:

  1. server端启动时绑定本地某个端口,初始化NioServerSocketChannel.

  2. 将自己NioServerSocketChannel注册到某个BossNioEventLoopGroup的selector上。

    • server端包含1个Boss NioEventLoopGroup和1个Worker NioEventLoopGroup

    • Boss NioEventLoopGroup专门负责接收客户端的连接,Worker NioEventLoopGroup专门负责网络的读写

    • NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。

  3. BossNioEventLoopGroup循环执行的任务:

    1、轮询accept事件;

    2、处理accept事件,将生成的NioSocketChannel注册到某一个WorkNioEventLoopGroup的Selector上。

    3、处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。

  4. WorkNioEventLoopGroup循环执行的任务:

    • 轮询read和Write事件

    • 处理IO事件,在NioSocketChannel可读、可写事件发生时,回调(触发)ChannelHandler进行处理。

    • 处理任务队列的任务,即 runAllTasks

1.3、Client工作流程图

image-20210825231934496

流程就不重复概述啦????

二、核心模块组件

Netty的核心组件大致是以下几个:

  1. Channel 接口

  2. EventLoopGroup 接口

  3. ChannelFuture 接口

  4. ChannelHandler 接口

  5. ChannelPipeline 接口

  6. ChannelHandlerContext 接口

  7. SimpleChannelInboundHandler 抽象类

  8. Bootstrap、ServerBootstrap 类

  9. ChannelFuture 接口

  10. ChannelOption 类

2.1、Channel 接口

我们平常用到基本的 I/O 操作(bind()、connect()、read()和 write()),其本质都依赖于底层网络传输所提供的原语,在Java中就是Socket类。

Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用Socket 类的复杂性。另外Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。

在调用结束后立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,支持 在I/O 操作成功、失败或取消时立马回调通知调用方。

此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根,比如:

  • LocalServerChannel:用于本地传输的ServerChannel ,允许 VM 通信。

  • EmbeddedChannel:以嵌入式方式使用的 Channel 实现的基类。

  • NioSocketChannel:异步的客户端  TCP 、Socket 连接。

  • NioServerSocketChannel:异步的服务器端  TCP、Socket 连接。

  • NioDatagramChannel: 异步的  UDP 连接。

  • NioSctpChannel:异步的客户端 Sctp 连接,它使用非阻塞模式并允许将 SctpMessage 读/写到底层 SctpChannel。

  • NioSctpServerChannel:异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

2.2、EventLoopGroup接口

EventLoop 定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。

Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写 的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:

  • 注册事件;

  • 将事件派发给 ChannelHandler;

  • 安排进一步的动作。

不过在这里我们不深究它,针对 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系做一个简单说明。

  • 一个EventLoopGroup 包含一个或者多个 EventLoop

  • 每个 EventLoop 维护着一个 Selector 实例,所以一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;

  • 因此所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,实际上消除了对于同步的需要;

  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop

  • 一个 EventLoop 可能会被分配给一个或多个Channel

  • 通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,就如上文中的流程图一样。

2.3、ChannelFuture 接口

Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会 立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。具体的实现就是通过 FutureChannelFutures,其 addListener()方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)自动触发注册的监听事件。

常见的方法有

  • Channel channel(),返回当前正在进行 IO 操作的通道

  • ChannelFuture sync(),等待异步操作执行完毕

2.4、ChannelHandler 接口

从之前的入门程序中,我们可以看到ChannelHandler在Netty中的重要性,它充当了所有处理入站和出站数据的应用程序逻辑的容器。 我们的业务逻辑也大都写在实现的字类中,另外ChannelHandler 的方法是由事件自动触发的,并不需要我们自己派发。

ChannelHandler的实现类或者实现子接口有很多。平时我们就是去继承或子接口,然后重写里面的方法。

image-20210825214929693

最常见的几种Handler:

  • ChannelInboundHandler :接收入站事件和数据

  • ChannelOutboundHandler:用于处理出站事件和数据。

常见的适配器:

  • ChannelInboundHandlerAdapter:用于处理入站IO事件

    ChannelInboundHandler实现的抽象基类,它提供了所有方法的实现。 这个实现只是将操作转发到ChannelPipeline的下一个ChannelHandler 。 子类可以覆盖方法实现来改变这一

  • ChannelOutboundHandlerAdapter: 用于处理出站IO事件

我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们来看看有哪些方法可以重写:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { //注册事件     public void channelRegistered(ChannelHandlerContext ctx) ; //      public void channelUnregistered(ChannelHandlerContext ctx); //通道已经就绪     public void channelActive(ChannelHandlerContext ctx);     public void channelInactive(ChannelHandlerContext ctx) ;     //通道读取数据事件     public void channelRead(ChannelHandlerContext ctx, Object msg) ; //通道读取数据事件完毕     public void channelReadComplete(ChannelHandlerContext ctx) ;     public void userEventTriggered(ChannelHandlerContext ctx, Object evt); //通道可写性已更改     public void channelWritabilityChanged(ChannelHandlerContext ctx); //异常处理     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) } 复制代码

2.5、ChannelPipeline 接口

ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。他们的组成关系如下:

image-20210825223614701

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由ChannelHandlerContext 组成的双向链表,并且每个ChanneHandlerContext中又关联着一个ChannelHandler

ChannelHandler 安装到 ChannelPipeline 中的过程:

  1. 一个ChannelInitializer的实现被注册到了ServerBootstrap中 ;

  2. ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler

  3. ChannelInitializer 将它自己从 ChannelPipeline 中移除。

从一个客户端应用程序 的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之 则称为入站的。服务端反之。

如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部 开始流动,并被传递给第一个 ChannelInboundHandler。次此handler处理完后,数据将会被传递给链中的下一个 ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,至此,所有处理就结束了。

出站事件会从尾端往前传递到最前一个出站的 handler。出站和入站两种类型的 handler互不干扰。

2.6、ChannelHandlerContext 接口

作用就是使ChannelHandler能够与其ChannelPipeline和其他处理程序交互。因为 ChannelHandlerContext保存channel相关的所有上下文信息,同时关联一个 ChannelHandler 对象, 另外,ChannelHandlerContext 可以通知ChannelPipeline的下一个ChannelHandler以及动态修改它所属的ChannelPipeline

2.7、SimpleChannelInboundHandler 抽象类

我们常常能够遇到应用程序会利用一个  ChannelHandler 来接收解码消息,并在这个Handler中实现业务逻辑,要写一个这样的 ChannelHandler ,我们只需要扩展抽象类 SimpleChannelInboundHandler< T > 即可, 其中T类型是我们要处理的消息的Java类型。

SimpleChannelInboundHandler 中最重要的方法就是void channelRead0(ChannelHandlerContext ctx, T msg)

我们自己实现了这个方法之后,接收到的消息就已经被解码完的消息啦。

举个例子:

image-20210825230526952

2.8、Bootstrap、ServerBootstrap 类

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件。

类别BootstrapServerBootstrap
引导用于引导客户端用于引导服务端
在网络编程中作用用于连接到远程主机和端口用于绑定到一个本地端口
EventLoopGroup 的数目12

我想大家对于最后一点可能会存有疑惑,为什么一个是1一个是2呢?

因为服务器需要两组不同的 Channel

第一组将只包含一个 ServerChannel,代表服务 器自身的已绑定到某个本地端口的正在监听的套接字。

而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel

这一点可以上文中的流程图。

2.9、ChannelFuture  接口

异步 Channel I/O 操作的结果。 Netty 中的所有 I/O 操作都是异步的。 这意味着任何 I/O 调用将立即返回,但不保证在调用结束时请求的 I/O 操作已完成。 相反,您将返回一个 ChannelFuture 实例,该实例为您提供有关 I/O 操作的结果或状态的信息。 ChannelFuture 要么未完成,要么已完成。 当 I/O 操作开始时,会创建一个新的未来对象。 新的未来最初是未完成的——它既没有成功,也没有失败,也没有取消,因为 I/O 操作还没有完成。 如果 I/O 操作成功完成、失败或取消,则使用更具体的信息(例如失败原因)将未来标记为已完成。 请注意,即使失败和取消也属于完成状态。

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 FutureChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

常见的方法有

  • Channel channel(),返回当前正在进行 IO 操作的通道

  • ChannelFuture sync(),等待异步操作执行完毕

2.10、ChannelOption 类

  1. Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。

  2. ChannelOption 参数如下:

    • ChannelOption.SO_KEEPALIVE :一直保持连接状态

    • ChannelOption.SO_BACKLOG:对应TCP/IP协议listen 函数中的backlog参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理内,所N博求放在队刚中等待处理,backilog参数指定端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理, backlog参数指定了队列的大小。

三、应用实例

【案例】:

写一个服务端,两个或多个客户端,客户端可以相互通信。

3.1、服务端 Handler

ChannelHandler的实现类或者实现子接口有很多。平时我们就是去继承或子接口,然后重写里面的方法。

在这里我们就是继承了 SimpleChannelInboundHandler< T > ,这里面许多方法都是大都只要我们重写一下业务逻辑,触发大都是在事件发生时自动调用的,无需我们手动调用。

package com.crush.atguigu.group_chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /**  * @author crush  */ public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {     /**      * 定义一个channle 组,管理所有的channel      * GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例      */     private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");     /**      * handlerAdded 表示连接建立,一旦连接,第一个被执行      *              将当前channel 加入到  channelGroup      * @param ctx      * @throws Exception      */     @Override     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {         Channel channel = ctx.channel();         //将该客户加入聊天的信息推送给其它在线的客户端         /*         该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,         我们不需要自己遍历          */         channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");         channelGroup.add(channel);     }     /**      * 断开连接, 将xx客户离开信息推送给当前在线的客户      * @param ctx      * @throws Exception      */     @Override     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {         Channel channel = ctx.channel();         channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");         System.out.println("channelGroup size" + channelGroup.size());     }     /**      * 表示channel 处于活动状态, 既刚出生 提示 xx上线      * @param ctx      * @throws Exception      */     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         System.out.println(ctx.channel().remoteAddress() + " 上线了~");     }     /**      * 表示channel 处于不活动状态, 既死亡状态 提示 xx离线了      * @param ctx      * @throws Exception      */     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {         System.out.println(ctx.channel().remoteAddress() + " 离线了~");     }     //读取数据     @Override     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {         //获取到当前channel         Channel channel = ctx.channel();         //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息         channelGroup.forEach(ch -> {             if (channel != ch) { //不是当前的channel,转发消息                 ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");             } else {//回显自己发送的消息给自己                 ch.writeAndFlush("[自己]发送了消息" + msg + "\n");             }         });     }     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         //关闭通道         ctx.close();     } } 复制代码

3.2、服务端 Server 启动

package com.crush.atguigu.group_chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /**  * @author crush  */ public class GroupChatServer {     /**      * //监听端口      */     private int port;     public GroupChatServer(int port) {         this.port = port;     }     /**      * 编写run方法 处理请求      * @throws Exception      */     public void run() throws Exception {         //创建两个线程组         EventLoopGroup bossGroup = new NioEventLoopGroup(1);         //8个NioEventLoop         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             ServerBootstrap b = new ServerBootstrap();             b.group(bossGroup, workerGroup)                     .channel(NioServerSocketChannel.class)                     .option(ChannelOption.SO_BACKLOG, 128)                     .childOption(ChannelOption.SO_KEEPALIVE, true)                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel ch) throws Exception {                             //获取到pipeline                             ChannelPipeline pipeline = ch.pipeline();                             //向pipeline加入解码器                             pipeline.addLast("decoder", new StringDecoder());                             //向pipeline加入编码器                             pipeline.addLast("encoder", new StringEncoder());                             //加入自己的业务处理handler                             pipeline.addLast(new GroupChatServerHandler());                         }                     });             System.out.println("netty 服务器启动");             ChannelFuture channelFuture = b.bind(port).sync();             //监听关闭             channelFuture.channel().closeFuture().sync();         } finally {             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     }     public static void main(String[] args) throws Exception {         new GroupChatServer(7000).run();     } } 复制代码

3.3、客户端 Handler

package com.crush.atguigu.group_chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /**  * @author crush  */ public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {          //当前Channel 已从对方读取消息时调用。     @Override     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {         System.out.println(msg.trim());     } } 复制代码

3.4、客户端 Server

package com.crush.atguigu.group_chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; /**  * @author crush  */ public class GroupChatClient {          private final String host;     private final int port;     public GroupChatClient(String host, int port) {         this.host = host;         this.port = port;     }     public void run() throws Exception {         EventLoopGroup group = new NioEventLoopGroup();         try {                          Bootstrap bootstrap = new Bootstrap()                     .group(group)                     .channel(NioSocketChannel.class)                     .handler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel ch) throws Exception {                             //得到pipeline                             ChannelPipeline pipeline = ch.pipeline();                             //加入相关handler                             pipeline.addLast("decoder", new StringDecoder());                             pipeline.addLast("encoder", new StringEncoder());                             //加入自定义的handler                             pipeline.addLast(new GroupChatClientHandler());                         }                     });             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();             //得到channel             Channel channel = channelFuture.channel();             System.out.println("-------" + channel.localAddress() + "--------");             //客户端需要输入信息             Scanner scanner = new Scanner(System.in);             while (scanner.hasNextLine()) {                 String msg = scanner.nextLine();                 //通过channel 发送到服务器端                 channel.writeAndFlush(msg + "\r\n");             }         } finally {             group.shutdownGracefully();         }     }     public static void main(String[] args) throws Exception {         new GroupChatClient("127.0.0.1", 7000).run();     } } 复制代码

多个客户端,cv一下即可。

3.5、测试:

测试流程是先启动 服务端 Server,再启动客户端 。

image-20211010092350555

image-20211010092356763

image-20211010092402794

四、自言自语

这篇文章应该算是个存稿了,之前忙其他事情去了????。

今天的文章就到这里了。

你好,我是博主宁在春:主页

如若在文章中遇到疑惑,请留言或私信,或者加主页联系方式,都会尽快回复。

如若发现文章中存在问题,望你能够指正,不胜感谢。

如果觉得对你有所帮助的话,请点个赞再走吧!

欢迎大家一起在讨论区讨论哦,增加增加自己的幸运,蹭一蹭掘金官方发送的周边哦!!!

评论越走心越有可能拿奖哦!!!


作者:宁在春
链接:https://juejin.cn/post/7017602386747195429


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐