阅读 132

MQ 入门实践

MQ 入门实践

MQ

Message Queue,消息队列,FIFO 结构。




例如电商平台,在用户支付订单后执行对应的操作;




优点:


异步

削峰

解耦

缺点


增加系统复杂性

数据一致性

可用性

JMS

Java Message Service,Java消息服务,类似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通信的规范;


区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。


ConnectionFactory


Connection


Destination


Session


MessageConsumer


MessageProducer


Message


协作方式图示为;




业界产品

ActiveMQ RabbitMQ RocketMQ kafka

单机吞吐量 万级 万级 10 万级 10 万级

可用性 非常高 非常高

可靠性 较低概率丢失消息 基本不丢 可以做到 0 丢失 可以做到 0 丢失

功能支持 较为完善 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,支持分布式事务 较为简单,主要应用与大数据实时计算,日志采集等

社区活跃度

ActiveMQ

作为 Apache 下的开源项目,完全支持 JMS 规范。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适合不过。


快速开始

添加依赖;


<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-core</artifactId>

    <version>5.7.0</version>

</dependency>

1

2

3

4

5

消息发送;


// 1. 创建连接工厂

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// 2. 工厂创建连接

Connection connection = factory.createConnection();

// 3. 启动连接

connection.start();

// 4. 创建连接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5. 根据session创建消息队列目的地

Destination queue = session.createQueue("test-queue");

// 6. 根据session和目的地queue创建生产者

MessageProducer producer = session.createProducer(queue);

// 7. 根据session创建消息实体

Message message = session.createTextMessage("hello world!");

// 8. 通过生产者producer发送消息实体

producer.send(message);

// 9. 关闭连接

connection.close();

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Spring Boot 集成

自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration


添加依赖;


<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

1

2

3

4

添加 yaml 配置;


spring:

  activemq:

    broker-url: tcp://localhost:61616

  jms:

    #消息模式 true:广播(Topic),false:队列(Queue),默认时false

    pub-sub-domain: true

1

2

3

4

5

6

收发消息;


@Autowired

private JmsTemplate jmsTemplate;


// 接收消息

@JmsListener(destination = "test")

public void receiveMsg(String msg) {

    System.out.println(msg);

}


// 发送消息

public void sendMsg(String destination, String msg) {

    jmsTemplate.convertAndSend(destination, msg);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

高可用

基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;


<persistenceAdapter>

    <replicatedLevelDB

        directory="${activemq.data}/levelDB"

        replicas="3"

        bind="tcp://0.0.0.0:0"

        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"

        zkPath="/activemq/leveldb-stores"

        hostname="localhost"

    />

</persistenceAdapter>

1

2

3

4

5

6

7

8

9

10

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false


负载均衡

在高可用集群节点 activemq.xml 添加节点 networkConnectors;


<networkConnectors>

    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>

</networkConnectors>

1

2

3

更多详细信息可参考:https://blog.csdn.net/haoyuyang/article/details/53931710


集群消费

由于发布订阅模式,所有订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。


ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。




在 activemq.xml 添加节点 destinationInterceptors;


<destinationInterceptors> 

    <virtualDestinationInterceptor> 

        <virtualDestinations> 

            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    

        </virtualDestinations>

    </virtualDestinationInterceptor> 

</destinationInterceptors>

1

2

3

4

5

6

7

生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。


更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829


RocketMQ

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。


架构图示



Name Server


名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;


Broker


RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;


Producer


消息生产方;


Consumer


消息消费方;


快速开始

安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;


https://segmentfault.com/a/1190000017841402


添加依赖;


<dependency>

    <groupId>org.apache.rocketmq</groupId>

    <artifactId>rocketmq-client</artifactId>

    <version>4.5.2</version>

</dependency>

1

2

3

4

5

消息发送;


DefaultMQProducer producer = new DefaultMQProducer("producer-group");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.setInstanceName("producer");

producer.start();

Message msg = new Message(

    "producer-topic",

    "msg",

    "hello world".getBytes()

);

//msg.setDelayTimeLevel(1);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult.toString());

producer.shutdown();

1

2

3

4

5

6

7

8

9

10

11

12

13

delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。


参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。


消息接收;


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

consumer.setNamesrvAddr("127.0.0.1:9876");

consumer.setInstanceName("consumer");

consumer.subscribe("producer-topic", "msg");

consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {

    for (MessageExt msg : list) {

        System.out.println(new String(msg.getBody()));

    }

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

1

2

3

4

5

6

7

8

9

10

11

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p “hello rocketmq” -n localhost:9876


Spring Boot 集成

添加依赖;


<dependency>

    <groupId>org.apache.rocketmq</groupId>

    <artifactId>rocketmq-spring-boot-starter</artifactId>

    <version>2.0.4</version>

</dependency>

1

2

3

4

5

添加 yaml 配置;


rocketmq:

  name-server: 127.0.0.1:9876

  producer:

    group: producer

1

2

3

4

发送消息;


@Autowired

private RocketMQTemplate mqTemplate;


public void sendMessage(String topic, String tag, String message) {

    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);

    System.out.println(JSON.toJSONString(result));

}

1

2

3

4

5

6

7

接收消息;


@Component

@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")

public class MsgListener implements RocketMQListener<String> {


    @Override

    public void onMessage(String message) {

        System.out.println(message);

    }

}

1

2

3

4

5

6

7

8

9

Console 控制台

RocketMQ 拓展包提供了管理控制台;


https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console




重复消费

产生原因:


生产者重复投递;

消息队列异常;

消费者异常消费;

怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性。


通常基于本地消息表的方案实现,消息处理过便不再处理。


顺序消息

消息错乱的原因:


一个消息队列 queue,多个 consumer 消费;

一个 queue 对应一个 consumer,但是 consumer 多线程消费;

要保证消息的顺序消费,有三个关键点:


消息顺序发送

消息顺序存储

消息顺序消费



参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。


分布式事务

在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。




通过 broker 的 HA 高可用,和定时回查 prepare 消息的状态,来保证最终一致性。

————————————————

版权声明:本文为CSDN博主「叫我宫城大人」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/qq_15002323/article/details/115876335


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐