RocketMQ api 生产者和消费者例子

上文介绍了 RocketMQ 和可视化控制台的安装和启动,本文介绍通过 RocketMQ 官方的 api实现一个简单的生产者和消费者例子,因为是直接使用官方API,所以不需要 springboot 项目内容,直接在 main 方法里调用接口。

一、准备

启动 NameServer 和 broker

nohup sh bin/mqnamesrv >mqnamesrv-log.txt &

nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &

启动控制台

mvn spring-boot:run

创建一个 topic名为 test_quick_topic

RocketMQ api 生产者和消费者例子

创建一个 Maven 项目,添加依赖

  1. <dependency>
  2.     <groupId>com.alibaba</groupId>
  3.     <artifactId>fastjson</artifactId>
  4.     <version>1.2.54</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.rocketmq</groupId>
  8.     <artifactId>rocketmq-client</artifactId>
  9.     <version>4.3.0</version>
  10. </dependency>

二、生产者例子

1.例1:同步投递10条消息

  1. package com.liuyanzhao.rocketmq.demo.test;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.exception.RemotingException;
  8. /**
  9.  * @author 言曌
  10.  * @date 2019-08-03 16:19
  11.  */
  12. public class TestProducer {
  13.     public static final String NAMESRV_ADDR = "127.0.0.1:9876";
  14.     public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
  15.         DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
  16.         producer.setNamesrvAddr(NAMESRV_ADDR);
  17.         producer.start();
  18.         for (int i = 0; i <10; i++) {
  19.             Message message = new Message("test_quick_topic",//主题
  20.                     "tagA"//标签
  21.                     "key" + i, //自定义key,唯一标识
  22.                     ("Hello RocketMQ=" + i).getBytes()); //消息内容实体 (byte[])
  23.             SendResult result = producer.send(message);
  24.             System.out.println("第" + i + "条消息发出,结果:" + result);
  25.         }
  26.         producer.shutdown();
  27.     }
  28. }

控制台输出

RocketMQ api 生产者和消费者例子

 

控制台也可以看到有10条消息

RocketMQ api 生产者和消费者例子

 

例2:异步投递消息、延迟投递消息和自定义投递到指定队列

  1. package com.liuyanzhao.rocketmq.demo.test;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  6. import org.apache.rocketmq.client.producer.SendCallback;
  7. import org.apache.rocketmq.client.producer.SendResult;
  8. import org.apache.rocketmq.common.message.Message;
  9. import org.apache.rocketmq.common.message.MessageQueue;
  10. import org.apache.rocketmq.remoting.exception.RemotingException;
  11. import java.util.List;
  12. /**
  13.  * @author 言曌
  14.  * @date 2019-08-03 16:19
  15.  */
  16. public class TestProducer {
  17.     public static final String NAMESRV_ADDR = "127.0.0.1:9876";
  18.     public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
  19.         DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
  20.         producer.setNamesrvAddr(NAMESRV_ADDR);
  21.         producer.start();
  22.         for (int i = 0; i < 10; i++) {
  23.             //1.创建消息
  24.             Message message = new Message("test_quick_topic",//主题
  25.                     "tagA"//标签
  26.                     "key" + i, //自定义key,唯一标识
  27.                     ("Hello RocketMQ=" + i).getBytes()); //消息内容实体 (byte[])
  28.             //3.延迟投递消息
  29.             if (i == 2) {
  30.                 //延迟10s投递
  31.                 message.setDelayTimeLevel(3);
  32.             }
  33.             //4.自定义投递:将消息发送到指定的队列,如发到第2个队列
  34.             producer.send(message, new MessageQueueSelector() {
  35.                 @Override
  36.                 public MessageQueue select(List<MessageQueue> list, Message message, Object args) {
  37.                     Integer queueNumber = (Integer) args;
  38.                     return list.get(queueNumber);
  39.                 }
  40.             }, 2);  //这里的最后一个参数2就是传到前面匿名方法的 args 的值
  41.             //2.1 同步发生消息
  42.             // SendResult result = producer.send(message);
  43.             // System.out.println("第" + i + "条消息发出,结果:" + result);
  44.             //2.2 异步发送消息
  45.             producer.send(message, new SendCallback() {
  46.                 @Override
  47.                 public void onSuccess(SendResult sendResult) {
  48.                     System.out.println("msgId:" + sendResult.getMsgId() + ", status:" + sendResult.getSendStatus());
  49.                 }
  50.                 @Override
  51.                 public void onException(Throwable throwable) {
  52.                     throwable.printStackTrace();
  53.                     System.out.println("--发送失败--");
  54.                 }
  55.             });
  56.         }
  57.         //producer.shutdown();
  58.     }
  59. }

 

 

二、消费者例子

1.例1:消费上面生产者生产的10条消息

  1. package com.liuyanzhao.rocketmq.demo.test;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import org.apache.rocketmq.remoting.common.RemotingHelper;
  10. import java.util.List;
  11. /**
  12.  * @author 言曌
  13.  * @date 2019-08-04 13:31
  14.  */
  15. public class TestConsumer {
  16.     public static final String NAMESRV_ADDR = "127.0.0.1:9876";
  17.     public static void main(String[] args) throws MQClientException {
  18.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
  19.         consumer.setNamesrvAddr(NAMESRV_ADDR);
  20.         //从最后开始消费
  21.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  22. //        consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息
  23.         consumer.subscribe("test_quick_topic""*"); //消费所有的
  24.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  25.             @Override
  26.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  27.                 MessageExt messageExt = list.get(0);
  28.                 try {
  29.                     String topic = messageExt.getTopic();
  30.                     String tags = messageExt.getTags();
  31.                     String keys = messageExt.getKeys();
  32.                     String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
  33.                     System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody);
  34.                 } catch (Exception e) {
  35.                     e.printStackTrace();
  36.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  37.                 }
  38.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  39.             }
  40.         });
  41.         consumer.start();
  42.         System.out.println("comsumer start");
  43.     }
  44. }

控制台输出

RocketMQ api 生产者和消费者例子

 

2.例2,模拟消费时发生异常

  1. package com.liuyanzhao.rocketmq.demo.test;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import org.apache.rocketmq.remoting.common.RemotingHelper;
  10. import java.util.List;
  11. /**
  12.  * @author 言曌
  13.  * @date 2019-08-04 13:31
  14.  */
  15. public class TestConsumer {
  16.     public static final String NAMESRV_ADDR = "127.0.0.1:9876";
  17.     public static void main(String[] args) throws MQClientException {
  18.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
  19.         consumer.setNamesrvAddr(NAMESRV_ADDR);
  20.         //从最后开始消费
  21.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  22. //        consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息
  23.         consumer.subscribe("test_quick_topic""*"); //消费所有的
  24.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  25.             @Override
  26.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  27.                 MessageExt messageExt = list.get(0);
  28.                 try {
  29.                     String id = messageExt.getMsgId();
  30.                     String topic = messageExt.getTopic();
  31.                     String tags = messageExt.getTags();
  32.                     String keys = messageExt.getKeys();
  33.                     String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
  34.                     //模拟失败
  35.                     if ("key5".equals(keys)) {
  36.                         System.out.println(1 / 0);
  37.                     }
  38.                     System.out.println("topic: " + topic + ",tags: " + tags + ", id:" + id + ",keys: " + keys + ", body: " + msgBody);
  39.                 } catch (Exception e) {
  40.                     //失败后,默认会隔 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 重试
  41.                     e.printStackTrace();
  42.                     int reconsumeTimes = messageExt.getReconsumeTimes();//失败重试次数
  43.                     if (reconsumeTimes == 3) {
  44.                         //记录日志 ...
  45.                         System.out.println("消息ID:" + messageExt.getMsgId() + "失败三次,执行补偿策略");
  46.                         //做补偿处理
  47.                         //TODO
  48.                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  49.                     }
  50.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  51.                 }
  52.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  53.             }
  54.         });
  55.         consumer.start();
  56.         System.out.println("comsumer start");
  57.     }
  58. }

 

然后重新启动生产者生产10条消息,然后启动消费者

消费的顺序是随机的,当消费到 key 为 key5的消息,会抛一个异常,进入 catch 代码块

reconsumeTimes 是重试次数,mq自己记录,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; 表示需要重试

重试机制是每隔  1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 重试一次

这里我们判断当重试第3次时,做补偿处理,记录日志,然后直接返回消费成功

控制台输出

RocketMQ api 生产者和消费者例子

 

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
言曌

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: