阅读 77

Netty系列(一):Springboot整合Netty,自定义协议实现

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程。

Springboot整合Netty

新建springboot项目,并在项目以来中导入netty包,用fastjson包处理jsonStr。

 <!-- netty -->         <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-all</artifactId>             <version>4.1.42.Final</version>         </dependency>         <!-- Json处理 -->         <dependency>             <groupId>com.alibaba.fastjson2</groupId>             <artifactId>fastjson2</artifactId>             <version>2.0.16</version>         </dependency> 复制代码

创建netty相关配置信息文件

  1. yml配置文件——application.yml

# netty 配置 netty:   # boss线程数量   boss: 4   # worker线程数量   worker: 2   # 连接超时时间   timeout: 6000   # 服务器主端口   port: 18023   # 服务器备用端口   portSalve: 18026   # 服务器地址   host: 127.0.0.1 复制代码

  1. netty配置实体类——NettyProperties与yml配置文件绑定 通过@ConfigurationProperties(prefix = "netty")注解读取配置文件中的netty配置,通过反射注入值,需要在实体类中提供对应的setter和getter方法。

@ConfigurationProperties(prefix = "netty")对应的实体类属性名称不要求一定相同,只需保证“set”字符串拼接配置文件的属性和setter方法名相同即可。

@Configuration @ConfigurationProperties(prefix = "netty") public class NettyProperties {     /**      * boss线程数量      */     private Integer boss;     /**      * worker线程数量      */     private Integer worker;     /**      * 连接超时时间      */     private Integer timeout = 30000;     /**      * 服务器主端口      */     private Integer port = 18023;     /**      * 服务器备用端口      */     private Integer portSalve = 18026;     /**      * 服务器地址 默认为本地      */     private String host = "127.0.0.1"; // setter、getter 。。。。 } 复制代码

  1. 对netty进行配置,绑定netty相关配置设置 Netty通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

@Configuration @EnableConfigurationProperties public class NettyConfig {     final NettyProperties nettyProperties;     public NettyConfig(NettyProperties nettyProperties) {         this.nettyProperties = nettyProperties;     }     /**      * boss线程池-进行客户端连接      *      * @return      */     @Bean     public NioEventLoopGroup boosGroup() {         return new NioEventLoopGroup(nettyProperties.getBoss());     }     /**      * worker线程池-进行业务处理      *      * @return      */     @Bean     public NioEventLoopGroup workerGroup() {         return new NioEventLoopGroup(nettyProperties.getWorker());     }     /**      * 服务端启动器,监听客户端连接      *      * @return      */     @Bean     public ServerBootstrap serverBootstrap() {         ServerBootstrap serverBootstrap = new ServerBootstrap()                 // 指定使用的线程组                 .group(boosGroup(), workerGroup())                 // 指定使用的通道                 .channel(NioServerSocketChannel.class)                 // 指定连接超时时间                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())                 // 指定worker处理器                 .childHandler(new NettyServerHandler());         return serverBootstrap;     } } 复制代码

  1. worker处理器,初始化通道以及配置对应管道的处理器 自定义了##@##分割符,通过DelimiterBasedFrameDecoder来处理拆包沾包问题; 通过MessageDecodeHandler将接收消息解码处理成对象实例; 通过MessageEncodeHandler将发送消息增加分割符后并编码; 最后通过ServerListenerHandler根据消息类型对应处理不同消息。

public class NettyServerHandler extends ChannelInitializer<SocketChannel> {     @Override     protected void initChannel(SocketChannel socketChannel) throws Exception {         // 数据分割符         String delimiterStr = "##@##";         ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());         ChannelPipeline pipeline = socketChannel.pipeline();         // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节         pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));         // 将上一步解码后的数据转码为Message实例         pipeline.addLast(new MessageDecodeHandler());         // 对发送客户端的数据进行编码,并添加数据分隔符         pipeline.addLast(new MessageEncodeHandler(delimiterStr));         // 对数据进行最终处理         pipeline.addLast(new ServerListenerHandler());     } } 复制代码

  1. 数据解码 数据解码和编码都采用UTF8格式

public class MessageDecodeHandler extends ByteToMessageDecoder {     @Override     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {         ByteBuf frame = in.retainedDuplicate();         final String content = frame.toString(CharsetUtil.UTF_8);         Message message = new Message(content);         list.add(message);         in.skipBytes(in.readableBytes());     } } 复制代码

  1. 数据解码转换的实例 Message类用于承载消息、转JsonString

 public class Message {     /**      * 数据长度      */     private Integer len;     /**      * 接收的通讯数据body      */     private String content;     /**      * 消息类型      */     private Integer msgType;     public Message(Object object) {         String str = object.toString();         JSONObject jsonObject = JSONObject.parseObject(str);         msgType = Integer.valueOf(jsonObject.getString("msg_type"));         content = jsonObject.getString("body");         len = str.length();     }     public String toJsonString() {         return "{" +                 "\"msg_type\": " + msgType + ",\n" +                 "\"body\": " + content +                 "}";     } // setter、getter 。。。。 } 复制代码

  1. 数据编码 netty服务端回复消息时,对消息转JsonString增加分割符,并进行编码。

public class MessageEncodeHandler extends MessageToByteEncoder<Message> {     // 数据分割符     String delimiter;     public MessageEncodeHandler(String delimiter) {         this.delimiter = delimiter;     }     @Override     protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {         out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));     } } 复制代码

  1. 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型,在使用switch时可以做下处理,具体参考代码,这里只演示如何操作,并没实现数据处理业务类。

 public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {     private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);     /**      * 设备接入连接时处理      *      * @param ctx      */     @Override     public void handlerAdded(ChannelHandlerContext ctx) {         log.info("有新的连接:[{}]", ctx.channel().id().asLongText());     }     /**      * 数据处理      *      * @param ctx      * @param msg      */     @Override     protected void channelRead0(ChannelHandlerContext ctx, Message msg) {         // 获取消息实例中的消息体         String content = msg.getContent();         // 对不同消息类型进行处理         MessageEnum type = MessageEnum.getStructureEnum(msg);         switch (type) {             case CONNECT:                 // TODO 心跳消息处理             case STATE:                 // TODO 设备状态             default:                 System.out.println(type.content + "消息内容" + content);         }     }     /**      * 设备下线处理      *      * @param ctx      */     @Override     public void handlerRemoved(ChannelHandlerContext ctx) {         log.info("设备下线了:{}", ctx.channel().id().asLongText());     }     /**      * 设备连接异常处理      *      * @param ctx      * @param cause      */     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         // 打印异常         log.info("异常:{}", cause.getMessage());         // 关闭连接         ctx.close();     } } 复制代码

  1. 数据类型枚举类

 public enum MessageEnum {     CONNECT(1, "心跳消息"),     STATE(2, "设备状态");     public final Integer type;     public final String content;     MessageEnum(Integer type, String content) {         this.type = type;         this.content = content;     }     // case中判断使用     public static MessageEnum getStructureEnum(Message msg) {         Integer type = Optional.ofNullable(msg)                 .map(Message::getMsgType)                 .orElse(0);         if (type == 0) {             return null;         } else {             List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())                     .filter((item) -> item.getType() == type)                     .distinct()                     .collect(Collectors.toList());             if (objectEnums.size() > 0) {                 return objectEnums.get(0);             }             return null;         }     } // setter、getter。。。。 } 复制代码

到此Netty整个配置已经完成,但如果要跟随springboot一起启动,仍需要做一些配置。

  1. netty启动类配置

@Component public class NettyServerBoot {     private static final Logger log = LoggerFactory.getLogger(NettyServerBoot.class);     @Resource     NioEventLoopGroup boosGroup;     @Resource     NioEventLoopGroup workerGroup;     final ServerBootstrap serverBootstrap;     final NettyProperties nettyProperties;     public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {         this.serverBootstrap = serverBootstrap;         this.nettyProperties = nettyProperties;     }     /**      * 启动netty      *      * @throws InterruptedException      */     @PostConstruct     public void start() throws InterruptedException {         // 绑定端口启动         serverBootstrap.bind(nettyProperties.getPort()).sync();         // 备用端口         serverBootstrap.bind(nettyProperties.getPortSalve()).sync();         log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());     }     /**      * 关闭netty      */     @PreDestroy     public void close() {         log.info("关闭Netty");         boosGroup.shutdownGracefully();         workerGroup.shutdownGracefully();     } } 复制代码

增加NettyServerBoot配置后,启动application时,netty服务端会跟随一起启动。 在这里插入图片描述 同时,在springboot关闭前,会先销毁netty服务。 在这里插入图片描述


作者:鳄鱼儿
链接:https://juejin.cn/post/7168862676683079694


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐