SpringBoot整合RabbitMQ(基础)

一.环境准备

1、在pom文件中引入对应的依赖:

        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        

2、在application.yml配置文件中配置RabbitMQ:

spring:
    #rabbitmq配置
  rabbitmq:
    host: 192.168.150.152#rabbitMq的服务器地址
    port: 5672
    username: root #用户名
    password: 123456 #密码
    virtual-host: /hjl #虚拟主机
二、整合
  1. 点对点,简单模式

    在这里插入图片描述

    ①配置文件中声明队列

@SpringBootConfigurationpublic class RabbitMqConfig {    /**     * hello队列名称     */    public static final String HELLO_MSG_QUEUE = "hello.msg.queue";       /**     * 声明hello队列     *     * @return     */    @Bean    public Queue getHelloQueue() {    	//参数一:队列名;参数二:是否持久化队列        return new Queue(HELLO_MSG_QUEUE, true);    }}

②创建生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendHello() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.HELLO_MSG_QUEUE, "hello world" + i);
        }
    }
}

消息发送成功后,在web管理页面查看:在这里插入图片描述

可以看到对应队列中产生了消息

③创建消费者

@Component
public class RabbitMqConsumer {
    @RabbitListener(queues = RabbitMqConfig.HELLO_MSG_QUEUE)
    public void listenHelloMsg(String message) {
        System.out.println("接受时间:"+System.currentTimeMillis());
        System.out.println("转发消息是:" + message);
    }
}

启动项目,可以看到消息成功消费:

在这里插入图片描述

  1. 工作队列(多个消费者对应一个队列)

    在这里插入图片描述

    ①声明队列

@SpringBootConfigurationpublic class RabbitMqConfig {    /**     * work队列名称     */    public static final String WORK_MSG_QUEUE = "work.msg.queue";    /**     * 声明work队列     *     * @return     */    @Bean    public Queue getWorkQueue() {        //参数一:队列名;参数二:是否持久化队列        return new Queue(WORK_MSG_QUEUE, true);    }}

②创建生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class WorkMqTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.WORK_MSG_QUEUE, "这是一条工作队列消息" + i);
        }
    }
}

③创建消费者(多个)

@Component
public class RabbitMqConsumer {
    @RabbitListener(queues = RabbitMqConfig.WORK_MSG_QUEUE)
    public void listenWork1(String message) {
        System.out.println("消费者一转发消息是:" + message);
    }
    @RabbitListener(queues = RabbitMqConfig.WORK_MSG_QUEUE)
    public void listenWork2(String message) {
        System.out.println("消费者二转发消息是:" + message);
    }
}

在这里插入图片描述

可以看到两个消费者都成功消费量word队列中的消息

  1. 发布订阅模式

    在这里插入图片描述

    ①声明队列,交换机并绑定

@SpringBootConfigurationpublic class RabbitMqConfig {   /**     * publish队列1     */    public static final String PUBLISH_MSG_QUEUE1 = "publish.msg.queue1";    /**     * publish队列2     */    public static final String PUBLISH_MSG_QUEUE2 = "publish.msg.queue2";    /**     * publish交换机     */    public static final String PUBLISH_EXCHANGE = "publish.exchange";       /**     * Publish队列     *     * @return     */    @Bean    public Queue getPublishQueue1() {        return new Queue(PUBLISH_MSG_QUEUE1, true);    }    /**     * Publish队列     *     * @return     */    @Bean    public Queue getPublishQueue2() {        return new Queue(PUBLISH_MSG_QUEUE2, true);    }    /**     * Publish交换机     *     * @return     */    @Bean    public FanoutExchange publishExchange() {        FanoutExchange exchange = new FanoutExchange(PUBLISH_EXCHANGE, true, false);        return exchange;    }    /**     * 绑定队列和交换机     *     * @return     */    @Bean    public Binding bindPublishExchangeQueue1() {        Binding binding = BindingBuilder.bind(getPublishQueue1()).to(publishExchange());        return binding;    }    /**     * 绑定队列和交换机     *     * @return     */    @Bean    public Binding bindPublishExchangeQueue2() {        Binding binding = BindingBuilder.bind(getPublishQueue2()).to(publishExchange());        return binding;    }

②创建生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class PublishMqTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send() {
        for (int i = 0; i < 10; i++) {
            //参数一:交换机名称;参数二:routingKey(广播模式不传);参数三:消息体
            rabbitTemplate.convertAndSend(RabbitMqConfig.PUBLISH_EXCHANGE, null, "这是一条工作队列消息" + i);
        }
    }
}

③创建消费者(多个)

@Component
public class PublishConsumer {
    @RabbitListener(queues = RabbitMqConfig.PUBLISH_MSG_QUEUE1)
    public void listenDead1(String message) {
        System.out.println("消费者一接收消息:" + message);
    }
    @RabbitListener(queues = RabbitMqConfig.PUBLISH_MSG_QUEUE2)
    public void listenDead2(String message) {
        System.out.println("消费者二接收消息:" + message);
    }
}

在这里插入图片描述

可以看到两个消费之都接收了生产者所有的消息;与工作队列不同的是,工作队列的消费者只消费部分消息,而此模式是消费所有。

  1. 路由模式

在这里插入图片描述

①声明队列

@SpringBootConfiguration
public class RabbitMqConfig {
   
    /**
     * routing交换机
     */
    public static final String ROUTING_EXCHANGE = "routing.exchange";
    /**
     * routing队列1
     */
    public static final String ROUTING_MSG_QUEUE1 = "routing.msg.queue1";
    /**
     * routing队列2
     */
    public static final String ROUTING_MSG_QUEUE2 = "routing.msg.queue2";

    /**
     * routing队列
     *
     * @return
     */
    @Bean
    public Queue getRoutingQueue1() {
        return new Queue(ROUTING_MSG_QUEUE1, true);
    }

    /**
     * routing队列
     *
     * @return
     */
    @Bean
    public Queue getRoutingQueue2() {
        return new Queue(ROUTING_MSG_QUEUE2, true);
    }

    /**
     * Publish交换机
     *
     * @return
     */
    @Bean
    public DirectExchange routingExchange() {
        DirectExchange exchange = new DirectExchange(ROUTING_EXCHANGE, true, false);
        return exchange;
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindRoutingExchangeQueue1() {
        Binding binding = BindingBuilder.bind(getRoutingQueue1()).to(routingExchange()).with("routingKey1");
        return binding;
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindRoutingExchangeQueue2() {
        Binding binding = BindingBuilder.bind(getRoutingQueue2()).to(routingExchange()).with("routingKey2");
        return binding;
    }
}

②创建生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class RoutingMqTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send() {
            //参数一:交换机名称;参数二:routingKey(交换机与队列绑定的key);参数三:消息体
            rabbitTemplate.convertAndSend(RabbitMqConfig.ROUTING_EXCHANGE, "routingKey1", "队列一:这是一条路由消息消息");
            rabbitTemplate.convertAndSend(RabbitMqConfig.ROUTING_EXCHANGE, "routingKey2", "队列二:这是一条路由消息消息");
    }
}

③创建消费者

@Component
public class RoutingConsumer {
    @RabbitListener(queues = RabbitMqConfig.ROUTING_MSG_QUEUE1)
    public void listenDead1(String message) {
        System.out.println("消费者一接收消息:" + message);
    }

    @RabbitListener(queues = RabbitMqConfig.ROUTING_MSG_QUEUE2)
    public void listenDead2(String message) {
        System.out.println("消费者二接收消息:" + message);
    }
}

运行结果:

在这里插入图片描述

x显而易见:与发布订阅模式不同的是,此模式需要将交换机与队列通过routingKey绑定,并且生产者可以通过指定routingKey,可以将消息发送到指定队列中

  1. 通配符模式

    在这里插入图片描述

    ①声明队列,交换机

@SpringBootConfigurationpublic class RabbitConfig {    /**     * topic交换机     */    public static final String TOPIC_EXCHANGE = "topic.exchange";    /**     * topic队列1     */    public static final String TOPIC_MSG_QUEUE1 = "topic.msg.queue1";    /**     *topic队列2     */    public static final String TOPIC_MSG_QUEUE2 = "topic.msg.queue2";    /**     * routing队列     *     * @return     */    @Bean    public Queue getTopicQueue1() {        return new Queue(TOPIC_MSG_QUEUE1, true);    }    /**     * routing队列     *     * @return     */    @Bean    public Queue getTopicQueue2() {        return new Queue(TOPIC_MSG_QUEUE2, true);    }    /**     * Publish交换机     *     * @return     */    @Bean    public TopicExchange topIcExchange() {        TopicExchange exchange = new TopicExchange(TOPIC_EXCHANGE, true, false);        return exchange;    }    /**     * 绑定队列和交换机     *     * @return     */    @Bean    public Binding bindTopicExchangeQueue1() {        Binding binding = BindingBuilder.bind(getTopicQueue1()).to(topIcExchange()).with("topKey.*");        return binding;    }    /**     * 绑定队列和交换机     *     * @return     */    @Bean    public Binding bindTopicExchangeQueue2() {        Binding binding = BindingBuilder.bind(getTopicQueue2()).to(topIcExchange()).with("topKey.#");        return binding;    }}

②创建生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class TopicMqTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send() {
        //参数一:交换机名称;参数二:routingKey(广播模式不传);参数三:消息体
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "topic.key1", "这是一条通配符模式消息一");
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "topic.key1.key2", "这是一条通配符模式消息二");
    }
}

③创建消费者

@Component
public class TopicConsumer {
    @RabbitListener(queues = RabbitConfig.TOPIC_MSG_QUEUE1)
    public void listenDead1(String message) {
        System.out.println("消费者一接收消息:" + message);
    }

    @RabbitListener(queues = RabbitConfig.TOPIC_MSG_QUEUE2)
    public void listenDead2(String message) {
        System.out.println("消费者二接收消息:" + message);
    }
}

运行结果如下:

在这里插入图片描述

可以看到:与路由模式不同的是topic支持通配符模式的路由key;特别的是”*“只能代替一个单词;而”#”可以代替多个;

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/fc1936ab1b.html