掘金 后端 ( ) • 2024-04-27 17:51

大致的流程

1、延迟消息队列.png

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>

yml配置

server:
  port: 8081
spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq: ## 访问地址:http://127.0.0.1:15672
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672
    virtual-host: / ## 设置虚拟主机路径
    listener:
      simple:
        acknowledge-mode: manual ##设置了消息确认模式为手动

创建交换机、队列以及他们之间的绑定关系

以下代码中创建了两个队列和一个死信队列,其中两个队列一个指定了队列的过期时间,另一个未指定队列的过期时间,但是会在后续发送消息到该队列时指定消息的过期时间。 注:如果队列的过期时间和消息的过期时间都指定了,则取小的那个值

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class RabbitMQConfig {

    //正常消息交换机x-change
    public static final String X_CHANGE = "x-change";
    //死信交换机x-dead-letter-exchange
    public static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";

    //队列A
    public static final String QUEUE_A = "queue-a";
    //路由keyA
    public static final String ROUTING_KEY_A = "routing-key-a";
    //队列B
    public static final String QUEUE_B = "queue-b";
    //路由keyB
    public static final String ROUTING_KEY_B = "routing-key-b";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";
    //死信路由key
    public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";

    /**
     * 创建两个交换机
     */
    //正常消息交换机x-change
    @Bean
    public DirectExchange xChange() {
        return new DirectExchange(X_CHANGE);
    }
    //死信交换机x-dead-letter-exchange
    @Bean
    public DirectExchange xDeadLetterExchange() {
        return new DirectExchange(X_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明队列
     */
    //队列A
    @Bean
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", X_DEAD_LETTER_EXCHANGE); //到期后转发的死信交换机
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); //到期后转发路由键
        args.put("x-message-ttl", 2000); // 设置队列过期时间,即超过该时间就会被转发到死信交换机
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
    //队列B
    @Bean
    public Queue queueB() {
        HashMap<String, Object> args =  new HashMap<>();
        args.put("x-dead-letter-exchange", X_DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
//        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }
    //死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     * 将队列绑定至对应交换机
     * @return
     */
    //队列A绑定交换机
    @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(xChange()).with(ROUTING_KEY_A);
    }
    //队列B绑定交换机
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(xChange()).with(ROUTING_KEY_B);
    }
    //死信队列绑定死信交换机
    @Bean
    public Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(xDeadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
    }

}

模拟消息发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.rabbitmqdemo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;

@Slf4j
@RestController
@RequestMapping("/mq")
public class MQTestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConnectionFactory connectionFactory;

    @GetMapping
    public void send() {
        log.info("发送消息时间:{}", LocalDateTime.now());
        rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_A, "队列A:消息。。。。");
        rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_B, "队列B:消息。。。。", message -> {
            message.getMessageProperties().setExpiration("10000"); //设置此条消息的过期时间
            return message;
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.X_CHANGE, RabbitMQConfig.ROUTING_KEY_B, "队列C:消息。。。。", message -> {
            message.getMessageProperties().setExpiration("10000"); //设置此条消息的过期时间
            return message;
        });
    }

    //此接口会取消掉queue-b队列中消息字符串包含B字符的消息,即他不会进入死信队列,所以后续会接收不到此队列
    @GetMapping("/cancel")
    public void cancel() throws IOException, TimeoutException {
        log.info("取消消息");
        Channel channel = connectionFactory.createConnection().createChannel(false);

        GetResponse response = channel.basicGet(RabbitMQConfig.QUEUE_B, false);
        boolean isDelete = false;
        while (!isDelete){
            if (response == null) {
                break;
            }
            String messageValue = new String(response.getBody(), StandardCharsets.UTF_8);
            if (messageValue.contains("B")) {
                isDelete = true;
                channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                continue;
            }
            channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
        }
        channel.close();
    }


}

创建队列的监听器

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Slf4j
@Component
public class RabbitMQListener {

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void deadLetterQueue(Message message, Channel channel){
        log.info("死信队列接收数据时间:{}" , LocalDateTime.now());
        String s = new String(message.getBody());
        System.out.println("死信队列收到消息:" + s);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

执行结果

image.png 可以看到中间那条被取消的消息并没有被监听到,从而实现某个任务或者订单在一段时间内未执行指定操作则进入死信队列执行逻辑,若执行了指定操作则不让他执行那段逻辑。