阅读 144

rabbitmq 基础使用

yaml 配置

spring:    rabbitmq:       host: 127.0.0.1       port: 5672       username: admin       password: admin       virtual-host: /dev       publisher-confirm-type: correlated # 发送者开启 confirm 确认机制       publisher-returns: true # 发送者开启 return 确认机制       template:         mandatory: true       listener:         simple:           acknowledge-mode: manual # 开启手动Ack 复制代码

配置类

@Configuration public class RabbitMQConfig {     @Autowired     private ConfirmCallbackService confirmCallbackService;     @Autowired     private ReturnCallbackService returnCallbackService;     @Autowired     RabbitAdmin rabbitAdmin;     // 创建交换机     @Bean     TopicExchange exchange() {         return new TopicExchange(RabbitMQConst.EXCHANGE_NAME);     }     // 创建队列     @Bean     public Queue queue() {         return new Queue(RabbitMQConst.QUEUE_NAME);     }               // 创建延迟交换机     @Bean     TopicExchange exchange() {         TopicExchange topicExchange =               new TopicExchange(DELAY_EXCHANGE_NAME, true, false, null);         // 设置为延迟队列         topicExchange.setDelayed(true);         return topicExchange;     }     /**      * 绑定交换机和队列      *      * @return      */     @Bean     Binding bindingStrategyHiQueue() {         return BindingBuilder             .bind(queue())             .to(exchange())             .with(RabbitMQConst.ROUTING_KEY_NAME);     }     // 定义消息模板用于发布消息,并且设置其消息转换器     @Bean     RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {         final RabbitTemplate rabbitTemplate =              new RabbitTemplate(connectionFactory);         /**          * 确保消息发送失败后可以重新返回到队列中          * 注意:yml需要配置 publisher-returns: true          */         rabbitTemplate.setMandatory(true);         /**          * 消费者确认收到消息后,手动ack回执回调处理          */         rabbitTemplate.setConfirmCallback(confirmCallbackService);         /**          * 消息投递到队列失败回调处理          */         rabbitTemplate.setReturnCallback(returnCallbackService);         return rabbitTemplate;     }     /**      * 创建初始化RabbitAdmin对象      *      * @param connectionFactory      * @return      */     @Bean     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);         // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类         rabbitAdmin.setAutoStartup(true);         return rabbitAdmin;     }     /**      * 创建交换机和队列      */     @Bean     public void createExchangeQueue() {         // 策略交换机+队列         rabbitAdmin.declareExchange(exchange());     } } @Slf4j @Component public class ConfirmCallbackService      implements RabbitTemplate.ConfirmCallback {     @Override     public void confirm         (CorrelationData correlationData, boolean ack, String cause) {                  if (ack) {             log.debug("消息发送成功!");         } else {             log.error("[消息发送异常] correlationData={} , cause={}"                 , correlationData, cause);         }     } } 复制代码

消息发送者

@Service public class RabbitMQProducerService {     @Autowired     private RabbitTemplate rabbitTemplate;     /**      * 发送普通消息      *      * @param exchange   交换机      * @param routingKey 路由键      * @param content    消息内容      */     public void sendMsg(String exchange, String routingKey, String content) {         rabbitTemplate.convertAndSend(exchange, routingKey, content);     }               /**      * 发送延迟消息      *      * @param content    内容      * @param delay      延时时间,秒 (视频的录制时间)      */     public void sendGetVideoScoreMsg(String content,int delay){         rabbitTemplate.setConfirmCallback(confirmCallback);         rabbitTemplate.convertAndSend(             RabbitMQConfig.EXCHANGE_NAME,             RabbitMQConfig.ROUTING_KEY_NAME,             content,              message ->             {                 message                     .getMessageProperties()                     .setDeliveryMode(MessageDeliveryMode.PERSISTENT);                 // 毫秒为单位,指定此消息的延时时长                 message.getMessageProperties().setDelay(delay * 1000);                 return message;             }          );     } } 复制代码

消息消费者

/**  * 延迟队列消费者  */ @RabbitListener(     bindings = @QueueBinding(             value = @Queue(value = RabbitMQConfig.QUEUE_NAME, durable = "true"),             exchange = @Exchange(value = RabbitMQConfig.EXCHANGE_NAME,                                   type = "x-delayed-message"),             key = {RabbitMQConfig.ROUTING_KEY_NAME}     ),     concurrency = "10" ) @Transactional(rollbackFor = Exception.class) public void updateRecordScore(Message message, Channel channel) throws IOException{     //      long deliverTag = message.getMessageProperties().getDeliveryTag();          try{         String channelId = new String(message.getBody());         if (!StringUtils.isEmpty(channelId)){             voiceVideoRecordRepository.updateRecordScore                 (channelId,scoreRecordService.getAddScoreByChannelId(channelId).get());         }                  // 手动ack         channel.basicAck(deliverTag,false);     }catch (Exception e){         log.error(" get video score failed " ,e);         channel.basicReject(deliverTag,false);     } } // 普通队列 @RabbitListener(     bindings = @QueueBinding(             value = @Queue(value = RabbitMQConst.QUEUE_NAME, durable = "true"),             exchange = @Exchange(value = RabbitMQConst.EXCHANGE_NAME,                                   type = "topic"),             key = {RabbitMQConst.ROUTING_KEY_NAME}     ),     // 多线程去消费     concurrency = "10" ) @Transactional(rollbackFor = Exception.class) public void consumeMessage(Message message, Channel channel) throws IOException {     long deliveryTag = message.getMessageProperties().getDeliveryTag();     try {         Person person = JSON.parseObject(new String(message.getBody()), Person.class);                  // 手动ack         channel.basicAck(deliveryTag, false);     } catch (Exception e) {         log.error(" save room duration failed ", e);         // 手动ack         channel.basicReject(deliveryTag, false);     } }


作者:懒_羊羊
链接:https://juejin.cn/post/7025888556753256478


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐