RabbitMQ——实战篇1(原生API)
实战篇mu目录
其他文章地址
RabbitMQ实战
6.1、生产者
6.2、消费者
5.1、生产者
5.2、消费者
4.1、生产者
4.2、消费者
3.1、生产者
3.2、消费者
2.1、生产者
2.2、消费者
1.1、生成者代码
1.2、消费者代码
1、简单模式 HelloWorld
2、工作队列模式 Work Queue
3、发布订阅模式 Publish/subscribe
4、路由模式 Routing
5、通配符模式 Topic
6、confirm机制
其他文章地址
1、RabbitMQ——单机版安装(3.6.5)
2、RabbitMQ——入门篇
3、RabbitMQ——实战篇1(原生API)
4、RabbitMQ——实战篇2(Spring集成)
5、RabbitMQ——实战篇3(Spring集成高级特性:死信队列,消息丢失,延迟队列)
6、RabbitMQ——实战篇4(SpringBoot集成)
RabbitMQ实战
项目地址:https://gitee.com/zhouzhz/rabbitmq
注意:只要是exchange都需要自己在rabbitmq服务端配置,也就是界面上
依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><optional>true</optional></dependency></dependencies>
公共代码
package com.zhz.utils;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 公共静态变量 **/public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";}
package com.zhz.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description:公共类 **/public class RabbitUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();static {connectionFactory.setHost("192.168.0.66");connectionFactory.setPort(5672);connectionFactory.setUsername("zhzmq");connectionFactory.setPassword("zhzmq");connectionFactory.setVirtualHost("/zhztest");}public static Connection getConnection(){Connection conn = null;try {conn = connectionFactory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}}}
1、简单模式 HelloWorld
1.1、生成者代码
package com.zhz.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 生产者 **/public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);String message="zhz6dsad66";//四个参数//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到//队列名称//额外的设置属性//最后一个参数是要传递的消息字节数组channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes());channel.close();connection.close();System.out.println("======数据发送成功===");}}
1.2、消费者代码
package com.zhz.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class Consumer {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//从MQ服务器中获取数据//创建一个消息消费者//第一个参数:队列名//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法//第三个参数要传入DefaultConsumer的实现类channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Receiver(channel));}}
package com.zhz.helloworld;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: **/public class Receiver extends DefaultConsumer {private Channel channel;/** * 重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 */public Receiver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body);System.out.println("消费者接收到的消息:"+message);System.out.println("消息的TagId:"+envelope.getDeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}}
2、工作队列模式 Work Queue
2.1、生产者
package com.zhz.workqueue;import com.google.gson.Gson;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 发送方(服务端/生产者) **/public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);for (int i = 1; i <= 100; i++) {Sms sms=new Sms("乘客"+i,"10086"+i,"您的车票已经预定");String json = new Gson().toJson(sms);channel.basicPublish("",RabbitConstant.QUEUE_SMS,null,json.getBytes());}System.out.println("发送数据成功");channel.close();connection.close();}}
2.2、消费者
package com.zhz.workqueue;import lombok.*;import lombok.experimental.Accessors;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 短信实体类 **/@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)public class Sms {private String name;private String mobile;private String content;}
package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class SmsSend1 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS1-发送短信成功" + json);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}
package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class SmsSend2 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS2-发送短信成功" + json);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}
package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/31 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class SmsSend3 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS3-发送短信成功" + json);try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}
3、发布订阅模式 Publish/subscribe
3.1、生产者
package com.zhz.pubsub;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Scanner;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 生产者 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();String input = new Scanner(System.in).next();Channel channel = connection.createChannel();//第一个参数交换机名字 其他参数和之前的一样channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());channel.close();connection.close();}}
3.2、消费者
package com.zhz.pubsub;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class BiaDu {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
package com.zhz.pubsub;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
4、路由模式 Routing
4.1、生产者
package com.zhz.routing;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;import java.util.Scanner;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 生产者 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//第一个参数交换机名字 第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}}
4.2、消费者
package com.zhz.routing;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
package com.zhz.routing;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
5、通配符模式 Topic
5.1、生产者
package com.zhz.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 生产者 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//第一个参数交换机名字 第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}}
5.2、消费者
package com.zhz.topic;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127"); // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
package com.zhz.topic;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/01/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
6、confirm机制
6.1、生产者
package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;/** * @author :zhz * @date :Created in 2021/02/03 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 生产者=>不需要关闭,因为要监听 **/public class WeatherBureau {public static void main(String[] args) throws IOException {Map<String, String> area = new LinkedHashMap<>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//开启confirm监听模式channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {//第二个参数代表接收的数据是否为批量接收,一般不用System.out.println("消息已被Broker接收,Tag="+l);}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker拒收,Tag:" + l);}});channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return aReturn) {System.err.println("===========================");System.err.println("Return编码:" + aReturn.getReplyCode() + "-Return描述:" + aReturn.getReplyText());System.err.println("交换机:" + aReturn.getExchange() + "-路由key:" + aReturn.getRoutingKey() );System.err.println("Return主题:" + new String(aReturn.getBody()));System.err.println("===========================");}});Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> map = iterator.next();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,map.getKey(),true,null,map.getValue().getBytes());}}}
6.2、消费者
package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/02/04 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 消费者 **/public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到的天气预报:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}}
package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/** * @author :zhz * @date :Created in 2021/02/05 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: **/public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20201127");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到的天气预报:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}}