行业资讯 2025年08月6日
0 收藏 0 点赞 668 浏览 4794 个字
摘要 :

文章目录 一、死信消息与死信队列的概念 (一)死信消息 (二)死信队列 二、死信消息产生的常见原因 三、死信策略的核心内容 四、死信队列的作用 五、死信队列的简单……




  • 一、死信消息与死信队列概念
    • (一)死信消息
    • (二)死信队列
  • 二、死信消息产生的常见原因
  • 三、死信策略的核心内容
  • 四、死信队列的作用
  • 五、死信队列的简单实现示例(RabbitMQ)
  • 六、消息队列的其他基础知识
    • (一)消息
    • (二)主题
    • (三)队列
    • (四)消费模式
    • (五)消息模型
    • (六)可靠性保障
    • (七)异步处理与削峰
    • (八)延时和优先级
    • (九)性能监控与优化
    • (十)流控与幂等性
    • (十一)架构设计
  • 七、典型死信策略配置示例(RabbitMQ)
  • 八、总结

现代分布式系统里,消息队列是个极为关键的组件,它能实现系统之间的异步通信,还能起到解耦的作用。本文会用通俗易懂的语言,为大家介绍消息队列中的死信策略,同时拓展相关基础知识,文中还配有示例代码,方便国内开发者学习和理解。

一、死信消息与死信队列的概念

(一)死信消息

死信消息,简单来说,就是那些消费者没办法正常处理的消息。像消息格式不对、数据出现异常、消费失败即便重试多次也没成功,或者消息过了有效期等情况,这些消息就会成为死信消息。

(二)死信队列

死信队列(Dead Letter Queue,简称DLQ),是专门用来存放死信消息的队列。一旦死信消息产生,就会被转移到死信队列里,这样就不会影响正常的业务流程了。

二、死信消息产生的常见原因

  1. 消息被拒绝:当消费者调用拒绝接口,并且选择不把消息重新放回队列时,这条消息就会变成死信消息。
  2. 消息过期:如果消息设置了存活时间(TTL),在这个时间内一直没被消费,时间一到,消息就过期了,也就成了死信消息。
  3. 队列满了:队列都有最大长度限制,当达到这个上限后,新的消息就无法再进入队列,这些无法入队的消息也可能成为死信消息。
  4. 消息格式或内容异常:消息的格式或者内容出现问题,导致消费者没办法处理,这样的消息也会变成死信消息。

三、死信策略的核心内容

  1. 消息转为死信条件:常见的情况有消费失败且重试次数达到上限、被消费者拒绝且不重试、消息过期、消息格式异常等。
  2. 死信消息处理方式:死信消息不会再被正常的消费者消费,而是会被转移到死信队列中,这样后续可以进行人工处理或者通过程序自动处理。
  3. 死信消息保存时间:一般来说,死信消息的保存时间和正常消息是一样的,比如设置为3天。而且消息进入死信队列后,保存时间会重新开始计算。
  4. 配置建议:为了避免死信消息循环,死信队列最好和业务队列分开设置。同时,要注意监控死信消息的相关指标,这样便于及时发现问题。

四、死信队列的作用

  1. 提高系统稳定性:死信队列可以把异常消息隔离出去,防止它们阻塞正常的业务流程,让系统运行更加稳定。
  2. 方便问题排查:所有的异常消息都集中存放在死信队列里,方便开发人员去分析问题,还能对出现问题的业务进行补偿操作。
  3. 保证消息完整性:通过死信队列,避免了消息丢失的情况,保证了业务数据的一致性。

五、死信队列的简单实现示例(RabbitMQ)

下面基于RabbitMQ,给大家展示一个死信队列的配置和使用示例,帮助大家理解它的工作流程。

import com.rabbitmq.client.*;

public class DeadLetterExample {
    // 定义业务交换机名称
    private static final String EXCHANGE_NAME = \"business_exchange\";
    // 定义业务队列名称
    private static final String QUEUE_NAME = \"business_queue\";
    // 定义死信交换机名称
    private static final String DLX_EXCHANGE = \"dead_letter_exchange\";
    // 定义死信队列名称
    private static final String DLQ_NAME = \"dead_letter_queue\";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(\"localhost\");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明死信交换机和死信队列
            channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DLQ_NAME, true, false, false, null);
            channel.queueBind(DLQ_NAME, DLX_EXCHANGE, \"dlx_routing_key\");

            // 业务队列参数,绑定死信交换机和路由键
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put(\"x-dead-letter-exchange\", DLX_EXCHANGE);
            argsMap.put(\"x-dead-letter-routing-key\", \"dlx_routing_key\");
            argsMap.put(\"x-message-ttl\", 10000); // 消息10秒后过期

            // 声明业务交换机和业务队列
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, \"business_key\");

            // 发送一条消息
            String message = \"Hello, RabbitMQ with DLQ!\";
            channel.basicPublish(EXCHANGE_NAME, \"business_key\", null, message.getBytes(\"UTF-8\"));
            System.out.println(\"Sent message: \" + message);

            // 消费业务队列,模拟拒绝消息,触发死信
            channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
                String msg = new String(delivery.getBody(), \"UTF-8\");
                System.out.println(\"Received message: \" + msg);
                // 拒绝消息且不重新入队,消息进入死信队列
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }, consumerTag -> {});

            // 消费死信队列
            channel.basicConsume(DLQ_NAME, true, (consumerTag, delivery) -> {
                String deadMsg = new String(delivery.getBody(), \"UTF-8\");
                System.out.println(\"Dead letter received: \" + deadMsg);
            }, consumerTag -> {});

            // 保持程序运行,观察输出
            Thread.sleep(20000);
        }
    }
}

代码说明

  • 业务队列设置了x-dead-letter-exchangex-dead-letter-routing-key,这样当消息被拒绝或者过期时,就会自动转发到死信交换机,再由死信交换机根据路由键把消息路由到死信队列。
  • 消费者在处理业务队列消息时,调用basicReject方法拒绝消息,并且第二个参数设置为false,表示不把消息重新放回队列,这样消息就会进入死信队列。
  • 通过消费死信队列,可以查看进入死信队列的消息,方便后续处理。

六、消息队列的其他基础知识

(一)消息

消息是传递数据的载体,它不仅包含实际的数据内容(消息体),还带有一些属性。

(二)主题

主题是消息的分类标识。生产者把消息发送到特定的主题,消费者通过订阅感兴趣的主题来获取消息。

(三)队列

队列是用来存储消息的容器,一般按照先进先出(FIFO)的顺序处理消息。

(四)消费模式

  1. 集群消费:这种模式下,多个消费者共同处理消息,实现负载均衡,提高消费效率。
  2. 广播消费:所有的消费者都会接收到每一条消息。

(五)消息模型

  1. 点对点模型:一个消息只会被一个消费者消费。
  2. 发布/订阅模型:多个消费者可以订阅同一个消息,都能接收到这条消息。

(六)可靠性保障

  1. 消息确认(ACK):消费者处理完消息后,向消息队列发送确认消息,确保消息不会丢失。
  2. 重试机制:当消息消费失败时,会按照一定的策略进行重试。
  3. 顺序消息保证:确保消息按照特定的顺序被消费。

(七)异步处理与削峰

生产者发送消息后不需要等待处理结果,直接返回。在流量高峰时,消息队列可以缓冲大量消息,平滑系统负载。

(八)延时和优先级

  1. 延时消息:可以设置消息在指定时间后才被消费。
  2. 优先级消息:重要的消息可以设置更高的优先级,优先被处理。

(九)性能监控与优化

通过监控消息队列的响应时间、传输速率、积压数量等指标,采取增加消费者、批量处理消息等方式,对消息队列的性能进行优化。

(十)流控与幂等性

  1. 流控:防止消费者因为消息过多而导致过载。
  2. 幂等性:保证消息无论被消费多少次,对业务的影响都是一样的,避免重复消费带来的问题,确保业务的一致性。

(十一)架构设计

采用分布式部署和多级队列设计,可以提升消息队列的容错能力和扩展能力。

七、典型死信策略配置示例(RabbitMQ)

# 声明死信交换机
rabbitmqadmin declare exchange name=dlx_exchange type=direct durable=true

# 声明死信队列
rabbitmqadmin declare queue name=dlq durable=true

# 绑定死信队列到死信交换机
rabbitmqadmin declare binding source=dlx_exchange destination=dlq routing_key=dlx_key

# 声明业务队列,绑定死信交换机和路由键
rabbitmqadmin declare queue name=business_queue durable=true arguments=\'{\"x-dead-letter-exchange\":\"dlx_exchange\",\"x-dead-letter-routing-key\":\"dlx_key\",\"x-message-ttl\":60000}\'

# 绑定业务队列到业务交换机
rabbitmqadmin declare binding source=business_exchange destination=business_queue routing_key=business_key

上述命令分别完成了声明死信交换机、死信队列,绑定死信队列到死信交换机,声明业务队列并绑定死信交换机和路由键,以及绑定业务队列到业务交换机的操作。

八、总结

死信策略是消息队列处理异常消息的重要手段,能把无法正常消费的消息隔离到死信队列,保证正常业务不受影响。死信消息产生的原因多样,通过合理配置死信交换机和死信队列,系统可以自动转移死信消息,便于后续处理。监控死信消息指标,对于保障消息系统的稳定运行至关重要。除了死信策略,消息队列还有很多其他的基础知识点,如消息模型、消费模式等。希望通过本文的介绍和示例代码,能帮助大家快速掌握死信策略的核心概念和实现方法。

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10529.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号