掘金 后端 ( ) • 2024-04-28 15:07

theme: vuepress

前言

这里我将通过 redis-py 简易封装一个异步的Redis客户端,然后主要讲解设计一个支持各种缓存代理(本地内存、Redis等)的缓存装饰器,用于在减少一些不必要的计算、存储层的查询、网络IO等。

具体代码都封装在 HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com) 中,以大家便捷使用。

异步redis客户端

首先安装 redis-py 库

pip install redis

Redis 之前是不支持异步的,后面为了统一异步redis操作与python常用的redis.py 的api接口一致,aioredis的作者已经将 aioredis 加入了redis中维护,安装的版本大于 4.2.0rc1 就行。

  • aioredis:https://github.com/aio-libs-abandoned/aioredis-py
  • redis:https://github.com/redis/redis-py

BaseRedisManager 封装

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { redis连接处理模块 }
# @Date: 2023/05/03 21:13
from datetime import timedelta
from typing import Optional, Union

from redis import Redis
from redis import asyncio as aioredis

from py_tools import constants
from py_tools.decorators.cache import CacheMeta, cache_json, RedisCacheProxy, AsyncRedisCacheProxy


class BaseRedisManager:
    """Redis客户端管理器"""

client: Union[Redis, aioredis.Redis] = None
    cache_key_prefix = constants.CACHE_KEY_PREFIX

    @classmethod
    def init_redis_client(
            cls,
            async_client: bool = False,
            host: str = "localhost",
            port: int = 6379,
            db: int = 0,
            password: Optional[str] = None,
            max_connections: Optional[int] = None,
            **kwargs
    ):
        """
        初始化 Redis 客户端。

        Args:
        async_client (bool): 是否使用异步客户端,默认为 False(同步客户端)
        host (str): Redis 服务器的主机名,默认为 'localhost'
        port (int): Redis 服务器的端口,默认为 6379
        db (int): 要连接的数据库编号,默认为 0
        password (Optional[str]): 密码可选
        max_connections (Optional[int]): 最大连接数。默认为 None(不限制连接数)
        **kwargs: 传递给 Redis 客户端的其他参数

        Returns:
        None
        """
        if cls.client is None:
            redis_client_cls = Redis
            if async_client:
                redis_client_cls = aioredis.Redis

            cls.client = redis_client_cls(
                host=host, port=port, db=db, password=password, max_connections=max_connections, **kwargs
            )

        return cls.client

    @classmethod
    def cache_json(
            cls,
            ttl: Union[int, timedelta] = 60,
            key_prefix: str = None,
    ):
        """
        缓存装饰器(仅支持缓存能够json序列化的数据)
        缓存函数整体结果
        Args:
        ttl: 过期时间 默认60s
        key_prefix: 默认的key前缀, 再未指定key时使用

        Returns:
        """
        key_prefix = key_prefix or cls.cache_key_prefix
        if isinstance(ttl, timedelta):
            ttl = int(ttl.total_seconds())

        cache_proxy = RedisCacheProxy(cls.client)
        if isinstance(cls.client, aioredis.Redis):
            cache_proxy = AsyncRedisCacheProxy(cls.client)

        return cache_json(cache_proxy=cache_proxy, key_prefix=key_prefix, ttl=ttl)

还是跟之前封装客户端一样的简易封装,由类属性 client 维护真正操作的redis的客户端,通过 init_redis_client 方法进行初始化。这样封装的目的就是在系统中只初始化一份 redis 客户端,操作时可以直接使用类方法。BaseRedisManager 只实现一些通用的 redis 操作(有待挖掘),具体还是需要业务Manager来继承封装业务中操作redis的方法。目前只实现了一个redis的缓存装饰器,其实内部就是组织参数设置redis代理,然后调用另外一个通用的缓存装饰器,这样使用的时候不需要制定缓存代理了。

缓存装饰器

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedelta

import cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constants

MEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))


def cache_json(
        cache_proxy: BaseCacheProxy = MEMORY_PROXY,
        key_prefix: str = constants.CACHE_KEY_PREFIX,
        ttl: Union[int, timedelta] = 60,
):
    """
    缓存装饰器(仅支持缓存能够json序列化的数据)
    Args:
    cache_proxy: 缓存代理客户端, 默认系统内存
    ttl: 过期时间 默认60s
    key_prefix: 默认的key前缀

    Returns:
    """
    key_prefix = f"{key_prefix}:cache_json"
    if isinstance(ttl, timedelta):
        ttl = int(ttl.total_seconds())

    def _cache(func):

        def _gen_key(*args, **kwargs):
            """生成缓存的key"""

            # 根据函数信息与参数生成
            # key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash
            param_args_str = ",".join([str(arg) for arg in args])
            param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))
            hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"
            hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()

            # 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hash
            hash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"
            return hash_key

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            """同步处理"""

            # 生成缓存的key
            hash_key = _gen_key(*args, **kwargs)

            # 先从缓存获取数据
            cache_data = cache_proxy.get(hash_key)
            if cache_data:
                # 有直接返回
                print(f"命中缓存: {hash_key}")
                return json.loads(cache_data)

            # 没有,执行函数获取结果
            ret = func(*args, **kwargs)

            # 缓存结果
            cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)
            return ret

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            """异步处理"""

            # 生成缓存的key
            hash_key = _gen_key(*args, **kwargs)

            # 先从缓存获取数据
            cache_data = await cache_proxy.get(hash_key)
            if cache_data:
                # 有直接返回
                return json.loads(cache_data)

            # 没有,执行函数获取结果
            ret = await func(*args, **kwargs)

            # 缓存结果
            await cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)
            return ret

        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

    return _cache

cache_json 是一个带单参数的缓存装饰器,可以指定一些缓存的代理、缓存key前缀、缓存ttl等。

内部实现了同步、异步函数的缓存处理,关键点其实就是如何构造唯一的缓存key,这里就是根据key前缀与函数的一些签名信息来构造的。

def _gen_key(*args, **kwargs):
    """生成缓存的key"""

    # 没有传递key信息,根据函数信息与参数生成
    # key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash
    param_args_str = ",".join([str(arg) for arg in args])
    param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))
    hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"
    hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()

    # 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hash
    hash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"
    return hash_key

函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash,在处理关键字参数的需要排个序,来保证相同的参数,顺序不同但缓存key一致。后面的逻辑就是常见的设置缓存操作。

image.png

缓存代理类

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedelta

import cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constants


class CacheMeta(BaseModel):
    """缓存元信息"""

    key: str = Field(description="缓存的key")
    ttl: Union[int, timedelta] = Field(description="缓存有效期")
    cache_client: str = Field(description="缓存的客户端(Redis、Memcached等)")
    data_type: str = Field(description="缓存的数据类型(str、list、hash、set)")


class BaseCacheProxy(object):
    """缓存代理基类"""

    def __init__(self, cache_client):
        self.cache_client = cache_client  # 具体的缓存客户端,例如Redis、Memcached等

    def set(self, key: str, value: str, ttl: int):
        raise NotImplemented

    def get(self, key):
        cache_data = self.cache_client.get(key)
        return cache_data


class RedisCacheProxy(BaseCacheProxy):
    """同步redis缓存代理"""

    def __init__(self, cache_client: Redis):
        super().__init__(cache_client)

    def set(self, key, value, ttl):
        self.cache_client.setex(name=key, value=value, time=ttl)


class AsyncRedisCacheProxy(BaseCacheProxy):
    """异步Redis缓存代理"""

    def __init__(self, cache_client: aioredis.Redis):
        super().__init__(cache_client)

    async def set(self, key, value, ttl):
        await self.cache_client.setex(name=key, value=value, time=ttl)

    async def get(self, key):
        cache_data = await self.cache_client.get(key)
        return cache_data


class MemoryCacheProxy(BaseCacheProxy):
    """系统内存缓存代理"""

    def __init__(self, cache_client: cacheout.Cache):
        super().__init__(cache_client)

    def set(self, key, value, ttl):
        self.cache_client.set(key=key, value=value, ttl=ttl)


MEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))

这里设置一个缓存代理抽象类是用于封装屏蔽不同缓存客户端的操作不一致性。统一成如下入口

def set(self, key: str, value: str, ttl: int):
    raise NotImplemented

def get(self, key):
    cache_data = self.cache_client.get(key)
    return cache_data

让具体的缓存客户端重写(实现)这两个方法,以达到缓存装饰器的通用性。目前只实现了同步、异步redis缓存代理以及通过 cacheout 库实现的本地内存缓存代理,后面接入其他的缓存代理(例如Memcached等)就不用动cache_json函数了,只要继承 BaseCacheProxy,实现具体的 set、get 操作即可。

pip install python-memcached
import memcache

class MemcacheCacheProxy(BaseCacheProxy):

    def __init__(self, cache_client: memcache.Client):
        super().__init__(cache_client)

    def set(self, key, value, ttl):
        self.cache_client.set(key, value, time=ttl)

由于获取缓存的方法逻辑一致,故而直接复用就行。

测试Demo

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: cache.py
# @Desc: { cache demo 模块 }
# @Date: 2024/04/23 11:11
import asyncio
import time
from datetime import timedelta

import cacheout

from py_tools.connections.db.redis_client import BaseRedisManager
from py_tools.decorators.cache import cache_json, MemoryCacheProxy, RedisCacheProxy, AsyncRedisCacheProxy


class RedisManager(BaseRedisManager):
    client = None


class AsyncRedisManager(BaseRedisManager):
    client = None


RedisManager.init_redis_client(async_client=False)
AsyncRedisManager.init_redis_client(async_client=True)

memory_proxy = MemoryCacheProxy(cache_client=cacheout.Cache())
redis_proxy = RedisCacheProxy(cache_client=RedisManager.client)
aredis_proxy = AsyncRedisCacheProxy(cache_client=AsyncRedisManager.client)


@cache_json(key_prefix="demo", ttl=3)
def memory_cache_demo_func(name: str, age: int):
    return {"test_memory_cache": "hui-test", "name": name, "age": age}


@cache_json(cache_proxy=redis_proxy, ttl=10)
def redis_cache_demo_func(name: str, age: int):
    return {"test_redis_cache": "hui-test", "name": name, "age": age}


@cache_json(cache_proxy=aredis_proxy, ttl=timedelta(minutes=1))
async def aredis_cache_demo_func(name: str, age: int):
    return {"test_async_redis_cache": "hui-test", "name": name, "age": age}


@AsyncRedisManager.cache_json(ttl=30)
async def aredis_manager_cache_demo_func(name: str, age: int):
    return {"test_async_redis_manager_cache": "hui-test", "name": name, "age": age}


def memory_cache_demo():
    print("memory_cache_demo")
    ret1 = memory_cache_demo_func(name="hui", age=18)
    print("ret1", ret1)
    print()

    ret2 = memory_cache_demo_func(name="hui", age=18)
    print("ret2", ret2)
    print()

    time.sleep(3)
    ret3 = memory_cache_demo_func(age=18, name="hui")
    print("ret3", ret3)
    print()

    assert ret1 == ret2 == ret3


def redis_cache_demo():
    print("redis_cache_demo")
    ret1 = redis_cache_demo_func(name="hui", age=18)
    print("ret1", ret1)
    print()

    ret2 = redis_cache_demo_func(name="hui", age=18)
    print("ret2", ret2)

    assert ret1 == ret2


async def aredis_cache_demo():
    print("aredis_cache_demo")
    ret1 = await aredis_cache_demo_func(name="hui", age=18)
    print("ret1", ret1)
    print()

    ret2 = await aredis_cache_demo_func(name="hui", age=18)
    print("ret2", ret2)

    assert ret1 == ret2


async def aredis_manager_cache_demo():
    print("aredis_manager_cache_demo")
    ret1 = await aredis_manager_cache_demo_func(name="hui", age=18)
    print("ret1", ret1)
    print()

    ret2 = await aredis_manager_cache_demo_func(name="hui", age=18)
    print("ret2", ret2)

    assert ret1 == ret2


async def main():
    memory_cache_demo()

    redis_cache_demo()

    await aredis_cache_demo()

    await aredis_manager_cache_demo()


if __name__ == '__main__':
    asyncio.run(main())

输出结果

memory_cache_demo
ret1 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}

命中缓存: demo:cache_json:__main__:memory_cache_demo_func:46c6a618a88eb5067a00915c10c97c6c72d5073ecf9b04060433de75b2d21f51
ret2 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}

ret3 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}

redis_cache_demo
ret1 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}

命中缓存: py-tools:cache_json:__main__:redis_cache_demo_func:a00b13aa2e1e56ad328d1956bc3c3fb8e89b7007453a780e866cc3ccafb51d73
ret2 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_cache_demo
ret1 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}

ret2 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_manager_cache_demo
ret1 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}

ret2 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}

Redis 缓存情况

缓存信息还是挺清晰的就是有点长。由于是从主入口调用的函数,所以 func.__module__ 是 __main__。

这缓存装饰器一般适用于一些参数相同, 结果经常不变的情况下,以及允许短时间内出现数据不一致的场景。如下是一些典型的应用场景

  1. API Token缓存:对于需要使用API Token进行身份验证的API请求,通常API Token具有一定的有效期。在这种情况下,你可以使用缓存装饰器来缓存API Token,以避免在每次请求时重新生成或从后端服务获取。这样可以降低对后端服务的负载,并提高系统的响应速度。
  2. OSS Sign URL缓存:当需要生成签名URL来访问对象存储服务(如AWS S3、阿里云OSS)中的资源时,通常需要对URL进行签名以确保安全性。在这种情况下,你可以使用缓存装饰器来缓存已签名的URL,在一定时间内重复使用相同的签名URL,而不必重新计算签名。这样可以降低对签名计算资源的消耗,并减少重复的签名请求。
  3. 频繁查询的数据缓存:对于一些数据不经常变化但是频繁被查询的情况,比如一些静态配置信息、全局参数等,可以使用缓存装饰器将查询结果缓存起来,减少数据库查询次数,提高系统的响应速度。
  4. 外部API响应结果缓存:当你调用外部API获取数据时,有时这些数据在一段时间内不会发生变化。在这种情况下,你可以使用缓存装饰器来缓存外部API的响应结果,以避免频繁地向外部API发出请求。这不仅可以提高系统的性能,还可以降低对外部服务的依赖性。

总的来说,缓存装饰器可以应用于许多场景,特别是在需要提高性能、减少资源消耗和避免重复请求数据的情况下。通过合理地设置缓存时间,可以权衡数据的新鲜度和系统性能,从而实现更好的用户体验。

源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)