简介
使用RocketMQ有两种方式,一种是引入rocketmq-client需要自己创建生产者和消费者,相对来说比较繁琐;另一种是引入rocketmq-spring-boot-starter(对rocketmq-client进行了封装),发消息和消费消息都比较简洁。
官网: https://rocketmq.apache.org/
使用步骤
添加依赖
在pom.xml文件中,添加rocketmq-spring-boot-starter
依赖,如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
消息生产者
配置生产者信息
在application.yml文件中添加RocketMQ生产者的信息,如下:
rocketmq:
# nameServer地址
name-server: 127.0.0.1:9876
producer:
# 生产者的组名
group: my-product-group
# 同步消息在发送失败后最大重试次数
retry-times-when-send-failed: 3
# 异步消息在发送失败后最大重试次数
retry-times-when-send-async-failed: 3
retry-next-server: true
# 发送消息超时时间
send-message-timeout: 3000
# 发送消息的最大消息体
max-message-size: 4194304
顺序消息
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 使用场景:重要通知邮件、报名短信通知、营销短信系统等都可以使用这种方式。
@RestController
@RequestMapping("/mq")
public class RocketmqController {
// 注入RocketMQTemplate操作对象
@Resource
private RocketMQTemplate rocketMQTemplate;
// 目的地为:topic:tag,即同一主题下,可以有多个tag
private String syncTag = "demo-topic:sync";
/**
* 同步消息
* @param id
* @return
*/
@GetMapping("sync/{id}")
public String syncMessage(@PathVariable Integer id){
String messageStr = "order id: " + id;
// 构建消息对象
Message<String> message = MessageBuilder
.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, id)
.build();
SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message);
if(sendResult.getSendStatus() == SendStatus.SEND_OK){
System.out.println("发送成功");
}
return "SUCCESS";
}
}
异步消息
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等,这种方式如果用的适当,可以提高系统响应速度,提高用户体验。
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 目的地为:topic:tag,即同一主题下,可以有多个tag
private String asyncTag = "demo-topic:async";
/**
* 发送异步,发送结果需要通过回调的方式进行获取
* @param id
* @return
*/
@GetMapping("async/{id}")
public String asyncMessage(@PathVariable Integer id){
String messageStr = "async order id: " + id;
Message<String> message = MessageBuilder
.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, id)
.build();
rocketMQTemplate.asyncSend(asyncTag, message, new SendCallback() {
// 通过回调接口获取发送结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("异步消息发送失败");
}
});
return "SUCCESS";
}
}
单向消息
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
使用场景:对数据的可靠性要求不高,丢失了也没关系,如日志收集
这种场景,可以采用这种方式发送消息,这种消息发送方式是速度最快的。
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private String onewayTag = "demo-topic:oneway";
/**
* 单向消息: 发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发
* 即只发送请求不等待应答,此方式发送消息的过程耗时非常短,一般都是微秒级别。
* @param id
* @return
*/
@GetMapping("oneway/{id}")
public String oneWayMessage(@PathVariable Integer id){
String messageStr = "oneway order id: " + id;
Message<String> message = MessageBuilder
.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, id)
.build();
rocketMQTemplate.sendOneWay(onewayTag, message);
return "SUCCESS";
}
}
延时消息
对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
使用场景:在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了。
延时等级
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等。其中,level=0
级表示不延时,level=1
表示 1 级延时,level=2
表示 2 级延时,以此类推。
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private String delayTag = "demo-topic:delay";
/**
* 延时消息
* @param id
* @return
*/
@GetMapping("delay/{id}")
public String delayMessage(@PathVariable Integer id){
String messageStr = "delay order id: " + id;
Message<String> message = MessageBuilder
.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, id)
.build();
// 延时级别(1-18),分别对应:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
System.out.println(System.currentTimeMillis());
SendResult sendResult = rocketMQTemplate.syncSend(delayTag, message, 100, 2);
if(sendResult.getSendStatus() == SendStatus.SEND_OK){
System.out.println(System.currentTimeMillis());
System.out.println("延时消息发送成功");
}
return "SUCCESS";
}
}
顺序消息
是一种严格按照顺序进行发布和消费的消息类型。要求消息的发布和消息消费都按照顺序进行。比如:通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单完成】。在创建完订单后,会发送3条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单完成】这个顺序去消费,这样的订单才是有效的。
RocketMQ采用局部顺序一致性
的机制,实现了单个队列中消息的有序性,使用FIFO顺序提供有序消息,简而言之,我们的消息要保证有序,就必须把一组消息存放在同一个队列,然后由Consumer进行逐一消费。
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private String topic = "demo-topic";
/**
* 发送顺序消息
* @param id
* @return
*/
@GetMapping("seq/{id}")
public String seqMessage(@PathVariable Integer id){
String stepOne = "seq order id: " + id + ":1";
String stepTwo = "seq order id: " + id + ":2";
Message<String> messageOne = MessageBuilder
.withPayload(stepOne)
.setHeader(RocketMQHeaders.KEYS, id+":1")
.build();
Message<String> messageTwo = MessageBuilder
.withPayload(stepTwo)
.setHeader(RocketMQHeaders.KEYS, id+":2")
.build();
// 发送同步顺序消息
// 第一个参数:目地下,第二个参数:消息体,第三个消息:hashKey 使用此参数选择与识别同一个顺序队列
SendResult sendResult = rocketMQTemplate.syncSendOrderly(syncTag, messageOne, String.valueOf(id));
SendResult sendResultTwo = rocketMQTemplate.syncSendOrderly(syncTag, messageTwo, String.valueOf(id));
// 发送异步顺序消息
rocketMQTemplate.asyncSendOrderly(syncTag, messageOne, String.valueOf(id), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步顺序消息发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("异步顺序消息发送异常");
}
});
// 发送单向顺序消息
rocketMQTemplate.sendOneWayOrderly(syncTag, messageOne, String.valueOf(id));
rocketMQTemplate.sendOneWayOrderly(syncTag, messageTwo, String.valueOf(id));
return "SUCCESS";
}
}
事务消息
事务消息是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
RocketMQ的事务消息分为3种状态:提交状态,回滚状态,未知状态。
原理
1、生产者将半事务消息发送至消息队列RocketMQ版服务端。
2、消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
事务消息回查步骤如下:
1、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
2、生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private String topic = "demo-topic";
// 目的地为:topic:tag,即同一主题下,可以有多个tag
private String syncTag = "demo-topic:sync";
/**
* 事务消息
* @param id
* @return
*/
@GetMapping("transaction/{id}")
public String transactionMessage(@PathVariable Integer id){
String messageStr = "transaction order id: 0 + id";
String transactionId = UUID.randomUUID().toString().replace("-","");
Message<String> message = MessageBuilder
.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, id)
// 设置事务ID
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(syncTag, message, null);
// 获取发送状态
SendStatus sendStatus = transactionSendResult.getSendStatus();
if(sendStatus == SendStatus.SEND_OK){
System.out.println("事务消息发送成功");
}
//本地事务执行状态
LocalTransactionState localState = transactionSendResult.getLocalTransactionState();
System.out.println("发送状态:" + sendStatus.name()+"; 本地事务执行状态:"+ localState.name());
return "success";
}
}
事务消息监听器
我们需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。
@RocketMQTransactionListener
public class TransactionMessageListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务(在发送消息成功时执行)
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.println(transactionId);
// 获取队列信息进行业务处理
String messageStr = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
System.out.println("处理事务消息为:" + messageStr);
// TODO 开启本地事务
// TODO 执行业务代码,例如插入订单数据库表等
// 业务代码正常执行,提交本地事务
return RocketMQLocalTransactionState.COMMIT;
// 回滚本地事务
// return RocketMQLocalTransactionState.ROLLBACK;
// 返回 UNKNOW状态的消息会等待broker进行事务状态回查
// return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 检查本地事务的状态,当本地事务为UNKNOW状态时回调执行
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String messageStr = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
// TODO 进行业务处理
System.out.println("检查本地事务,并进行COMMIT操作");
return RocketMQLocalTransactionState.COMMIT;
}
}
批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息、顺序消息。此外,这一批消息的总大小不应超过 4MB。对于超过4MB的消息,可以通过将消息进行切割成多个小于4MB的内容进行发送,下面的例子就是将消息进行切隔发送
1、定义一个集合的切割类,切割内容小于4MB
public class ListUtil <T> implements Iterator<List<T>> {
/**
* 所有内容
*/
private final List<T> ts;
/**
* 多少数据进行分割
*/
private int size;
/**
* 已经分割的索引
*/
private int currentIndex;
public ListUtil(List<T> ts, int size) {
this.ts = ts;
this.size = size;
}
@Override
public boolean hasNext() {
return currentIndex < ts.size();
}
// 对内容进行切隔,每次返回的数据都将小于4MB
@Override
public List<T> next() {
int nextIndex = currentIndex;
int totalSize = 0;
for (; nextIndex < ts.size(); nextIndex++){
T t = ts.get(nextIndex);
totalSize = totalSize + t.toString().length();
if(totalSize > size){
break;
}
}
List<T> subList = ts.subList(currentIndex, nextIndex);
currentIndex= nextIndex;
return subList;
}
}
@RestController
@RequestMapping("/mq")
public class RocketmqController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private String topic = "demo-topic";
// 目的地为:topic:tag,即同一主题下,可以有多个tag
private String syncTag = "demo-topic:sync";
/**
* 批量消息,同时发送30个同步消息(真正的批量)
* @param id
* @return
*/
@GetMapping("batch/{id}")
public String batchMessage(@PathVariable Integer id){
List<Message> messages = new ArrayList<>();
for(int i=0;i<30;i++){
String myId = id + "" + i;
String messageStr = "batch order id: " + myId;
Message<String> message = MessageBuilder.withPayload(messageStr)
.setHeader(RocketMQHeaders.KEYS, myId)
.build();
messages.add(message);
}
// 批量下发消息到broker,不支持顺序消息,并且对消息体大小限制为不大于4M
ListUtil listUtil = new ListUtil(messages, 1024 * 1024 * 4);
while(listUtil.hasNext()){
List<Message> listItem = listUtil.next();
SendResult sendResult = rocketMQTemplate.syncSend(syncTag, listItem);
if(sendResult.getSendStatus() == SendStatus.SEND_OK){
System.out.println("批量消息发送成功");
}
}
return "SUCCESS";
}
}
消息消费者
配置消费者信息
在application.yml文件中添加RocketMQ消费者的信息,如下:
rocketmq:
# nameServer地址
name-server: 127.0.0.1:9876
consumer:
group: my-consumer-group
topic: demo-topic
消息消费
消息监听消费需要实现RocketMQListener
接口,同时在类上标注@RocketMQMessageListener
注解,注意该监听类必须注入到Spring容器中,即添加@Service
标注即可。
/**
* 消息监听消费
* consumerGroup -> 监听组
* topic -> 监听TOPIC
* selectorExpression -> 监听tag(默认为*,即监听topic下所有,监听多个tag时使用 || 进行分割,如:anran-sync-tags||anran-async-tags)
* messageModel -> 监听消费者模式
* 负载均衡模式:MessageModel.CLUSTERING(每一个消息只发给一个消费者),
* 广播模式:MessageModel.BROADCASTING(同一条消息发送给所有的消费者)
*/
@Service
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer.group}",
topic = "${rocketmq.consumer.topic}",
selectorExpression = "*",
messageModel = MessageModel.CLUSTERING)
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String o) {
System.out.println("消费消息,"+o);
}
}
原创文章,作者:jiafegn,如若转载,请注明出处:https://www.techlearn.cn/archives/622