掘金 后端 ( ) • 2024-05-05 17:13

概述:

愉快的五一已经接近尾声了,休息的同时也即将迎来工作的时光,后续在学习Redission 同时也将学习其他的中间件-消息队列之RabbitMQ。

RabbitMQ 原理:

RabbitMQ 是一个开源的消息代理和队列服务器,它可以用来在不同的应用/服务之间传递消息。它使用 AMQP(高级消息队列协议)作为核心协议,具有以下主要组件和概念:

  1. Producer(生产者):发送消息的应用程序。
  2. Consumer(消费者):接收消息的应用程序。
  3. Queue(队列):存储消息的缓冲区,等待消费者来取走消息。
  4. Exchange(交换器):决定消息发送到哪个队列的实体。它根据消息的 routing key 和交换器类型将消息路由到一个或多个队列。
  5. Binding(绑定):是交换器和队列之间的链接,它告诉交换器如何根据 routing key 分发消息。
  6. Routing Key(路由键):消息的属性,交换器使用它来决定如何路由消息(发送给哪个队列)。
  7. Channel(通道):消息传递的通道,生产者、消费者和交换器都在通道中进行交互,以减少建立和关闭 TCP 连接的开销。

工作流程:

RabbitMQ 的工作流程通常如下:

  • 生产者发送消息到交换器,消息包含 routing key。
  • 交换器接收消息并根据类型、routing key 和绑定规则将消息路由到一个或多个队列。
  • 消息在队列中排队,等待消费者处理。
  • 消费者从队列中取出消息进行处理。

RabbitMQ 的优点:

  1. 灵活的路由:通过不同类型的交换器(direct, topic, fanout, headers)和绑定,RabbitMQ 可以灵活地将消息路由到指定的队列。
  2. 高可用性:支持集群配置,可以实现消息队列的高可用性。
  3. 可靠性:提供消息持久化、消息确认机制(acknowledgements)和事务支持,确保消息不会丢失。
  4. 多种协议支持:除了 AMQP,RabbitMQ 还支持 STOMP、MQTT 等多种消息传递协议。
  5. 插件系统:拥有丰富的插件系统,例如支持延迟消息、Shovel 插件可以将消息从一个 RabbitMQ 代理转移到另一个等。
  6. 管理界面:提供一个易用的管理界面,方便监控和管理消息、队列、交换器等。

RabbitMQ 的缺点:

  1. 性能开销:功能丰富但可能带来一定的性能开销,尤其是在消息持久化和事务处理时。
  2. 复杂性:虽然提供了灵活的路由和配置选项,但对于初学者来说可能会感到复杂。
  3. 内存和磁盘使用:在处理大量消息时,如果不适当管理,有可能导致内存和磁盘空间迅速增长。
  4. 集群模式限制:集群模式下,队列仅存在于创建它们的节点上,这可能会导致集群中的节点不均衡。

流程图:

下面给出简单的流程图来描述 RabbitMQ 的消息传递流程。 以下是一个例子:

    graph LR
    P(Producer) -- Message --> E(Exchange)
    E -- Route Message --> Q1(Queue 1)
    E -- Route Message --> Q2(Queue 2)
    E -- Route Message --> Q3(Queue 3)
    Q1 -- Message --> C1(Consumer 1)
    Q2 -- Message --> C2(Consumer 2)
    Q3 -- Message --> C3(Consumer 3)

在这个流程图中:

  • Producer 是消息的生产者,它发布消息到 Exchange
  • Exchange 是交换器,它根据提供的 routing key 和绑定规则将消息路由到一个或多个 Queue
  • Queue 1, Queue 2, Queue 3 是存储消息的队列。
  • Consumer 1, Consumer 2, Consumer 3 是消息的消费者,它们从对应的队列中取出消息并进行处理。

当然,这个图只是一个简化的表示,实际上 RabbitMQ 的交换器可以根据不同的类型(如 direct, topic, fanout, headers)和绑定规则进行更复杂的消息路由。此外,一个队列可以有多个消费者,消费者也可以订阅多个队列。

时序图:

创建一个时序图来展示基于 RabbitMQ 的消息传递过程。以下是一个简单的例子:

sequenceDiagram  
    participant P as Producer  
    participant R as RabbitMQ Exchange  
    participant Q as Queue  
    participant C as Consumer  
  
    P->>+R: Publish Message (with routing key)  
    R->>+Q: Route to Queue  
    Q->>-C: Deliver Message  
    C->>+Q: Acknowledge (optional)  

在这个时序图中:

Producer 发布消息到 RabbitMQ Exchange,消息包含路由键。
RabbitMQ Exchange 接收消息,并根据路由键和绑定规则将消息路由到 Queue
Queue 收到消息,并将其传递给 Consumer
Consumer 处理消息后,可以选择发送回执(Acknowledge)到 Queue 以确认消息已被成功处理。

Spring Boot 结合:

RabbitMQ 可以与 Spring Boot 应用程序很好地集成,主要通过 Spring AMQP 项目实现, 该项目提供了一个高级的抽象来与消息代理进行交互。Spring Boot 对 Spring AMQP 提供了自动配置和启动支持,使得集成变得相对简单。 以下是将 RabbitMQ 集成到 Spring Boot 应用程序的基本步骤:

1. 添加依赖

首先,需要在 pom.xml(Maven)或 build.gradle(Gradle)中添加 Spring Boot 的 AMQP 依赖。

Maven:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

Gradle:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
}

2. 配置连接

application.propertiesapplication.yml 文件中配置 RabbitMQ 的连接信息。

application.properties:

spring.rabbitmq.host=your-rabbitmq-host
spring.rabbitmq.port=5672
spring.rabbitmq.username=your-username
spring.rabbitmq.password=your-password

application.yml:

spring:
  rabbitmq:
    host: your-rabbitmq-host
    port: 5672
    username: your-username
    password: your-password

3. 创建队列、交换器和绑定

在 Spring Boot 应用程序中,可以使用 @Bean 注解来声明队列、交换器和绑定。

@Configuration
public class RabbitConfig {

    @Bean
    Queue queue() {
        return new Queue("myQueue", false);}

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("myExchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routing.key.#");
    }
}

4. 发送消息

创建一个生产者(Producer)组件,使用 RabbitTemplate 发送消息。

@Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("myExchange", "routing.key.specific", message);
    }
}

5. 接收消息

创建一个消费者(Consumer)组件,使用 @RabbitListener 注解来监听队列并处理消息。

@Service
public class MessageReceiver {

    @RabbitListener(queues = "myQueue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 启动类

确保的 Spring Boot 应用程序有一个启动类,用于运行应用程序。

@SpringBootApplication
public class MyApp {
    public static void main(String[] args) {
        SpringApplication.run(MyApp.class, args);
    }
}

这些基本步骤展示了如何在 Spring Boot 应用程序中集成 RabbitMQ。可以根据自己的需求进行更多的配置和自定义,比如设置消息转换器、配置监听器容器、开启消息确认等。

类图:

RabbitMQ 集成的 Spring Boot 应用程序中的组件和它们之间的关系:

classDiagram  
    class Producer {  
        +RabbitTemplate rabbitTemplate  
        +send(message)  
    }  
    class RabbitTemplate  
    class RabbitMQ_Exchange  
    class Queue  
    class Consumer {  
        +receive(message)  
    }  
    class RabbitListener  
  
    Producer --> RabbitTemplate : uses  
    RabbitTemplate--> RabbitMQ_Exchange : sends message to  
    RabbitMQ_Exchange --> Queue : routes message to  
    Queue --> Consumer : delivers message to  
    Consumer --> RabbitListener : annotated with  

在这个图中,展示了以下组件:

Producer:生产者类,使用 RabbitTemplate 来发送消息。
RabbitTemplate:Spring AMQP 提供的模板类,用于发送和接收消息。
RabbitMQ_Exchange:RabbitMQ 中的交换器,负责根据路由键将消息路由到队列。
Queue:RabbitMQ 中的队列,存储消息等待消费者取出。
Consumer:消费者类,使用 @RabbitListener 注解来监听队列并接收消息。
RabbitListener:Spring AMQP 提供的注解,用于标记消息监听方法。

RabbitMQ 是一个成熟、稳定、广泛使用的消息队列系统,适用于需要复杂路由、消息持久化和高可靠性的场景。然而,它也需要适当的配置和管理以确保性能和资源使用的优化。