阅读 154

Netty编程(九)—— 协议设计与解析

TCP/IP 中消息传输基于流的方式,没有边界,而协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

Redis协议

如果我们要向Redis服务器发送一条set name Nyima的指令,需要遵守如下协议

// 该指令一共有3部分,每条指令之后都要添加回车与换行符 *3\r\n // 第一个指令的长度是3 $3\r\n // 第一个指令是set指令 set\r\n // 下面的指令以此类推 $4\r\n name\r\n $5\r\n Nyima\r\n 复制代码

下面是客户端代码

public class RedisClient {     static final Logger log = LoggerFactory.getLogger(StudyServer.class);     public static void main(String[] args) {         NioEventLoopGroup group =  new NioEventLoopGroup();         try {             ChannelFuture channelFuture = new Bootstrap()                     .group(group)                     .channel(NioSocketChannel.class)                     .handler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel ch) {                             // 打印日志                             ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));                             ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {                                 @Override                                 public void channelActive(ChannelHandlerContext ctx) throws Exception {                                     // 回车与换行符                                     final byte[] LINE = {'\r','\n'};                                     // 获得ByteBuf                                     ByteBuf buffer = ctx.alloc().buffer();                                     // 连接建立后,向Redis中发送一条指令,注意添加回车与换行                                     // set name Nyima                                     buffer.writeBytes("*3".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("$3".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("set".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("$4".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("name".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("$5".getBytes());                                     buffer.writeBytes(LINE);                                     buffer.writeBytes("Nyima".getBytes());                                     buffer.writeBytes(LINE);                                     ctx.writeAndFlush(buffer);                                 }                             });                         }                     })                     .connect(new InetSocketAddress("localhost", 6379));             channelFuture.sync();             // 关闭channel             channelFuture.channel().close().sync();         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             // 关闭group             group.shutdownGracefully();         }     } } 复制代码

控制台输出结果: 在这里插入图片描述

之后服务端接到了发送的数据包后就能够根据协议进行解析内容

HTTP协议

HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器 public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>         implements HttpServerUpgradeHandler.SourceCodecCopy 复制代码

使用这个方法可以发现会解析出两种类型的消息:请求行请求头、请求体

tips: 可以使用ch.pipeline().addLast(new SimpleChannelInboundHandler())指定单对一种类型的消息感兴趣,这个是只对HttpRequest感兴趣

下面是服务器代码, 服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可

public class HttpServer {     static final Logger log = LoggerFactory.getLogger(StudyServer.class);     public static void main(String[] args) {         NioEventLoopGroup group = new NioEventLoopGroup();         new ServerBootstrap()                 .group(group)                 .channel(NioServerSocketChannel.class)                 .childHandler(new ChannelInitializer<SocketChannel>() {                     @Override                     protected void initChannel(SocketChannel ch) {                         ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));                         // 作为服务器,使用 HttpServerCodec 作为编码器与解码器                         ch.pipeline().addLast(new HttpServerCodec());                         // 服务器只处理HTTPRequest                         ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {                             @Override                             protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {                                 // 获得请求uri                                 log.debug(msg.uri());                                 // 获得完整响应,设置版本号与状态码                                 DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);                                 // 设置响应内容                                 byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);                                 // 设置响应体长度,避免浏览器一直接收响应内容                                 response.headers().setInt(CONTENT_LENGTH, bytes.length);                                 // 设置响应体                                 response.content().writeBytes(bytes);                                 // 写回响应                                 ctx.writeAndFlush(response);                             }                         });                     }                 })                 .bind(8080);     } } 复制代码

浏览器结果:

在这里插入图片描述

控制台结果:

在这里插入图片描述

自定义协议

除了上面两种协议,我们自己也可以使用netty定义自己的协议

组成要素

  • 魔数:用来在第一时间判定接收的数据是否为无效数据包

  • 版本号:可以支持协议的升级

  • 序列化算法

    :消息正文到底采用哪种序列化反序列化方式

    • 如:json、protobuf、hessian、jdk

  • 指令类型:是登录、注册、单聊、群聊… 跟业务相关

  • 请求序号:为了双工通信,提供异步能力

  • 正文长度

  • 消息正文

编码器解码器

  • 编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。==此处使用了自定义类Message,代表消息==

    public class MessageCodec extends ByteToMessageCodec<Message> 复制代码

  • 编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。==正文内容如果为对象,需要通过序列化将其放入到ByteBuf中==

  • 解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler

public class MessageCodec extends ByteToMessageCodec<Message> {     @Override     protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {         // 设置魔数 4个字节         out.writeBytes(new byte[]{'N','Y','I','M'});         // 设置版本号 1个字节         out.writeByte(1);         // 设置序列化方式 1个字节         out.writeByte(1);         // 设置指令类型 1个字节         out.writeByte(msg.getMessageType());         // 设置请求序号 4个字节         out.writeInt(msg.getSequenceId());         // 为了补齐为16个字节,填充1个字节的数据,无意义         out.writeByte(0xff);         // 获得序列化后的msg         ByteArrayOutputStream bos = new ByteArrayOutputStream();         ObjectOutputStream oos = new ObjectOutputStream(bos);         oos.writeObject(msg);         byte[] bytes = bos.toByteArray();         // 获得并设置正文长度 长度用4个字节标识         out.writeInt(bytes.length);         // 设置消息正文         out.writeBytes(bytes);     }     @Override     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {         // 获取魔数         int magic = in.readInt();         // 获取版本号         byte version = in.readByte();         // 获得序列化方式         byte seqType = in.readByte();         // 获得指令类型         byte messageType = in.readByte();         // 获得请求序号         int sequenceId = in.readInt();         // 移除补齐字节         in.readByte();         // 获得正文长度         int length = in.readInt();         // 获得正文         byte[] bytes = new byte[length];         in.readBytes(bytes, 0, length);         ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));         Message message = (Message) ois.readObject(); // 将信息放入List中,传递给下一个handler         out.add(message);                  // 打印获得的信息正文         System.out.println("===========魔数===========");         System.out.println(magic);         System.out.println("===========版本号===========");         System.out.println(version);         System.out.println("===========序列化方法===========");         System.out.println(seqType);         System.out.println("===========指令类型===========");         System.out.println(messageType);         System.out.println("===========请求序号===========");         System.out.println(sequenceId);         System.out.println("===========正文长度===========");         System.out.println(length);         System.out.println("===========正文===========");         System.out.println(message);     } } 复制代码

编写测试类

  • 测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题

  • 通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码

public class TestCodec {     static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);     public static void main(String[] args) throws Exception {         EmbeddedChannel channel = new EmbeddedChannel();         // 添加解码器,避免粘包半包问题         channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));         channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));         channel.pipeline().addLast(new MessageCodec());         // LoginRequestMessage 是自建类         LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");         // 测试编码与解码         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();         new MessageCodec().encode(null, user, byteBuf);         channel.writeInbound(byteBuf);     } } 复制代码

结果:

在这里插入图片描述

在这里插入图片描述


作者:JAVAWarrior
链接:https://juejin.cn/post/7035447732584054820

 伪原创工具 SEO网站优化  https://www.237it.com/ 


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐