RocketMQ ⽀持定时消息,但是不⽀持任意时间精度,仅⽀持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表⽰不延时,level=1 表⽰ 1 级延时,level=2 表⽰ 2 级延时,以此类推。
如何配置:
在服务器端(rocketmq-broker端)的属性配置⽂件中加⼊以下⾏:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h描述了各级别与延时时间的对应映射关系。
这个配置项配置了从1级开始各级延时的时间,如1表⽰延时1s,2表⽰延时5s,14表⽰延时10m,可以修改这个指定级别的延时时间;时间单位⽀持:s、m、h、d,分别表⽰秒、分、时、天;默认值就是上⾯声明的,可⼿⼯调整;
默认值已经够⽤,不建议调整【仅供参考,还是根据实际需要调整。调整默认值时注意同时要修改时间对应的level级别的值】
如何发送延时消息:
发送延时消息只需要在客户端(rocketmq-client端)待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可。
Message msg = new Message(topicName,\"\msg.setDelayTimeLevel(delayLevel);
SendResult sendResult = getMQProducer.send(msg);
RocketMQ定时(延迟)消息
RocketMQ 不⽀持任意时间⾃定义的延迟消息,仅⽀持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
延时消息的使⽤场景
⽐如电商⾥,提交了⼀个订单就可以发送⼀个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。⽣产
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;public class ProducerDelay {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(\"please_rename_unique_group_name\"); producer.setNamesrvAddr(\"192.168.10.11:9876\"); producer.start();
Message msg1 = new Message( JmsConfig.TOPIC, \"订单001\".getBytes());
msg1.setDelayTimeLevel(2);//延迟5秒 Message msg2 = new Message( JmsConfig.TOPIC, \"订单001\".getBytes());
msg2.setDelayTimeLevel(4);//延迟30秒
SendResult sendResult1 = producer.send(msg1); SendResult sendResult2 = producer.send(msg2);
System.out.println(\"Product1-同步发送-Product信息={}\" + sendResult1); System.out.println(\"Product2-同步发送-Product信息={}\" + sendResult2); producer.shutdown(); }}
消费
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;
public class ConsumerDelay {
public static void main(String[] args) throws Exception { // 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(\"please_rename_unique_group_name\"); // 设置NameServer的地址
consumer.setNamesrvAddr(\"192.168.10.11:9876\");
// 订阅⼀个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe(JmsConfig.TOPIC, \"*\"); // 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(List System.out.println(\"Receive message[msgId=\" + message.getMsgId() + \"] \" + (System.currentTimeMillis() - message.getStoreTimestamp()) + \"ms later\"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); }} 以上为个⼈经验,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。 因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- stra.cn 版权所有 赣ICP备2024042791号-4
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务