掘金 后端 ( ) • 2024-05-08 10:52

原因:

目前基于项目新的问题,一次性获取60多万数据,每秒执行该查询大约 0.66 次 ,三天下来I/O量达到恐怖的60多G,严重影响项目正常运行。以下是解决步骤,请勿喷。。。。

public List<String> loadSOrigin(List<String> c, List<Integer> w, List<Integer> l) {
    return shelfService.getShelfCodes(c, w, l);
}

基于此我这边的设计想通过Redis 进行缓存数据,减少数据库请求,应该该数据变化量不大,有一定的容错空间,但是其返回值来看,如果存储到redis 也将达到12M数据,形成一个大key.故此则得想办法进行解决该大key问题。

原理:

在 Redis 中,所谓的 "大 key" 通常指的是单个 key 所存储的数据量非常大,它可能会导致性能问题,比如在执行某些操作时(例如删除、备份、迁移)可能会阻塞 Redis 服务器。对于是否算作 "大 key" 并没有一个严格的界限,这取决于具体的使用场景和 Redis 服务器的配置。然而,一般来说,一个超过 1 MB 的 key 可以被认为是一个较大的 key。 在目前情况下,12.58 MB 的数据存储在一个 List 类型的 key 中,可以认为是一个大 key。处理大 key 的策略包括:
1. 分页加载:不要一次性加载整个大 key 的所有元素,而是使用 LRANGE 命令分页加载,每次只获取部分元素。
2. 分割 key:可以将一个大的 List 分割成多个小的 List,例如根据数据的时间戳、ID 范围或其他逻辑进行分割,每个小 List 作为一个独立的 key 存储。
3. 数据压缩:如果数据是可以压缩的,可以在客户端对数据进行压缩后再存储到 Redis,这样可以减少内存的使用量。
4. 数据存储策略优化:评估是否所有数据都需要存储在 Redis 中,或者是否可以通过其他方式(如数据库)来存储部分数据。
5. 使用其他数据结构:如果适用,可以考虑使用其他更内存高效的数据结构,比如 zset(有序集合)
6. 定期清理:定期检查并清理不再需要的数据,以释放内存。
7. Redis 集群:如果单个 Redis 实例的内存不足以处理大量数据,可以考虑使用 Redis 集群来分散数据和负载。
8. 监控和警报:使用 Redis 的 INFO MEMORY 命令或其他监控工具来监控内存使用情况,并设置警报机制。

在实际操作之前,建议先分析你的应用场景和数据访问模式,以便选择最适合你需求的优化策略。如果需要进一步的帮助,可以考虑咨询 Redis 性能优化方面的专家。

实战解决:

设计思路是为了有效地处理大量数据的加载和缓存,同时确保数据的一致性和高效的缓存管理。基于此该基于以下设计思路进行设计: 1. 分布式环境下的数据一致性
   - 使用分布式锁(RLock)来确保在更新缓存时,只有一个进程/线程可以操作缓存,从而避免并发写入导致的数据不一致问题。
   - 使用缓存标记(cacheMarker)来指示缓存的状态,确保在更新数据时,可以先使缓存无效,然后再进行缓存更新操作。

2. 缓存失效和更新策略
   - 设置缓存的过期时间(TTL),确保缓存数据不会永久存储,从而避免潜在的陈旧数据问题。
   - 在更新缓存数据之前,先将缓存标记设置为无效,更新完成后再重新将其设置为有效,并更新过期时间,这样可以减少读取陈旧缓存数据的风险。

3. 处理大规模数据集
   - 由于单个 Redis 值可能会非常大,设计中采用了分批存储数据的方法。这样可以避免单个 Redis 值的大小超过处理能力或导致性能问题。
   - 通过使用子列表(sublist)的方式,将大数据集分解为更小的批次,并为每个批次生成单独的缓存键。

4. 优化读取性能
   - 在读取数据时,先检查缓存是否存在数据。如果存在,则直接从缓存中读取,避免了不必要的数据库访问,从而提高了性能。
   - 如果缓存中没有数据,则从数据库加载数据,并将结果存储到缓存中,以便后续请求可以快速获取。

5. 键名生成
   - 为了确保缓存的唯一性和可查询性,设计了一个方法(generateKey)来根据输入参数生成一个唯一的缓存键。这保证了相同的查询参数将会引用相同的缓存数据。

具体实现代码:

先根据对应参数生成对应key,后续获取生成都将使用到:

生成Key:

/**
 * 生成key
 *
 * @param c
 * @param w
 * @param l
 * @return
 */
private String generateKey(List<String> c, List<Integer> w, List<Integer> l) {
    // Implement a method to generate a unique key based on the method parameters
    // This could be a concatenation of the parameters' hashCodes or a more complex logic
    return "shelfCodes:" + c.hashCode() + ":" + w.hashCode() + ":" + l.hashCode();
}

image.png

简单版:

public List<String> loadSv(List<String> c, List<Integer> w, List<Integer> l) throws ExecutionException, InterruptedException {
    String key = generateKey(c, w, l);
    List<String> shelfCodes = new ArrayList<>();
    boolean dataFoundInRedis = false;
    // 尝试从 Redis 分批获取数据
    int index = 0;
    while (true) {
        String sublistKey = key + ":sublist:" + index;
        RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
        if (sublist.isExists()) {
            shelfCodes.addAll(sublist.readAll());
            dataFoundInRedis = true;
            index++;
        } else {
            break;
        }
    }
    // 如果 Redis 中没有数据,则从数据库加载并存储到 Redis
    if (!dataFoundInRedis) {
        shelfCodes = shelfService.getShelfCodes(c, w, l);
        storeS(c, w, l, shelfCodes, ttl, timeUnit); // 设置过期时间
    }
    return shelfCodes;
}

企业微信截图_17149718201736.png

加锁版:

public List<String> loadS(List<String> c, List<Integer> w, List<Integer> l) {
    String key = generateKey(c, w, l);
    List<String> shelfCodes = new ArrayList<>();
    boolean dataFoundInRedis = false;

    // 获取缓存标记
    RBucket<Boolean> cacheMarker = redissonClient.getBucket(key + ":marker", StringCodec.INSTANCE);
    if (Boolean.TRUE.equals(cacheMarker.get())) {
        // 尝试从 Redis 分批获取数据
        int index = 0;
        while (true) {
            String sublistKey = key + ":sublist:" + index;
            RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
            if (!sublist.isEmpty()) {
                shelfCodes.addAll(sublist);
                dataFoundInRedis = true;
                index++;
            } else {
                break;
            }
        }
    }

    // 如果 Redis 中没有数据,则从数据库加载并存储到 Redis
    if (!dataFoundInRedis) {
        // 使用分布式锁确保一致性
        RLock lock = redissonClient.getLock(key + ":lock");
        lock.lock();
        try {
            // 双重检查,确保数据没有被其他线程加载
            if (Boolean.TRUE.equals(cacheMarker.get())) {
                shelfCodes.clear();
                int index = 0;
                while (true) {
                    String sublistKey = key + ":sublist:" + index;
                    RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
                    if (!sublist.isEmpty()) {
                        shelfCodes.addAll(sublist);
                        index++;
                    } else {
                        break;
                    }
                }
            } else {
                shelfCodes = shelfService.getShelfCodes(c,w,l);
                // 先使缓存标记无效
                cacheMarker.set(false);
                // 存储新的数据到缓存
                storeS(c,w,l, shelfCodes, ttl, timeUnit);
                // 重新标记缓存为有效
                cacheMarker.set(true, ttl, timeUnit);
            }
        } finally {
            lock.unlock();
        }
    }

    return shelfCodes;
}

image.png

image.png

流程图:
sequenceDiagram
    participant Client
    participant LargeDataStorageService as Service
    participant RedissonClient as Redis
    participant ShelfService as Database

    Client->>Service: loadShelf(classCodes, warehouseAreaIds, logicAreaIds)
    Service->>Redis: Check cacheMarker
    alt cacheMarker is true
        loop Until no more sublists
            Service->>Redis: Get sublist from cache
            Redis-->>Service: Return sublist
        end
    else cacheMarker is false or missing
        Service->>Redis: Acquire lock
        alt Double-check cacheMarker
            Service->>Redis: Check cacheMarker
            Redis-->>Service: Return cacheMarker state
            Service->>Service: Skip to release lock
        else cacheMarker still false
            Service->>Database: getShelfCodes from DB
            Database-->>Service: Return shelfCodes
            Service->>Service: Invalidate cacheMarker
            Service->>Redis: Store shelfCodes in cache
            Service->>Service: Set cacheMarker to true
        end
        Service->>Redis: Release lock
    end
    Service-->>Client: Return shelfCodes
解读:

1. 客户端 (Client) 请求 LargeDataStorageService 加载货架代码 (loadShelf)。
2. 服务首先检查 Redis 中的缓存标记 (cacheMarker)。
3. 如果缓存标记为真,表示缓存中有数据,服务将循环获取缓存中的所有子列表,直到没有更多子列表。
4. 如果缓存标记不存在或为假,服务将尝试获取 Redis 的分布式锁。
5. 在锁内部,服务再次检查缓存标记,以确保在等待锁的期间没有其他进程已经加载了数据并更新了缓存。
6. 如果缓存标记仍然为假,服务将从数据库 (ShelfService) 加载货架代码。
7. 服务将数据库中获取的货架代码存储到 Redis 缓存中,并将缓存标记设置为真。
8. 服务释放 Redis 的分布式锁。
9. 服务返回货架代码给客户端。 10.

分片存储:

/**
 * 分片存储
 *
 * @param c
 * @param w
 * @param l
 * @param s
 * @param ttl
 * @param timeUnit
 */
public void storeS(List<String> c, List<Integer> w, List<Integer> l, List<String> s, long ttl, TimeUnit timeUnit) {
    // Generate a unique key for the given parameters
    String key = generateKey(c, w, l);
    // Batch operation for atomic execution
    RBatch batch = redissonClient.createBatch();
    // Assuming each sublist has a reasonable size that Redis can handle efficiently
    int sublistSize = 1000; // Adjust this size as needed
    for (int i = 0; i < s.size(); i += sublistSize) {
        int end = Math.min(s.size(), i + sublistSize);
        List<String> sublist = s.subList(i, end);
        // Store each sublist in a separate key with a suffix
        String sublistKey = key + ":sublist:" + (i / sublistSize);
        RListAsync<String> listAsync = batch.getList(sublistKey, StringCodec.INSTANCE);
        listAsync.addAllAsync(sublist);
        listAsync.expireAsync(ttl, timeUnit);
    }
    // Execute batch commands in one round trip
    batch.execute();
}

企业微信截图_17149718523106.png

解读上述代码:

  1. loadS 方法

    • 接受c(c)、w (w)和 l(l)作为参数。
    • 使用这些参数生成一个唯一的缓存键。
    • 尝试从 Redis 缓存中分批获取数据,如果找到数据则返回。
    • 如果缓存中没有数据,使用分布式锁来防止多个进程同时加载和缓存数据。
    • 在锁内部,再次检查缓存标记以确认数据是否已被其他进程缓存。
    • 如果数据仍未被缓存,则调用 shelfService.getShelfCodes 方法从数据库中加载数据。
    • 使用 storeShelfCodes 方法将数据存储到 Redis,并设置缓存标记,标记数据现在可用。
    • 最后释放锁并返回加载的数据。
  2. storeS 方法

    • 接受相同的参数以及要存储的货架代码列表和过期时间设置。
    • 使用批处理操作将数据分批存储到 Redis 中,每个批次是一个子列表,这有助于避免单个 Redis 值过大。
    • 设置每个子列表的过期时间。
  3. generateKey 方法

    • 生成基于输入参数的唯一键,用于在 Redis 中标识特定的数据集。

这个类的实现具有以下特点:

  • 数据一致性:通过使用分布式锁和缓存标记来确保数据一致性,防止缓存污染。
  • 缓存失效策略:通过设置过期时间来确保缓存数据最终会被刷新,允许新的数据被加载和缓存。
  • 性能优化:通过分批处理大量数据来优化性能,避免单个大型 Redis 值可能导致的性能问题。

这个类的设计考虑了大规模数据处理的需求,并通过缓存机制来提高数据检索的效率。同时,它还考虑了缓存数据的一致性和时效性,以确保系统的稳定性和数据的准确性。

总结:

总体而言,LargeDataStorageService 的设计考虑到了分布式系统中数据一致性的重要性,缓存数据的有效管理,以及如何有效地处理和存储大量数据。这个设计通过减少对数据库的依赖和优化缓存的使用,提高了数据检索的效率和系统的整体性能。