行业资讯 2025年08月6日
0 收藏 0 点赞 599 浏览 7668 个字
摘要 :

文章目录 一、添加依赖与配置连接信息 二、发布订阅模式(Publish/Subscribe) (一)模式原理 (二)代码实现 (三)基于配置类的发布订阅模式实现 三、路由模式(Ro……




  • 一、添加依赖与配置连接信息
  • 二、发布订阅模式(Publish/Subscribe)
    • (一)模式原理
    • (二)代码实现
    • (三)基于配置类的发布订阅模式实现
  • 三、路由模式(Routing)
    • (一)模式原理
    • (二)代码实现
  • 四、通配符模式(Topics)
    • (一)模式原理
    • (二)代码实现

    RabbitMQ作为一款功能强大的消息队列中间件,与Spring Boot框架的整合相当常见。本文将详细介绍如何在Spring Boot项目中集成RabbitMQ,并深入探讨发布订阅、路由、通配符等多种工作模式的实现方式

    一、添加依赖与配置连接信息

    在开始整合之前,首先要在Spring Boot项目的pom.xml文件中添加RabbitMQ相关依赖,引入spring-boot-starter-amqp依赖后,项目便能使用RabbitMQ的各项功能。具体依赖配置如下:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    接着,在application.properties文件里配置RabbitMQ的连接信息,包括服务器地址、端口、用户名、密码、虚拟主机等。同时,还可以设置一些与消息处理相关的参数:

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    # 配置消息确认模式为简单模式,消息成功处理后,RabbitMQ会发送确认信号
    spring.rabbitmq.publisher-confirm-type=simple 
    # 配置监听器的最小并发数,启动时至少有3个消费者线程处理消息
    spring.rabbitmq.listener.simple.concurrency=3 
    # 配置监听器的最大并发数,负载高时最多启动10个消费者线程
    spring.rabbitmq.listener.simple.max-concurrency=10 
    

    二、发布订阅模式(Publish/Subscribe)

    (一)模式原理

    发布订阅模式下,需配置一个fanout类型的交换器(Exchange)。这种交换器的特点是不依赖路由键(Routing key),它会将接收到的消息广播到与其绑定的每一个消息队列上。每个消息队列都能接收并存储相同的消息,然后由各自关联的消费者进行消费。

    (二)代码实现

    1. 定义交换机、队列与绑定关系
      可以借助AmqpAdmin管理类来定义交换机、队列以及它们之间的绑定关系。以下是示例代码:
    package com.xyu;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    
    @SpringBootTest
    class SpringbootXyuProjectApplicationTests {
        @Autowired
        public AmqpAdmin amqpAdmin;
    
        @Test
        void contextLoads() {
        }
    
        @Test
        public void amqpAdmin() {
            // 定义一个fanout类型的交换机
            amqpAdmin.declareExchange(new FanoutExchange(\"fanout_exchange\")); 
            // 声明两个队列
            amqpAdmin.declareQueue(new Queue(\"fanout_queue_email\")); 
            amqpAdmin.declareQueue(new Queue(\"fanout_queue_sms\")); 
            // 将队列与交换机进行绑定
            amqpAdmin.declareBinding(new Binding(
                    \"fanout_queue_email\",
                    Binding.DestinationType.QUEUE,
                    \"fanout_exchange\",
                    \"\",
                    null
            ));
            amqpAdmin.declareBinding(new Binding(
                    \"fanout_queue_sms\",
                    Binding.DestinationType.QUEUE,
                    \"fanout_exchange\",
                    \"\",
                    null
            ));
        }
    }
    
    1. 消息发送
      创建实体类并进行序列化处理,使用RabbitTemplateconvertAndSend方法发送消息到指定的交换机。示例代码如下:
    // 定义实体类User
    package com.xyu.po;
    public class User {
        private String name;
        private Integer age;
    
        @Override
        public String toString() {
            return \"User{\" +
                    \"name=\'\" + name + \'\\\'\' +
                    \", age=\" + age +
                    \'}\';
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getAge() {
            return age;
        }
    
        public void setAge(Integer age) {
            this.age = age;
        }
    }
    
    // 配置消息转换器,这里使用Jackson2JsonMessageConverter将Java对象转换为JSON字符串
    package com.xyu.Config;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    // 发送消息
    @Test
    public void Publisher() {
        User user = new User();
        user.setAge(18);
        user.setName(\"天天\");
        // 将消息发送到fanout_exchange交换机,路由键为空
        rabbitTemplate.convertAndSend(\"fanout_exchange\", \"\", user); 
    }
    
    1. 消息监听
      使用@RabbitListener注解标记消息接收方法,Spring Boot会自动监听指定队列中的消息并调用相应方法进行处理。示例代码如下:
    package com.xyu.Service;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class RabbitMqListenerService {
        @RabbitListener(queues = \"fanout_queue_email\")
        public void psubConsumerEmail(Message message) {
            byte[] body = message.getBody();
            String s = new String(body);
            System.out.println(\"邮件业务接收到消息: \" + s);
        }
    
        @RabbitListener(queues = \"fanout_queue_sms\")
        public void psubConsumerSms(Message message) {
            byte[] body = message.getBody();
            String s = new String(body);
            System.out.println(\"短信业务接收到消息: \" + s);
        }
    }
    

    (三)基于配置类的发布订阅模式实现

    除了上述方式,还可以通过配置类来创建交换机、队列和绑定关系,同时启用RabbitMQ的注解功能。示例代码如下:

    package com.xyu.Config;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableRabbit
    // @EnableRabbit注解用于启用基于注解的RabbitMQ消息处理功能
    public class RabbitMQConfig {
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(\"exchange_my\");
        }
    
        @Bean
        public Queue smsQueue() {
            return new Queue(\"fanout_queue_tem\", true); // 队列持久化
        }
    
        @Bean
        public Queue emailQueue() {
            return new Queue(\"fanout_queue_num\", true); // 队列持久化
        }
    
        @Bean
        public Binding smsBinding() {
            return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
        }
    
        @Bean
        public Binding emailBinding() {
            return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
        }
    
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    // 接收消息
    @RabbitListener(bindings = @QueueBinding(value =
            @Queue(\"fanout_queue_tem\"), exchange =
            @Exchange(value = \"exchange_my\", type = \"fanout\")))
    public void psubConsumerEmailAno(User user) {
        System.out.println(\"1邮件业务接收到消息: \" + user);
    }
    
    @RabbitListener(bindings = @QueueBinding(value =
            @Queue(\"fanout_queue_num\"), exchange =
            @Exchange(value = \"exchange_my\", type = \"fanout\")))
    public void psubConsumerSmsAno(User user) {
        System.out.println(\"1短信业务接收到消息: \" + user);
    }
    
    // 发送消息
    @Test
    public void Publisher() {
        User user = new User();
        user.setAge(18);
        user.setName(\"天天1\");
        // 向两个交换机发送相同消息
        rabbitTemplate.convertAndSend(\"fanout_exchange\", \"\", user); 
        rabbitTemplate.convertAndSend(\"exchange_my\", \"\", user);
    }
    

    三、路由模式(Routing)

    (一)模式原理

    路由模式中,需配置一个direct类型的交换器,并为不同的消息指定特定的路由键。交换器会根据路由键将消息准确地路由到与之匹配的消息队列中,消费者从各自对应的队列中获取消息进行消费。

    (二)代码实现

    1. 定义交换机、队列与绑定关系
    // 定义direct类型的交换机、队列和绑定关系
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(\"routing_exchange\");
    }
    
    @Bean
    public Queue routeQueue() {
        return new Queue(\"routing_queue_all\", true);
    }
    
    @Bean
    public Binding routeBinding() {
        return BindingBuilder.bind(routeQueue()).to(directExchange()).withQueueName();
    }
    
    1. 消息发送与接收
    // 发送消息
    @Test
    public void routingPublisher() {
        rabbitTemplate.convertAndSend(\"routing_exchange\",
                \"error_routing_key\",
                \"routing send error message\");
    }
    
    // 接收消息
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = \"routing_queue_all\"),
                    exchange = @Exchange(value = \"routing_exchange\", type = \"direct\"),
                    key = \"error_routing_key\")
    )
    public void routingConsumerError(String message) {
        System.out.println(\"接收到error级别日志消息: \" + message);
    }
    

    四、通配符模式(Topics)

    (一)模式原理

    通配符模式同样使用topic类型的交换器,它在路由键匹配上更加灵活,支持使用通配符(#*)。#代表零个或多个单词,*代表一个单词,通过这种方式可以实现更细粒度的消息路由。

    (二)代码实现

    1. 定义交换机、队列与绑定关系
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(\"topic_exchange\");
    }
    
    @Bean
    public Queue topic_queue_email() {
        return new Queue(\"topic_queue_email\", true);
    }
    
    @Bean
    public BindingBuilder.TopicExchangeRoutingKeyConfigurer topicBinding() {
        return BindingBuilder.bind(topic_queue_email()).to(topicExchange());
    }
    
    1. 消息发送与接收
    // 发送消息
    @Test
    public void topicPublisher() {
        rabbitTemplate.convertAndSend(\"topic_exchange\",
                \"info.email\",
                \"topics send email message\");
        rabbitTemplate.convertAndSend(\"topic_exchange\",
                \"info.sms\",
                \"topics send sms message\");
        rabbitTemplate.convertAndSend(\"topic_exchange\",
                \"info.email.sms\",
                \"topics send email and sms message\");
    }
    
    // 接收消息
    @RabbitListener(bindings = @QueueBinding(value =
            @Queue(\"topic_queue_email\"), exchange =
            @Exchange(value = \"topic_exchange\", type = \"topic\"),
            key = \"info.#.sms.#\"))
    public void topicConsumerSms(String message) {
        System.out.println(\"接收到短信订阅需求处理消息: \" + message);
    }
    

    通过以上详细的步骤和代码示例,我们可以在Spring Boot项目中顺利集成RabbitMQ,并灵活运用多种工作模式满足不同的业务需求。希望本文能帮助大家更好地理解和掌握Spring Boot与RabbitMQ的整合步骤。

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10363.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号