rabbitmq 基础使用
yaml 配置
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin virtual-host: /dev publisher-confirm-type: correlated # 发送者开启 confirm 确认机制 publisher-returns: true # 发送者开启 return 确认机制 template: mandatory: true listener: simple: acknowledge-mode: manual # 开启手动Ack 复制代码
配置类
@Configuration public class RabbitMQConfig { @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; @Autowired RabbitAdmin rabbitAdmin; // 创建交换机 @Bean TopicExchange exchange() { return new TopicExchange(RabbitMQConst.EXCHANGE_NAME); } // 创建队列 @Bean public Queue queue() { return new Queue(RabbitMQConst.QUEUE_NAME); } // 创建延迟交换机 @Bean TopicExchange exchange() { TopicExchange topicExchange = new TopicExchange(DELAY_EXCHANGE_NAME, true, false, null); // 设置为延迟队列 topicExchange.setDelayed(true); return topicExchange; } /** * 绑定交换机和队列 * * @return */ @Bean Binding bindingStrategyHiQueue() { return BindingBuilder .bind(queue()) .to(exchange()) .with(RabbitMQConst.ROUTING_KEY_NAME); } // 定义消息模板用于发布消息,并且设置其消息转换器 @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); /** * 确保消息发送失败后可以重新返回到队列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); return rabbitTemplate; } /** * 创建初始化RabbitAdmin对象 * * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 创建交换机和队列 */ @Bean public void createExchangeQueue() { // 策略交换机+队列 rabbitAdmin.declareExchange(exchange()); } } @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.debug("消息发送成功!"); } else { log.error("[消息发送异常] correlationData={} , cause={}" , correlationData, cause); } } } 复制代码
消息发送者
@Service public class RabbitMQProducerService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送普通消息 * * @param exchange 交换机 * @param routingKey 路由键 * @param content 消息内容 */ public void sendMsg(String exchange, String routingKey, String content) { rabbitTemplate.convertAndSend(exchange, routingKey, content); } /** * 发送延迟消息 * * @param content 内容 * @param delay 延时时间,秒 (视频的录制时间) */ public void sendGetVideoScoreMsg(String content,int delay){ rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.convertAndSend( RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY_NAME, content, message -> { message .getMessageProperties() .setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 毫秒为单位,指定此消息的延时时长 message.getMessageProperties().setDelay(delay * 1000); return message; } ); } } 复制代码
消息消费者
/** * 延迟队列消费者 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = RabbitMQConfig.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = RabbitMQConfig.EXCHANGE_NAME, type = "x-delayed-message"), key = {RabbitMQConfig.ROUTING_KEY_NAME} ), concurrency = "10" ) @Transactional(rollbackFor = Exception.class) public void updateRecordScore(Message message, Channel channel) throws IOException{ // long deliverTag = message.getMessageProperties().getDeliveryTag(); try{ String channelId = new String(message.getBody()); if (!StringUtils.isEmpty(channelId)){ voiceVideoRecordRepository.updateRecordScore (channelId,scoreRecordService.getAddScoreByChannelId(channelId).get()); } // 手动ack channel.basicAck(deliverTag,false); }catch (Exception e){ log.error(" get video score failed " ,e); channel.basicReject(deliverTag,false); } } // 普通队列 @RabbitListener( bindings = @QueueBinding( value = @Queue(value = RabbitMQConst.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = RabbitMQConst.EXCHANGE_NAME, type = "topic"), key = {RabbitMQConst.ROUTING_KEY_NAME} ), // 多线程去消费 concurrency = "10" ) @Transactional(rollbackFor = Exception.class) public void consumeMessage(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Person person = JSON.parseObject(new String(message.getBody()), Person.class); // 手动ack channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error(" save room duration failed ", e); // 手动ack channel.basicReject(deliveryTag, false); } }
作者:懒_羊羊
链接:https://juejin.cn/post/7025888556753256478