阅读 327

Netty编程(八)—— 多线程优化服务端

多线程架构

之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。

服务端线程可以建立多个线程,将这些线程分成两组:

  • 其中一个线程专门处理 accept 事件,称为Boss线程

  • 创建 cpu 核心数的线程,每个线程配一个Selector,轮流处理 read 事件,称为Worker线程

他们之间的的关系可以通过下面这张图进行理解:

在这里插入图片描述

Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。当Boss线程检测到有客户端的连接请求,就会把这个连接返回的SocketChannel注册到某一个Worker线程上。当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。

Worker类的实现

上面分析了Worker类是一个监听并且处理读写事件的新线程,所以在Worker类中需要一个Thread对象用来启动线程,还需要一个Selector用来监听事件管理channel,此外为Worker设定一个name。这些可以在构造函数中进行初始化。

之后需要实现一个register函数,这个函数接收一个SocketChannel对象,Worker类的register函数将这个SocketChannel对象注册到Worker类的selector上。

下面就是完整的Worker类的代码:

static class Worker implements Runnable{         private String name;         private Thread thread;         private Selector selector;         private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();         public Worker(String name) throws IOException {             this.name = name;             thread = new Thread(this, name);             thread.start();             selector = Selector.open();         }         //初始化线程和selector         public void register(SocketChannel sc) throws IOException {             //将任务添加到队列中             queue.add(()->{                 try {                     sc.register(this.selector,SelectionKey.OP_READ,null);//boss                 } catch (ClosedChannelException e) {                     e.printStackTrace();                 }             });             selector.wakeup();         }         @Override         public void run() {             while(true){                 try{                     selector.select();//worker-0  一开始会阻塞,下面执行不了,所以要先wakeup一下来注册                     Runnable task = queue.poll();                     if(task!=null)                     {                         task.run();//执行了sc.register(this.selector,SelectionKey.OP_READ,null);//boss                     }                     Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();                     while (iter.hasNext()) {                         SelectionKey key = iter.next();                         iter.remove();                         if (key.isReadable()) {                             ByteBuffer buffer = ByteBuffer.allocate(16);                             SocketChannel channel = ((SocketChannel) key.channel());                             channel.read(buffer);                             buffer.flip();                             ByteBufferUtil.debugAll(buffer);//buffer可视化                             key.cancel();                         }                     }                 }catch (Exception e)                 {                 }             }         }     } 复制代码

在这段代码中,还有一个需要注意的问题:必须保证sc.register(this.selector,SelectionKey.OP_READ,null)不会因为selector.select()被阻塞,否则就不能给socketchannel注册上。所以这里采用的方法是使用消息队列,当有线程想要注册到selector上时,就先使用selector.wakeup()唤醒,紧接着在selector.select()下面进行注册,即拿出消息队列中的注册任务执行。这种方式就能够保证当注册通道时不会被select方法阻塞住。

多Worker

上面讲解了Worker类,在实际服务端代码里,肯定是使用多个Worker去管理多个channel,那么可以使用一个worker数组,每一个channel注册worker数组中的某一个worker,此外,可以使用一个变量去给channel轮流分配worker,具体是这样实现的:

  1. 创建worker数组,并对每一项进行初始化,此外使用一个workerindex表示给下一个通道分配的worker在数组的下标,需要注意的是,因为是多线程环境,所以不能使用int类型,使用的是AtomicInteger类型。

Worker[] workers = new Worker[5]; for (int i =0 ;i< workers.length;i++) {      workers[i] = new Worker("worker-"+i); } AtomicInteger workerindex = new AtomicInteger(); 复制代码

2、给通道注册worker,这里需要模数组长度以达到循环使用的目的,做到了负载均衡。

workers[workerindex.getAndIncrement()% workers.length].register(sc); 复制代码

服务端代码

下面给出服务端所有的代码:

@Slf4j public class ThreadNIOServer {     public static void main(String[] args) {         try(ServerSocketChannel server = ServerSocketChannel.open()) {             Thread.currentThread().setName("BOSS");             server.bind(new InetSocketAddress(8080));             server.configureBlocking(false);             Selector boss = Selector.open();             server.register(boss, SelectionKey.OP_ACCEPT);             //1、创建固定数量的worker             Worker[] workers = new Worker[5];             for (int i =0 ;i< workers.length;i++)             {                 workers[i] = new Worker("worker-"+i);             }             AtomicInteger workerindex = new AtomicInteger();             while(true)             {                 boss.select();                 Set<SelectionKey> selectionKeys = boss.selectedKeys();                 Iterator<SelectionKey> iter = selectionKeys.iterator();                 while (iter.hasNext())                 {                     SelectionKey key = iter.next();                     iter.remove();                     if(key.isAcceptable())                     {                         SocketChannel sc = server.accept();                         sc.configureBlocking(false);                         log.debug("connected...{}",sc.getRemoteAddress());                         //2、关联socket channel和worker                         log.debug("before connect...");                         workers[workerindex.getAndIncrement()% workers.length].register(sc);                         log.debug("after connect...");                     }                     else if(key.isReadable())                     {                     }                 }             }         }catch (Exception e){         }     }     static class Worker implements Runnable{         private String name;         private Thread thread;         private Selector selector;         private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();         public Worker(String name) throws IOException {             this.name = name;             thread = new Thread(this, name);             thread.start();             selector = Selector.open();         }         //初始化线程和selector         public void register(SocketChannel sc) throws IOException {             //将任务添加到队列中             queue.add(()->{                 try {                     sc.register(this.selector,SelectionKey.OP_READ,null);//boss                 } catch (ClosedChannelException e) {                     e.printStackTrace();                 }             });             selector.wakeup();         }         @Override         public void run() {             while(true){                 try{                     selector.select();//worker-0  一开始会阻塞,下面执行不了,所以要先wakeup一下来注册                     Runnable task = queue.poll();                     if(task!=null)                     {                         task.run();//执行了sc.register(this.selector,SelectionKey.OP_READ,null);//boss                     }                     Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();                     while (iter.hasNext()) {                         SelectionKey key = iter.next();                         iter.remove();                         if (key.isReadable()) {                             ByteBuffer buffer = ByteBuffer.allocate(16);                             SocketChannel channel = ((SocketChannel) key.channel());                             channel.read(buffer);                             buffer.flip();                             ByteBufferUtil.debugAll(buffer);//buffer可视化                             key.cancel();                         }                     }                 }catch (Exception e)                 {                 }             }         }     } }


作者:JAVAWarrior
链接:https://juejin.cn/post/7031129066203938830


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐