,目前国内的公司里消息中间件(即MQ)这块用的最多 Kafka 和 RocketMQ了。
本文介绍一下 RocketMQ 与 SpringBoot 整合,该示例比网上那些简单例子会更加清晰明了,同时也更适合在公司的代码里书写。
完整代码地址:https://github.com/saysky/rocketmq-demo
关于 RocketMQ总结可以这篇文章
一、安装 RocketMQ
1、从官网下载RocketMQ代码
http://rocketmq.apache.org/docs/quick-start
自行下载最新版或者历史版本,我这里因为是好早之前下载的,所以还是 4.4,现在已经 4.7 了
2、启动 NameServer 和 Broker
可用根据官方页面的提示分别启动 NameServer 和 Broker
因为不同系统(Windows或Linux、Mac) 安装方式和启动方式都不太同
这里主要介绍一下我的 Mac 的操作
启动 NameServer
cd /Users/liuyanzhao/Documents/JavaStudy/rocketmq-all-4.4.0
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/namesrv.log
如果没有报错就说明启动成功了,可以通过 jps 命令查看是否有这两个 java 进程
二、安装 RockerMQ Console
rocketmq console 是一个用于可视化 RocketMQ 消息 的 springboot 项目,单纯就是可视化的操作,比如查询消息,添加 Topic 等。
界面如下
1、下载 RocketMQ Console
地址如下 https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
2、修改 rocketmq-console 里的 application.properties 配置文件
可以把端口号改成 9999 或其他不常用的端口号,默认好像是 8080
如果里面的 nameserver 地址不一样也要改,一般按默认就OK
3、启动 RocketMQ Console
可以直接用 IDEA 打开 rocketmq-console 项目进行启动
但是对于博主而言,需要打开一个额外 IDEA 窗口我是不愿意的
所以我进行 mvn clean install 打包,然后运行 target 里的 jar 文件
具体怎么运行,按照自己喜欢的来,不启动也OK
三、生产者代码
代码结构如下
1、pom.xml
我这里使用的 4.4 版本,大家可以改成自己的版本
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.liuyanzhao</groupId>
<artifactId>rocketmq-producer-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-producer-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、application.yml
server:
port: 8080
rocketmq:
producer:
namesrvAddr: 127.0.0.1:9876
group: PID_BLOG_PROJECT
3、DefaultProducerConfig
因为我们没有使用 starter 版本的 rocketmq 依赖,所以需要自己写配置文件里配置默认的Producer
package com.liuyanzhao.rocketmq.producer.demo.config;
import com.liuyanzhao.rocketmq.producer.demo.properties.MqProducerProperties;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @author 言曌
* @date 2020/9/17 2:04 下午
*/
@Configuration
public class DefaultProducerConfig {
private static final Logger log = LoggerFactory.getLogger(DefaultProducerConfig.class);
@Autowired
private MqProducerProperties propertiesProperties;
/**
* 创建普通消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
@Primary
public DefaultMQProducer defaultProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(propertiesProperties.getGroup());
producer.setNamesrvAddr(propertiesProperties.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("default producer 创建成功, {}, {}", propertiesProperties.getNamesrvAddr(), propertiesProperties.getGroup());
return producer;
}
}
4、MqProducerProperties
读取 application.yml 配置里属性类
package com.liuyanzhao.rocketmq.producer.demo.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author 言曌
* @date 2020/9/17 2:51 下午
*/
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MqProducerProperties {
private String namesrvAddr;
private String group;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
}
5、发送消息接口
package com.liuyanzhao.rocketmq.producer.demo.mq.service;
import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;
/**
* MQ生产者发送消息服务
*
* @author 言曌
* @date 2020/9/14 8:21 下午
*/
public interface MqProducerService {
/**
* 发送消息
*
* @param mqMessageDTO
* @return
*/
boolean sendMessage(MqMessageDTO mqMessageDTO);
}
6、发送消息接口实现
package com.liuyanzhao.rocketmq.producer.demo.mq.service.impl;
import com.alibaba.fastjson.JSON;
import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;
import com.liuyanzhao.rocketmq.producer.demo.mq.service.MqProducerService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author 言曌
* @date 2020/9/14 8:24 下午
*/
@Service
public class MqProducerServiceImpl implements MqProducerService {
private static final Logger log = LoggerFactory.getLogger(MqProducerServiceImpl.class);
@Autowired
private DefaultMQProducer defaultMQProducer;
@Override
public boolean sendMessage(MqMessageDTO mqMessageDTO) {
log.info("开始发送消息, mqMessageDTO:{}", JSON.toJSONString(mqMessageDTO));
SendResult sendResult;
try {
Message message = new Message(mqMessageDTO.getTopic(), mqMessageDTO.getTag(), mqMessageDTO.getKey(), mqMessageDTO.getContent());
sendResult = defaultMQProducer.send(message);
} catch (Exception e) {
log.error("消息发送失败, mqMessageDTO={}, cause:{}", JSON.toJSONString(mqMessageDTO), e);
return false;
}
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("发送成功, sendResult:{}", JSON.toJSONString(sendResult));
mqMessageDTO.setMsgId(sendResult.getMsgId());
return true;
}
return false;
}
}
7、发送消息封装对象
package com.liuyanzhao.rocketmq.producer.demo.dto;
import java.io.Serializable;
/**
* @author 言曌
* @date 2020/9/14 8:22 下午
*/
public class MqMessageDTO implements Serializable {
/**
* 消息ID
*/
private String msgId;
/**
* 主题
*/
private String topic;
/**
* 消息标签体
*/
private String tag;
/**
* 消息业务主键
*/
private String key;
/**
* 消息体
*/
private byte[] content;
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public MqMessageDTO() {
}
public MqMessageDTO(String msgId, String topic, String tag, String key, byte[] content) {
this.msgId = msgId;
this.topic = topic;
this.tag = tag;
this.key = key;
this.content = content;
}
}
8、调用发布消息
UserServiceImpl
package com.liuyanzhao.rocketmq.producer.demo.service.impl;
import com.alibaba.fastjson.JSON;
import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;
import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;
import com.liuyanzhao.rocketmq.producer.demo.mq.service.MqProducerService;
import com.liuyanzhao.rocketmq.producer.demo.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author 言曌
* @date 2020/9/15 3:13 下午
*/
@Service
public class UserServiceImpl implements UserService {
private static final Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
private MqProducerService mqProducerService;
@Override
public void addUser(UserDTO user) {
// 1.模拟业务操作
// ....
// 2.向关联系统发送消息
MqMessageDTO mqMessageDTO = new MqMessageDTO();
mqMessageDTO.setTopic("T_DEMO_PROJECT");
mqMessageDTO.setTag("ADD_USER");
mqMessageDTO.setContent(JSON.toJSONString(user).getBytes());
boolean success = mqProducerService.sendMessage(mqMessageDTO);
if (success) {
log.info("消息发送成功, UserServiceImpl.addUser");
} else {
log.info("消息发送失败");
// TODO 记录错误记录
}
}
}
UserService
package com.liuyanzhao.rocketmq.producer.demo.service;
import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;
/**
* @author 言曌
* @date 2020/9/15 3:11 下午
*/
public interface UserService {
/**
* 添加用户
*
* @param user
*/
void addUser(UserDTO user);
}
UserDTO
package com.liuyanzhao.rocketmq.producer.demo.dto;
import java.io.Serializable;
/**
* @author 言曌
* @date 2020/9/15 3:12 下午
*/
public class UserDTO implements Serializable {
private String id;
private String username;
private String password;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public UserDTO() {
}
public UserDTO(String id, String username, String password) {
this.id = id;
this.username = username;
this.password = password;
}
}
HelloController
package com.liuyanzhao.rocketmq.producer.demo.controller;
import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;
import com.liuyanzhao.rocketmq.producer.demo.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* 测试发生消息控制器
*
* @author 言曌
* @date 2020/9/14 8:47 下午
*/
@RestController
public class HelloController {
@Autowired
private UserService userService;
@GetMapping("/testAddUser")
public String testAddUser() {
// 模拟发送数据
UserDTO user = new UserDTO(UUID.randomUUID().toString().replace("-", ""), "张三", "123456");
userService.addUser(user);
return "success";
}
}
访问 localhost:8080/testAddUser 会往 broker 发一条信息
四、消费者代码
代码结构如下
1、pom.xml
和生产者的一样
2、application.yml
server:
port: 8081
rocketmq:
consumer:
namesrvAddr: 127.0.0.1:9876
group: CID_BLOG_PROJECT
3、属性类
package com.example.rocketmq.consumer.demo.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author 言曌
* @date 2020/9/17 2:51 下午
*/
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MqConsumerProperties {
private String namesrvAddr;
private String group;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
}
4、抽象消费消息监听类
package com.example.rocketmq.consumer.demo.mq.listener;
import com.example.rocketmq.consumer.demo.properties.MqConsumerProperties;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author 言曌
* @date 2020/9/15 7:15 下午
*/
@Configuration
public abstract class AbstractConsumerListener {
private static final Logger log = LoggerFactory.getLogger(AbstractConsumerListener.class);
@Autowired
private MqConsumerProperties consumerProperties;
/**
* 开启消费注册
*
* @param topic
* @param tags 支持多个tag, 如 tag1 || tag2 || tag3
* @throws MQClientException
*/
public void listener(String topic, String tags) throws MQClientException {
log.info("开启" + topic + ":" + tags + "消费者");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerProperties.getGroup());
consumer.setNamesrvAddr(consumerProperties.getNamesrvAddr());
consumer.subscribe(topic, tags);
// 开启内部类实现监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return AbstractConsumerListener.this.onMessage(msgs);
}
});
consumer.start();
}
/**
* 处理body的业务
*
* @param msgs
* @return
*/
public abstract ConsumeConcurrentlyStatus onMessage(List<MessageExt> msgs);
}
5、消费消息监听者实现类
package com.example.rocketmq.consumer.demo.mq.listener;
import com.example.rocketmq.consumer.demo.enums.MqConsumerBeanEnum;
import com.example.rocketmq.consumer.demo.mq.service.MqConsumerService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
/**
* @author 言曌
* @date 2020/9/15 7:22 下午
*/
@Configuration
public class DefaultConsumerListener extends AbstractConsumerListener implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
private static final Logger log = LoggerFactory.getLogger(DefaultConsumerListener.class);
/**
* 上下文
*/
private ApplicationContext applicationContext;
@Override
public ConsumeConcurrentlyStatus onMessage(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
String topicTag = msg.getTopic() + ":" + msg.getTags();
MqConsumerBeanEnum mqConsumerBeanEnum = MqConsumerBeanEnum.getBeanByTopicTag(topicTag);
if (mqConsumerBeanEnum == null) {
break;
}
ConsumeConcurrentlyStatus consumerStatus = null;
try {
Object serviceBean = applicationContext.getBean(mqConsumerBeanEnum.getBeanName());
if (serviceBean == null) {
break;
}
if (serviceBean instanceof MqConsumerService) {
String message = new String(msg.getBody(), "utf-8");
MqConsumerService consumerService = (MqConsumerService) serviceBean;
// 预处理
consumerService.beforeHandler(message);
// 处理
consumerStatus = consumerService.handle(message);
// 处理之后
consumerService.afterHandler(message, new Date(), consumerStatus);
}
} catch (UnsupportedEncodingException e) {
log.error("body转字符串解析失败");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
// 订阅同一个topic下的多个tag
super.listener("T_DEMO_PROJECT", "ADD_USER || ADD_ORDER");
} catch (MQClientException e) {
log.error("consumer error");
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
6、消费服务接口
package com.example.rocketmq.consumer.demo.mq.service;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import java.util.Date;
/**
* @author 言曌
* @date 2020/9/16 6:22 下午
*/
public interface MqConsumerService {
/**
* 前置
*
* @param message
* @return
*/
ConsumeConcurrentlyStatus beforeHandler(String message);
/**
* 消息处理
*
* @param message
* @return
*/
ConsumeConcurrentlyStatus handle(String message);
/**
* 后置
*
* @param message
* @param startHandlerTime
* @param status
*/
void afterHandler(String message, Date startHandlerTime, ConsumeConcurrentlyStatus status);
}
7、消费服务接口抽象实现类
package com.example.rocketmq.consumer.demo.mq.service.impl;
import com.example.rocketmq.consumer.demo.mq.service.MqConsumerService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* @author 言曌
* @date 2020/9/17 5:01 下午
*/
@Service("mqConsumerService")
public class AbstractMqConsumerServiceImpl implements MqConsumerService {
private static final Logger log = LoggerFactory.getLogger(AbstractMqConsumerServiceImpl.class);
@Override
public ConsumeConcurrentlyStatus beforeHandler(String message) {
log.info("mq消息前置处理: message={}", message);
return null;
}
@Override
public ConsumeConcurrentlyStatus handle(String message) {
log.info("mq消息处理业务逻辑");
return null;
}
@Override
public void afterHandler(String message, Date startHandlerTime, ConsumeConcurrentlyStatus status) {
log.info("mq消息后置处理, status", status);
}
}
8、消费消息业务处理类
package com.example.rocketmq.consumer.demo.handler;
import com.alibaba.fastjson.JSON;
import com.example.rocketmq.consumer.demo.dto.UserDTO;
import com.example.rocketmq.consumer.demo.mq.service.impl.AbstractMqConsumerServiceImpl;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* 处理MQ消息-添加用户
*
* @author 言曌
* @date 2020/9/15 11:33 下午
*/
@Service("addUserHandler")
public class AddUserHandler extends AbstractMqConsumerServiceImpl {
private static final Logger log = LoggerFactory.getLogger(AddUserHandler.class);
@Override
public ConsumeConcurrentlyStatus handle(String message) {
log.info("mq消息处理开始【添加用户】,message={}", message);
try {
UserDTO user = JSON.parseObject(message, UserDTO.class);
// 模拟做业务操作
// ....
System.out.println(user.toString());
} catch (Exception e) {
log.error("handler error:{}", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
UserDTO 同生产者
9、Topic和Tag 与处理类关系枚举
package com.example.rocketmq.consumer.demo.enums;
/**
* 维护topicTag和处理类的关系
*
* @author 言曌
* @date 2020/9/15 11:31 下午
*/
public enum MqConsumerBeanEnum {
ADD_USER("T_DEMO_PROJECT:ADD_USER", "addUserHandler"),
ADD_ORDER("T_DEMO_PROJECT:ADD_ORDER", "addOrderHandler"),
;
/**
* TOPIC:TAG 名称
*/
private String topicTag;
/**
* Handler bean名称
*/
private String beanName;
public String getTopicTag() {
return topicTag;
}
public void setTopicTag(String topicTag) {
this.topicTag = topicTag;
}
public String getBeanName() {
return beanName;
}
public void setBeanName(String beanName) {
this.beanName = beanName;
}
MqConsumerBeanEnum(String topicTag, String beanName) {
this.topicTag = topicTag;
this.beanName = beanName;
}
public static MqConsumerBeanEnum getBeanByTopicTag(String topicTag) {
for (MqConsumerBeanEnum beanEnum : MqConsumerBeanEnum.values()) {
if (beanEnum.getTopicTag().equals(topicTag)) {
return beanEnum;
}
}
return null;
}
}
五、MQ的一些常识
关于 RocketMQ总结可以这篇文章
1、名称解释
集群:即 NameServer 集群,集群里的节点是无状态的,不会互相同步信息,业务系统(生产者或消费者)和RocketMQ服务端(即Broker)需要连接到 NameServer 集群。
实际生产上,集群机器数量通常是2到3个,比如集群名叫 Cluster001, 地址为 10.20.30.31:9876;1020.30.32:9876。
Broker:可以理解为 RocketMQ 的服务端,通常一个云服务器里部署一个Broker 。可以配置单 Master、单Master单Slave、多Master多Slave模式。
实际生产上,我们用单Master单Slave就行, 如 Broker 名称都叫 RMQ_broker1, 地址为 Master 10.20.30.33:10911, Slave 10.20.30.34:10911.
Topic:创建 topic 需要指定集群如 Cluster001 和队列数(默认8,即8个读队列,8个写队列) , 每个队列的 buffer 长度是 “无限”的。如果后期该 Topic 消息量较大,可以扩大队列数,比如改成16读16写。通常为了方便项目管理,一个微服务的项目使用一个 Topic,防止 Topic 太多不好管理。不同的业务逻辑使用不同的 tag 来区分即可。
队列:上面也说了队列,每个Topic有多个队列,分布在不同的 Broker 上(如果是Broker集群)。顺序消息可以放到一个队列上保证消费顺序。
生产者:生产者可以理解为发布MQ消息的业务系统,为了方便在可视化页面管理,我们可以在生产者管理里新增生产者,如 PID_DEMO_PROJECT,然后申请发布里绑定 Topic。代码里发布消息的 defaultProducer 需要使用 PID_DEMO_PROJECT。
消费者:消费者可以理解为接受MQ消息的业务系统,同样,在消费者管理里新增消费者,如 CID_TEST_PROJECT,然后订阅 Topic。代码里 defaultConsumer 使用 CID_TEST_PROJECT,然后订阅指定 topic 和 tags,就可以收到该 topic 下的指定 tags 的消息了。
2、调用http接口和发MQ对比
两个项目需要通信,有两种常见方式,一种是调用 http 接口,一种就是发 MQ 了。
调用 http 接口的有点是可以获得返回值,操作简单方便;缺点就是会阻塞线程,不支持请求堆积,失败不能重试等。
MQ 的好处是异步,不阻塞,失败可以重试,可以堆积消息;缺点就是无法实时获得返回值,如果需要返回,需要对方再回调通知。
3、具体业务场景公式
A系统给B系统发消息,即A系统往 topic 里发,B系统去 topic 里拿
A系统往自己的 Topic (即 T_TOPIC_A )发消息
B系统需要接受消息,就需要在平台设置,让B系统自己的消费者(即 CID_PROJECT_C ) 去订阅 T_TOPIC_A
A系统开始发消息,在代码里指定 Topic= T_TOPIC_A, Tag=test1
B系统监听Topic= T_TOPIC_A, Tag=test1 的消息,并指定处理类为 test1Handler
原则就是每个 tag 对应一个方法处理类
4、关于消费的问题
(1) 如果 B 系统是多实例部署,消费是随机的。同样,如果 C 项目也订阅了那个 topic 和 tag,B和C也是随机消费。
(2) 如果消费消息后,没有返回消费成功,待会儿会重新消费,具体消费多少次,可以配置。
(3) 如果出现重复消费问题,可以通过幂等性解决,什么意思?即你假设会重复消费,即方法会重复调用,通过代码判断来操作。
比如当某个字段为某个值的时候才执行具体的逻辑,否则直接返回结束
后面我有时间会继续在下面补充
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏