Featured image of post RabbitMQ 快速入门

RabbitMQ 快速入门

RabbitMQ 对于 Java 程序员的快速上手使用。

RabbitMQ 核心概念及整体架构

  • virtual-host:虚拟主机,起到数据隔离的作用。
  • publisher:消息发送者。
  • sconsumer:消息的消费者。
  • queue:队列,存储消息。
  • exchange:交换机,负责路由消息。

Springboot 项目使用 RabbitMQ

依赖引入

pom.xml:

1
2
3
4
5
6
7
8
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>

配置

application.yaml:

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: localhost	
    port: 5672	
    username: guest	
    password: guest	
    virtual-host: /	

configuration 类配置序列化:

1
2
3
4
5
6
7
8
9
@Configuration
public class RabbitMQMessageConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

测试

publisher 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void testConnection() {
        // 队列名称
        String queueName = "test.queue";
        // 消息
        String message = "test";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

consumer 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Component
@Slf4j
public class TestListener {

    @RabbitListener(queues = "test.queue")
    public void listenTestQueue(String message) {
        System.out.println(message);
    }

}

声明队列和交换机的方式

配置类

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.mincai.study.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author limincai
 */
@Configuration
public class FanoutConfig {

    /**
     * 声明 Fanout 交换机
     */
    @Bean
    public FanoutExchange fanoutExchange1() {
        // return ExchangeBuilder.fanoutExchange("exchange.fanout").build();
        return new FanoutExchange("exchange.fanout");
    }


    /**
     * 声明一个队列
     */
    @Bean
    public Queue fanoutQueue1() {
        // return QueueBuilder.durable("fanout.queue1").build();
        // 用下面这种方式,默认持久化
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列1和交换机1
     */
    @Bean
    public Binding fanoutBuilding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
    }

}

注解

在监听的方法上使用 @RabbitListener 注解

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Component
public class TestListener {

    @RabbitListener(
            bindings = @QueueBinding(
            // 绑定的队列
            value = @Queue(value = "fanout.queue1", durable = "true"),
            // 绑定的交换机
            exchange = @Exchange(value = "fanout.exchange1", type = ExchangeTypes.FANOUT),
            // Binging Key
            key = "fanoutkey"))
    public void listenFanoutQueue1(String message) {
        System.err.println("消费者1收到了 fanout queue1 的消息:" + message);
    }
}

Work Queues

让多个消费者绑定到一个队列,共同消费队列中的消息。

测试

publisher 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "work.queue";
        for (int i = 0; i < 50; i++) {
            // 消息
            String message = "hello,message_" + i;
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
            Thread.sleep(20);
        }
    }

}

consumer 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Component
@Slf4j
public class TestListener {
  
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String message) {
        System.out.println("消费者1收到了 work queue 的消息:" + message);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String message) {
        System.err.println("消费者2收到了 work queue 的消息:" + message);
    }
}

消费者1和消费者2轮询消费了 work.queue 中的消息。

默认情况下,RabbitMQ 会将消息轮询投递给绑定在队列上的每一个消费者。可以修改 consumer 的配置,将 prefetch 设置为1,确保同一时刻最多投递给消费者1条消息:

1
2
3
4
5
6
spring:
  rabbitmq:
    listener:
      simple:
      	# 每次只能获取一条消息,处理完成才能获取下一条消息
        prefetch: 1 

Fanout 交换机

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的 queue。

测试

publisher 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;
  
    @Test
    void testFanoutExchange() {
        // 交换机名称
        String exchangeName = "fanout.exchange";
        // 消息
        String message = "hello,fanout exchange";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

}

consumer 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Component
@Slf4j
public class TestListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) {
        System.out.println("消费者1收到了 fanout queue1 的消息:" + message);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) {
        System.out.println("消费者2收到了 fanout queue2 的消息:" + message);
    }
}

Direct 交换机

Direct Exchange 会将收到的消息根据规则路由到指定的 queue。

  • 每一个 queue 都与 Exchange 绑定一个 Binging Key。
  • 发布者发送消息时,指定消息的 Routing Key。
  • Exchange 将消息路由到 Binging Key 与消息 Routing Key 一致的队列。

测试

publisher 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void testDirectExchange() {
        // 交换机名称
        String exchangeName = "direct.exchange";
        // routing key
        String routingKey = "blue";
        // 消息
        String message = "hello,direct exchange";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
}

consumer 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
@Slf4j
public class TestListener {

    @RabbitListener(
            bindings = 
            @QueueBinding(
            value = @Queue(
            value = "direct.queue1", durable = "true"),
            exchange = @Exchange(value = "direct.exchange", type = "direct"),
            key = {"red"}))
    public void listenDirectQueue1(String message) {
        System.out.println("消费者1收到了 direct queue1 的消息:" + message);
    }

    @RabbitListener(
            bindings = 
            @QueueBinding(
            value = @Queue(
            value = "direct.queue2", durable = "true"),
            exchange = @Exchange(value = "direct.exchange", type = "direct"),
            key = {"yellow"}))
    public void listenDirectQueue2(String message) {
        System.err.println("消费者2收到了 direct queue2 的消息:" + message);
    }
  
}

Topic 交换机

Topic Exchange 与 Direct Exchange 类似,区别在于 Binging Key 可以有更多规则。

  • #:代指0个单词。
  • *:代指1个单词。

测试

publisher 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;
  
    @Test
    void testTopicExchange() {
        // 交换机名称
        String exchangeName = "topic.exchange";
        // routing key
        String routingKey = "china.news";
        // 消息
        String message = "hello,topic exchange";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
}

consumer 端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@Slf4j
public class TestListener {

    @RabbitListener(
      bindings = @QueueBinding(
      value = @Queue(value = "topic.queue1", durable = "true"),
      exchange = @Exchange(value = "direct.exchange", type = ExchangeTypes.TOPIC),
      key = "#.news"))
    public void listenTopicQueue1(String message) {
        System.out.println("消费者1收到了 topic queue1 的消息:" + message);
    }

    @RabbitListener(
            bindings = @QueueBinding(
            value = @Queue(value = "topic.queue2", durable = "true"),
            exchange = @Exchange(value = "direct.exchange", type = ExchangeTypes.TOPIC),
            key = "china.#"))
    public void listenTopicQueue2(String message) {
        System.err.println("消费者2收到了 topic queue2 的消息:" + message);
    }
}