掘金 后端 ( ) • 2024-05-07 13:43

概述:

RSet 是 Redisson 提供的一个分布式 Java Set 实现,它基于 Redis 的 set 数据结构。Redis set 是字符串类型的无序集合,且集合中的元素具有唯一性,即集合中不允许有重复的元素。

原理:

RSet 的原理是将 Java Set 接口的操作映射到 Redis 的 set 数据结构的命令。Redis 提供了一系列操作 set 的命令,

  • SADD:向集合添加一个或多个成员
  • SREM:移除集合中一个或多个成员
  • SMEMBERS:返回集合中的所有成员
  • SISMEMBER:判断成员元素是否是集合的成员
  • SCARD:获取集合的成员数
  • SUNION, SINTER, SDIFF:集合的并集、交集、差集操作

Redisson 的 RSet 通过调用这些命令来实现分布式的 set 功能。

优点:

  1. 分布式特性:RSet 可以跨多个节点使用,适用于分布式系统和微服务架构。
  2. 性能:由于 Redis 是基于内存的数据结构存储,所以 RSet 提供快速的读写性能。
  3. 高可用和持久化:Redis 支持数据的持久化,并且可以配置为主从复制模式,提供高可用性。
  4. 原子操作:Redis 的操作是原子性的,这意味着 RSet 的操作也是原子性的,适合并发环境。
  5. 丰富的操作:RSet 支持 Redis set 提供的丰富操作,如集合运算等。
  6. 简单易用:Redisson 提供了与 Java 标准 Set 接口一致的操作,使得开发者可以方便地使用 RSet

缺点:

  1. 内存限制:由于 Redis 是基于内存的,大量数据的存储可能受到物理内存的限制。
  2. 成本:相比于基于磁盘的数据库,维护足够内存可能导致更高的成本。
  3. 数据一致性:在 Redis 集群模式下,网络分区或其他问题可能会影响数据的最终一致性。
  4. 复杂的集群管理:在分布式环境下,管理 Redis 集群可能比较复杂,需要处理节点的添加、移除和故障转移。
  5. 数据安全性:如果没有合适的配置和管理,Redis 数据可能会面临安全风险,如未授权的访问等。

流程图:

graph TD
    Start(开始) -->|获取/创建 RSet| GetRSet[获取RSet引用]
    GetRSet --> |添加元素| AddElement[向RSet添加元素]
    AddElement --> |检查元素是否存在| CheckElement[检查元素是否存在于RSet]
    CheckElement --> |元素存在| ElementExists{元素存在吗?}
    ElementExists --> |是| RemoveElement[从RSet中移除元素]
    ElementExists --> |否| DoNothing[不执行任何操作]
    RemoveElement --> |检查是否需要清空集合| IsEmpty[是否清空RSet]
    DoNothing --> |结束流程| End[结束]
    IsEmpty --> |是| ClearSet[清空RSet]
    IsEmpty --> |否| End
    ClearSet --> |结束流程| End

流程图:

  • 开始操作,然后添加一个元素到 RSet
  • 添加元素后,检查该元素是否存在于 RSet 中。
  • 根据元素是否存在,决定是移除元素还是结束流程。
  • 如果元素存在于 RSet,则将其移除。
  • 如果元素不存在,或者在移除元素之后,流程结束。

时序图:

sequenceDiagram
    participant Client as 客户端
    participant RSet as Redisson RSet
    participant Redis as Redis服务器    
    Client->>+RSet: 创建或获取RSet引用
    RSet->>+Redis: 查询RSet是否存在
    Redis-->>-RSet: 返回结果
    RSet-->>-Client: RSet引用

    Client->>+RSet: 添加元素(element)
    RSet->>+Redis: SADD element
    Redis-->>-RSet: 返回添加结果
    RSet-->>-Client: 添加成功/失败

    Client->>+RSet: 检查元素是否存在(element)
    RSet->>+Redis: SISMEMBER element
    Redis-->>-RSet: 返回存在结果
    RSet-->>-Client: 存在/不存在

    alt 如果元素存在
	Client->>+RSet: 移除元素(element)
        RSet->>+Redis: SREM element
        Redis-->>-RSet: 返回移除结果
        RSet-->>-Client: 移除成功/失败
    end

    Client->>+RSet: 检查RSet是否为空
    RSet->>+Redis: SCARD (获取集合大小)
    Redis-->>-RSet: 返回集合大小
    RSet-->>-Client: 集合大小

    alt 如果集合为空
        Client->>+RSet: 清空RSet
        RSet->>+Redis: DEL RSet
        Redis-->>-RSet: 返回删除结果
        RSet-->>-Client: 清空成功/失败
    end

时序图:

  • 客户端首先创建或获取 RSet 的引用。
  • RSet 询问 Redis 服务器该集合是否存在。
  • Redis 服务器返回结果给 RSet,然后 RSet 将引用返回给客户端。
  • 客户端请求向 RSet 添加一个元素。
  • RSet 向 Redis 服务器发送 SADD 命令添加元素。
  • Redis 服务器返回添加结果,RSet 将结果返回给客户端。
  • 客户端检查一个元素是否存在于 RSet 中。
  • RSet 向 Redis 服务器发送 SISMEMBER 命令检查元素。
  • Redis 服务器返回存在结果,RSet 将结果返回给客户端。
  • 如果元素存在,客户端请求从 RSet 移除该元素。
  • RSet 向 Redis 服务器发送 SREM 命令移除元素。
  • Redis 服务器返回移除结果,RSet 将结果返回给客户端。
  • 客户端检查 RSet 是否为空。
  • RSet 向 Redis 服务器发送 SCARD 命令获取集合大小。
  • Redis 服务器返回集合大小,RSet 将结果返回给客户端。
  • 如果集合为空,客户端请求清空 RSet
  • RSet 向 Redis 服务器发送 DEL 命令删除集合。
  • Redis 服务器返回删除结果,RSet 将结果返回给客户端。

工具类:

功能点:

  1. 容错机制:实现重试逻辑、异常处理和可能的回退策略。
  2. 易用性:提供简单的API,隐藏底层的复杂性。
  3. 高可用性:确保在 Redis 实例不可用时有策略来处理。
  4. 功能丰富:除了基本的 Set 操作,还可以添加监听器、排序、和元素过期等高级特性。
  5. 配置化重试策略:允许自定义重试次数和重试间隔,甚至使用更复杂的重试库,如 Resilience4j。
  6. 异步操作支持:为了不阻塞当前线程,可以提供异步API,让调用者能够以非阻塞的方式处理操作结果。
  7. 更细粒度的异常处理:区分不同类型的异常,并为每种异常类型提供不同的处理策略。
  8. 事件监听:允许注册事件监听器,以便在添加、删除或更新元素时触发自定义行为。
  9. 集群支持:确保在 Redis 集群环境中 RSet 操作的正确性,处理可能的跨槽(cross-slot)操作。
  10. 日志记录增强:提供更详细的日志记录,包括操作成功时的日志。
  11. 资源管理:确保在操作完成后释放所有资源,例如关闭连接。
  12. 监控和度量:集成监控系统来跟踪操作的性能和成功率。

源码:

import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 202/4/25 15:26
 * @Description RSet 工具类
 * <p>
 */
public class EnhancedRSet<E> {

    private static final Logger logger = LoggerFactory.getLogger(EnhancedRSet.class);

    private final RedissonClient redissonClient;
    private final RSet<E> rSet;
    // Executor for asynchronous operations
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    public EnhancedRSet(RedissonClient redissonClient, String setName) {
        this.redissonClient = redissonClient;
        this.rSet = redissonClient.getSet(setName);
    }

    public boolean add(E e) {
        try {
            return retry(() -> rSet.add(e));
        } catch (Exception ex) {
            logger.error("Failed to add element to set", ex);
            // Implement fallback strategy if needed
            return false;
        }
    }

    public boolean remove(Object o) {
        try {
            return retry(() -> rSet.remove(o));
        } catch (Exception ex) {
            logger.error("Failed to remove element from set", ex);
            return false;
        }
    }

    public boolean contains(Object o) {
        try {
            return retry(() -> rSet.contains(o));
        } catch (Exception ex) {
            logger.error("Failed to check if element is in set", ex);
            return false;
        }
    }

    public int size() {
        try {
            return retry(() -> rSet.size());
        } catch (Exception ex) {
            logger.error("Failed to get the size of the set", ex);
            return -1;
        }
    }

    public void clear() {
        try {
            retry(() -> {
                rSet.clear();
                return null;
            });
        } catch (Exception ex) {
            logger.error("Failed to clear the set", ex);
        }
    }

    // Implement other methods like addAll, removeAll, iterator, etc.

    // Retry logic for operations
    private <T> T retry(Callable<T> operation) throws Exception {
        int attempts = 3;
        for (int i = 0; i < attempts; i++) {
            try {
                return operation.call();
            } catch (Exception ex) {
                logger.warn("Operation failed on attempt {}", i + 1, ex);
                if (i == attempts - 1) {
                    throw ex;
                }
                // Optionally, add some delay here
            }
        }
        return null;
    }


    // Example of additional features: Element expiration
    public void addWithExpiration(E e, long timeToLive, TimeUnit timeUnit) {
        try {
            retry(() -> {
                rSet.add(e);
                rSet.expire(timeToLive, timeUnit);
                return null;
            });
        } catch (Exception ex) {
            logger.error("Failed to add element with expiration to set", ex);
        }
    }

    // Callable interface for retry operations
    @FunctionalInterface
    private interface Callable<T> {
        T call() throws Exception;
    }


    // Improved retry with custom configuration
    private <T> T retry(Callable<T> operation, int maxAttempts, long delay, TimeUnit timeUnit) throws Exception {
        for (int i = 0; i < maxAttempts; i++) {
            try {
                return operation.call();
            } catch (Exception ex) {
                logger.warn("Operation failed on attempt {}", i + 1, ex);
                if (i < maxAttempts - 1) {
                    timeUnit.sleep(delay);
                } else {
                    throw ex;
                }
            }
        }
        return null;
    }

    // Asynchronous add operation
    public CompletableFuture<Boolean> addAsync(E e) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return retry(() -> rSet.add(e), 3, 100, TimeUnit.MILLISECONDS);
            } catch (Exception ex) {
                logger.error("Failed to add element to set asynchronously", ex);
                throw new RuntimeException("Async operation failed", ex);
            }
        }, executorService);
    }

    // ... other asynchronous operations ...

    // Enhanced logging
    public boolean addAyns(E e) {
        try {
            boolean result = retry(() -> rSet.add(e), 3, 100, TimeUnit.MILLISECONDS);
            logger.info("Element added to set: {}", e);
            return result;
        } catch (Exception ex) {
            logger.error("Failed to add element to set", ex);
            return false;
        }
    }

    // ... existing code ...

    // Cleanup resources
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }
}

1714290046134.png

使用示例:

让通过一个示例来演示如何使用 EnhancedRSet 类,并调用其中的所有方法。将创建一个 EnhancedRSet 实例,执行一系列操作,并在结束时清理资源。 假设已经有了一个 RedissonClient 实例。

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 202/4/25 17:25
 * @Description EnhancedRSet 测试类
 * <p>
 */
public class EnhancedRSetExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 配置 Redisson
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient = Redisson.create(config);

        // 创建 EnhancedRSet 实例
        EnhancedRSet<String> enhancedRSet = new EnhancedRSet<>(redissonClient, "mySet");

        // 添加元素
        enhancedRSet.add("element1");
        enhancedRSet.add("element2");

        // 异步添加元素
        enhancedRSet.addAsync("element3").thenAccept(result -> {
            if (result) {
                System.out.println("Element3 added asynchronously");
            }
        }).get(); // 等待异步操作完成,实际应用中应避免使用 get() 阻塞主线程

        // 检查元素是否存在
        boolean contains = enhancedRSet.contains("element1");
        System.out.println("Set contains element1: " + contains);

        // 获取集合大小
        int size = enhancedRSet.size();
        System.out.println("Size of set: " + size);

        // 移除元素
        enhancedRSet.remove("element2");

        // 清空集合
        enhancedRSet.clear();

        // 添加元素并设置过期时间
        enhancedRSet.addWithExpiration("element4", 10, TimeUnit.SECONDS);

        // 关闭 EnhancedRSet 实例并释放资源
        enhancedRSet.shutdown();

        // 关闭 Redisson 客户端
        redissonClient.shutdown();
    }
}

1714289640296.png

使用总结:

这个示例中,首先配置了 Redisson 客户端,然后创建了 EnhancedRSet 实例。执行了一系列的操作,包括添加、异步添加、检查存在性、获取大小、移除和清空集合。还使用了 addWithExpiration 方法添加了一个带过期时间的元素。

对于异步添加操作,使用了 CompletableFuturethenAccept 方法来处理异步结果。在实际应用中,通常不会在异步操作后立即调用 get() 方法,因为这会阻塞调用线程,而是根据程序的需求处理异步结果。

最后,调用 shutdown 方法来关闭 EnhancedRSet 实例并释放资源,然后关闭 Redisson 客户端。

请注意,这个示例假设 Redis 服务器已经在本地运行并且监听默认端口 6379。在实际部署中,需要根据实际的 Redis 服务器配置来设置 Config 对象。此外,由于异步操作可能会抛出异常,在 get() 方法后添加了异常处理(ExecutionExceptionInterruptedException)。