【RocketMQ】RocketMQ实例--定时/延时消息
1、应用场景
一、比如需要定时0点需要执行什么业务。传统基于数据库的定时调度方案在分布式场景下比较复杂,基于RocketMQ可以封装多种类型定时消息。
二、比如在电商系统里,订单下单后,可以定时半小时后查看订单状态。
2、功能原理
1、什么是定时消息
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
2、定时时间设置原则
-
Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
-
定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。具体方式,请参见官方文档 Unix时间戳转换工具。
-
定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
-
定时时长最大值默认为24小时,不支持自定义修改,更多信息,请参见官方文档 参数限制。
-
定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。
3、生命周期
-
初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
-
定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
-
待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
-
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见官方文档 消费重试。
-
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
-
消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见官方文档 消息存储和清理机制。
3、使用限制
1、消息类型一致性
定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
2、定时精度约束
RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟。
4、代码实例
4.1 生产者
/**
* 延时消息的使用场景:
* 比如电商里,提交了一个订单就可以发送一个延时消息,在1h以后去查看这个订单的状态,如果是未支付就取消订单释放内存
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(默认只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
延时等级
// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
4.2 消费者
/**
* 延时消息--启动消费者等待传入订阅者
*/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topics
consumer.subscribe("TestTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " +
(System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
5、使用建议
避免将在某一时刻进行大量定时
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。