SpringBoot 整合RocketMQ

简介

使用RocketMQ有两种方式,一种是引入rocketmq-client需要自己创建生产者和消费者,相对来说比较繁琐;另一种是引入rocketmq-spring-boot-starter(对rocketmq-client进行了封装),发消息和消费消息都比较简洁。
官网: https://rocketmq.apache.org/

使用步骤

SpringBoot 整合RocketMQ

添加依赖

在pom.xml文件中,添加rocketmq-spring-boot-starter依赖,如下:

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.2.2</version>
</dependency>

消息生产者

配置生产者信息

在application.yml文件中添加RocketMQ生产者的信息,如下:

rocketmq:
  # nameServer地址
  name-server: 127.0.0.1:9876
  producer:
    # 生产者的组名
    group: my-product-group
    # 同步消息在发送失败后最大重试次数
    retry-times-when-send-failed: 3
    # 异步消息在发送失败后最大重试次数
    retry-times-when-send-async-failed: 3
    retry-next-server: true
    # 发送消息超时时间
    send-message-timeout: 3000
    # 发送消息的最大消息体
    max-message-size: 4194304

顺序消息

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 使用场景:重要通知邮件、报名短信通知、营销短信系统等都可以使用这种方式。

@RestController
@RequestMapping("/mq")
public class RocketmqController {
  	// 注入RocketMQTemplate操作对象
  	@Resource
    private RocketMQTemplate rocketMQTemplate;
  	
  	// 目的地为:topic:tag,即同一主题下,可以有多个tag
  	private String syncTag = "demo-topic:sync";
  
  	/**
     * 同步消息
     * @param id
     * @return
     */
    @GetMapping("sync/{id}")
    public String syncMessage(@PathVariable Integer id){
        String messageStr = "order id: " + id;
      	// 构建消息对象
        Message<String> message = MessageBuilder
                .withPayload(messageStr)
                .setHeader(RocketMQHeaders.KEYS, id)
                .build();
        SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message);
        if(sendResult.getSendStatus() == SendStatus.SEND_OK){
            System.out.println("发送成功");
        }
        return "SUCCESS";
    }
}

异步消息

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。

发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等,这种方式如果用的适当,可以提高系统响应速度,提高用户体验。

@RestController
@RequestMapping("/mq")
public class RocketmqController {
  
  	@Resource
    private RocketMQTemplate rocketMQTemplate;
  
  	// 目的地为:topic:tag,即同一主题下,可以有多个tag
 		private String asyncTag = "demo-topic:async";
  
  	/**
     * 发送异步,发送结果需要通过回调的方式进行获取
     * @param id
     * @return
     */
    @GetMapping("async/{id}")
    public String asyncMessage(@PathVariable Integer id){
        String messageStr = "async order id: " + id;
        Message<String> message = MessageBuilder
                .withPayload(messageStr)
                .setHeader(RocketMQHeaders.KEYS, id)
                .build();
        rocketMQTemplate.asyncSend(asyncTag, message, new SendCallback() {
            // 通过回调接口获取发送结果
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息发送失败");
            }
        });
        return "SUCCESS";
    }    
}

单向消息

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

使用场景:对数据的可靠性要求不高,丢失了也没关系,如日志收集这种场景,可以采用这种方式发送消息,这种消息发送方式是速度最快的。

@RestController
@RequestMapping("/mq")
public class RocketmqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  
  	private String onewayTag = "demo-topic:oneway";
  
  	/**
     * 单向消息: 发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发
     * 即只发送请求不等待应答,此方式发送消息的过程耗时非常短,一般都是微秒级别。
     * @param id
     * @return
     */
    @GetMapping("oneway/{id}")
    public String oneWayMessage(@PathVariable Integer id){
        String messageStr = "oneway order id: " + id;
        Message<String> message = MessageBuilder
                .withPayload(messageStr)
                .setHeader(RocketMQHeaders.KEYS, id)
                .build();
        rocketMQTemplate.sendOneWay(onewayTag, message);
        return "SUCCESS";
    }
}

延时消息

对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。

使用场景:在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了。

延时等级

RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等。其中,level=0级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@RestController
@RequestMapping("/mq")
public class RocketmqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  
  	private String delayTag = "demo-topic:delay";
  
  	/**
     * 延时消息
     * @param id
     * @return
     */
    @GetMapping("delay/{id}")
    public String delayMessage(@PathVariable Integer id){
        String messageStr = "delay order id: " + id;
        Message<String> message = MessageBuilder
                .withPayload(messageStr)
                .setHeader(RocketMQHeaders.KEYS, id)
                .build();
        // 延时级别(1-18),分别对应:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        System.out.println(System.currentTimeMillis());
        SendResult sendResult = rocketMQTemplate.syncSend(delayTag, message, 100, 2);
        if(sendResult.getSendStatus() == SendStatus.SEND_OK){
            System.out.println(System.currentTimeMillis());
            System.out.println("延时消息发送成功");
        }
        return "SUCCESS";
    }
}

顺序消息

是一种严格按照顺序进行发布和消费的消息类型。要求消息的发布和消息消费都按照顺序进行。比如:通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单完成】。在创建完订单后,会发送3条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单完成】这个顺序去消费,这样的订单才是有效的。

RocketMQ采用局部顺序一致性的机制,实现了单个队列中消息的有序性,使用FIFO顺序提供有序消息,简而言之,我们的消息要保证有序,就必须把一组消息存放在同一个队列,然后由Consumer进行逐一消费。

@RestController
@RequestMapping("/mq")
public class RocketmqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  
  	private String topic = "demo-topic";
  
  	/**
     * 发送顺序消息
     * @param id
     * @return
     */
    @GetMapping("seq/{id}")
    public String seqMessage(@PathVariable Integer id){
        String stepOne = "seq order id: " + id + ":1";
        String stepTwo = "seq order id: " + id + ":2";
        Message<String> messageOne = MessageBuilder
                .withPayload(stepOne)
                .setHeader(RocketMQHeaders.KEYS, id+":1")
                .build();
        Message<String> messageTwo = MessageBuilder
                .withPayload(stepTwo)
                .setHeader(RocketMQHeaders.KEYS, id+":2")
                .build();
      	// 发送同步顺序消息
      	// 第一个参数:目地下,第二个参数:消息体,第三个消息:hashKey 使用此参数选择与识别同一个顺序队列
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(syncTag, messageOne, String.valueOf(id));        
        SendResult sendResultTwo = rocketMQTemplate.syncSendOrderly(syncTag, messageTwo, String.valueOf(id));        
      
        // 发送异步顺序消息
        rocketMQTemplate.asyncSendOrderly(syncTag, messageOne, String.valueOf(id), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步顺序消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步顺序消息发送异常");
            }
        });

        // 发送单向顺序消息
        rocketMQTemplate.sendOneWayOrderly(syncTag, messageOne, String.valueOf(id));
        rocketMQTemplate.sendOneWayOrderly(syncTag, messageTwo, String.valueOf(id));
        return "SUCCESS";
    }
}

事务消息

事务消息是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

RocketMQ的事务消息分为3种状态:提交状态,回滚状态,未知状态。

原理

SpringBoot 整合RocketMQ

1、生产者将半事务消息发送至消息队列RocketMQ版服务端。
2、消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

1、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
2、生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

@RestController
@RequestMapping("/mq")
public class RocketmqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    private String topic = "demo-topic";

    // 目的地为:topic:tag,即同一主题下,可以有多个tag
    private String syncTag = "demo-topic:sync";
  
  
  	/**
     * 事务消息
     * @param id
     * @return
     */
    @GetMapping("transaction/{id}")
    public String transactionMessage(@PathVariable Integer id){
        String messageStr = "transaction order id: 0 + id";
      	String transactionId = UUID.randomUUID().toString().replace("-","");
        Message<String> message = MessageBuilder
                .withPayload(messageStr)
                .setHeader(RocketMQHeaders.KEYS, id)
          			// 设置事务ID
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .build();
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(syncTag, message, null);
        // 获取发送状态
        SendStatus sendStatus = transactionSendResult.getSendStatus();
        if(sendStatus == SendStatus.SEND_OK){
            System.out.println("事务消息发送成功");
        }
        //本地事务执行状态
        LocalTransactionState localState = transactionSendResult.getLocalTransactionState();
        System.out.println("发送状态:" + sendStatus.name()+"; 本地事务执行状态:"+ localState.name());

        return "success";
    }
}

事务消息监听器

我们需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。

@RocketMQTransactionListener
public class TransactionMessageListener implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务(在发送消息成功时执行)
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        System.out.println(transactionId);

        // 获取队列信息进行业务处理
        String messageStr = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
        System.out.println("处理事务消息为:" + messageStr);

        // TODO 开启本地事务
        // TODO 执行业务代码,例如插入订单数据库表等

        // 业务代码正常执行,提交本地事务
         return RocketMQLocalTransactionState.COMMIT;

        // 回滚本地事务
        // return RocketMQLocalTransactionState.ROLLBACK;

        // 返回 UNKNOW状态的消息会等待broker进行事务状态回查
        // return RocketMQLocalTransactionState.UNKNOWN;
    }

    /**
     * 检查本地事务的状态,当本地事务为UNKNOW状态时回调执行
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String messageStr = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
        // TODO 进行业务处理
        System.out.println("检查本地事务,并进行COMMIT操作");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息顺序消息。此外,这一批消息的总大小不应超过 4MB。对于超过4MB的消息,可以通过将消息进行切割成多个小于4MB的内容进行发送,下面的例子就是将消息进行切隔发送

1、定义一个集合的切割类,切割内容小于4MB

public class ListUtil <T> implements Iterator<List<T>> {

    /**
     * 所有内容
     */
    private final List<T> ts;

    /**
     * 多少数据进行分割
     */
    private int size;

    /**
     * 已经分割的索引
     */
    private int currentIndex;

    public ListUtil(List<T> ts, int size) {
        this.ts = ts;
        this.size = size;
    }

    @Override
    public boolean hasNext() {
        return currentIndex < ts.size();
    }

    // 对内容进行切隔,每次返回的数据都将小于4MB
    @Override
    public List<T> next() {
        int nextIndex = currentIndex;
        int totalSize = 0;
        for (; nextIndex < ts.size(); nextIndex++){
            T t = ts.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if(totalSize > size){
                break;
            }
        }
        List<T> subList = ts.subList(currentIndex, nextIndex);
        currentIndex= nextIndex;
        return subList;
    }
}
@RestController
@RequestMapping("/mq")
public class RocketmqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    private String topic = "demo-topic";

    // 目的地为:topic:tag,即同一主题下,可以有多个tag
    private String syncTag = "demo-topic:sync";
  
    /**
     * 批量消息,同时发送30个同步消息(真正的批量)
     * @param id
     * @return
     */
    @GetMapping("batch/{id}")
    public String batchMessage(@PathVariable Integer id){
        List<Message> messages = new ArrayList<>();
        for(int i=0;i<30;i++){
            String myId = id + "" + i;
            String messageStr = "batch order id: " + myId;
            Message<String> message = MessageBuilder.withPayload(messageStr)
                    .setHeader(RocketMQHeaders.KEYS, myId)
                    .build();
            messages.add(message);
        }
        // 批量下发消息到broker,不支持顺序消息,并且对消息体大小限制为不大于4M
        ListUtil listUtil = new ListUtil(messages, 1024 * 1024 * 4);
        while(listUtil.hasNext()){
            List<Message> listItem = listUtil.next();
            SendResult sendResult = rocketMQTemplate.syncSend(syncTag, listItem);
            if(sendResult.getSendStatus() == SendStatus.SEND_OK){
                System.out.println("批量消息发送成功");
            }
        }
        return "SUCCESS";
    }
  	
}

消息消费者

配置消费者信息

在application.yml文件中添加RocketMQ消费者的信息,如下:

rocketmq:
  # nameServer地址
  name-server: 127.0.0.1:9876
  consumer:
    group: my-consumer-group
    topic: demo-topic

消息消费

消息监听消费需要实现RocketMQListener接口,同时在类上标注@RocketMQMessageListener注解,注意该监听类必须注入到Spring容器中,即添加@Service标注即可。

/**
 * 消息监听消费
 * consumerGroup  ->  监听组
 * topic  ->  监听TOPIC
 * selectorExpression  -> 监听tag(默认为*,即监听topic下所有,监听多个tag时使用 || 进行分割,如:anran-sync-tags||anran-async-tags)
 * messageModel  -> 监听消费者模式
 *                  负载均衡模式:MessageModel.CLUSTERING(每一个消息只发给一个消费者),
 *                  广播模式:MessageModel.BROADCASTING(同一条消息发送给所有的消费者)
 */
@Service
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group}",
        topic = "${rocketmq.consumer.topic}",
        selectorExpression = "*",
        messageModel = MessageModel.CLUSTERING)
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String o) {
        System.out.println("消费消息,"+o);
    }
}

原创文章,作者:jiafegn,如若转载,请注明出处:https://www.techlearn.cn/archives/622

Previous 2023年4月30日
Next 2023年4月30日

相关推荐

  • Springboot 注解 @Resource

    作用 和@Autowired注解类似,也是用来注入依赖对象的,spring容器会对bean中所有的字段、方法进行遍历,标注有@Resouce注解的,都会进行注入。@Autowire…

    springboot 2022年9月14日
    270
  • SpringBoot 分布式Session

    简介 现在随着分布式,微服务架构的日益成熟,越来越多的企业将传统的单体服务改造为微服务或是分布式架构,在分布式中就遇到session管理的问题,微服务架构中一个服务为了实现高可用至…

    springboot 2023年4月30日
    176
  • SpringBoot 整合Log4j2日志框架

    简介 Apache Log4j 2是日志框架Log4j的升级,它比其前身Log4j 1.x提供了重要的改进, 并且参考了Logback中许多有用的改进,同时修复了Logback的一…

    springboot 2023年3月26日
    153
  • springboot 注解 @ComponentScan

    作用 @ComponentScan用于批量注册bean,这个注解会让spring去扫描某些包及其子包中所有的类,然后将满足一定条件的类作为bean 注册到spring容器中。主要使…

    springboot 2022年11月8日
    219
  • Springboot 注解 @Configuration

    作用 @Configuration用于定义配置类,标注在类上,配置spring容器(应用上下文)。相当于把该类作为spring的xml配置文件中的beans。@Configurat…

    springboot 2022年9月2日
    224
  • SpringBoot 过滤器

    简介 SpringBoot过滤器在web开发中可以过滤指定的URL,比如拦截掉不需要的接口请求,同时也可以对request和response的内容进行修改。 使用场景 Spring…

    springboot 2023年4月30日
    241
  • SpringBoot Spring Event 业务解耦神器

    介绍 Spring Event是Spring框架中的一个事件机制,主要用于实现应用程序内部的事件传递与处理,它允许不同组件之间通过发布-订阅机制进行解耦通信,比如用户注册,订单创建…

    springboot 2024年1月11日
    301
  • SpringBoot 整合Mybatis-Plus

    简介 MyBatis-Plus是一个MyBatis的增强工具,在Mybatis的基础上只做增强不做改变,为简化开发,提高效率而生。 添加依赖 注意:添加Mybatis-Plus即可…

    springboot 2023年3月26日
    131
  • SpringBoot 依赖注入

    前言 SpringBoot 中通过注解实现依赖注入主要有以下几种: @Autowired注解 @Qualifier注解 @Resource注解 @Primary注解 @Autowi…

    springboot 2024年1月15日
    632
  • Springboot 注解 @ConfigurationProperties

    作用 @ConfigurationProperties注解主要作用就是将prefix属性指定的前缀配置项的值绑定到这个Bean上,默认情况下需要和@Component或者@Conf…

    springboot 2022年9月2日
    181