RabbitMQ 实现延迟通知的完整方案
一、延迟通知概述
延迟通知是指消息在发送后不会立即被消费,而是在指定的时间延迟后才被处理的消息传递机制。常见应用场景包括:
- 订单超时自动取消
- 定时任务调度
- 会议/活动前提醒
- 账单到期通知
二、RabbitMQ 实现延迟通知的两种方案
方案对比
| 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| TTL + 死信队列 | 无需安装插件,原生支持 | 1. 队列级TTL不支持动态延迟 2. 消息级TTL存在性能问题 |
延迟时间固定或较少变化的场景 |
| 延迟插件 | 1. 支持每条消息单独设置延迟时间 2. 性能更好 3. 配置简单 |
需要安装额外插件 | 延迟时间不固定,需要灵活设置的场景 |
三、方案一:基于TTL和死信队列实现
1. 原理
- 利用消息或队列的TTL(Time-To-Live)特性使消息过期
- 配置死信交换机(DLX)接收过期消息
- 将死信消息路由到实际处理队列
2. 代码实现
2.1 配置类
package com.example.delaynotify.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlDelayConfig {
// 普通交换机
public static final String DELAY_EXCHANGE = \"delay_exchange\";
// 普通队列
public static final String DELAY_QUEUE = \"delay_queue\";
// 死信交换机
public static final String DEAD_LETTER_EXCHANGE = \"dead_letter_exchange\";
// 死信队列
public static final String DEAD_LETTER_QUEUE = \"dead_letter_queue\";
// 路由键
public static final String DELAY_ROUTING_KEY = \"delay.key\";
public static final String DEAD_LETTER_ROUTING_KEY = \"dead.letter.key\";
// 声明死信交换机
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
}
// 声明死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 声明普通交换机
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(DELAY_EXCHANGE).durable(true).build();
}
// 声明延迟队列并绑定死信交换机
@Bean
public Queue delayQueue() {
Map args = new HashMap();
// 设置死信交换机
args.put(\"x-dead-letter-exchange\", DEAD_LETTER_EXCHANGE);
// 设置死信路由键
args.put(\"x-dead-letter-routing-key\", DEAD_LETTER_ROUTING_KEY);
// 队列级TTL (10秒) - 如果需要消息级TTL可以不设置此参数
args.put(\"x-message-ttl\", 10000);
return QueueBuilder.durable(DELAY_QUEUE)
.withArguments(args)
.build();
}
// 绑定普通队列和普通交换机
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(DELAY_ROUTING_KEY)
.noargs();
}
// 绑定死信队列和死信交换机
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_ROUTING_KEY)
.noargs();
}
}
2.2 生产者 – 发送延迟消息
package com.example.delaynotify.service;
import com.example.delaynotify.config.TtlDelayConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class TtlDelayMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送固定延迟时间的消息(队列级TTL)
public void sendFixedDelayMessage(String message) {
System.out.println(\"发送固定延迟消息: \" + message + \", 时间: \" + System.currentTimeMillis());
rabbitTemplate.convertAndSend(
TtlDelayConfig.DELAY_EXCHANGE,
TtlDelayConfig.DELAY_ROUTING_KEY,
message
);
}
// 发送自定义延迟时间的消息(消息级TTL)
public void sendCustomDelayMessage(String message, long delayMillis) {
System.out.println(\"发送自定义延迟消息: \" + message + \", 延迟时间: \" + delayMillis + \"ms, 时间: \" + System.currentTimeMillis());
rabbitTemplate.convertAndSend(
TtlDelayConfig.DELAY_EXCHANGE,
TtlDelayConfig.DELAY_ROUTING_KEY,
message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息级TTL
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return message;
}
}
);
}
}
2.3 消费者 – 接收延迟消息
package com.example.delaynotify.consumer;
import com.example.delaynotify.config.TtlDelayConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class TtlDelayMessageConsumer {
@RabbitListener(queues = TtlDelayConfig.DEAD_LETTER_QUEUE)
public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException {
try {
System.out.println(\"接收到延迟消息: \" + message + \", 时间: \" + System.currentTimeMillis());
// 处理业务逻辑 - 例如发送通知、更新状态等
processDelayMessage(message);
// 手动确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println(\"消息处理失败: \" + e.getMessage());
// 拒绝消息并丢弃
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}
private void processDelayMessage(String message) {
// 模拟发送通知的业务逻辑
System.out.println(\"执行通知业务: \" + message);
// 这里可以调用邮件、短信、推送等服务
}
}
四、方案二:基于延迟插件实现
1. 安装延迟插件
1.1 Docker环境安装
# 下载插件(根据RabbitMQ版本选择对应版本)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
# 复制插件到容器
docker cp rabbitmq_delayed_message_exchange-3.11.1.ez rabbitmq:/plugins
# 进入容器启用插件
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
# 重启RabbitMQ容器
docker restart rabbitmq
1.2 验证安装
在RabbitMQ管理界面新建交换机时,如果能看到x-delayed-message类型,则表示插件安装成功。
2. 代码实现
2.1 配置类
package com.example.delaynotify.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class PluginDelayConfig {
// 延迟交换机
public static final String DELAY_PLUGIN_EXCHANGE = \"delay_plugin_exchange\";
// 延迟队列
public static final String DELAY_PLUGIN_QUEUE = \"delay_plugin_queue\";
// 路由键
public static final String DELAY_PLUGIN_ROUTING_KEY = \"delay.plugin.key\";
// 声明延迟交换机(类型为x-delayed-message)
@Bean
public CustomExchange delayPluginExchange() {
Map args = new HashMap();
// 设置底层路由模式为direct
args.put(\"x-delayed-type\", \"direct\");
return new CustomExchange(
DELAY_PLUGIN_EXCHANGE,
\"x-delayed-message\",
true, // 持久化
false, // 非自动删除
args
);
}
// 声明延迟队列
@Bean
public Queue delayPluginQueue() {
return QueueBuilder.durable(DELAY_PLUGIN_QUEUE).build();
}
// 绑定延迟交换机和延迟队列
@Bean
public Binding delayPluginBinding() {
return BindingBuilder.bind(delayPluginQueue())
.to(delayPluginExchange())
.with(DELAY_PLUGIN_ROUTING_KEY)
.noargs();
}
}
2.2 生产者 – 发送延迟消息
package com.example.delaynotify.service;
import com.example.delaynotify.config.PluginDelayConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class PluginDelayMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送延迟消息
public void sendDelayMessage(String message, long delayMillis) {
System.out.println(\"使用插件发送延迟消息: \" + message + \", 延迟时间: \" + delayMillis + \"ms, 时间: \" + System.currentTimeMillis());
rabbitTemplate.convertAndSend(
PluginDelayConfig.DELAY_PLUGIN_EXCHANGE,
PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY,
message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置延迟时间(毫秒)
message.getMessageProperties().setDelay((int) delayMillis);
return message;
}
}
);
}
}
2.3 消费者 – 接收延迟消息
package com.example.delaynotify.consumer;
import com.example.delaynotify.config.PluginDelayConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class PluginDelayMessageConsumer {
@RabbitListener(queues = PluginDelayConfig.DELAY_PLUGIN_QUEUE)
public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException {
try {
System.out.println(\"接收到插件延迟消息: \" + message + \", 时间: \" + System.currentTimeMillis());
// 处理业务逻辑 - 例如发送通知、更新状态等
processDelayMessage(message);
// 手动确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println(\"插件延迟消息处理失败: \" + e.getMessage());
// 拒绝消息并丢弃
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}
private void processDelayMessage(String message) {
// 模拟发送通知的业务逻辑
System.out.println(\"执行通知业务: \" + message);
// 这里可以调用邮件、短信、推送等服务
}
}
五、Controller层实现
package com.example.delaynotify.controller;
import com.example.delaynotify.service.PluginDelayMessageService;
import com.example.delaynotify.service.TtlDelayMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DelayNotifyController {
@Autowired
private TtlDelayMessageService ttlDelayMessageService;
@Autowired
private PluginDelayMessageService pluginDelayMessageService;
// 基于TTL的固定延迟
@GetMapping(\"/ttl/fixed\")
public String sendFixedTtlDelay(@RequestParam String message) {
ttlDelayMessageService.sendFixedDelayMessage(message);
return \"固定延迟消息已发送 (10秒)\";
}
// 基于TTL的自定义延迟
@GetMapping(\"/ttl/custom\")
public String sendCustomTtlDelay(@RequestParam String message, @RequestParam long delayMillis) {
ttlDelayMessageService.sendCustomDelayMessage(message, delayMillis);
return \"自定义延迟消息已发送 (\" + delayMillis + \"ms)\";
}
// 基于插件的延迟
@GetMapping(\"/plugin/delay\")
public String sendPluginDelay(@RequestParam String message, @RequestParam long delayMillis) {
pluginDelayMessageService.sendDelayMessage(message, delayMillis);
return \"插件延迟消息已发送 (\" + delayMillis + \"ms)\";
}
}
六、application.yml配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
# 生产者确认配置
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual
prefetch: 1
concurrency: 1
max-concurrency: 5
七、完整的通知场景实现示例
订单超时通知场景
package com.example.delaynotify.service;
import com.example.delaynotify.config.PluginDelayConfig;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Service
public class OrderNotifyService {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\");
/**
* 创建订单并设置超时通知
* @param orderId 订单ID
* @param notifyDelaySeconds 超时时间(秒)
*/
public void createOrderAndSetTimeout(String orderId, int notifyDelaySeconds) {
// 1. 保存订单逻辑
System.out.println(\"创建订单: \" + orderId + \" 时间: \" + LocalDateTime.now().format(formatter));
// 2. 设置延迟通知
String notifyMessage = \"订单[\" + orderId + \"]已超时,需要取消处理\";
long delayMillis = notifyDelaySeconds * 1000L;
System.out.println(\"设置订单超时通知,延迟: \" + notifyDelaySeconds + \"秒\");
// 使用延迟插件发送通知消息
rabbitTemplate.convertAndSend(
PluginDelayConfig.DELAY_PLUGIN_EXCHANGE,
PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY,
notifyMessage,
message -> {
message.getMessageProperties().setDelay((int) delayMillis);
return message;
}
);
}
}
八、两种方案对比与选择建议
1. 性能对比
- TTL+死信队列:当使用消息级TTL时,RabbitMQ需要为每条消息设置过期时间,会造成额外的性能开销
- 延迟插件:插件内部使用优先队列实现,性能更优,特别适合大量不同延迟时间的消息场景
2. 灵活性对比
- TTL+死信队列:如果要支持不同的延迟时间,需要创建多个不同TTL的队列
- 延迟插件:每条消息都可以设置不同的延迟时间,更加灵活
3. 选择建议
- 如果延迟时间固定或种类较少,可以使用TTL+死信队列方案,无需安装插件
- 如果延迟时间不固定或种类较多,强烈建议使用延迟插件方案
- 对于生产环境,建议使用延迟插件方案,性能更好、配置更简洁
通过以上两种方案,您可以根据实际需求选择合适的方式实现RabbitMQ的延迟通知功能,满足订单超时、定时提醒等各种业务场景。
