掘金 后端 ( ) • 2024-04-06 16:24

theme: juejin

Redisson 实现发布/订阅(Pub/Sub)的原理是基于 Redis 的发布/订阅消息模式。在这种模式下,发布者将消息发送到一个频道(channel),而订阅者监听这个频道并接收消息。

Redisson 对这个模式进行了封装,使其更加易于在 Java 应用程序中使用。

发布/订阅原理:

  1. 发布者:发布者是发送消息到特定频道的客户端。在 Redis 中,这通过 PUBLISH 命令实现。

  2. 订阅者:订阅者是监听频道并接收消息的客户端。Redis 提供了 SUBSCRIBE 命令来实现订阅功能。

  3. 频道:频道是消息传递的媒介。发布者和订阅者通过频道进行通信。

  4. 消息:消息是通过频道从发布者传递到订阅者的数据。

Redisson 使用这些概念并提供了一套 Java API,允许你在 Java 程序中方便地实现发布和订阅功能。

优点:

  1. 易用性:Redisson 提供了简单直观的 API,使得在 Java 应用中实现发布/订阅变得非常容易。

  2. 高性能:Redis 本身是一个内存数据结构存储,因此 Redisson 的发布/订阅特性能够提供高性能和低延迟的消息传递。

  3. 可扩展性:Redis 可以通过集群模式来提供更高的可用性和分区容错性,Redisson 作为客户端能够很好地与 Redis 集群协同工作。

  4. 支持多种数据结构:Redisson 不仅支持发布/订阅,还支持丰富的数据结构和同步器,如分布式锁、原子变量、计数器、队列等。

  5. 语言无关性:由于 Redis 支持多种编程语言的客户端,不同语言写的服务可以通过 Redis 进行交互。

缺点:

  1. 消息持久性:Redis 的发布/订阅模式不保证消息的持久性。如果没有订阅者在线,消息将会丢失。如果需要持久性,可能需要结合其他消息队列系统。

  2. 消息传递保证:Redis 提供的是最基本的消息传递保证,不支持事务或消息队列的高级特性,如死信队列、消息确认机制等。

  3. 单点故障:如果未使用 Redis 集群,单个 Redis 节点可能成为单点故障。即使在集群模式下,网络分区也可能导致问题。

  4. 资源消耗:订阅者需要保持长连接以接收消息,这可能会消耗服务器资源,尤其是当订阅者数量很多时。

  5. 缩放限制:尽管 Redis 可以通过集群来缩放,但在极高的负载下,Redis 集群也可能面临性能瓶颈。

  6. 复杂的部署和管理:对于大型系统,维护 Redis 集群可能需要专业知识和额外的运维工作。

在选择 Redisson 作为发布/订阅解决方案时,需要根据实际应用的需求权衡这些优缺点。对于需要高吞吐量、低延迟和简单实现的场景,Redisson 是一个很好的选择。

然而,如果你的系统需要复杂的消息传递保证或持久性,可能需要考虑其他消息队列系统,如 RabbitMQ、Apache Kafka 等。

工具类和示例

使用 Redisson 实现发布/订阅模式可以通过创建一个服务类来封装 Redisson 的发布/订阅功能,并使用 Spring 的注解来定义单频道和多频道的订阅。

下面是一个实现的例子,包括了工具类的创建和单频道订阅、多频道订阅的示例。

Redisson 配置首先,需要配置 Redisson 客户端作为一个 Spring Bean,以便在整个应用程序中使用。

import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        Config config = new Config();
         config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        return Redisson.create(config);
    }
}

RedisPubSubService 工具类:创建一个服务类来封装 Redisson 的发布/订阅逻辑。

import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RedisPubSubService {

    private final RedissonClient redissonClient;

    @Autowired
    public RedisPubSubService(RedissonClient redissonClient) { 
  this.redissonClient = redissonClient;
    }

    // 订阅单个频道
    public <T> void subscribeToTopic(String topicName, MessageListener<T> listener) {
        RTopic topic = redissonClient.getTopic(topicName);
        topic.addListener(listener);
    }

    // 订阅多个频道
    public <T> void subscribeToTopics(String[] topicNames, MessageListener<T> listener) {
        for (String topicName : topicNames) {
            RTopic topic = redissonClient.getTopic(topicName);
            topic.addListener(listener);
        }
    }

    // 发布消息到频道
    public <T> long publishToTopic(String topicName, T message) {
          RTopic topic = redissonClient.getTopic(topicName);
        return topic.publish(message);
    }
}

单频道订阅示例:创建一个 Spring 组件,用于订阅单个频道。

import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class SingleTopicSubscriber {

    private final RedisPubSubService redisPubSubService;

    @Autowired
    public SingleTopicSubscriber(RedisPubSubService redisPubSubService) {
        this.redisPubSubService = redisPubSubService;
    }

    @PostConstruct
    public void subscribeToTopic() {
        redisPubSubService.subscribeToTopic("mySingleTopic", new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence channel, String msg) {
                System.out.println("Received message on single topic: " + msg);
            }
        });
    }
}

多频道订阅示例:创建一个 Spring 组件,用于同时订阅多个频道。

import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MultiTopicSubscriber {

    private final RedisPubSubService redisPubSubService;

    @Autowired
    public MultiTopicSubscriber(RedisPubSubService redisPubSubService) {
        this.redisPubSubService = redisPubSubService;
    }

    @PostConstruct
    public void subscribeToTopics() {
        String[] topics = new String[]{"topic1", "topic2", "topic3"};
        redisPubSubService.subscribeToTopics(topics, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence channel, String msg) {
            System.out.println("Received message on " + channel + ": " + msg);
            }
        });
    }
}

发布消息示例:创建一个 REST 控制器,用于发布消息到指定的频道。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/pubsub")
public class PubSubController {

    private final RedisPubSubService redisPubSubService;

    @Autowired
    public PubSubController(RedisPubSubService redisPubSubService) {
        this.redisPubSubService = redisPubSubService;
    }

    @PostMapping("/publish/{topic}")
    public String publishMessage(@PathVariable String topic, @RequestBody String message) {
        redisPubSubService.publishToTopic(topic, message);
        return "Message published to " + topic;
    }
}

在这个控制器中,publishMessage 方法允许客户端通过发送 POST 请求到 /pubsub/publish/{topic} 来发布消息到指定的频道。

这些示例展示了如何在 Spring 应用程序中使用 Redisson 实现发布/订阅模式。RedisPubSubService 服务类封装了 Redisson 的发布/订阅操作,

而 SingleTopicSubscriber 和 MultiTopicSubscriber 组件分别用于订阅单个频道和多个频道。PubSubController 提供了一个 RESTful 接口来发布消息。