阅读 206

SpringBoot RabbitMQ 七种工作模式入门

RabbitMQ 工作模式

简单模式
  • 简单:一个生产者、一个队列和一个消费者,生产者发送消息至队列,消费者监听队列并消费消息
Work 模式
  • Work:一个生产者、一个队列和多个消费者,生产者发送消息至队列,多个消费者监听同一队列消费消息
发布/订阅模式
  • 发布/订阅:publish/subscribe 模式包含一个生产者、一个交换机、多个队列及多个消费者,交换机(Exchange)和队列直接绑定,生产者通过交换机(Exchange)将消息存储在与交换机绑定的队列中,消费者监听队列并进行消费
路由模式
  • 路由:routing 模式可以根据 routing key 将消息发送给指定队列,交换机(Exchange)和队列通过routing key 进行绑定,生产者通过交换机(Exchange)和 routing key 将消息精准发送至队列,消费者监听队列并消费消息
主题模式
  • 主题:Topics 模式在路由模式的基础上支持通配符操作,交换机会根据通配符将消息存储在匹配成功的队列中,消费者监听队列并进行消费
Header 模式
  • Header:header 模式取消了 routing key,而是使用 header 中的 key/value 键值对来进行匹配,匹配成功后消息会通过交换机发送给队列,消息者才能获取到消息并消费
RPC 模式
  • RPC:RPC 模式主要针对需要获取消费者处理结果的情况,通常是生产者将消息发送给了消费者,消费者接收到消息并进行消费后返回给生产者处理结果

SpringBoot 集成 RabbitMQ

  • 首先创建一个SpringBoot 项目,pom.xml 文件加入以下依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  • 配置文件修改,加入以下 RabbitMQ 配置
server:
  port: 8888  # 设置端口号

spring:
  rabbitmq:
    host: 127.0.0.1  # 设置RabbitMQ所在主机
    port: 5672       # 设置RabbitMQ服务端口
    username: guest  # 设置RabbitMQ用户名
    password: guest  # 设置RabbitMQ密码
  • 新增公共常量类
public interface RabbitConstant {

    /**
     * 简单模式
     */
    String SIMPLE_QUEUE_NAME = "simple_queue";

    /**
     * work 模式
     */
    String WORK_QUEUE_NAME = "work_queue";

    /**
     * 发布/订阅(publish/subscribe)模式
     */
    String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange";
    String PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue";
    String PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue";

    /**
     * 路由(routing)模式
     */
    String ROUTING_EXCHANGE_NAME = "routing_exchange";
    String ROUTING_FIRST_QUEUE_NAME = "routing_first_queue";
    String ROUTING_SECOND_QUEUE_NAME = "routing_second_queue";
    String ROUTING_THIRD_QUEUE_NAME = "routing_third_queue";
    String ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key";
    String ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key";
    String ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key";

    /**
     * 主题(topics)模式
     */
    String TOPICS_EXCHANGE_NAME = "topics_exchange";
    String TOPICS_FIRST_QUEUE_NAME = "topics_first_queue";
    String TOPICS_SECOND_QUEUE_NAME = "topics_second_queue";
    String TOPICS_THIRD_QUEUE_NAME = "topics_third_queue";
    String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key";
    String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key";
    String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key";
    String TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#";
    String TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#";
    String TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*";

    /**
     * header 模式
     */
    String HEADER_EXCHANGE_NAME = "header_exchange";
    String HEADER_FIRST_QUEUE_NAME = "header_first_queue";
    String HEADER_SECOND_QUEUE_NAME = "header_second_queue";

    /**
     * rpc 模式
     */
    String RPC_QUEUE_NAME = "rpc_queue";

}
  • 新增 Controller 请求类(用于验证结果,可最后新增)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/simple")
    public void simple() {
        rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
    }

    @GetMapping(value = "/work")
    public void work() {
        rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
    }

    @GetMapping(value = "/pubsub")
    public void pubsub() {
        rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
    }

    @GetMapping(value = "/routing")
    public void routing() {
        // 给第一个队列发送消息
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
    }

    @GetMapping(value = "/topics")
    public void topics() {
        // 给第一个队列发送消息,此时队列能接受到消息,因为队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
        // 给第二个队列发送消息,此时队列也能接受到消息,因为队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
        // 给第三个队列发送消息,此时队列无法接受到消息,因为队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }

    @GetMapping(value = "/header")
    public void header() {
        // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("matchAll", "YES");
        messageProperties.setHeader("hello", "world");
        Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

        // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功
        MessageProperties messagePropertiesSecond = new MessageProperties();
        messagePropertiesSecond.setHeader("matchAll", "NO");
        Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
    }

    @GetMapping(value = "/rpc")
    public void rpc() {
        Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
        System.out.println("rabbit rpc response message: " + responseMsg);
    }

}
SpringBoot RabbitMQ 简单模式
  • 生产者声明队列,并向队列发送消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitSimpleProvider {

    @Bean
    public Queue simpleQueue() {
        return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME);
    }

}
  • 消费者监听队列,并消费消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitSimpleConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME)
    public void simpleListener(String context) {
        System.out.println("rabbit receiver: " + context);
    }

}
  • 单元测试
@Test
public void simple() {
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
}
  • 响应结果
SpringBoot RabbitMQ Work 模式
  • 生产者声明队列,并向队列生产消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitWorkProvider {

    @Bean
    public Queue workQueue() {
        return new Queue(RabbitConstant.WORK_QUEUE_NAME);
    }

}
  • 消费者监听队列,并消费消息(这里有两个消费者监听同一队列)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitWorkConsumer {

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerFirst(String context) {
        System.out.println("rabbit workQueue listener first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerSecond(String context) {
        System.out.println("rabbit workQueue listener second receiver: " + context);
    }

}
  • 单元测试
@Test
public void work() {
    rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
}
  • 响应结果(由于有两个消费者监听同一队列,消息只能被其中一者进行消费,默认是负载均衡的将消息发送给所有消费者)
SpringBoot RabbitMQ 发布/订阅模式
  • 生产者声明两个队列和一个 fanout 交换机,并将这两个队列和交换机进行绑定
  • 交换机种类一共有四种 fanout、direct、topic、header(文末介绍)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitPublishSubscribeProvider {

    @Bean
    public Queue pubsubQueueFirst() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue pubsubQueueSecond() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 创建fanout类型交换机,表示与此交换机会将消息发送给所有绑定的队列
        return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME);
    }

    @Bean
    public Binding pubsubQueueFirstBindFanoutExchange() {
        // 队列一绑定交换机
        return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange());
    }

    @Bean
    public Binding pubsubQueueSecondBindFanoutExchange() {
        // 队列二绑定交换机
        return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange());
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitPublishSubscribeConsumer {

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueFirst(String context) {
        System.out.println("rabbit pubsub queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueSecond(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

}
  • 单元测试
@Test
public void pubsub() {
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
}
  • 响应结果
SpringBoot RabbitMQ 路由模式
  • 生产者声明三个队列和一个 direct 交换机,将这三个队列和交换机进行绑定并设定交换机与队列之间的路由
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRoutingProvider {

    @Bean
    public Queue rabbitRoutingFirstQueue() {
        return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingSecondQueue() {
        return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingThirdQueue() {
        return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
    }

    @Bean
    public DirectExchange directExchange() {
        // 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
        return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
    }

    @Bean
    public Binding routingFirstQueueBindDirectExchange() {
        // 队列一绑定direct交换机,并设置 routing_key 为 routing_first_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingSecondQueueBindDirectExchange() {
        // 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingThirdQueueBindDirectExchange() {
        // 队列三绑定direct交换机,并设置 routing_key 为 routing_third_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRoutingConsumer {

    @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void routingFirstQueueListener(String context) {
        System.out.println("rabbit routing queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void routingSecondQueueListener(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void routingThirdQueueListener(String context) {
        System.out.println("rabbit pubsub queue third receiver: " + context);
    }

}
  • 单元测试
@Test
public void routing() {
    // 给第一个队列发送消息
    rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
}
  • 响应结果
SpringBoot RabbitMQ 主题模式
  • 生产者声明三个队列和一个 topic 交换机,队列分别与 topic 交换机绑定并设置 routing key 统配符,若 routing key 满足交换机与队列间通配符要求则将消息存储至队列
  • # 通配符可以匹配一个或多个单词,* 通配符可以匹配一个单词;假如交换机(Exchange)与队列之间的 routing key 通配符为 #.hello.#,则代表 routing key 中间带有 hello 单词的都满足条件,消息将存储至队列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicProvider {

    @Bean
    public Queue topicFirstQueue() {
        return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue topicSecondQueue() {
        return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue topicThirdQueue() {
        return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange() {
        // 创建topic类型交换机,表示与此交换机会将消息发送给 routing_key 通配符匹配成功的队列
        return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME);
    }

    @Bean
    public Binding topicFirstQueueBindExchange() {
        // 队列一绑定topic类型交换机,并设置 routing_key 通配符为 #.first.#
        return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD);
    }

    @Bean
    public Binding topicSecondQueueBindExchange() {
        // 队列二绑定topic类型交换机,并设置 routing_key 通配符为 *.second.#
        return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD);
    }

    @Bean
    public Binding topicThirdQueueBindExchange() {
        // 队列三绑定topic类型交换机,并设置 routing_key 通配符为 *.third.*
        return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitTopicsConsumer {

    @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void topicFirstQueue(String context) {
        System.out.println("rabbit topics queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void topicSecondQueue(String context) {
        System.out.println("rabbit topics queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void topicThirdQueue(String context) {
        System.out.println("rabbit topics queue third receiver: " + context);
    }

}
  • 单元测试
@Test
public void topics() {
    // 给第一个队列发送消息,此时队列能接受到消息,因为队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
    // 给第二个队列发送消息,此时队列也能接受到消息,因为队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
    // 给第三个队列发送消息,此时队列无法接受到消息,因为队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }
  • 响应结果
SpringBoot RabbitMQ header模式
  • 生产者声明队列并创建 HeaderExchange 交换机,将队列分别与交换机通过 header 进行绑定
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitHeaderProvider {

    @Bean
    public Queue headerFirstQueue() {
        return new Queue(RabbitConstant.HEADER_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue headerSecondQueue() {
        return new Queue(RabbitConstant.HEADER_SECOND_QUEUE_NAME);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(RabbitConstant.HEADER_EXCHANGE_NAME);
    }

    @Bean
    public Binding headerFirstQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "YES");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerFirstQueue()).to(headersExchange()).whereAll(headersMap).match();
    }

    @Bean
    public Binding headerSecondQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "NO");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerSecondQueue()).to(headersExchange()).whereAny(headersMap).match();
    }

}
  • 交换机与队列绑定详情
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitHeaderConsumer {

    @RabbitListener(queues = RabbitConstant.HEADER_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void headerFirstQueue(String context) {
        System.out.println("rabbit header queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.HEADER_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void headerSecondQueue(String context) {
        System.out.println("rabbit header queue second receiver: " + context);
    }

}
  • 单元测试
@Test
public void header() {
    // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("matchAll", "YES");
    messageProperties.setHeader("hello", "world");
    Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

    // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功
    MessageProperties messagePropertiesSecond = new MessageProperties();
    messagePropertiesSecond.setHeader("matchAll", "NO");
    Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
}
  • 响应结果
SpringBoot RabbitMQ RPC模式
  • 生产者声明队列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRpcProvider {

    @Bean
    public Queue rpcQueue() {
        return new Queue(RabbitConstant.RPC_QUEUE_NAME);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRpcConsumer {

    @RabbitListener(queues = RabbitConstant.RPC_QUEUE_NAME)
    @RabbitHandler
    public String rpcQueue(String context) {
        System.out.println("rabbit rpc queue receiver: " + context);
        return "copy that!";
    }

}
  • 单元测试
@Test
public void rpc() {
    Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
    System.out.println("rabbit rpc response message: " + responseMsg);
}
  • 响应结果

Exchange 交换机

  • fanout 交换机:通过fanout交换机发送消息,则与fanout交换机绑定的所有队列都能接收到该消息
  • direct 交换机:通过direct交换机发送消息,则与direct交换机绑定队列中routing key完全一致的队列能接收到消息
  • topic 交换机:通过topic交换机发送消息,则与topic交换机绑定队列中routing key经过通配符匹配成功的队列能接收到消息

文章还有很多不足之处,欢迎各位兄弟姐妹批评指正,代码仓库已存放至gitee [SpringBoot RabbitMQ 工作模式仓库链接](https://gitee.com/BarryMan/spring-boot-rabbitmq.git)

作者:HanlinBubble

原文链接:https://www.jianshu.com/p/a26742325c0a

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐