首页 开发教程 SpringBoot + RocketMQ 实现延迟消息案例详解

SpringBoot + RocketMQ 实现延迟消息案例详解

开发教程 2025年12月4日
720 浏览

SpringBoot + RocketMQ 实现延迟消息案例详解

下面将详细介绍如何在SpringBoot中使用RocketMQ实现延迟消息,包括基于延迟级别和基于具体时间两种方式的完整实现。

一、延迟消息概述

RocketMQ提供了两种类型的延迟消息机制:

  1. 延迟消息:消息发送后延迟指定的时间长度再被消费
  2. 定时消息:消息在指定的具体时间点被消费

这两种机制在订单超时取消、会议提醒、定时任务调度等场景中有广泛应用。

二、环境准备

1. 添加Maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件设置

application.yml中配置RocketMQ连接信息:

rocketmq:
  name-server: localhost:9876
  producer:
    group: delay-message-producer-group

三、延迟级别机制实现

1. 默认延迟级别

RocketMQ默认提供18个延迟级别,定义在MessageStoreConfig类中:

messageDelayLevel = \"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h\"

对应关系:

  • level=1: 延迟1秒
  • level=2: 延迟5秒
  • level=3: 延迟10秒
  • level=4: 延迟30秒
  • level=5: 延迟1分钟
  • level=6: 延迟2分钟
  • …以此类推
  • level=18: 延迟2小时

2. 基于延迟级别的生产者实现

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class DelayLevelProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送基于延迟级别的消息
     * @param topic 主题
     * @param tag 标签
     * @param message 消息内容
     * @param delayLevel 延迟级别(1-18)
     */
    public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
        // 创建消息
        Message springMessage = MessageBuilder.withPayload(message).build();
        
        // 发送延迟消息
        SendResult sendResult = rocketMQTemplate.syncSend(
            topic + \":\" + tag, 
            springMessage, 
            3000, // 超时时间
            delayLevel // 延迟级别
        );
        
        System.out.println(\"延迟级别消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送订单超时取消消息(延迟15分钟)
     */
    public void sendOrderTimeoutMessage(String orderId) {
        String message = \"订单超时取消: \" + orderId;
        // 15分钟对应level=14(根据默认配置)
        sendMessageByDelayLevel(\"OrderTopic\", \"Timeout\", message, 14);
    }
}

四、基于具体时间的延迟消息实现

1. 定时消息生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送延迟指定毫秒数的消息
     */
    public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
        // 计算投递时间
        long deliverTimeMs = System.currentTimeMillis() + delayMs;
        
        // 创建消息并设置投递时间
        Message springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println(\"延迟毫秒消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送指定时间点投递的消息
     */
    public void sendMessageAtTime(String topic, String message, Date deliverTime) {
        long deliverTimeMs = deliverTime.getTime();
        
        // 创建消息并设置投递时间
        Message springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println(\"定时投递消息发送成功: \" + sendResult);
    }
    
    /**
     * 发送10秒后投递的消息
     */
    public void sendTenSecondsLaterMessage(String topic, String message) {
        sendMessageWithDelayMs(topic, message, 10000L);
    }
}

五、消费者实现

延迟消息的消费者与普通消息消费者相同,无需特殊配置:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(
    topic = \"OrderTopic\",
    consumerGroup = \"delay-message-consumer-group\",
    selectorExpression = \"Timeout\"
)
public class OrderTimeoutConsumer implements RocketMQListener {
    
    @Override
    public void onMessage(String message) {
        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\"));
        System.out.println(\"[\" + now + \"] 接收到订单超时消息: \" + message);
        
        // 处理订单取消逻辑
        processOrderCancellation(message);
    }
    
    private void processOrderCancellation(String message) {
        // 提取订单ID
        String orderId = message.substring(message.indexOf(\":\") + 2);
        System.out.println(\"执行订单取消操作,订单ID: \" + orderId);
        // 这里可以调用订单服务进行取消操作
    }
}

六、Controller层实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping(\"/api/delay\")
public class DelayMessageController {
    
    @Autowired
    private DelayLevelProducer delayLevelProducer;
    
    @Autowired
    private ScheduledMessageProducer scheduledMessageProducer;
    
    /**
     * 发送基于延迟级别的消息
     */
    @PostMapping(\"/level\")
    public String sendByDelayLevel(
            @RequestParam String topic,
            @RequestParam String tag,
            @RequestParam String message,
            @RequestParam(defaultValue = \"3\") int delayLevel) {
        
        delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
        return \"延迟级别消息发送成功,延迟级别: \" + delayLevel;
    }
    
    /**
     * 发送订单超时取消消息
     */
    @PostMapping(\"/order/timeout\")
    public String sendOrderTimeout(@RequestParam String orderId) {
        delayLevelProducer.sendOrderTimeoutMessage(orderId);
        return \"订单超时取消消息已发送,订单ID: \" + orderId;
    }
    
    /**
     * 发送延迟指定毫秒的消息
     */
    @PostMapping(\"/milliseconds\")
    public String sendByDelayMs(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam long delayMs) {
        
        scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
        return \"延迟毫秒消息发送成功,延迟: \" + delayMs + \"ms\";
    }
    
    /**
     * 发送指定时间点的消息
     */
    @PostMapping(\"/scheduled\")
    public String sendScheduled(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam @DateTimeFormat(pattern = \"yyyy-MM-dd HH:mm:ss\") Date deliverTime) {
        
        scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
        return \"定时消息发送成功,投递时间: \" + deliverTime;
    }
}

七、自定义延迟级别配置

在Broker的配置文件中可以自定义延迟级别:

# 在broker.conf文件中添加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h

重启Broker使其生效。注意,修改延迟级别后,所有使用延迟级别的消息都会使用新的配置。

八、两种实现方式对比

特性 基于延迟级别 基于具体时间
灵活性 较低,只能使用预定义级别 高,可以精确到毫秒
适用版本 全版本支持 RocketMQ 5.x及以上版本完整支持
使用场景 固定延迟时间的场景 需要精确控制投递时间的场景
配置复杂度 简单,无需额外配置 可能需要在Broker端开启相关功能

九、使用注意事项

  1. 延迟精度

    • 延迟消息的投递时间不是完全精确的,有一定误差
    • 在高并发场景下,误差可能会增大
  2. 版本兼容性

    • 基于具体时间的延迟消息在RocketMQ 5.x版本支持更完善
    • 在低版本中可能需要使用延迟级别机制
  3. 性能考虑

    • 大量延迟消息可能会增加Broker的负担
    • 对于长时间延迟的消息,考虑使用其他方案(如定时任务+消息队列组合)
  4. 消息可靠性

    • 延迟消息同样支持持久化,确保Broker重启后不会丢失
    • 建议开启消息确认机制确保消息可靠投递

十、测试示例

  1. 发送订单超时取消消息(延迟15分钟):

    POST /api/delay/order/timeout?orderId=ORDER123456
    
  2. 发送10秒后投递的消息:

    POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
    
  3. 发送指定时间点的消息:

    POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
    

通过以上配置和代码,您可以在SpringBoot项目中轻松实现基于RocketMQ的延迟消息功能,满足各种定时任务和延迟处理的业务需求。

发表评论
暂无评论

还没有评论呢,快来抢沙发~

客服

点击联系客服 点击联系客服

在线时间:09:00-18:00

关注微信公众号

关注微信公众号
客服电话

400-888-8888

客服邮箱 122325244@qq.com

手机

扫描二维码

手机访问本站

扫描二维码
搜索