当前位置: 首页 > news >正文

【RocketMQ】RocketMQ实例--定时/延时消息

1、应用场景

 一、比如需要定时0点需要执行什么业务。传统基于数据库的定时调度方案在分布式场景下比较复杂,基于RocketMQ可以封装多种类型定时消息。

二、比如在电商系统里,订单下单后,可以定时半小时后查看订单状态。

2、功能原理

 1、什么是定时消息

定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。 

2、定时时间设置原则

  • Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。

  • 定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。具体方式,请参见官方文档  Unix时间戳转换工具。

  • 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。

  • 定时时长最大值默认为24小时,不支持自定义修改,更多信息,请参见官方文档 参数限制。

  • 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

3、生命周期

 

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。

  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见官方文档 消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见官方文档 消息存储和清理机制。

3、使用限制

1、消息类型一致性

定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

2、定时精度约束

RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。

RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟。

4、代码实例

4.1 生产者

/**
 * 延时消息的使用场景:
 * 比如电商里,提交了一个订单就可以发送一个延时消息,在1h以后去查看这个订单的状态,如果是未支付就取消订单释放内存
 */
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级3,这个消息将在10s之后发送(默认只支持固定的几个时间,详看delayTimeLevel)
            message.setDelayTimeLevel(3);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

 延时等级

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
 
//发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h

4.2 消费者

 

/**
 * 延时消息--启动消费者等待传入订阅者
 */
public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 订阅Topics
        consumer.subscribe("TestTopic", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, 
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + 
                            (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

5、使用建议

避免将在某一时刻进行大量定时

定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

相关文章:

  • Electron 图标修改
  • 大数据框架常用端口号总结
  • 剃(磨)前插齿刀设计计算开发第一步
  • 持续集成与持续交付CI/CD
  • 2024年9月HarmonyOS鸿蒙应用开发者高级认证全新题库(覆盖99%考题)
  • 【HCIA-Datacom】网络参考模型
  • android中常见的面试题,讲的太透彻了
  • HarmonyOS | 状态管理(五) | @Observed装饰器和@ObjectLink装饰器
  • Docker基础(一)
  • React回顾
  • Qt RGB三色灯上位机
  • docker学习快速入门
  • Redis 中主从、哨兵和集群这三种模式有什么区别 ?
  • 数据库MYSQL及MYSQL ODBC
  • 华为机试 - 猜字谜
  • Windows下的通用进程守护程序(持续更新中),高仿supervisor。
  • 认证AAA的好处及必要性
  • Chrome谷歌浏览器清空缓存并强制刷新页面
  • 有了 HTTP,为什么还要 RPC?
  • 蓝桥杯:跳越
  • 05第二章:04_使用通用 Mapper
  • 二苯并环辛炔-聚乙二醇-CY5.5;DBCO-PEG-CY5.5简介;DBCO-PEG-Cyanine5.5 激发/发射波长为 675 nm/694 nm
  • 【关于eps8266自动重启 Soft WDT reset】
  • QEMU环境搭建
  • SetWindowLongPtr之GWLP_USERDATA
  • 华为OD机试真题 Python 实现【去除多余空格】【2022.11 Q4 新题】
  • LeetCode 8. 字符串转换整数 (atoi)(C++)
  • 作为程序员的你,常用的软件有哪些?
  • 当谈论 React hook,我们究竟说的是什么?
  • RocketMQ疑难杂症之No route info of this topic解决方案
  • 国产CAE的涅槃-岩土行业高性能离散元软件MatDEM
  • 我国登山鞋行业参与者越发广泛带来广阔潜在需求 女性市场值得期待