RabbitMQ(3) : 集成 Springboot 项目使用
前言
前两篇文章分别介绍了 RabbitMQ 的搭建与基础使用,哪些基础都是学习 MQ 的必要知识,而想要在项目中用到,则需要集成到我们的 Spring 项目中,本文只介绍 Springboot 如何集成 RabbitMQ,通过这个中间件给其他的微服务发送消息。
一、生产者服务搭建
1.1、建立springboot父子工程。
其中的xiaolei-server 是rabbitmq-producer 的父项目,在其下添加一个 rabbitmq的生产者服务。这样代码比较有结构。而middleware-rabbitmq 是后期对rabbitmq 封装的 SDK,里面封装一些方法和配置,后期需要,这里可以不用管他。
1.2、配置 application.yml 文件
# 服务端口 server: port: 8200 # 配置rabbitmq服务 spring: rabbitmq: username: test password: test virtual-host: test host: 192.168.81.102 port: 5672复制代码
1.3、在项目中导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>复制代码
1.4、定义生产者,发送消息
生产者,我们继续发送上一文中讲的影片的案例。
@RestController @RequestMapping("/firm") public class SendFirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; // 1、定义交换机 private String exchangeName = "exchange_firm"; // 2、定义路由key private String routeKey1 = "爱国.吴京"; private String routeKey2 = "爱国.沈腾"; private String routeKey3 = "动作.吴京"; private String routeKey4 = "喜剧.沈腾"; @PostMapping("/send") public void sendMsg(){ for (int i = 1; i <=40; i++) { // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 if(i%4==0){ rabbitTemplate.convertAndSend(exchangeName,routeKey1,("爱国.吴京,说第"+i+"遍。").getBytes()); }else if(i%4 ==1){ rabbitTemplate.convertAndSend(exchangeName,routeKey2,("爱国.沈腾,说第"+i+"遍。").getBytes()); }else if(i%4 ==2){ rabbitTemplate.convertAndSend(exchangeName,routeKey3,("动作.吴京,说第"+i+"遍。").getBytes()); }else if(i%4 ==3){ rabbitTemplate.convertAndSend(exchangeName,routeKey4,("喜剧.沈腾,说第"+i+"遍。").getBytes()); } System.out.println("发送第"+i); } } }复制代码
1.5 初始化队列和交换机关系
package com.xiaolei.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 主题消费类型配置 * @Author xiaolei * @Date 2021/10/29 11:03 **/ @Configuration public class TopicRabbitConfig { /** * 给队列取名字 * @return */ @Bean public Queue firstQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue ("queue1",true); } @Bean public Queue SecondQueue() { return new Queue ("queue2",true); } @Bean public Queue ThreeQueue() { return new Queue ("queue3",true); } @Bean public Queue FourQueue() { return new Queue ("queue4",true); } /** * 给交换机取名 * @return */ @Bean public TopicExchange topicExchange(){ return new TopicExchange("exchange_firm",true,false); } @Bean public Binding bindingTopic1(){ return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("*.吴京"); } @Bean public Binding bindingTopic2(){ return BindingBuilder.bind(SecondQueue()).to(topicExchange()).with("爱国.吴京"); } @Bean public Binding bindingTopic3(){ return BindingBuilder.bind(ThreeQueue()).to(topicExchange()).with("爱国.*"); } @Bean public Binding bindingTopic4(){ return BindingBuilder.bind(FourQueue()).to(topicExchange()).with("#.沈腾"); } }复制代码
1.6 调用 postman 接口测试
发现此时在rabbitmq 上存在了新的 交换机和topci信息
二、消费者服务搭建
2.1 导入依赖
这过程我们在父项目中已经导入了,所以这里可以省略。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>复制代码
2.2 application.yml 配置
# 服务端口 server: port: 8201 # 配置rabbitmq服务 spring: rabbitmq: username: test password: test virtual-host: test host: 192.168.81.102 port: 5672复制代码
2.3 定义四个消费者
@RestController public class MsgController { @RabbitListener(bindings = @QueueBinding( // 指定队列名字 value = @Queue(value = "queue1",autoDelete = "false"), // 指定交换机的名字 exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC) )) @RabbitHandler public void consumrmsg1(String msg){ System.out.println("吴京粉丝-------------->" + msg); } @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue2",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC))) @RabbitHandler public void consumrmsg2(String msg){ System.out.println("爱国吴京粉丝-------------->" + msg); } @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue3",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC))) @RabbitHandler public void consumrmsg3(String msg){ System.out.println("爱国粉丝-------------->" + msg); } @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue4",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC))) @RabbitHandler public void consumrmsg4(String msg){ System.out.println("沈腾粉丝-------------->" + msg); } }复制代码
打印效果如下:
2.4 细节讲解
消费者类中通过 @RabbitListener 和 @RabbitHandler 注解将一个方法定义为消息监听的方法。
其他几种的类型都差不多,我们只要自己来配置对应的类就好了。目前基础配置已经完成。
三、小结
本文介绍了 RabbitMq 中与 Springboot 的集成,这是属于初级应用,后面的文章中,我们将再考虑其他的问题,研究 RabbitMq的相关特性。
作者:潇雷
链接:https://juejin.cn/post/7025769688302878756