Netty编程(九)—— 协议设计与解析
TCP/IP 中消息传输基于流的方式,没有边界,而协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议
如果我们要向Redis服务器发送一条set name Nyima
的指令,需要遵守如下协议
// 该指令一共有3部分,每条指令之后都要添加回车与换行符 *3\r\n // 第一个指令的长度是3 $3\r\n // 第一个指令是set指令 set\r\n // 下面的指令以此类推 $4\r\n name\r\n $5\r\n Nyima\r\n 复制代码
下面是客户端代码
public class RedisClient { static final Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 打印日志 ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 回车与换行符 final byte[] LINE = {'\r','\n'}; // 获得ByteBuf ByteBuf buffer = ctx.alloc().buffer(); // 连接建立后,向Redis中发送一条指令,注意添加回车与换行 // set name Nyima buffer.writeBytes("*3".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$3".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("set".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$4".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("name".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$5".getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("Nyima".getBytes()); buffer.writeBytes(LINE); ctx.writeAndFlush(buffer); } }); } }) .connect(new InetSocketAddress("localhost", 6379)); channelFuture.sync(); // 关闭channel channelFuture.channel().close().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 关闭group group.shutdownGracefully(); } } } 复制代码
控制台输出结果:
之后服务端接到了发送的数据包后就能够根据协议进行解析内容
HTTP协议
HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec
作为服务器端的解码器与编码器,来处理HTTP请求
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器 public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder> implements HttpServerUpgradeHandler.SourceCodecCopy 复制代码
使用这个方法可以发现会解析出两种类型的消息:请求行请求头、请求体
tips: 可以使用ch.pipeline().addLast(new SimpleChannelInboundHandler())指定单对一种类型的消息感兴趣,这个是只对HttpRequest感兴趣
下面是服务器代码, 服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可
public class HttpServer { static final Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); // 作为服务器,使用 HttpServerCodec 作为编码器与解码器 ch.pipeline().addLast(new HttpServerCodec()); // 服务器只处理HTTPRequest ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) { // 获得请求uri log.debug(msg.uri()); // 获得完整响应,设置版本号与状态码 DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); // 设置响应内容 byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8); // 设置响应体长度,避免浏览器一直接收响应内容 response.headers().setInt(CONTENT_LENGTH, bytes.length); // 设置响应体 response.content().writeBytes(bytes); // 写回响应 ctx.writeAndFlush(response); } }); } }) .bind(8080); } } 复制代码
浏览器结果:
控制台结果:
自定义协议
除了上面两种协议,我们自己也可以使用netty定义自己的协议
组成要素
魔数:用来在第一时间判定接收的数据是否为无效数据包
版本号:可以支持协议的升级
序列化算法
:消息正文到底采用哪种序列化反序列化方式
如:json、protobuf、hessian、jdk
指令类型:是登录、注册、单聊、群聊… 跟业务相关
请求序号:为了双工通信,提供异步能力
正文长度
消息正文
编码器解码器
编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。==此处使用了自定义类Message,代表消息==
public class MessageCodec extends ByteToMessageCodec<Message> 复制代码
编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。==正文内容如果为对象,需要通过序列化将其放入到ByteBuf中==
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler
public class MessageCodec extends ByteToMessageCodec<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { // 设置魔数 4个字节 out.writeBytes(new byte[]{'N','Y','I','M'}); // 设置版本号 1个字节 out.writeByte(1); // 设置序列化方式 1个字节 out.writeByte(1); // 设置指令类型 1个字节 out.writeByte(msg.getMessageType()); // 设置请求序号 4个字节 out.writeInt(msg.getSequenceId()); // 为了补齐为16个字节,填充1个字节的数据,无意义 out.writeByte(0xff); // 获得序列化后的msg ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); // 获得并设置正文长度 长度用4个字节标识 out.writeInt(bytes.length); // 设置消息正文 out.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 获取魔数 int magic = in.readInt(); // 获取版本号 byte version = in.readByte(); // 获得序列化方式 byte seqType = in.readByte(); // 获得指令类型 byte messageType = in.readByte(); // 获得请求序号 int sequenceId = in.readInt(); // 移除补齐字节 in.readByte(); // 获得正文长度 int length = in.readInt(); // 获得正文 byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); // 将信息放入List中,传递给下一个handler out.add(message); // 打印获得的信息正文 System.out.println("===========魔数==========="); System.out.println(magic); System.out.println("===========版本号==========="); System.out.println(version); System.out.println("===========序列化方法==========="); System.out.println(seqType); System.out.println("===========指令类型==========="); System.out.println(messageType); System.out.println("===========请求序号==========="); System.out.println(sequenceId); System.out.println("===========正文长度==========="); System.out.println(length); System.out.println("===========正文==========="); System.out.println(message); } } 复制代码
编写测试类
测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码
public class TestCodec { static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main(String[] args) throws Exception { EmbeddedChannel channel = new EmbeddedChannel(); // 添加解码器,避免粘包半包问题 channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0)); channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); channel.pipeline().addLast(new MessageCodec()); // LoginRequestMessage 是自建类 LoginRequestMessage user = new LoginRequestMessage("Nyima", "123"); // 测试编码与解码 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, user, byteBuf); channel.writeInbound(byteBuf); } } 复制代码
结果:
作者:JAVAWarrior
链接:https://juejin.cn/post/7035447732584054820
伪原创工具 SEO网站优化 https://www.237it.com/