行业资讯 2025年08月6日
0 收藏 0 点赞 443 浏览 2358 个字
摘要 :

文章目录 一、消息发送端保障顺序性 二、消息消费端保障顺序性 三、加锁机制的扩展知识:第三把锁的作用 四、顺序消费存在的问题 RocketMQ作为一款高性能的消息队列,……




  • 一、消息发送端保障顺序
  • 二、消息消费端保障顺序性
  • 三、加锁机制的扩展知识:第三把锁的作用
  • 四、顺序消费存在的问题

RocketMQ作为一款高性能的消息队列,和Kafka类似,它支持基于队列(分区)的顺序消费,即同一队列内的消息能保证有序,不同队列间的消息则无序。下面,咱们就深入探讨下在RocketMQ中如何保证消息的顺序性。

一、消息发送端保障顺序性

当我们以RocketMQ生产者的身份发送顺序消息时,需要在send方法里传入一个MessageQueueSelector。这个MessageQueueSelector里有个select方法,它的作用是决定消息该被发送到哪个MessageQueue。通常,我们会用取模法来进行路由,具体代码如下:

SendResult sendResult = producer.send(
        msg,
        new MessageQueueSelector() {
            @Override
            // mqs:该Topic下所有可选的MessageQueue
            // msg:待发送的消息
            // arg:发送消息时传递的参数
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                //根据参数,计算出一个要接收消息的MessageQueue的下标
                int index = id % mqs.size();
                //返回这个MessageQueue
                return mqs.get(index);
            }
        },
        orderId);

这段代码里,arg参数会被转化为整数id,然后通过idmqs(该Topic下所有可选的MessageQueue列表)的大小取模,得到一个下标,进而确定消息要发送到的MessageQueue。需要注意的是,这里建议使用同步发送的方式,这样才能保证消息按顺序发送到指定队列。

二、消息消费端保障顺序性

消息按顺序发送到消息队列后,消费者要如何按发送顺序消费呢?RocketMQ的MessageListener回调函数提供了两种消费模式:有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。若要实现顺序消费,得采用有序消费模式,示例代码如下:

consumer.registerMessageListener(
    new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.printf(\"Receive order msg:\" + new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
});

当我们用这种方式注册一个消费者后,为确保同一队列中的有序消息能顺序消费,还得保证RocketMQ的Broker只会把消息发送到同一个消费者上,这就需要加锁了。

ConsumeMessageOrderlyService初始化时,会启动一个定时任务,该任务会尝试向Broker为当前消费者客户端申请分布式锁。如果成功获取锁,后续消息就会只发给这个Consumer。

消息拉取过程中,消费者会一次性拉取多条消息,并将这些消息放入ProcessQueue,同时提交到消费线程池执行。那拉取后的消费过程如何保证顺序性呢?这就涉及到更多的锁机制了。

RocketMQ在消费时,需要申请MessageQueue锁,确保同一时间只有一个线程能处理队列中的消息。获取MessageQueue锁后,就可以从ProcessQueue中依次拉取一批消息处理。但在这个过程中,为避免消息重复消费,还得对ProcessQueue加锁,这部分内容在后面扩展知识中会详细展开。之后,就可以开始处理业务逻辑了。

总的来说,为保障消息顺序性,RocketMQ在消费过程中进行了三次加锁:首先锁定Broker上的MessageQueue,确保消息只投递到唯一的消费者;接着对本地的MessageQueue加锁,保证只有一个线程能处理该消息队列;最后对存储消息的ProcessQueue加锁,防止在重平衡过程中出现消息重复消费。RocketMQ消息如何实现顺序性:原理、实现与问题解析

三、加锁机制的扩展知识:第三把锁的作用

前面提到客户端加锁过程中有三把锁,可能有人会疑惑:第三把锁如果不加,是不是也没问题呢?毕竟我们已经对MessageQueue加锁了,为什么还需要对ProcessQueue再次加锁?

这主要是考虑到重平衡的情况。当消费者集群新增消费者,发生重平衡时,原本由客户端A消费的某个队列,可能会重新分配给客户端B。这时,客户端A需要把加在Broker上的锁解掉。但在解锁过程中,要确保消息不会在消费过程中被移除。因为如果客户端A正在处理一部分消息,而位点信息还未提交,此时若客户端B立马去消费队列中的消息,就可能导致部分数据被重复消费。

那如何判断消息是否正在消费中呢?这就要通过ProcessQueue上的锁来判断。也就是说,解锁的线程也需要尝试对ProcessQueue加锁,只有加锁成功才能进行解锁操作,以此避免在解锁过程中有消息被消费。

四、顺序消费存在的问题

通过上述介绍可知,RocketMQ的顺序消费是通过在消费者上多次加锁实现的。这种方式虽然保证了消息顺序性,但也带来了一些问题。比如,加锁操作会降低系统的吞吐量,而且如果前面的消息出现阻塞,后续更多消息也会跟着阻塞。所以在实际应用中,顺序消息要谨慎使用,需根据具体业务场景权衡利弊。

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10497.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号