掘金 后端 ( ) • 2024-05-02 14:00

一、RabbitMQ介绍

1. 同步调用与异步调用

微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步调用的方式。

程序里所有的通信,有两种形式:同步通讯和异步通讯。

image-20240501172132163.png

解读:

  • 同步调用:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
  • 异步调用:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种调用方式各有优缺点

同步调用

优点:时效性强,等待到结果才返回

缺点:

  • 拓展性差
  • 性能差
  • 级联失败

异步调用

优点:

  • 耦合度低,扩展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上有业务
  • 缓存消息,流量削峰填谷

缺点:

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖Broker的可靠性
  • 业务复杂度增加:防止消息丢失、消息重复,要保证数据的一致性等等问题
  • 系统架构复杂度增加:必须保证MQ的高可用

2. MQ介绍

Message Queue,消息对列

​ 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

MQ的作用:服务之间数据交互时的一种方式。和Feign对比

  • 异步:实现服务之间异步通信
  • 削峰:MQ可以堆积消息,可以应对流量洪峰,实现流量的削峰填谷
  • 解耦:实现服务之间的耦合性降低

注意:MQ消息队列和即时通信是两种不同的技术

  • MQ:用于服务之间的异步数据交互。和Feign对比,Feign是同步通信,MQ是异步通信
  • 即时通信:用于聊天的,单聊、群聊等等。通常要借助于第三方服务,比如:环信云,融联云等等

常见的MQ

  • RabbitMQ:性能好,延时低
  • RocketMQ:稳定可靠,可以做到消息0丢失
  • kafka:吞吐量大,可以实现海量数据交互传输,通常用于大数据领域

3. RabbitMQ介绍

AMQP,Advanced Message Queuing Protocol,高级消息队列,是一种网络协议。它是应用层协议的一个开发标准,为面向消息的中间件而设计。

基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同编程语言的限制。

Rabbit公司基于AMQP协议标准,开发了RabbitMQ1.0。RabbitMQ采用Erlang语言开发,Erlang是专门为开发高并发和分布式系统的一种语言,在电信领域广泛使用。

image-20240501174959684.png

RabbitMQ的几个概念:

  • Producer:生产者,是发送消息的代码
  • Consumer:消费者,是接收消息的代码
  • Broker:中间件,指的就是RabbitMQ
  • Connection:消息生产者、消费者 与 RabbitMQ之间建立的TCP连接
  • Channel:Channel是在Connection内部建立的逻辑连接
  • Exchange:交换机,作用是路由消息到队列
  • Queue:队列,真正存储消息的队列
  • Binding:exchange和queue之间的虚拟连接
  • VirtualHost:虚拟主机。每个虚拟主机里可以有多个交换机和队列, 不同虚拟主机之间互相隔离

二、RabbitMQ安装

1. 拉取RabbitMQ镜像

  • 方式一:在线拉取镜像
docker pull rabbitmq:3.8-management
  • 方式二:从本地加载镜像

    把准备好的MQ压缩包《mq.tar》上传到虚拟机CentOS里

    在CentOS里执行命令加载镜像:

#先切换到mq.tar所在的目录

#再执行命令加载镜像:已经加载过了,不需要重复加载
docker load -i mq.tar

#加载后,查看一下镜像。找一下有没有rabbitmq这个镜像
docker images

2. 安装RabbitMQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=XXXXXX \
 -e RABBITMQ_DEFAULT_PASS=XXXXXX \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 --restart=always \
 rabbitmq:3-management
  • Java程序连接RabbitMQ:使用端口5672
  • RabbitMQ控制台页面: http://ip:15672, 登录帐号:XXXXXX, 密码:XXXXXX

3. RabbitMQ控制台

打开浏览器输入地址:http://ip:15672, 登录帐号:XXXXXX, 密码:XXXXXX

在控制台里,可以查看、管理 交换机、队列等等

三、SpringAMOP

1. SpringAMQP介绍

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

SpringAMQP使用步骤

  1. 添加依赖
  2. 配置RabbitMQ连接信息
  3. 收发消息

使用示例

添加依赖(每个服务模块都需要加,如果有父工程,加在父工程,让子工程继承)

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置RabbitMQ连接信息

spring:
  application:
    name: demo-producer
  rabbitmq:
    host: 192.168.119.129 #RabbitMQ服务的ip
    port: 5672            #RabbitMQ服务的端口
    username: xxxxxx 	  #RabbitMQ的帐号
    password: xxxxxx 	  #RabbitMQ的密码

2. RabbitMQ工作模式

RabbitMQ提供了6种工作模式,参考:https://www.rabbitmq.com/getstarted.html

  • basic queue:简单模式
  • work queues:工作队列集群消费
  • Publish/Subscribe:发布订阅模式,也称为Fanout,是一种消息广播模式
  • Routing:路由模式,也称为Direct模式
  • Topics:主题模式
  • RPC远程调用模式:远程调用,其实算不上MQ,这里不做介绍

2.1 basic queue简单队列

模式说明

basic queue是RabbitMQ中最简单的一种队列模式:生产者把消息直接发送到队列queue,消费者从queue里直接接收消息。

image-20240501183742496.png

使用示例

生产者

@SpringBootTest
public class DemoProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 简单模式:发送消息示例
     */
    @Test
    public void test01Simple(){
        rabbitTemplate.convertAndSend("demo01.simple.queue", "hello,simple queue");
    }
}

消费者

  1. 创建一个类,用于监听消息。类上需要添加@Component注解

  2. 类里定义一个方法,用于处理消息。

    方法上需要添加注解 @RabbitListener(queuesToDeclare = @Queue("队列名称"))

    方法上需要添加一个String类型的形参:是接收到的消息内容

@Component
public class Demo01SimpleListener {

    @RabbitListener(queuesToDeclare = @Queue("demo01.simple.queue"))
    public void listen(String msg){
        System.out.println("msg = " + msg);
    }
}

注意事项

要想使用RabbitMQ收发消息,必须要保证已经有队列和交换机已经存在,才可以正常收发。

  • 可以直接在RabbitMQ控制台里创建队列、交换机和绑定关系。 然后再启动代码收发消息,先启动生产者或先消费者都行
  • 可以在生产者一方使用@Bean声明队列、交换机和绑定关系。然后必须先启动生产者服务发送消息,再启动消费者监听消息
  • 可以在消费者一方使用@RabbitListener声明队列、交换机和绑定关系。然后必须先启动消费者监听消息,再运行生产者发送消息

2.2 work queues工作队列

假如只有一个消费者处理消息,那么处理消息的速度就有可能赶不上发送消息的速度。该如何同时处理更多的消息呢?

可以在同一个队列上创建多个竞争的消费者,以便消费者可以同时处理更多的消息

模式说明

多个消费者相互竞争,从同一个队列里获取消息。生产者发送的消息将被所有消费者分摊消费

注意: 一个队列里的一条消息,只能被消费一次,不可能多个消费者同时消费处理

image-20240501184514649.png

对于任务过重,或者任务较多的情况,使用工作队列可以提高任务处理的速度

例如:短信通知服务。 订单完成后要发短信通知

使用示例

生产者

在测试类里发送消息

    @Test
    public void test02WorkQueue(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("demo02.work.queue","hello, 这是消息"+i);
        }
    }

消费者

@Component
public class Demo02WorkQueueListener {

    @RabbitListener(queuesToDeclare = @Queue("demo02.work.queue"))
    public void listener1(String msg){
        System.out.println("消费者1收到消息msg = " + msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("demo02.work.queue"))
    public void listener2(String msg){
        System.out.println("消费者2收到消息msg = " + msg);
    }
}

注意事项

在WorkQueues模式的默认情况下,一个队列里的所有消息,将平均分配给每个消费者。这种情况并没有考虑到消费者的实际处理能力,显然是有问题的。

例如:生产者发送了50条消息,有两个消费者,各接收到了25条消息。假如

  • 消费者1,每秒能处理100条消息。 很快就能处理完消息
  • 消费者2,每秒能处理10条消息。 消息堆积越来越多

要解决这个问题其实非常简单:让每个消费者一次性只拉取1条消息

修改消费者的配置文件application.yaml:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #消费者一次抓取几条消息

2.3 Publish/Subscribe发布订阅(Fanout)

工作队列背后的假设是,每个任务只传递给一个消费者。如果向多个消费者传递一条消息。这种模式称为“发布/订阅”。

模式说明

image-20240501184743493.png

使用示例

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:

  • 生产者程序将发出日志消息
  • 消费者程序将接收日志消息
    • 第一组消费者,接收到日志消息并保存到磁盘上
    • 第二组消费者,接收到日志消息并打印到控制台

生产者

    @Test
    public void test03Fanout(){
        //参数1:交换机名。参数2:路由key。参数3:消息内容
        rabbitTemplate.convertAndSend("demo03.fanout.exchange","demo03.key", "这是一条广播消息");
    }

消费者

@Component
public class Demo03FanoutListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo03.queue1"),
            exchange = @Exchange(value = "demo03.fanout.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void listener1(String msg){
        System.out.println("消费者1收到消息msg = " + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo03.queue2"),
            exchange = @Exchange(value = "demo03.fanout.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void listener2(String msg){
        System.out.println("消费者2收到消息msg = " + msg);
    }
}

2.4 Direct(Routing)

我们能够向许多消费者广播日志消息。

我们将向其添加一个功能:我们将使消费者能够仅订阅消息的子集。例如:

  • 只能将关键错误消息定向到日志文件(以节省磁盘空间)
  • 同时仍然能够在控制台上打印所有日志消息。

模式说明

image-20240501185423938.png

  • 队列在绑定交换机时,需要给队列指定一个Routing Key(路由key)
  • 生产者在发送消息时,必须指定消息的Routing Key
  • 交换机根据消息的RoutingKey进行判断:只有队列的RoutingKey 与 消息的RoutingKey完全相同,才会收到消息

使用示例

生产者

    @Test
    public void test04Direct(){
        rabbitTemplate.convertAndSend("demo04.direct.exchange","demo04.error", "这是error消息");
        
        rabbitTemplate.convertAndSend("demo04.direct.exchange","demo04.info", "这是info消息");
    }

消费者

@Component
public class Demo04DirectListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo04.direct.queue2"),
            exchange = @Exchange(value = "demo04.direct.exchange", type = ExchangeTypes.DIRECT),
            key = {"demo04.info", "demo04.error"}
    ))
    public void listener1(String msg){
        System.out.println("消费者1(监听demo04.erorr和demo04.info)收到消息msg = " + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo04.direct.queue1"),
            exchange = @Exchange(value = "demo04.direct.exchange", type = ExchangeTypes.DIRECT),
            key = {"demo04.error"}
    ))
    public void listener2(String msg){
        System.out.println("消费者2(监听demo04.erorr)收到消息msg = " + msg);
    }
}

2.5 Topic ★★★★★

我们没有使用仅能进行消息广播的FANOUT,而是使用了DIRECT,实现了了有选择地接收日志。

虽然使用DIRECT改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由,例如:

  • 第一组消费者,要接收所有系统的所有日志消息,打印到控制台
  • 第二组消费者,要接收所有系统的错误日志消息,和订单系统的所有日志消息,保存到磁盘

为了在日志系统中实现这一点,我们需要了解更复杂的TOPIC交换机。

模式说明

image-20240501185520294.png

  • RoutingKey:发送到TOPIC的消息不能有任意的routing键,它:
    • 必须是由点分隔的单词列表
    • 可以有任意多个单词,最多255个字节
    • 可使用* 星号,匹配一个单词
    • 可使用#,匹配0个或多个单词
  • 使用特定RoutingKey发送的消息,将被传递到使用匹配Key绑定的所有队列。

使用示例

生产者

    @Test
    public void test05Topic(){
        rabbitTemplate.convertAndSend("demo05.topic.exchange","order.info", "这是一条订单普通消息");
        rabbitTemplate.convertAndSend("demo05.topic.exchange","order.error", "这是一条订单错误消息");
    }

消费者

@Component
public class Demo05TopicListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo05.queue1"),
            exchange = @Exchange(value = "demo05.topic.exchange", type = ExchangeTypes.TOPIC),
            key = "order.*"
    ))
    public void listener1(String msg){
        System.out.println("消费者1(监听order.*)收到消息msg = " + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo05.queue2"),
            exchange = @Exchange(value = "demo05.topic.exchange", type = ExchangeTypes.TOPIC),
            key = "*.error"
    ))
    public void listener2(String msg){
        System.out.println("消费者2(监听*.error)收到消息msg = " + msg);
    }
}

四、@Bean方式声明队列和交换机

队列和交换机的声明方式

要使用RabbitMQ发送消息的话,就必须提前声明好队列和交换机。

而声明队列和交换机的方式是多种多样的:

  • 手动创建:在RabbitMQ控制台页面上,直接手动创建队列和交换机,并进行绑定

    这种方式需要在控制台上页面创建并绑定,然后再编写程序,不太方便

  • @Bean方式:使用@Bean的方式声明交换机和队列,在程序启动运行时,由代码进行声明

    这种方式配置比较麻烦

  • 注解方式:使用注解方式声明交换机和队列,在监听消息时,由代码进行声明

    使用相对简单,在监听消息时,一个注解综合性配置消息队列、交换机并进行绑定

使用@Bean的方式

声明交换机、队列以及绑定关系

//声明交换机
@Bean
public XxxExchange exchange(){
    return ExchangeBuilder.xxxExchange("交换机名").build();
}

//声明队列
@Bean
public Queue queue(){
    return QueueBuilder.durable("队列名称").build();
}

//声明队列和交换机的绑定关系
@Bean
public Binding queueBinding(XxxExchange exchange, Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("路由key通配符");
}

发消息

rabbitTemplate.convertAndSend("交换机名", "消息的路由key", 消息内容);

收消息

@RabbitListener(queueToDeclare=@Queue("队列名"))
public void listener(String msg){
    
}

使用示例

生产者声明队列和交换机

@Configuration
public class DemoRabbitConfig {

    /**
     * 声明一个名称为demo.topic.exchange的交换机
     */
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange("demo.topic.exchange").build();
    }

    /**
     * 声明一个名称为demo.topic.queue1的队列
     */
    @Bean
    public Queue topicQueue1(){
        return QueueBuilder.durable("demo.topic.queue1").build();
    }

    /**
     * 声明一个名称为demo.topic.queue2的队列
     */
    @Bean
    public Queue topicQueue2(){
        return QueueBuilder.durable("demo.topic.queue2").build();
    }

    /**
     * 将交换机demo.topic.exchange
     * 和队列demo.topic.queue1
     * 绑定起来,路由key是:demo.*
     */
    @Bean
    public Binding topicQueue1Binding(Queue topicQueue1, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("demo.*");
    }

    /**
     * 将交换机demo.topic.exchange
     * 和队列demo.topic.queue2
     * 绑定起来,路由key是:#.key
     */
    @Bean
    public Binding topicQueue2Binding(Queue topicQueue2, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.key");
    }
}

生产者发送消息

    @Test
    public void test05(){
        rabbitTemplate.convertAndSend("demo.topic.exchange", "demo.1", "消息demo.1");
        rabbitTemplate.convertAndSend("demo.topic.exchange", "xxx.key", "消息xxx.key");
    }

消费者监听消息

@Component
public class Demo06Listener {

    @RabbitListener(queues = "demo.topic.queue1")
    public void listener1(String msg){
        System.out.println("消费者1(从队列demo.topic.queue1)收到消息msg = " + msg);
    }

    @RabbitListener(queues = "demo.topic.queue2")
    public void listener2(String msg){
        System.out.println("消费者2(从队列demo.topic.queue2)收到消息msg = " + msg);
    }
}

测试

先运行消费者代码,开始监听队列。如果队列不存在,会自动创建并进行绑定

再运行生产者代码,发送消息

五、消息json格式转换器

说明

使用RabbitTemplate发送消息时,Spring会采用JDK的序列化技术对消息内容进行序列化:

  • 生产者发出的消息内容会被序列化成字节数组,再发送出去。
  • 消费者收到的消息,也是字节数组,Spring会进行反序列化还原

而JDK的序列化技术存在一些问题:

  • 序列化的字节数组,通常体积比较大
  • 序列化和反序列化容易产生安全漏洞
  • 可读性差

image-20230515203355922.png 我们可以配置一个消息转换器:把消息转换成json格式字符串发送出去,消费者接收到json

使用步骤

  1. 添加json转换的依赖坐标
  2. 配置消息转换器

使用示例

  1. 添加json转换的依赖坐标

    注意:需要在生产者和消费者双方都添加。我们这里可以直接添加到父工程的pom.xml里

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    
  2. 配置消息转换器

    注意:需要在**生产者和消费者双方都添加**。我们在两个模块的引导类里添加:

    /**
     * 配置json消息转换器,不要导错了:
     *  org.springframework.amqp.support.converter.MessageConverter
     *  org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
     */
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
  3. 测试

    1. 关闭消费者:暂时不接收消息,等发送消息后,我们要先去控制台上查看消息内容的格式
    2. 生产者发送消息
    @SpringBootTest
    public class Demo07SerializeTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void test(){
            Map<String, Object> msg = new HashMap<>();
            msg.put("id", 2);
            msg.put("title", "山东要热成灿东了 高温黄色预警高挂局部可达39℃");
            rabbitTemplate.convertAndSend("serialize.queue", msg);
        }
    }
    

    在RabbitMQ控制台上查看消息:查看收到的消息,是json格式的

image-20230515222824281.png

启动消费者,接收到消息内容

@Slf4j
@Component
public class Demo07SerializeListener {

    @RabbitListener(queuesToDeclare = @Queue("serialize.queue"))
    public void handleSerializeQueue1(Map<String,Object> msg){
        log.info("从{}接收到消息:{}", "serialize.queue1", msg);
    }
}