SpringBoot2.0 整合 RocketMQ 生产者和消费者示例

avatar 2020年09月17日20:38:56 0 271 views

,目前国内的公司里消息中间件(即MQ)这块用的最多 Kafka 和 RocketMQ了。

本文介绍一下 RocketMQ 与 SpringBoot 整合,该示例比网上那些简单例子会更加清晰明了,同时也更适合在公司的代码里书写。

完整代码地址:https://github.com/saysky/rocketmq-demo

关于 RocketMQ总结可以这篇文章

一、安装 RocketMQ

1、从官网下载RocketMQ代码

http://rocketmq.apache.org/docs/quick-start 

自行下载最新版或者历史版本,我这里因为是好早之前下载的,所以还是 4.4,现在已经 4.7 了

 

2、启动 NameServer 和 Broker

可用根据官方页面的提示分别启动 NameServer 和 Broker

因为不同系统(Windows或Linux、Mac) 安装方式和启动方式都不太同

这里主要介绍一下我的 Mac 的操作

 

启动 NameServer

cd /Users/liuyanzhao/Documents/JavaStudy/rocketmq-all-4.4.0
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

 

启动 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/namesrv.log

 

如果没有报错就说明启动成功了,可以通过 jps 命令查看是否有这两个 java 进程

 

二、安装 RockerMQ Console

rocketmq console 是一个用于可视化 RocketMQ 消息 的 springboot 项目,单纯就是可视化的操作,比如查询消息,添加 Topic 等。

界面如下

 

1、下载 RocketMQ Console

地址如下 https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console 

 

2、修改 rocketmq-console 里的 application.properties 配置文件

可以把端口号改成 9999 或其他不常用的端口号,默认好像是 8080

如果里面的 nameserver 地址不一样也要改,一般按默认就OK

 

3、启动 RocketMQ Console

可以直接用 IDEA 打开 rocketmq-console 项目进行启动

但是对于博主而言,需要打开一个额外 IDEA 窗口我是不愿意的

所以我进行 mvn clean install 打包,然后运行 target 里的 jar 文件

具体怎么运行,按照自己喜欢的来,不启动也OK

 

三、生产者代码

代码结构如下

 

1、pom.xml 

我这里使用的 4.4 版本,大家可以改成自己的版本

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.liuyanzhao</groupId>
    <artifactId>rocketmq-producer-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-producer-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

2、application.yml

server:
  port: 8080

rocketmq:
  producer:
    namesrvAddr: 127.0.0.1:9876
    group: PID_BLOG_PROJECT

 

3、DefaultProducerConfig

因为我们没有使用 starter 版本的 rocketmq 依赖,所以需要自己写配置文件里配置默认的Producer

package com.liuyanzhao.rocketmq.producer.demo.config;

import com.liuyanzhao.rocketmq.producer.demo.properties.MqProducerProperties;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @author 言曌
 * @date 2020/9/17 2:04 下午
 */
@Configuration
public class DefaultProducerConfig {

    private static final Logger log = LoggerFactory.getLogger(DefaultProducerConfig.class);

    @Autowired
    private MqProducerProperties propertiesProperties;

    /**
     * 创建普通消息发送者实例
     *
     * @return
     * @throws MQClientException
     */
    @Bean
    @Primary
    public DefaultMQProducer defaultProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(propertiesProperties.getGroup());
        producer.setNamesrvAddr(propertiesProperties.getNamesrvAddr());
        producer.setVipChannelEnabled(false);
        producer.setRetryTimesWhenSendAsyncFailed(10);
        producer.start();
        log.info("default producer 创建成功, {}, {}", propertiesProperties.getNamesrvAddr(), propertiesProperties.getGroup());
        return producer;
    }
}

 

4、MqProducerProperties

读取 application.yml 配置里属性类

package com.liuyanzhao.rocketmq.producer.demo.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author 言曌
 * @date 2020/9/17 2:51 下午
 */
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MqProducerProperties {

    private String namesrvAddr;
    private String group;

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }
}

 

5、发送消息接口

package com.liuyanzhao.rocketmq.producer.demo.mq.service;

import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;

/**
 * MQ生产者发送消息服务
 *
 * @author 言曌
 * @date 2020/9/14 8:21 下午
 */
public interface MqProducerService {

    /**
     * 发送消息
     *
     * @param mqMessageDTO
     * @return
     */
    boolean sendMessage(MqMessageDTO mqMessageDTO);
}

 

6、发送消息接口实现

package com.liuyanzhao.rocketmq.producer.demo.mq.service.impl;

import com.alibaba.fastjson.JSON;
import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;
import com.liuyanzhao.rocketmq.producer.demo.mq.service.MqProducerService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


/**
 * @author 言曌
 * @date 2020/9/14 8:24 下午
 */
@Service
public class MqProducerServiceImpl implements MqProducerService {

    private static final Logger log = LoggerFactory.getLogger(MqProducerServiceImpl.class);

    @Autowired
    private DefaultMQProducer defaultMQProducer;

    @Override
    public boolean sendMessage(MqMessageDTO mqMessageDTO) {
        log.info("开始发送消息, mqMessageDTO:{}", JSON.toJSONString(mqMessageDTO));
        SendResult sendResult;
        try {
            Message message = new Message(mqMessageDTO.getTopic(), mqMessageDTO.getTag(), mqMessageDTO.getKey(), mqMessageDTO.getContent());
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("消息发送失败, mqMessageDTO={}, cause:{}", JSON.toJSONString(mqMessageDTO), e);
            return false;
        }
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("发送成功, sendResult:{}", JSON.toJSONString(sendResult));
            mqMessageDTO.setMsgId(sendResult.getMsgId());
            return true;
        }

        return false;
    }
}

 

7、发送消息封装对象

package com.liuyanzhao.rocketmq.producer.demo.dto;


import java.io.Serializable;

/**
 * @author 言曌
 * @date 2020/9/14 8:22 下午
 */
public class MqMessageDTO implements Serializable {

    /**
     * 消息ID
     */
    private String msgId;

    /**
     * 主题
     */
    private String topic;

    /**
     * 消息标签体
     */
    private String tag;

    /**
     * 消息业务主键
     */
    private String key;

    /**
     * 消息体
     */
    private byte[] content;

    public String getMsgId() {
        return msgId;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }


    public MqMessageDTO() {
    }

    public MqMessageDTO(String msgId, String topic, String tag, String key, byte[] content) {
        this.msgId = msgId;
        this.topic = topic;
        this.tag = tag;
        this.key = key;
        this.content = content;
    }
}

 

8、调用发布消息

UserServiceImpl

package com.liuyanzhao.rocketmq.producer.demo.service.impl;

import com.alibaba.fastjson.JSON;
import com.liuyanzhao.rocketmq.producer.demo.dto.MqMessageDTO;
import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;
import com.liuyanzhao.rocketmq.producer.demo.mq.service.MqProducerService;
import com.liuyanzhao.rocketmq.producer.demo.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author 言曌
 * @date 2020/9/15 3:13 下午
 */
@Service
public class UserServiceImpl implements UserService {


    private static final Logger log = LoggerFactory.getLogger(UserServiceImpl.class);


    @Autowired
    private MqProducerService mqProducerService;


    @Override
    public void addUser(UserDTO user) {
        // 1.模拟业务操作
        // ....

        // 2.向关联系统发送消息
        MqMessageDTO mqMessageDTO = new MqMessageDTO();
        mqMessageDTO.setTopic("T_DEMO_PROJECT");
        mqMessageDTO.setTag("ADD_USER");
        mqMessageDTO.setContent(JSON.toJSONString(user).getBytes());
        boolean success = mqProducerService.sendMessage(mqMessageDTO);
        if (success) {
            log.info("消息发送成功, UserServiceImpl.addUser");
        } else {
            log.info("消息发送失败");
            // TODO 记录错误记录
        }
    }
}

 

UserService

package com.liuyanzhao.rocketmq.producer.demo.service;

import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;

/**
 * @author 言曌
 * @date 2020/9/15 3:11 下午
 */

public interface UserService {

    /**
     * 添加用户
     *
     * @param user
     */
    void addUser(UserDTO user);
}

 

UserDTO

package com.liuyanzhao.rocketmq.producer.demo.dto;

import java.io.Serializable;

/**
 * @author 言曌
 * @date 2020/9/15 3:12 下午
 */
public class UserDTO implements Serializable {
    private String id;
    private String username;
    private String password;


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public UserDTO() {
    }

    public UserDTO(String id, String username, String password) {
        this.id = id;
        this.username = username;
        this.password = password;
    }
}

 

HelloController

package com.liuyanzhao.rocketmq.producer.demo.controller;

import com.liuyanzhao.rocketmq.producer.demo.dto.UserDTO;
import com.liuyanzhao.rocketmq.producer.demo.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * 测试发生消息控制器
 *
 * @author 言曌
 * @date 2020/9/14 8:47 下午
 */
@RestController
public class HelloController {


    @Autowired
    private UserService userService;


    @GetMapping("/testAddUser")
    public String testAddUser() {
        // 模拟发送数据
        UserDTO user = new UserDTO(UUID.randomUUID().toString().replace("-", ""), "张三", "123456");
        userService.addUser(user);
        return "success";
    }


}


 

访问 localhost:8080/testAddUser 会往 broker 发一条信息

 

四、消费者代码

代码结构如下

 

1、pom.xml

和生产者的一样

 

2、application.yml

server:
  port: 8081

rocketmq:
  consumer:
    namesrvAddr: 127.0.0.1:9876
    group: CID_BLOG_PROJECT


 

3、属性类

package com.example.rocketmq.consumer.demo.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author 言曌
 * @date 2020/9/17 2:51 下午
 */
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MqConsumerProperties {

    private String namesrvAddr;
    private String group;

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }
}

 

4、抽象消费消息监听类

package com.example.rocketmq.consumer.demo.mq.listener;

import com.example.rocketmq.consumer.demo.properties.MqConsumerProperties;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @author 言曌
 * @date 2020/9/15 7:15 下午
 */
@Configuration
public abstract class AbstractConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(AbstractConsumerListener.class);

    @Autowired
    private MqConsumerProperties consumerProperties;

    /**
     * 开启消费注册
     *
     * @param topic
     * @param tags  支持多个tag, 如 tag1 || tag2 || tag3
     * @throws MQClientException
     */
    public void listener(String topic, String tags) throws MQClientException {
        log.info("开启" + topic + ":" + tags + "消费者");

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerProperties.getGroup());
        consumer.setNamesrvAddr(consumerProperties.getNamesrvAddr());
        consumer.subscribe(topic, tags);

        // 开启内部类实现监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                return AbstractConsumerListener.this.onMessage(msgs);
            }
        });

        consumer.start();

    }

    /**
     * 处理body的业务
     *
     * @param msgs
     * @return
     */
    public abstract ConsumeConcurrentlyStatus onMessage(List<MessageExt> msgs);

}

 

5、消费消息监听者实现类

package com.example.rocketmq.consumer.demo.mq.listener;

import com.example.rocketmq.consumer.demo.enums.MqConsumerBeanEnum;
import com.example.rocketmq.consumer.demo.mq.service.MqConsumerService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

/**
 * @author 言曌
 * @date 2020/9/15 7:22 下午
 */
@Configuration
public class DefaultConsumerListener extends AbstractConsumerListener implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

    private static final Logger log = LoggerFactory.getLogger(DefaultConsumerListener.class);

    /**
     * 上下文
     */
    private ApplicationContext applicationContext;

    @Override
    public ConsumeConcurrentlyStatus onMessage(List<MessageExt> msgs) {
        for (MessageExt msg : msgs) {
            String topicTag = msg.getTopic() + ":" + msg.getTags();
            MqConsumerBeanEnum mqConsumerBeanEnum = MqConsumerBeanEnum.getBeanByTopicTag(topicTag);
            if (mqConsumerBeanEnum == null) {
                break;
            }
            ConsumeConcurrentlyStatus consumerStatus = null;
            try {
                Object serviceBean = applicationContext.getBean(mqConsumerBeanEnum.getBeanName());
                if (serviceBean == null) {
                    break;
                }
                if (serviceBean instanceof MqConsumerService) {
                    String message = new String(msg.getBody(), "utf-8");
                    MqConsumerService consumerService = (MqConsumerService) serviceBean;
                    // 预处理
                    consumerService.beforeHandler(message);
                    // 处理
                    consumerStatus = consumerService.handle(message);
                    // 处理之后
                    consumerService.afterHandler(message, new Date(), consumerStatus);
                }
            } catch (UnsupportedEncodingException e) {
                log.error("body转字符串解析失败");
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            // 订阅同一个topic下的多个tag
            super.listener("T_DEMO_PROJECT", "ADD_USER || ADD_ORDER");
        } catch (MQClientException e) {
            log.error("consumer error");
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

 

6、消费服务接口

package com.example.rocketmq.consumer.demo.mq.service;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import java.util.Date;

/**
 * @author 言曌
 * @date 2020/9/16 6:22 下午
 */

public interface MqConsumerService {

    /**
     * 前置
     *
     * @param message
     * @return
     */
    ConsumeConcurrentlyStatus beforeHandler(String message);

    /**
     * 消息处理
     *
     * @param message
     * @return
     */
    ConsumeConcurrentlyStatus handle(String message);

    /**
     * 后置
     *
     * @param message
     * @param startHandlerTime
     * @param status
     */
    void afterHandler(String message, Date startHandlerTime, ConsumeConcurrentlyStatus status);
}

 

7、消费服务接口抽象实现类

package com.example.rocketmq.consumer.demo.mq.service.impl;

import com.example.rocketmq.consumer.demo.mq.service.MqConsumerService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @author 言曌
 * @date 2020/9/17 5:01 下午
 */

@Service("mqConsumerService")
public class AbstractMqConsumerServiceImpl implements MqConsumerService {

    private static final Logger log = LoggerFactory.getLogger(AbstractMqConsumerServiceImpl.class);


    @Override
    public ConsumeConcurrentlyStatus beforeHandler(String message) {
        log.info("mq消息前置处理: message={}", message);
        return null;
    }

    @Override
    public ConsumeConcurrentlyStatus handle(String message) {
        log.info("mq消息处理业务逻辑");
        return null;
    }

    @Override
    public void afterHandler(String message, Date startHandlerTime, ConsumeConcurrentlyStatus status) {
        log.info("mq消息后置处理, status", status);

    }
}

 

8、消费消息业务处理类

package com.example.rocketmq.consumer.demo.handler;

import com.alibaba.fastjson.JSON;
import com.example.rocketmq.consumer.demo.dto.UserDTO;
import com.example.rocketmq.consumer.demo.mq.service.impl.AbstractMqConsumerServiceImpl;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;


/**
 * 处理MQ消息-添加用户
 *
 * @author 言曌
 * @date 2020/9/15 11:33 下午
 */
@Service("addUserHandler")
public class AddUserHandler extends AbstractMqConsumerServiceImpl {

    private static final Logger log = LoggerFactory.getLogger(AddUserHandler.class);

    @Override
    public ConsumeConcurrentlyStatus handle(String message) {
        log.info("mq消息处理开始【添加用户】,message={}", message);
        try {
            UserDTO user = JSON.parseObject(message, UserDTO.class);
            // 模拟做业务操作
            // ....
            System.out.println(user.toString());
        } catch (Exception e) {
            log.error("handler error:{}", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


}

UserDTO   同生产者

 

9、Topic和Tag 与处理类关系枚举

package com.example.rocketmq.consumer.demo.enums;

/**
 * 维护topicTag和处理类的关系
 *
 * @author 言曌
 * @date 2020/9/15 11:31 下午
 */

public enum MqConsumerBeanEnum {


    ADD_USER("T_DEMO_PROJECT:ADD_USER", "addUserHandler"),
    ADD_ORDER("T_DEMO_PROJECT:ADD_ORDER", "addOrderHandler"),


    ;

    /**
     * TOPIC:TAG 名称
     */
    private String topicTag;

    /**
     * Handler bean名称
     */
    private String beanName;

    public String getTopicTag() {
        return topicTag;
    }

    public void setTopicTag(String topicTag) {
        this.topicTag = topicTag;
    }

    public String getBeanName() {
        return beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    MqConsumerBeanEnum(String topicTag, String beanName) {
        this.topicTag = topicTag;
        this.beanName = beanName;
    }

    public static MqConsumerBeanEnum getBeanByTopicTag(String topicTag) {
        for (MqConsumerBeanEnum beanEnum : MqConsumerBeanEnum.values()) {
            if (beanEnum.getTopicTag().equals(topicTag)) {
                return beanEnum;
            }
        }
        return null;
    }
}

 

五、MQ的一些常识

关于 RocketMQ总结可以这篇文章

1、名称解释

集群:即 NameServer 集群,集群里的节点是无状态的,不会互相同步信息,业务系统(生产者或消费者)和RocketMQ服务端(即Broker)需要连接到 NameServer 集群。

实际生产上,集群机器数量通常是2到3个,比如集群名叫 Cluster001, 地址为  10.20.30.31:9876;1020.30.32:9876。

Broker:可以理解为 RocketMQ 的服务端,通常一个云服务器里部署一个Broker 。可以配置单 Master、单Master单Slave、多Master多Slave模式。

实际生产上,我们用单Master单Slave就行,  如 Broker 名称都叫 RMQ_broker1, 地址为 Master 10.20.30.33:10911, Slave 10.20.30.34:10911.

Topic:创建 topic 需要指定集群如 Cluster001 和队列数(默认8,即8个读队列,8个写队列) , 每个队列的 buffer 长度是 “无限”的。如果后期该 Topic 消息量较大,可以扩大队列数,比如改成16读16写。通常为了方便项目管理,一个微服务的项目使用一个 Topic,防止 Topic 太多不好管理。不同的业务逻辑使用不同的 tag 来区分即可。

队列:上面也说了队列,每个Topic有多个队列,分布在不同的 Broker 上(如果是Broker集群)。顺序消息可以放到一个队列上保证消费顺序。

生产者:生产者可以理解为发布MQ消息的业务系统,为了方便在可视化页面管理,我们可以在生产者管理里新增生产者,如 PID_DEMO_PROJECT,然后申请发布里绑定 Topic。代码里发布消息的 defaultProducer 需要使用 PID_DEMO_PROJECT。

消费者:消费者可以理解为接受MQ消息的业务系统,同样,在消费者管理里新增消费者,如 CID_TEST_PROJECT,然后订阅 Topic。代码里 defaultConsumer 使用 CID_TEST_PROJECT,然后订阅指定 topic 和 tags,就可以收到该 topic 下的指定 tags 的消息了。

 

2、调用http接口和发MQ对比

两个项目需要通信,有两种常见方式,一种是调用 http 接口,一种就是发 MQ 了。

调用 http 接口的有点是可以获得返回值,操作简单方便;缺点就是会阻塞线程,不支持请求堆积,失败不能重试等。

MQ 的好处是异步,不阻塞,失败可以重试,可以堆积消息;缺点就是无法实时获得返回值,如果需要返回,需要对方再回调通知。

 

3、具体业务场景公式

A系统给B系统发消息,即A系统往 topic 里发,B系统去 topic 里拿

A系统往自己的 Topic (即 T_TOPIC_A )发消息

B系统需要接受消息,就需要在平台设置,让B系统自己的消费者(即 CID_PROJECT_C ) 去订阅 T_TOPIC_A 

A系统开始发消息,在代码里指定 Topic= T_TOPIC_A, Tag=test1

B系统监听Topic= T_TOPIC_A, Tag=test1 的消息,并指定处理类为 test1Handler 

 

原则就是每个 tag 对应一个方法处理类

 

4、关于消费的问题

(1) 如果 B 系统是多实例部署,消费是随机的。同样,如果 C 项目也订阅了那个 topic 和 tag,B和C也是随机消费。

(2) 如果消费消息后,没有返回消费成功,待会儿会重新消费,具体消费多少次,可以配置。

(3) 如果出现重复消费问题,可以通过幂等性解决,什么意思?即你假设会重复消费,即方法会重复调用,通过代码判断来操作。

比如当某个字段为某个值的时候才执行具体的逻辑,否则直接返回结束

 

后面我有时间会继续在下面补充

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar

发表评论

avatar 登录者:匿名
您需要登录才能评论,可以选择注册或者QQ快速登录

     

已通过评论:0   待审核评论数:0