行业资讯 2025年08月6日
0 收藏 0 点赞 847 浏览 4507 个字
摘要 :

文章目录 一、告别ZooKeeper,开启KRaft 二、新消费者组协议 三、支持队列功能 四、Java版本要求升级 五、API更新 六、代码示例 使用Kafka 4.0的KRaft模式创建主题 使……




  • 一、告别ZooKeeper,开启KRaft
  • 二、新消费者组协议
  • 三、支持队列功能
  • 四、Java版本要求升级
  • 五、API更新
  • 六、代码示例
    • 使用Kafka 4.0的KRaft模式创建主题
    • 使用新消费者组协议
    • 队列功能示例

    Apache Kafka一直占据着消息队列技术方向重要地位。2025年3月18日,Kafka迎来了4.0版本的更新,这次更新带来了许多令人瞩目的新特性,对开发者和运维人员来说意义重大。下面咱们就详细聊聊这些新变化。

    一、告别ZooKeeper,开启KRaft

    在Kafka 4.0版本中,最大的亮点之一就是默认运行在KRaft模式下,这意味着它不再依赖Apache ZooKeeper。以前,ZooKeeper的存在虽然为Kafka提供了协调服务,但也增加了部署和管理的复杂性。现在,Kafka 4.0去掉了这一依赖,使得整个部署过程变得更加简单直接。运维人员不用再花费大量精力去维护ZooKeeper集群,降低了运营成本的同时,还提高了系统的可扩展性,管理任务也大大简化了。

    二、新消费者组协议

    Kafka 4.0引入了新的消费者组协议(KIP – 848),这个更新主要是为了优化重新平衡性能。在大规模部署场景下,消费者组的重新平衡操作经常会导致停机时间延长和延迟增加。而新协议将相关逻辑转移到了代理端,很好地解决了这些问题,提升了消费者组的可靠性和响应速度。简单来说,就是在处理大量消息时,系统能够更加稳定、高效地运行。

    三、支持队列功能

    Kafka 4.0新增的队列功能(KIP – 932),支持了传统队列语义。以前,Kafka在消息处理模式上有一定的局限性,现在通过这个功能,允许多个消费者协同处理同一个分区的消息。这就使得Kafka可以应用在更多场景中,尤其是那些需要点对点消息模式的场景,它变成了一个更通用的消息平台。

    四、Java版本要求升级

    随着Kafka 4.0的发布,对Java版本的要求也有所改变。Kafka客户端和Kafka Streams现在需要Java 11的支持,而Kafka代理、Connect和相关工具则需要Java 17。在项目升级到Kafka 4.0时,开发者需要注意确保Java环境符合要求,以免出现兼容性问题。

    五、API更新

    为了让平台更加简洁,同时鼓励开发者采用新功能,Kafka 4.0删除了至少12个月前被废弃的API。这一操作虽然可能会对部分依赖旧API的项目造成一定影响,但从长远来看,有助于Kafka生态系统的健康发展,让开发者能够更快地接触和使用到新的、更强大的功能。

    六、代码示例

    使用Kafka 4.0的KRaft模式创建主题

    下面这段代码展示了如何在KRaft模式下创建Kafka主题:

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaTopicCreator {
        public static void main(String[] args) {
            // 配置Kafka连接属性
            Properties props = new Properties();
            // 设置Kafka服务器地址
            props.put(\"bootstrap.servers\", \"localhost:9092\");
            // 设置消息确认机制
            props.put(\"acks\", \"all\");
    
            // 创建AdminClient实例,用于管理Kafka集群
            AdminClient adminClient = AdminClient.create(props);
    
            // 创建一个新的主题,指定主题名为my-topic,分区数为1,副本因子为1
            NewTopic newTopic = new NewTopic(\"my-topic\", 1, (short) 1);
            // 创建主题并获取创建结果
            CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
    
            try {
                // 输出主题创建结果
                System.out.println(\"Topic created: \" + result.all().get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    使用新消费者组协议

    接下来的代码示例展示了如何使用新消费者组协议:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            // 配置消费者属性
            Properties props = new Properties();
            // 设置Kafka服务器地址
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");
            // 设置消费者组ID
            props.put(ConsumerConfig.GROUP_ID_CONFIG, \"my-group\");
            // 设置键的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 设置值的反序列化类
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 创建Kafka消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 订阅主题my-topic
            consumer.subscribe(Collections.singleton(\"my-topic\"));
    
            while (true) {
                // 拉取消息,设置拉取超时时间为100毫秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 输出消息内容
                    System.out.println(record.value());
                }
                // 同步提交消费位移,确保消息被正确处理
                consumer.commitSync();
            }
        }
    }
    

    队列功能示例

    下面的代码展示了如何在Kafka中实现类似队列的行为:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaQueueExample {
        public static void main(String[] args) {
            // 配置消费者属性
            Properties props = new Properties();
            // 设置Kafka服务器地址
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");
            // 设置消费者组ID
            props.put(ConsumerConfig.GROUP_ID_CONFIG, \"my-queue-group\");
            // 设置键的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 设置值的反序列化类
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 创建Kafka消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 订阅主题my-queue-topic
            consumer.subscribe(Collections.singleton(\"my-queue-topic\"));
    
            while (true) {
                // 拉取消息,设置拉取超时时间为100毫秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 输出消息内容
                    System.out.println(record.value());
                    // 处理消息后,手动确认以避免重复消费
                    consumer.commitSync(Collections.singleton(record));
                }
            }
        }
    }
    

    通过这些示例,希望能帮助大家更好地理解和使用Kafka 4.0的新特性。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10347.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号