阅读 185

消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

1、实现功能

希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

1、发送Topic

2、发送Queue

3、接收Topic

4、接收Queue

2、接口设计

根据功能设计公共调用接口

  1. /**

  2. * 数据分发接口(用于发送、接收消息队列数据)

  3. *

  4. * @author eguid

  5. *

  6. */

  7. public interface MsgDistributeInterface {

  8.  

  9. /**

  10. * 发送到主题

  11. *

  12. * @param topicName -主题

  13. * @param data -数据

  14. * @return

  15. */

  16. public boolean sendTopic(String topicName, byte[] data);

  17.  

  18. /**

  19. * 发送到主题

  20. * @param topicName -主题

  21. * @param data-数据

  22. * @param offset -偏移量

  23. * @param length -长度

  24. * @return

  25. */

  26. boolean sendTopic(String topicName, byte[] data, int offset, int length);

  27.  

  28. /**

  29. * 发送到队列

  30. *

  31. * @param queueName -队列名称

  32. * @param data -数据

  33. * @return

  34. */

  35. public boolean sendQueue(String queueName, byte[] data);

  36.  

  37. /**

  38. * 发送到队列

  39. * @param queueName -队列名称

  40. * @param data -数据

  41. * @param offset

  42. * @param length

  43. * @return

  44. */

  45. public boolean sendQueue(String queueName, byte[] data,int offset, int length);

  46.  

  47. /**

  48. * 接收队列消息

  49. * @param queueName 队列名称

  50. * @param listener

  51. * @throws JMSException

  52. */

  53. void receiveQueue(String queueName, MessageListener listener) throws JMSException;

  54.  

  55. /**

  56. * 订阅主题

  57. * @param topicName -主题名称

  58. * @param listener

  59. * @throws JMSException

  60. */

  61. void receiveTopic(String topicName, MessageListener listener) throws JMSException;

  62. }

3、基于ActiveMQ的接口实现

  1. /**

  2. * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)

  3. *

  4. * @author eguid

  5. *

  6. */

  7. public class ActiveMQImpl implements MsgDistributeInterface {

  8.  

  9. private String userName;

  10. private String password;

  11. private String brokerURL;

  12. private boolean persistentMode;//持久化模式

  13. //连接工厂

  14. ConnectionFactory connectionFactory;

  15. //发送消息的线程

  16. Connection connection;

  17. // 事务管理

  18. Session session;

  19.  

  20. //存放各个线程订阅模式生产者

  21. ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();

  22. //存放各个线程队列模式生产者

  23. ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();

  24.  

  25. public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {

  26. this(userName, password, brokerURL, true);

  27. }

  28.  

  29. public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {

  30. this.userName = userName;

  31. this.password = password;

  32. this.brokerURL = brokerURL;

  33. this.persistentMode=persistentMode;

  34. init();

  35. }

  36.  

  37. public void init() throws JMSException {

  38. try {

  39. // 创建一个链接工厂

  40. connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);

  41. // 从工厂中创建一个链接

  42. connection = connectionFactory.createConnection();

  43. // 开启链接

  44. connection.start();

  45. // 创建一个事务(订阅模式,事务采用自动确认方式)

  46. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  47. } catch (JMSException e) {

  48. throw e;

  49. }

  50. }

  51.  

  52. @Override

  53. public boolean sendTopic(String topicName, byte[] data) {

  54. return sendTopic(topicName, data, 0, data.length);

  55. }

  56.  

  57. @Override

  58. public boolean sendTopic(String topicName, byte[] data, int offset, int length) {

  59. return send(true, topicName, data, offset, length);

  60. }

  61.  

  62. @Override

  63. public boolean sendQueue(String queueName, byte[] data) {

  64. return sendQueue(queueName, data, 0, data.length);

  65. }

  66.  

  67. @Override

  68. public boolean sendQueue(String queueName, byte[] data, int offset, int length) {

  69. return send(false, queueName, data, offset, length);

  70. }

  71.  

  72. /**

  73. * 发送数据

  74. *

  75. * @param name

  76. * @param data

  77. * @param offset

  78. * @param length

  79. * @param type

  80. * -类型

  81. * @return

  82. */

  83. private boolean send(boolean type, String name, byte[] data, int offset, int length) {

  84. try {

  85. MessageProducer messageProducer = getMessageProducer(name, type);

  86.  

  87. BytesMessage msg = createBytesMsg(data, offset, length);

  88. System.err.println(Thread.currentThread().getName()+"发送消息");

  89. // 发送消息

  90. messageProducer.send(msg);

  91. } catch (JMSException e) {

  92. return false;

  93. }

  94. return false;

  95. }

  96.  

  97. public void receive(String topicName) throws JMSException {

  98. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

  99. Topic topic =session.createTopic(topicName);

  100. MessageConsumer consumer=session.createConsumer(topic);

  101. consumer.setMessageListener(new MessageListener() {

  102. @Override

  103. public void onMessage(Message message) {

  104. BytesMessage msg=(BytesMessage) message;

  105. System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());

  106. }

  107. });

  108.  

  109. }

  110. /**

  111. * 创建字节数组消息

  112. *

  113. * @param data

  114. * @param offset

  115. * @param length

  116. * @return

  117. * @throws JMSException

  118. */

  119. private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {

  120. BytesMessage msg = session.createBytesMessage();

  121. msg.writeBytes(data, offset, length);

  122. return msg;

  123. }

  124.  

  125. /**

  126. * 创建对象序列化消息

  127. * @param obj

  128. * @return

  129. * @throws JMSException

  130. */

  131. private ObjectMessage createMapMsg(Serializable obj) throws JMSException {

  132. // MapMessage msg = session.createMapMessage();//key-value形式的消息

  133. ObjectMessage msg = session.createObjectMessage(obj);

  134. return msg;

  135. }

  136.  

  137. /**

  138. * 创建字符串消息

  139. * @param text

  140. * @return

  141. * @throws JMSException

  142. */

  143. private TextMessage createTextMsg(String text) throws JMSException {

  144. TextMessage msg = session.createTextMessage(text);

  145. return msg;

  146. }

  147.  

  148.  

  149. /**

  150. * 获取创建者

  151. *

  152. * @param name -名称(主题名称和队列名称)

  153. * @param type -类型(true:topic,false:queue)

  154. * @return

  155. * @throws JMSException

  156. */

  157. private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {

  158. return type?getTopicProducer(name):getQueueProducer(name);

  159. }

  160.  

  161. /**

  162. * 创建或获取队列

  163. * @param queueName

  164. * @return

  165. * @throws JMSException

  166. */

  167. private MessageProducer getQueueProducer(String queueName) throws JMSException {

  168. MessageProducer messageProducer = null;

  169. if ((messageProducer = queueThreadLocal.get()) == null) {

  170. Queue queue = session.createQueue(queueName);

  171. messageProducer = session.createProducer(queue);

  172. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))

  173. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);

  174. queueThreadLocal.set(messageProducer);

  175. }

  176. return messageProducer;

  177. }

  178.  

  179. /**

  180. * 创建或获取主题

  181. * @param topicName

  182. * @return

  183. * @throws JMSException

  184. */

  185. private MessageProducer getTopicProducer(String topicName) throws JMSException {

  186. MessageProducer messageProducer = null;

  187. if ((messageProducer = topicThreadLocal.get()) == null) {

  188. Topic topic = session.createTopic(topicName);

  189. messageProducer = session.createProducer(topic);

  190. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))

  191. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);

  192. topicThreadLocal.set(messageProducer);

  193. }

  194. return messageProducer;

  195. }

  196.  

  197. public String getPassword() {

  198. return password;

  199. }

  200.  

  201. public void setPassword(String password) {

  202. this.password = password;

  203. }

  204.  

  205. @Override

  206. public void receiveQueue(String queueName,MessageListener listener) throws JMSException {

  207. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

  208. Queue topic =session.createQueue(queueName);

  209. MessageConsumer consumer=session.createConsumer(topic);

  210. consumer.setMessageListener(listener);

  211.  

  212. }

  213.  

  214. @Override

  215. public void receiveTopic(String topicName,MessageListener listener) throws JMSException {

  216. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

  217. Topic topic =session.createTopic(topicName);

  218. MessageConsumer consumer=session.createConsumer(topic);

  219. consumer.setMessageListener(listener);

  220. }

4、测试一下Topic和Queue

  1. public static void main(String[] args) throws JMSException{

  2. //如果创建失败会立即抛出异常

  3. MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");

  4. Test testMq = new Test();

  5. try {

  6. Thread.sleep(1000);

  7. } catch (InterruptedException e) {

  8. e.printStackTrace();

  9. }

  10. //Thread 1

  11. new Thread(testMq.new ProductorMq(producter)).start();

  12. //Thread 2

  13. new Thread(testMq.new ProductorMq(producter)).start();

  14. //Thread 3

  15. new Thread(testMq.new ProductorMq(producter)).start();

  16. //Thread 4

  17. new Thread(testMq.new ProductorMq(producter)).start();

  18. //Thread 5

  19. new Thread(testMq.new ProductorMq(producter)).start();

  20. //Thread 6

  21. new Thread(testMq.new ProductorMq(producter)).start();

  22.  

  23. //订阅接收线程Thread 1

  24. new Thread(new Runnable() {

  25. @Override

  26. public void run() {

  27. try {

  28. producter.receiveTopic("eguid-topic",new MessageListener() {

  29. @Override

  30. public void onMessage(Message message) {

  31. BytesMessage msg=(BytesMessage) message;

  32. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());

  33. }

  34. });

  35. } catch (JMSException e) {

  36. // TODO Auto-generated catch block

  37. e.printStackTrace();

  38. }

  39. }

  40. }).start();

  41. //订阅接收线程Thread 2

  42. new Thread(new Runnable() {

  43. @Override

  44. public void run() {

  45. try {

  46. producter.receiveTopic("eguid-topic",new MessageListener() {

  47. @Override

  48. public void onMessage(Message message) {

  49. BytesMessage msg=(BytesMessage) message;

  50. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());

  51. }

  52. });

  53. } catch (JMSException e) {

  54. // TODO Auto-generated catch block

  55. e.printStackTrace();

  56. }

  57. }

  58. }).start();

  59. //队列消息生产线程Thread-1

  60. new Thread(testMq.new QueueProductor(producter)).start();

  61. //队列消息生产线程Thread-2

  62. new Thread(testMq.new QueueProductor(producter)).start();

  63. //队列接收线程Thread 1

  64. new Thread(new Runnable() {

  65. @Override

  66. public void run() {

  67. try {

  68. producter.receiveQueue("eguid-queue",new MessageListener() {

  69. @Override

  70. public void onMessage(Message message) {

  71. BytesMessage msg=(BytesMessage) message;

  72. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());

  73. }

  74. });

  75. } catch (JMSException e) {

  76. // TODO Auto-generated catch block

  77. e.printStackTrace();

  78. }

  79. }

  80. }).start();

  81. //队列接收线程Thread2

  82. new Thread(new Runnable() {

  83. @Override

  84. public void run() {

  85. try {

  86. producter.receiveQueue("eguid-queue",new MessageListener() {

  87. @Override

  88. public void onMessage(Message message) {

  89. BytesMessage msg=(BytesMessage) message;

  90. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());

  91. }

  92. });

  93. } catch (JMSException e) {

  94. // TODO Auto-generated catch block

  95. e.printStackTrace();

  96. }

  97. }

  98. }).start();

  99. }

  100.  

  101. private class ProductorMq implements Runnable{

  102. Jtt809MsgProducter producter;

  103. public ProductorMq(Jtt809MsgProducter producter){

  104. this.producter = producter;

  105. }

  106.  

  107. @Override

  108. public void run() {

  109. while(true){

  110. try {

  111. String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";

  112. producter.sendTopic("eguid-topic",wang.getBytes());

  113.  

  114. Thread.sleep(2000);

  115. } catch (InterruptedException e) {

  116. e.printStackTrace();

  117. }

  118. }

  119. }

  120. }

  121.  

  122. private class QueueProductor implements Runnable{

  123. Jtt809MsgProducter producter;

  124. public QueueProductor(Jtt809MsgProducter producter){

  125. this.producter = producter;

  126. }

  127.  

  128. @Override

  129. public void run() {

  130. while(true){

  131. try {

  132. String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";

  133. producter.sendQueue("eguid-queue",eguid.getBytes());

  134. Thread.sleep(2000);

  135. } catch (InterruptedException e) {

  136. e.printStackTrace();

  137. }

  138. }

  139. }

  140. }

-------------------End--------------------


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐