掘金 后端 ( ) • 2024-05-09 10:58

概述:

RTopic 是 Redisson 提供的一个 Java 对象,它实现了发布/订阅 (pub/sub) 模式,基于 Redis 的 PUBLISHSUBSCRIBE 命令。在发布/订阅模式中, 发布者将消息发送到一个主题(Topic),而订阅者监听这个主题以接收消息。这允许消息的发送者和接收者解耦,因为它们不需要知道对方的存在。

原理:

当使用 RTopic 时,它的工作原理如下:

  1. 发布 (Publish):当一个客户端想要发送消息时,它使用 Redis 的 PUBLISH 命令将消息发送到特定的频道(Channel)。
  2. 订阅 (Subscribe):另一端的客户端使用 Redis 的 SUBSCRIBE 命令订阅一个或多个频道。当有消息发送到这些频道时,Redis 会自动将消息推送给所有订阅者。
  3. 消息处理:Redisson 为 RTopic 实现了监听器接口,允许应用程序定义如何处理接收到的消息。当消息到达时,Redisson 会调用这些监听器。

优点:

  1. 解耦:发布者和订阅者之间不需要知道对方,可以独立进行扩展和修改。
  2. 简单易用:Redisson 提供了简单的 API 来进行消息的发布和订阅,开发者可以轻松集成到自己的应用中。
  3. 实时性:Redis 的发布/订阅机制提供了低延迟的消息传递,适合需要实时通信的应用。
  4. 可扩展性:可以有多个订阅者同时订阅同一个主题,而且可以通过增加 Redis 实例来水平扩展系统。
  5. 高吞吐量:Redis 作为内存数据结构存储,能够处理大量的消息。

缺点:

  1. 消息持久性:在 Redis 的发布/订阅模式中,如果没有订阅者在线,消息将会丢失。这意味着它不保证消息的持久性。
  2. 消息积压:如果订阅者处理消息的速度跟不上发布者的速度,Redis 不会为订阅者维护消息队列,这可能导致消息丢失或订阅者过载。
  3. 可靠性:在网络分区或 Redis 实例故障的情况下,可能会丢失正在传输的消息。
  4. 缺乏消息确认:Redis 的发布/订阅模式不支持消息确认机制,发布者无法确保消息被订阅者成功接收。
  5. 单点故障:如果使用单个 Redis 节点,那么它可能成为单点故障。虽然可以通过 Redis 集群来提高可用性,但这增加了系统的复杂性。

流程图:

基于 Redis Topic (发布/订阅模式) 的消息简易流程图。

graph LR
    A[Publisher] -- Publish Message --> B[Redis Topic] 
    C[Subscriber 1] -- Subscribe --> B
    D[Subscriber 2] -- Subscribe --> B
    B -- Dispatch Message --> C
    B -- Dispatch Message --> D
  • "Publisher" 代表消息发布者,它发布消息到 Redis Topic。
  • "Redis Topic" 代表 Redis 中的一个 Topic,它是消息的中转站。
  • "Subscriber 1" 和 "Subscriber 2" 代表消息订阅者,它们订阅 Redis Topic 来接收消息。
  • Redis Topic 将接收到的消息分发给所有订阅了该 Topic 的订阅者。

可以使用时序图来描述基于 Redis Topic (RTopic) 的发布/订阅消息传递过程

时序图:

sequenceDiagram
    participant Publisher
    participant Redis Topic
    participant Subscriber1
    participant Subscriber2    
    Publisher->>+Redis Topic: Publish Message
    Redis Topic->>-Subscriber1: Dispatch Message
    Redis Topic->>Subscriber2: Dispatch Message
    Subscriber1->>+Subscriber1: Process Message
    Subscriber2->>+Subscriber2: Process Message
  • "Publisher" 是消息的发布者。
  • "Redis Topic" 是 Redis 中的发布/订阅机制中的主题。
  • "Subscriber1" 和 "Subscriber2" 是订阅了 Redis Topic 的订阅者。
  • "Publisher" 发布消息到 "Redis Topic"。
  • "Redis Topic" 收到消息后,将消息分发给 "Subscriber1" 和 "Subscriber2"。
  • "Subscriber1" 和 "Subscriber2" 接收并处理消息。

普通版:

使用示例:

import org.redisson.Redisson;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author derek_smart
 * @Date 202/4/29 09:21
 * @Description Topic 使用示例
 * 1. 高并发:使用 `RTopic` 可以支持多个订阅者并发接收消息,因为 Redisson 已经为处理了并发问题。
 * 2. 异步处理:Redisson 提供了异步接口,可以使用这些接口来进行非阻塞的消息发布和订阅。
 * 3. 批量操作:虽然 `RTopic` 本身不提供批量发布消息的方法,但可以在应用层面上实现消息的批量处理。
 * 4. 异常处理:在消息监听器中处理可能发生的任何异常,并实现适当的兜底策略。
 * 5. 兜底策略:在出现异常时,可以记录日志、重试操作或者将消息转移到另一个存储系统,比如数据库或日志文件。
 * <p>
 */
public class RedissonTopicExample {

    private static final Logger logger = LoggerFactory.getLogger(RedissonTopicExample.class);
    private final RedissonClient redissonClient;
    private final RTopic topic;
    private final ExecutorService executorService;

    public RedissonTopicExample(RedissonClient redissonClient, String topicName) {
        this.redissonClient = redissonClient;
        this.topic = redissonClient.getTopic(topicName, StringCodec.INSTANCE);
        this.executorService = Executors.newCachedThreadPool();
    }

    public void subscribe() {
        topic.addListener(String.class, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence channel, String msg) {
                try {
                    // 处理消息
                    handleMessage(msg);
                } catch (Exception e) {
                    logger.error("Error handling message: {}", msg, e);
                    // 兜底策略,比如重试或者将消息存储起来以便后续处理
                    fallbackStrategy(msg);
                }
            }
        });

        // 监听订阅和取消订阅事件
        topic.addListener(new StatusListener() {
            @Override
            public void onSubscribe(String channel) {
                // 处理订阅事件
                logger.info("Subscribed to channel: {}", channel);
            }

            @Override
            public void onUnsubscribe(String channel) {
                // 处理取消订阅事件
                logger.info("Unsubscribed from channel: {}", channel);
            }
        });
    }

    public void publishAsync(String message) {
        // 异步发布消息
        CompletableFuture<Void> future = (CompletableFuture<Void>) topic.publishAsync(message).thenAccept(response -> {
            // 处理响应,例如确认消息发送成功
            logger.info("Message published: {}", message);
        });

        // 异常处理
        future.exceptionally(e -> {
            logger.error("Failed to publish message: {}", message, e);
            // 兜底策略
            fallbackStrategy(message);
            return null;
        });
    }

    /**
     * @param message
     */
    private void handleMessage(String message) {
        // TODO: 实现消息处理逻辑
    }

    /**
     * @param message
     */
    private void fallbackStrategy(String message) {
        // TODO: 实现兜底策略,比如将消息存储到数据库或文件中以便后续处理
    }

    /**
     * 关闭资源
     */
    public void shutdown() {
        executorService.shutdown();
        redissonClient.shutdown();
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        // 初始化Redisson客户端
        RedissonClient redissonClient = Redisson.create();
        RedissonTopicExample example = new RedissonTopicExample(redissonClient, "myTopic");

        // 订阅
        example.subscribe();

        // 异步发布消息
        example.publishAsync("Hello, Redisson!");

        // ...

        // 关闭资源
        example.shutdown();
    }
}

1714357107793.png

功能点:

确保高并发、异步处理、批量操作、异常处理和兜底策略:

  1. 高并发:使用 RTopic 可以支持多个订阅者并发接收消息,因为 Redisson 已经为处理了并发问题。
  2. 异步处理:Redisson 提供了异步接口,可以使用这些接口来进行非阻塞的消息发布和订阅。
  3. 批量操作:虽然 RTopic 本身不提供批量发布消息的方法,但可以在应用层面上实现消息的批量处理。
  4. 异常处理:在消息监听器中处理可能发生的任何异常,并实现适当的兜底策略。
  5. 兜底策略:在出现异常时,可以记录日志、重试操作或者将消息转移到另一个存储系统,比如数据库或日志文件。

在这个示例中,创建了一个 RedissonTopicExample 类,它订阅了一个主题,并且可以异步地发布消息。使用了一个 ExecutorService 来处理并发任务。消息监听器中包含了异常处理和兜底策略。还添加了一个 StatusListener 来监听订阅和取消订阅事件。

请注意,实际的兜底策略和消息处理逻辑需要根据应用需求来实现。此外,在生产环境中,可能需要进一步考虑消息的持久性、确保消息传递的顺序性、处理网络故障和其他潜在的风险因素。

增强版:

功能点:

  1. 使用 Redisson 的 RBatch 批量发布消息:这可以减少网络往返次数并提高性能。
  2. 引入消息确认机制:确保消息被成功处理。
  3. 使用自定义线程池:为消息处理配置专用的线程池,以便更好地控制并发性和资源使用。
  4. 增加消息重试机制:在消息处理失败时,实现重试逻辑。
  5. 优化异常处理和兜底策略:提供更详细的异常处理和更健壮的兜底策略。
  6. 使用配置类:允许从外部配置类中读取配置,使得类更灵活。
  7. 优化资源关闭:确保所有资源在关闭时能够正确释放。
  8. 监控和指标:引入监控和指标,以便跟踪和优化系统性能。

优化后的 RedissonTopicExample 类:

import org.redisson.api.RBatch;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 202/4/29 10:11
 * @Description Topic 优化版本
 *
 * 1. 使用 Redisson 的 `RBatch` 批量发布消息:这可以减少网络往返次数并提高性能。
 * 2. 引入消息确认机制:确保消息被成功处理。
 * 3. 使用自定义线程池:为消息处理配置专用的线程池,以便更好地控制并发性和资源使用。
 * 4. 增加消息重试机制:在消息处理失败时,实现重试逻辑。
 * 5. 优化异常处理和兜底策略:提供更详细的异常处理和更健壮的兜底策略。
 * 6. 使用配置类:允许从外部配置类中读取配置,使得类更灵活。
 * 7. 优化资源关闭:确保所有资源在关闭时能够正确释放。
 * 8. 监控和指标:引入监控和指标,以便跟踪和优化系统性能。
 * <p>
 */
public class RedissonTopicExample {

    private static final Logger logger = LoggerFactory.getLogger(RedissonTopicExample.class);
    private final RedissonClient redissonClient;
    private final RTopic topic;
    private final String topicName; // 存储主题名称
    private final ExecutorService messageExecutorService;
    private final int maxRetryAttempts;
    private final long retryDelay;

    public RedissonTopicExample(RedissonClient redissonClient, String topicName, int maxRetryAttempts, long retryDelay) {
        this.redissonClient = redissonClient;
        this.topicName = topicName; // 保存主题名称
        this.topic = redissonClient.getTopic(topicName);
        this.messageExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        this.maxRetryAttempts = maxRetryAttempts;
        this.retryDelay = retryDelay;
    }

    public void subscribe() {
        topic.addListener(String.class, (channel, msg) -> messageExecutorService.submit(() -> {
            int attempt = 0;
            boolean successful = false;
            while (attempt <  maxRetryAttempts && !successful) {
                try {
                    handleMessage(msg);
                    successful = true;
                } catch (Exception e) {
                    attempt++;
                    logger.error("Attempt {} failed for message: {}", attempt, msg, e);
                    if (attempt < maxRetryAttempts) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(retryDelay);
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    } else {
                        fallbackStrategy(msg);
                    }
                }
            }
        }));
    }

    public void publishBatch(List<String> messages) {
        RBatch batch = redissonClient.createBatch();
        messages.forEach(message -> {
            RTopic batchTopic = (RTopic) batch.getTopic(topicName); // 使用保存的主题名称
            batchTopic.publishAsync(message);
        });
        batch.execute();
    }

    private void handleMessage(String message) {
        // TODO: Implement message handling logic
    }

    private void fallbackStrategy(String message) {
        // TODO: Implement fallback strategy, e.g., storing the message for later processing
    }

    public void shutdown() {
        try {
            messageExecutorService.shutdown();
            if (!messageExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
                messageExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            messageExecutorService.shutdownNow();
        }
        redissonClient.shutdown();
    }

    // Other methods...

    public static void main(String[] args) {
        // Initialize Redisson client and RedissonTopicExample...
    }
}

1714357161778.png

总结:

优化后的版本中,为消息处理引入了自定义的线程池 messageExecutorService。实现了消息重试逻辑,并在达到最大重试次数后调用兜底策略。还添加了批量发布消息的方法 publishBatch,它使用 Redisson 的 RBatch 来优化网络使用。 这个类只是一个示例,实际应用中可能需要根据具体需求进一步调整。例如,可能需要为兜底策略实现更复杂的逻辑,比如将消息发送到死信队列、事件存储或者其他类型的持久化存储。此外,监控和指标的具体实现将取决于监控系统和需求。

Note:

总的来说,RTopic 在需要解耦、实时性和高吞吐量的场景下非常有用。然而,如果应用需要确保消息的持久性和可靠性,可能需要考虑其他消息队列解决方案,如 Apache Kafka 或 RabbitMQ。