阅读 86

JAVA:多线程处理,实现MQ中同类型消息保序消费

最近项目中有一个关于MQ消息消费和java线程池的应用场景,如下:

基本场景图

业务进程:实现基础业务功能,将租户业务拆分成设备配置,通过MQ与设备适配层进行消息通知。

MQ:使用主流MQ中间件实现。保存的消息为租户业务拆分后的设备配置信息。

设备适配层:消息消费者,消费消息(单线程消费),将消息中的设备配置信息下发到对应的设备上(多线程处理配置下发)。


消息实例

        基本的消息体如上图所示,业务进程根据租户业务拆分成一个(或多个)设备的多个配置项,每台设备的配置项之间有配置顺序要求:config1->config2->config3->config4。业务进程按照配置顺序将配置信息写入MQ中,此时MQ中的消息是有序的。

        设备适配层作为消息消费者,读取MQ中的设备配置消息,使用线程池来进行设备配置处理,提高消息处理速度。此时,如何在多线程并行的情况下保证同一台设备的配置下发顺序,即在多线程并行处理时如何保证MQ中同类型消息(设备ip相同)的保序消费呢???


        因为消息可以根据设备ip进行区分,这时我想到如果可以把包含同一个设备ip的消息放入到一个线程中处理就可以解决该问题了。- _ -

        那么如何做呢???或者说如何设计Runnable的接口实现??(Thread执行的Task)

        上面写了一大堆文字,作为码农的我真心感觉有点累,下面先上代码吧:

package thread;

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;

import java.util.concurrent.ScheduledThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class T004_TestThreadRef {

public static void main(String[] args) {

//初始化Scheduled线程池,设置核心线程数为5

ScheduledThreadPoolExecutor executorService =new ScheduledThreadPoolExecutor(5);

//初始化5个task

OperationTask task1 =new OperationTask("taskList1");

OperationTask task2 =new OperationTask("taskList2");

OperationTask task3 =new OperationTask("taskList3");

OperationTask task4 =new OperationTask("taskList4");

OperationTask task5 =new OperationTask("taskList5");

//使用线程池执行task

executorService.scheduleAtFixedRate(task1,0,5000,TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(task2,0,5000,TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(task3,0,5000,TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(task4,0,5000,TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(task5,0,5000,TimeUnit.MILLISECONDS);

//构建消息list

ListipList =new ArrayList<>();

ipList.add("1.1.1.1");

ipList.add("2.2.2.2");

ipList.add("3.3.3.3");

ipList.add("4.4.4.4");

ipList.add("5.5.5.5");

ipList.add("6.6.6.6");

ipList.add("7.7.7.7");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

ipList.add("1.1.1.1");

//将消息Hash到对应的task中

dispatchThread(task1,task2,task3,task4,task5,ipList);

try {

Thread.sleep(1000);

}catch (InterruptedException e) {

e.printStackTrace();

}

// 模拟第二次的消息消费

dispatchThread(task1,task2,task3,task4,task5,ipList);

//关闭线程池

//executorService.shutdown();

}

private static void dispatchThread(OperationTask task1,OperationTask task2,OperationTask task3,OperationTask task4,OperationTask task5,List ipList) {

for (String s : ipList) {

//使用设备ip的hash进行取模运算

int mod =Math.floorMod(s.hashCode(),5);

switch (mod) {

case 1:

task1.addTask(s);

break;

case 2:

task2.addTask(s);

break;

case 3:

task3.addTask(s);

break;

case 4:

task4.addTask(s);

break;

case 0:

task5.addTask(s);

break;

default:

task1.addTask(s);

break;

}

}

}

static class OperationTask implements Runnable {

public OperationTask(String name) {

this.name = name;

}

private final String name;

//配置下发任务队列

//        private final Queue tasksQueue = new LinkedList<>();

        private final LinkedListtasks =new LinkedList<>();

//添加配置下发任务

        public void addTask(String task) {

this.tasks.add(task);

}

@Override

        public void run() {

//执行配置下发操作

System.out.println(name + " start.");

            int count =0;

while (!tasks.isEmpty()) {

System.out.println("TaskOperator: " +name +" operate task : " +tasks.pop());

try {

Thread.sleep(500);

}catch (InterruptedException e) {

e.printStackTrace();

}

count++;

}

/*            Iterator iterable = tasks.iterator();

while (iterable.hasNext()) {

System.out.println("TaskOperator: " + name + " operate task : " + tasks.pop());

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

iterable.remove();

count++;

}*/

            System.out.println(name +" end with " + count +" completed");

}

}

}



        还是代码看着亲切啊!!上面的代码就是我模拟的设备适配层中消费消息的功能:使用ScheduledThreadPoolExecutor这个线程池来处理设备配置下发操作。以5s周期执行OperationTask任务。

Task

        OperationTask实现了Runnable接口,它的run方法执行的就是配置下发操作,使用while循环判断tasks中是否存在配置下发任务,使用tasks.pop()方法获取配置下发消息。注意,这里使用了LinkedList来保存设备配置任务。LinkedListFIFO特性,可以保证配置任务的顺序。

        为什么使用while+pop来处理任务,不使用for/Iterator+remove来处理任务呢??

        消息消费使用的是线程池,由一个主线程将消息写入到对应的worker线程的队列中,worker线程又要从队列中取数据进行处理,此时使用for/Iterator+remove操作进行任务处理时会因为主线程写入数据到队列,导致List中length和index的变化,导致循环失败。

Iterator-remove

        我这里使用Iterator进行循环,使用remove清除队列中的已完成消息,会导致线程池中线程挂掉。。。具体原因还没有找到,后面找到再更新到这里。

dispatchThread

        将设备ip进行hash+mod计算,分配task到对应的OperationTask的任务队列中,调用task的addTask方法。


模拟消息消费

        ipList模拟设备配置消息,包含设备ip。通过调用dispatchThread方法模拟消息消费,每次消费ipList.length个消息,然后进行运算写入到各task的队列中。Schedule线程池执行task任务,周期性消费task的队列(LinkedList)中的消息。

        结果:

taskList4 start.

taskList5 start.

taskList3 start.

taskList2 start.

TaskOperator: taskList2 operate task : 2.2.2.2

taskList1 start.

TaskOperator: taskList3 operate task : 6.6.6.6

TaskOperator: taskList5 operate task : 4.4.4.4

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList1 operate task : 3.3.3.3

TaskOperator: taskList1 operate task : 7.7.7.7

taskList3 end with 1 completed

TaskOperator: taskList4 operate task : 5.5.5.5

taskList2 end with 1 completed

taskList5 end with 1 completed

TaskOperator: taskList1 operate task : 3.3.3.3

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList1 operate task : 7.7.7.7

TaskOperator: taskList4 operate task : 1.1.1.1

taskList1 end with 4 completed

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

taskList1 start.

taskList1 end with 0 completed

taskList2 start.

taskList5 start.

TaskOperator: taskList5 operate task : 4.4.4.4

taskList3 start.

TaskOperator: taskList2 operate task : 2.2.2.2

TaskOperator: taskList3 operate task : 6.6.6.6

TaskOperator: taskList4 operate task : 5.5.5.5

taskList5 end with 1 completed

taskList2 end with 1 completed

taskList3 end with 1 completed

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

TaskOperator: taskList4 operate task : 1.1.1.1

taskList4 end with 18 completed

taskList4 start.

taskList4 end with 0 completed

taskList1 start.

taskList2 start.

taskList2 end with 0 completed

taskList1 end with 0 completed

taskList3 start.

taskList3 end with 0 completed

taskList4 start.

taskList4 end with 0 completed

taskList5 start.

taskList5 end with 0 completed

        通过结果看到,总共执行了28个task,与dispatchThread分配的task个数相同。

        第一次通过简书记录自己的coding生活,有不到位的地方欢迎大家指正,同时有更好的解决方式希望大佬们不吝赐教,多谢!

作者:浮物乱炖

原文链接:https://www.jianshu.com/p/08a0e1264d8a

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐