阅读 188

RabbitMQ——实战篇1(原生API)

实战篇mu目录

  • 其他文章地址

  • RabbitMQ实战

    • 6.1、生产者

    • 6.2、消费者

    • 5.1、生产者

    • 5.2、消费者

    • 4.1、生产者

    • 4.2、消费者

    • 3.1、生产者

    • 3.2、消费者

    • 2.1、生产者

    • 2.2、消费者

    • 1.1、生成者代码

    • 1.2、消费者代码

    • 1、简单模式 HelloWorld

    • 2、工作队列模式 Work Queue

    • 3、发布订阅模式 Publish/subscribe

    • 4、路由模式 Routing

    • 5、通配符模式 Topic

    • 6、confirm机制


其他文章地址

1、RabbitMQ——单机版安装(3.6.5)
2、RabbitMQ——入门篇
3、RabbitMQ——实战篇1(原生API)
4、RabbitMQ——实战篇2(Spring集成)
5、RabbitMQ——实战篇3(Spring集成高级特性:死信队列,消息丢失,延迟队列)
6、RabbitMQ——实战篇4(SpringBoot集成)

RabbitMQ实战

项目地址:https://gitee.com/zhouzhz/rabbitmq
注意:只要是exchange都需要自己在rabbitmq服务端配置,也就是界面上
依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><optional>true</optional></dependency></dependencies>


公共代码

package com.zhz.utils;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 公共静态变量
 **/public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";}


package com.zhz.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description:公共类
 **/public class RabbitUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();static {connectionFactory.setHost("192.168.0.66");connectionFactory.setPort(5672);connectionFactory.setUsername("zhzmq");connectionFactory.setPassword("zhzmq");connectionFactory.setVirtualHost("/zhztest");}public static Connection getConnection(){Connection conn = null;try {conn = connectionFactory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}}}


1、简单模式 HelloWorld

1.1、生成者代码

package com.zhz.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 生产者
 **/public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);String message="zhz6dsad66";//四个参数//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到//队列名称//额外的设置属性//最后一个参数是要传递的消息字节数组channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes());channel.close();connection.close();System.out.println("======数据发送成功===");}}


1.2、消费者代码

package com.zhz.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class Consumer {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//从MQ服务器中获取数据//创建一个消息消费者//第一个参数:队列名//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法//第三个参数要传入DefaultConsumer的实现类channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Receiver(channel));}}


package com.zhz.helloworld;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 
 **/public class Receiver extends DefaultConsumer {private Channel channel;/**
     *   重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
     */public Receiver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body);System.out.println("消费者接收到的消息:"+message);System.out.println("消息的TagId:"+envelope.getDeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}}


2、工作队列模式 Work Queue

2.1、生产者

package com.zhz.workqueue;import com.google.gson.Gson;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 发送方(服务端/生产者)
 **/public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);for (int i = 1; i <= 100; i++) {Sms sms=new Sms("乘客"+i,"10086"+i,"您的车票已经预定");String json = new Gson().toJson(sms);channel.basicPublish("",RabbitConstant.QUEUE_SMS,null,json.getBytes());}System.out.println("发送数据成功");channel.close();connection.close();}}


2.2、消费者

package com.zhz.workqueue;import lombok.*;import lombok.experimental.Accessors;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 短信实体类
 **/@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)public class Sms {private String name;private String mobile;private String content;}


package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class SmsSend1 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS1-发送短信成功" + json);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}


package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class SmsSend2 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS2-发送短信成功" + json);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}


package com.zhz.workqueue;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/31
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class SmsSend3 {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS3-发送短信成功" + json);try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}});}}


3、发布订阅模式 Publish/subscribe

3.1、生产者

package com.zhz.pubsub;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Scanner;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 生产者
 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Connection connection = RabbitUtils.getConnection();String input = new Scanner(System.in).next();Channel channel = connection.createChannel();//第一个参数交换机名字   其他参数和之前的一样channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());channel.close();connection.close();}}


3.2、消费者

package com.zhz.pubsub;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class BiaDu {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


package com.zhz.pubsub;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


4、路由模式 Routing

4.1、生产者

package com.zhz.routing;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;import java.util.Scanner;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 生产者
 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//第一个参数交换机名字   第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}}


4.2、消费者

package com.zhz.routing;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


package com.zhz.routing;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


5、通配符模式 Topic

5.1、生产者

package com.zhz.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 生产者
 **/public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//第一个参数交换机名字   第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}}


5.2、消费者

package com.zhz.topic;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");   // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


package com.zhz.topic;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/01/30
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}


6、confirm机制

6.1、生产者

package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;/**
 * @author :zhz
 * @date :Created in 2021/02/03
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 生产者=>不需要关闭,因为要监听
 **/public class WeatherBureau {public static void main(String[] args) throws IOException {Map<String, String> area = new LinkedHashMap<>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//开启confirm监听模式channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {//第二个参数代表接收的数据是否为批量接收,一般不用System.out.println("消息已被Broker接收,Tag="+l);}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker拒收,Tag:" + l);}});channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return aReturn) {System.err.println("===========================");System.err.println("Return编码:" + aReturn.getReplyCode() + "-Return描述:" + aReturn.getReplyText());System.err.println("交换机:" + aReturn.getExchange() + "-路由key:" + aReturn.getRoutingKey() );System.err.println("Return主题:" + new String(aReturn.getBody()));System.err.println("===========================");}});Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> map = iterator.next();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,map.getKey(),true,null,map.getValue().getBytes());}}}


6.2、消费者

package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/02/04
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description: 消费者
 **/public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到的天气预报:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}}


package com.zhz.confirm;import com.rabbitmq.client.*;import com.zhz.utils.RabbitConstant;import com.zhz.utils.RabbitUtils;import java.io.IOException;/**
 * @author :zhz
 * @date :Created in 2021/02/05
 * @version: V1.0
 * @slogan: 天下风云出我辈,一入代码岁月催
 * @description:
 **/public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20201127");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到的天气预报:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}}




文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐