SpringBoot + RocketMQ 实现延迟消息案例详解

2025-12-12 0 919

SpringBoot + RocketMQ 实现延迟消息案例详解

下面将详细介绍如何在SpringBoot中使用RocketMQ实现延迟消息,包括基于延迟级别和基于具体时间两种方式的完整实现。

一、延迟消息概述

RocketMQ提供了两种类型的延迟消息机制:

  1. 延迟消息:消息发送后延迟指定的时间长度再被消费
  2. 定时消息:消息在指定的具体时间点被消费

这两种机制在订单超时取消、会议提醒、定时任务调度等场景中有广泛应用。

二、环境准备

1. 添加Maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件设置

application.yml中配置RocketMQ连接信息:

rocketmq:
  name-server: localhost:9876
  producer:
    group: delay-message-producer-group

三、延迟级别机制实现

1. 默认延迟级别

RocketMQ默认提供18个延迟级别,定义在MessageStoreConfig类中:

messageDelayLevel = \"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h\"

对应关系:

  • level=1: 延迟1秒
  • level=2: 延迟5秒
  • level=3: 延迟10秒
  • level=4: 延迟30秒
  • level=5: 延迟1分钟
  • level=6: 延迟2分钟
  • …以此类推
  • level=18: 延迟2小时

2. 基于延迟级别的生产者实现

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class DelayLevelProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送基于延迟级别的消息
     * @param topic 主题
     * @param tag 标签
     * @param message 消息内容
     * @param delayLevel 延迟级别(1-18)
     */
    public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
        // 创建消息
        Message springMessage = MessageBuilder.withPayload(message).build();
        
        // 发送延迟消息
        SendResult sendResult = rocketMQTemplate.syncSend(
            topic + \":\" + tag, 
            springMessage, 
            3000, // 超时时间
            delayLevel // 延迟级别
        );
        
        System.out.println(\"延迟级别消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送订单超时取消消息(延迟15分钟)
     */
    public void sendOrderTimeoutMessage(String orderId) {
        String message = \"订单超时取消: \" + orderId;
        // 15分钟对应level=14(根据默认配置)
        sendMessageByDelayLevel(\"OrderTopic\", \"Timeout\", message, 14);
    }
}

四、基于具体时间的延迟消息实现

1. 定时消息生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送延迟指定毫秒数的消息
     */
    public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
        // 计算投递时间
        long deliverTimeMs = System.currentTimeMillis() + delayMs;
        
        // 创建消息并设置投递时间
        Message springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println(\"延迟毫秒消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送指定时间点投递的消息
     */
    public void sendMessageAtTime(String topic, String message, Date deliverTime) {
        long deliverTimeMs = deliverTime.getTime();
        
        // 创建消息并设置投递时间
        Message springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println(\"定时投递消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送10秒后投递的消息
     */
    public void sendTenSecondsLaterMessage(String topic, String message) {
        sendMessageWithDelayMs(topic, message, 10000L);
    }
}

五、消费者实现

延迟消息的消费者与普通消息消费者相同,无需特殊配置:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(
    topic = \"OrderTopic\",
    consumerGroup = \"delay-message-consumer-group\",
    selectorExpression = \"Timeout\"
)
public class OrderTimeoutConsumer implements RocketMQListener {
    
    @Override
    public void onMessage(String message) {
        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\"));
        System.out.println(\"[\" + now + \"] 接收到订单超时消息: \" + message);
        
        // 处理订单取消逻辑
        processOrderCancellation(message);
    }
    
    private void processOrderCancellation(String message) {
        // 提取订单ID
        String orderId = message.substring(message.indexOf(\":\") + 2);
        System.out.println(\"执行订单取消操作,订单ID: \" + orderId);
        // 这里可以调用订单服务进行取消操作
    }
}

六、Controller层实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping(\"/api/delay\")
public class DelayMessageController {
    
    @Autowired
    private DelayLevelProducer delayLevelProducer;
    
    @Autowired
    private ScheduledMessageProducer scheduledMessageProducer;
    
    /**
     * 发送基于延迟级别的消息
     */
    @PostMapping(\"/level\")
    public String sendByDelayLevel(
            @RequestParam String topic,
            @RequestParam String tag,
            @RequestParam String message,
            @RequestParam(defaultValue = \"3\") int delayLevel) {
        
        delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
        return \"延迟级别消息发送成功,延迟级别: \" + delayLevel;
    }
    
    /**
     * 发送订单超时取消消息
     */
    @PostMapping(\"/order/timeout\")
    public String sendOrderTimeout(@RequestParam String orderId) {
        delayLevelProducer.sendOrderTimeoutMessage(orderId);
        return \"订单超时取消消息已发送,订单ID: \" + orderId;
    }
    
    /**
     * 发送延迟指定毫秒的消息
     */
    @PostMapping(\"/milliseconds\")
    public String sendByDelayMs(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam long delayMs) {
        
        scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
        return \"延迟毫秒消息发送成功,延迟: \" + delayMs + \"ms\";
    }
    
    /**
     * 发送指定时间点的消息
     */
    @PostMapping(\"/scheduled\")
    public String sendScheduled(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam @DateTimeFormat(pattern = \"yyyy-MM-dd HH:mm:ss\") Date deliverTime) {
        
        scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
        return \"定时消息发送成功,投递时间: \" + deliverTime;
    }
}

七、自定义延迟级别配置

在Broker的配置文件中可以自定义延迟级别:

# 在broker.conf文件中添加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h

重启Broker使其生效。注意,修改延迟级别后,所有使用延迟级别的消息都会使用新的配置。

八、两种实现方式对比

特性 基于延迟级别 基于具体时间
灵活性 较低,只能使用预定义级别 高,可以精确到毫秒
适用版本 全版本支持 RocketMQ 5.x及以上版本完整支持
使用场景 固定延迟时间的场景 需要精确控制投递时间的场景
配置复杂度 简单,无需额外配置 可能需要在Broker端开启相关功能

九、使用注意事项

  1. 延迟精度

    • 延迟消息的投递时间不是完全精确的,有一定误差
    • 在高并发场景下,误差可能会增大
  2. 版本兼容性

    • 基于具体时间的延迟消息在RocketMQ 5.x版本支持更完善
    • 在低版本中可能需要使用延迟级别机制
  3. 性能考虑

    • 大量延迟消息可能会增加Broker的负担
    • 对于长时间延迟的消息,考虑使用其他方案(如定时任务+消息队列组合)
  4. 消息可靠性

    • 延迟消息同样支持持久化,确保Broker重启后不会丢失
    • 建议开启消息确认机制确保消息可靠投递

十、测试示例

  1. 发送订单超时取消消息(延迟15分钟):

    POST /api/delay/order/timeout?orderId=ORDER123456
    
  2. 发送10秒后投递的消息:

    POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
    
  3. 发送指定时间点的消息:

    POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
    

通过以上配置和代码,您可以在SpringBoot项目中轻松实现基于RocketMQ的延迟消息功能,满足各种定时任务和延迟处理的业务需求。

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

左子网 编程相关 SpringBoot + RocketMQ 实现延迟消息案例详解 https://www.zuozi.net/35638.html

常见问题
  • 1、自动:拍下后,点击(下载)链接即可下载;2、手动:拍下后,联系卖家发放即可或者联系官方找开发者发货。
查看详情
  • 1、源码默认交易周期:手动发货商品为1-3天,并且用户付款金额将会进入平台担保直到交易完成或者3-7天即可发放,如遇纠纷无限期延长收款金额直至纠纷解决或者退款!;
查看详情
  • 1、描述:源码描述(含标题)与实际源码不一致的(例:货不对板); 2、演示:有演示站时,与实际源码小于95%一致的(但描述中有”不保证完全一样、有变化的可能性”类似显著声明的除外); 3、发货:不发货可无理由退款; 4、安装:免费提供安装服务的源码但卖家不履行的; 5、收费:价格虚标,额外收取其他费用的(但描述中有显著声明或双方交易前有商定的除外); 6、其他:如质量方面的硬性常规问题BUG等。 注:经核实符合上述任一,均支持退款,但卖家予以积极解决问题则除外。
查看详情
  • 1、左子会对双方交易的过程及交易商品的快照进行永久存档,以确保交易的真实、有效、安全! 2、左子无法对如“永久包更新”、“永久技术支持”等类似交易之后的商家承诺做担保,请买家自行鉴别; 3、在源码同时有网站演示与图片演示,且站演与图演不一致时,默认按图演作为纠纷评判依据(特别声明或有商定除外); 4、在没有”无任何正当退款依据”的前提下,商品写有”一旦售出,概不支持退款”等类似的声明,视为无效声明; 5、在未拍下前,双方在QQ上所商定的交易内容,亦可成为纠纷评判依据(商定与描述冲突时,商定为准); 6、因聊天记录可作为纠纷评判依据,故双方联系时,只与对方在左子上所留的QQ、手机号沟通,以防对方不承认自我承诺。 7、虽然交易产生纠纷的几率很小,但一定要保留如聊天记录、手机短信等这样的重要信息,以防产生纠纷时便于左子介入快速处理。
查看详情

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务