上文介绍了 RocketMQ 和可视化控制台的安装和启动,本文介绍通过 RocketMQ 官方的 api实现一个简单的生产者和消费者例子,因为是直接使用官方API,所以不需要 springboot 项目内容,直接在 main 方法里调用接口。
启动 NameServer 和 broker
控制台输出
控制台也可以看到有10条消息
例2:异步投递消息、延迟投递消息和自定义投递到指定队列
1.例1:消费上面生产者生产的10条消息
控制台输出
2.例2,模拟消费时发生异常
然后重新启动生产者生产10条消息,然后启动消费者
消费的顺序是随机的,当消费到 key 为 key5的消息,会抛一个异常,进入 catch 代码块
reconsumeTimes 是重试次数,mq自己记录,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; 表示需要重试
重试机制是每隔 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 重试一次
这里我们判断当重试第3次时,做补偿处理,记录日志,然后直接返回消费成功
控制台输出
一、准备
启动 NameServer 和 broker
nohup sh bin/mqnamesrv >mqnamesrv-log.txt &
nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &
启动控制台
mvn spring-boot:run
创建一个 topic名为 test_quick_topic
创建一个 Maven 项目,添加依赖
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.54</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.3.0</version>
- </dependency>
二、生产者例子
1.例1:同步投递10条消息
- package com.liuyanzhao.rocketmq.demo.test;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- /**
- * @author 言曌
- * @date 2019-08-03 16:19
- */
- public class TestProducer {
- public static final String NAMESRV_ADDR = "127.0.0.1:9876";
- public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
- DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i <10; i++) {
- Message message = new Message("test_quick_topic",//主题
- "tagA", //标签
- "key" + i, //自定义key,唯一标识
- ("Hello RocketMQ=" + i).getBytes()); //消息内容实体 (byte[])
- SendResult result = producer.send(message);
- System.out.println("第" + i + "条消息发出,结果:" + result);
- }
- producer.shutdown();
- }
- }
控制台输出
控制台也可以看到有10条消息
例2:异步投递消息、延迟投递消息和自定义投递到指定队列
- package com.liuyanzhao.rocketmq.demo.test;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.MessageQueueSelector;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import java.util.List;
- /**
- * @author 言曌
- * @date 2019-08-03 16:19
- */
- public class TestProducer {
- public static final String NAMESRV_ADDR = "127.0.0.1:9876";
- public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
- DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 10; i++) {
- //1.创建消息
- Message message = new Message("test_quick_topic",//主题
- "tagA", //标签
- "key" + i, //自定义key,唯一标识
- ("Hello RocketMQ=" + i).getBytes()); //消息内容实体 (byte[])
- //3.延迟投递消息
- if (i == 2) {
- //延迟10s投递
- message.setDelayTimeLevel(3);
- }
- //4.自定义投递:将消息发送到指定的队列,如发到第2个队列
- producer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> list, Message message, Object args) {
- Integer queueNumber = (Integer) args;
- return list.get(queueNumber);
- }
- }, 2); //这里的最后一个参数2就是传到前面匿名方法的 args 的值
- //2.1 同步发生消息
- // SendResult result = producer.send(message);
- // System.out.println("第" + i + "条消息发出,结果:" + result);
- //2.2 异步发送消息
- producer.send(message, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("msgId:" + sendResult.getMsgId() + ", status:" + sendResult.getSendStatus());
- }
- @Override
- public void onException(Throwable throwable) {
- throwable.printStackTrace();
- System.out.println("--发送失败--");
- }
- });
- }
- //producer.shutdown();
- }
- }
二、消费者例子
1.例1:消费上面生产者生产的10条消息
- package com.liuyanzhao.rocketmq.demo.test;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import java.util.List;
- /**
- * @author 言曌
- * @date 2019-08-04 13:31
- */
- public class TestConsumer {
- public static final String NAMESRV_ADDR = "127.0.0.1:9876";
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- //从最后开始消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- // consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息
- consumer.subscribe("test_quick_topic", "*"); //消费所有的
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- MessageExt messageExt = list.get(0);
- try {
- String topic = messageExt.getTopic();
- String tags = messageExt.getTags();
- String keys = messageExt.getKeys();
- String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
- System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody);
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("comsumer start");
- }
- }
控制台输出
2.例2,模拟消费时发生异常
- package com.liuyanzhao.rocketmq.demo.test;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import java.util.List;
- /**
- * @author 言曌
- * @date 2019-08-04 13:31
- */
- public class TestConsumer {
- public static final String NAMESRV_ADDR = "127.0.0.1:9876";
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- //从最后开始消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- // consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息
- consumer.subscribe("test_quick_topic", "*"); //消费所有的
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- MessageExt messageExt = list.get(0);
- try {
- String id = messageExt.getMsgId();
- String topic = messageExt.getTopic();
- String tags = messageExt.getTags();
- String keys = messageExt.getKeys();
- String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
- //模拟失败
- if ("key5".equals(keys)) {
- System.out.println(1 / 0);
- }
- System.out.println("topic: " + topic + ",tags: " + tags + ", id:" + id + ",keys: " + keys + ", body: " + msgBody);
- } catch (Exception e) {
- //失败后,默认会隔 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 重试
- e.printStackTrace();
- int reconsumeTimes = messageExt.getReconsumeTimes();//失败重试次数
- if (reconsumeTimes == 3) {
- //记录日志 ...
- System.out.println("消息ID:" + messageExt.getMsgId() + "失败三次,执行补偿策略");
- //做补偿处理
- //TODO
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("comsumer start");
- }
- }
然后重新启动生产者生产10条消息,然后启动消费者
消费的顺序是随机的,当消费到 key 为 key5的消息,会抛一个异常,进入 catch 代码块
reconsumeTimes 是重试次数,mq自己记录,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; 表示需要重试
重试机制是每隔 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 重试一次
这里我们判断当重试第3次时,做补偿处理,记录日志,然后直接返回消费成功
控制台输出
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏