掘金 后端 ( ) • 2024-03-18 00:10

highlight: a11y-dark theme: channing-cyan

目前,想用python开发异步任务,比较好用的就只用celery。不过celery坑十分的多,光是issue就几百个

但是想用好celery还是很容易的

今天就介绍一下celery在生产中的使用技巧,包括celery config,日志配置,异步函数配置,任务结果记录,任务监控等

github easy_celery

Celery 介绍

Celery 是 Python 的一个第三方模块,它本质上是一个任务调度框架。一般用来实现:

  • 异步任务:一些耗时的操作可以交给 celery 异步执行,而不用等着程序处理完才知道结果。比如:视频转码、邮件发送、消息推送等等。
  • 定时任务:比如定时推送消息、定时爬取数据、定时统计数据等等。

模块特点

  • 灵活:是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。
  • 快速:一个单进程的 celery 每分钟可处理上百万个任务。
  • 开源:是一个完全基于 Python 语言编写的开源项目。

使用场景

前面刚提到,celery 适用于一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如:

  • 场景一:在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,而发邮件是个 IO 阻塞式任务,如果直接把它放到应用当中,就需要等邮件发出去之后才能进行下一步操作,此时用户只能等待再等待。更好的方式是在业务逻辑中触发一个发邮件的异步任务,而主程序可以继续往下运行。
  • 场景二:接口自动化测试平台中,当选择 N 条 case 后点击执行按钮,因为运行选择的多条 case 是一个比较耗时的过程,所以不可能在点击按钮后,页面一直处于等待运行结果的状态,那样会给用户一个页面卡住或者夯住的体验,所以运行用例的过程就应该是一个异步任务去处理。

工作流程

有一点需要注意,celery 是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis 缓存、数据库等等。

一般最常用的是 RabbitMQ 或者 Redis,而我接下来的演示都会使用 Redis。

先来看一下 celery 的工作流程图:

image.png

主要包含以下几个模块:

  • 任务模块 Task:包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
  • 消息中间件 Broker:即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
  • 任务执行单元 Worker:是任务的执行单元,它实时监控消息队列,会将任务从队列中顺序取出并执行。
  • 任务结果存储 Backend:用于在任务结束之后保存状态信息和结果,以便查询,一般是数据库,当然只要具备存储功能都可以作为 Backend,比如 RabbitMQ,Redis 和 MongoDB 等。

安装 Celery

Celery 的安装比较简单,直接 pip install celery 即可。这里我本地的 Python 版本是 3.8.9,celery 版本是 5.2.7。

另外,由于 celery 本身不提供任务存储的功能,所以这里我使用 Redis 作为消息队列,负责存储任务。

后续 celery 就会将任务存到 broker 里面,要想实现这一点,就必须还要有能够操作相应 broker 的驱动。Python 操作 Redis 的驱动也叫 redis,操作 RabbitMQ 的驱动叫 pika,根据实际情况选择,安装方式如下:

# Redis 驱动
pip install redis

# RabbitMQ 驱动
pip install pika

实现异步任务示例

使用 celery 实现异步任务主要包含三个步骤:

  • 创建一个 celery 实例,并编写要变成异步任务的函数
  • 启动 celery worker
  • 调用异步任务

1)编写任务

新建一个工程,就叫 celery_demo,然后在里面新建一个 app.py 文件。

# celery_demo/app.py
import time
from celery import Celery

broker_url = "redis://127.0.0.1:6379/1"
result_backend = "redis://127.0.0.1:6379/2"

app = Celery(
    "celery_demo",
    broker=broker_url,
    backend=result_backend
)


@app.task
def my_task(name, age):
    print("准备执行任务啦")
    time.sleep(3)
    return f"name is {name}, age is {age}"

在这个文件里的代码做了这三件事:

  • 定义 celery 所需的配置信息
    • broker_url 配置 Redis 数据库地址,格式为 redis://:password@hostname:port/db_number
    • result_backend 配置任务结果存储位置,将保存每个任务的执行结果。
  • 实例化 Celery
    • 第一个参数是指定一个 name,名称可自定义
    • 后面两个参数分别是 broker 的地址、backend 的地址
  • 编写一个异步任务,这个任务先写在这里,以后再考虑是否抽离到专门写任务的模块里
    • 只有通过 @app.task 装饰后的函数,才能成为可被 celery 调度的任务
    • 需要注意的是,如果使用了多个装饰器,那么要确保 task 装饰器最后被应用,即写在最上面

函数被 @app.task 装饰之后,可以理解为它就变成了一个任务工厂,然后调用任务工厂的 delay 方法即可创建任务并发送到队列里面。我们也可以创建很多个任务工厂,但是这些任务工厂必须要让 worker 知道,否则不会生效。所以如果修改了某个任务工厂、或者添加、删除了某个任务工厂,那么一定要让 worker 知道,而做法就是先停止 celery worker 进程,然后再重新启动。

如果我们新建了一个任务工厂,然后在没有重启 worker 的情况下,就用调用它的 delay 方法创建任务、并发送到队列的话,那么会抛出一个 KeyError,提示找不到相应的任务工厂。

这点其实很好理解,因为代码已经加载到内存里面了,光修改了源文件而不重启是没用的。因为加载到内存里面的还是原来的代码,不是修改过后的。 :::

2)启动 worker

我们说执行任务的对象是 worker,那么我们是不是需要创建一个 worker 呢?显然是需要的,而创建 worker 可以使用如下命令创建:

celery -A app worker -l info

解释一下:

  • -A 参数:表示 celery 对象所在的 py 文件的文件名,本例是在 app.py 中,celery 会自动在该文件中寻找 celery 对象实例。当然,我们也可以自己指定,如对于本例可使用 -A app.app
  • -l 参数:指定日志级别,--loglevel 的缩写,默认为 warning。
  • -c 参数:表示明发数量,比如再加上 -c 10,表示限制并发数量为 10。
  • -D:启动 worker 默认是前台启动,加上 -D 会后台启动。

执行命令后可以看到输出如下:

以上就是在前台启动了一个 worker,正在等待从队列中获取任务,图中也显示了相应的信息。然而此时队列中并没有任务,所以我们需要在另一个文件中创建任务并发送到队列里面去。

3)创建并调用任务

再新建一个 Python 文件用来调用上面编写的异步任务:

# celery_demo/run_async_task.py
import time
from app import my_task

start = time.perf_counter()
my_task.delay("张三", 13)
print(time.perf_counter() - start)

这个文件没几行,主要做了三件事:

  • 从刚才的 app.py 中导入 my_task
  • 通过 delay() 方法 或 apply_async() 方法(这个方法后面会讲)来调用任务。
  • 统计任务执行时间。

::: tip 小贴士 这里要注意,创建任务不是直接调用 my_task 函数,因为那样的话就是在本地执行了,我们的目的是将任务发送到队列里面去,进而让监听队列的 worker 从队列里面获取任务并执行。

因为在 app.py 里面,my_task@app.task 装饰了,所以我们需要调用它的 delay 方法。

调用 delay 之后,就会创建一个任务,然后发送到队列里面去(也就是我们这里的 Redis)。至于参数,普通函数调用的时候怎么传,在 delay 里面依旧怎么传。 :::

现在,执行该文件,发现只用了约 0.06 秒,而 my_task 函数里面明明 sleep 了 3 秒。所以说明这一步是不会阻塞的,调用 my_task.delay() 只是创建一个任务并发送至队列。再看一下 worker 的输出信息:

可以看到任务已经被消费者接收并且消费了,而且调用 delay 方法是不会阻塞的,花费的那 0.008 秒是用在了其他地方,比如连接 Redis 发送任务等等。

4)Redis 中存储的信息

再看看 Redis 中存储的信息,1 号库用作 broker,负责存储任务;2 号库用作 backend,负责存储执行结果。我们来看 2 号库:

# 选择 2 号库
127.0.0.1:6379> select 2
OK
127.0.0.1:6379[2]>

# 查看里面所有的 key
# 因为我只将任务执行了一遍,所以只有一个 key
127.0.0.1:6379[2]> keys *
1) "celery-task-meta-7b74ed26-eaac-45cb-9079-938f4b7bc101"
127.0.0.1:6379[2]>

# 查看任务的相关信息,会返回一个 JSON 字符串
# 里面包含了任务的状态、返回值、异常回溯信息(如果执行没有出错则为 None)等等
127.0.0.1:6379[2]> get celery-task-meta-7b74ed26-eaac-45cb-9079-938f4b7bc101
"{\"status\": \"SUCCESS\", \"result\": \"name is \\u5f20\\u4e09, age is 13\", \"traceback\": null, \"children\": [], \"date_done\": \"2022-09-23T02:51:11.851307\", \"task_id\": \"7b74ed26-eaac-45cb-9079-938f4b7bc101\"}"
127.0.0.1:6379[2]>

# 再看看这个 key 是否有过期时间
# 瞧,原来它会在大约 24 小时后过期(celery 默认设置的是 24h,该值是可配置的)
127.0.0.1:6379[2]> ttl celery-task-meta-7b74ed26-eaac-45cb-9079-938f4b7bc101
(integer) 86074
127.0.0.1:6379[2]>

以上我们就启动了一个 worker 并成功消费了队列中的任务,并且还从 Redis 里面拿到了执行信息。当然啦,如果只能通过查询 backend 才能拿到信息的话,那 celery 就太不智能了。我们也可以直接从程序中获取。

5)查询任务执行信息

Redis(backend)里面存储了很多关于任务的信息,这些信息我们可以直接在程序中获取。

比如修改此时的 run_async_task.py,让它在调用任务的同时打印一些结果看看:

# celery_demo/run_async_task
from app import my_task

# 调用任务
res = my_task.delay("张三", 13)

# 打印返回值类型
print(type(res))
"""
<class 'celery.result.AsyncResult'>
"""

# 直接打印,显示任务的 id
print(res)
"""
7a7c8473-d918-4387-b759-f042fccced7e
"""

# 获取状态, 显然此刻没有执行完
# 因此结果是PENDING, 表示等待状态
print(res.status)
"""
PENDING
"""

# 获取 id,两种方式均可
print(res.task_id)
print(res.id)
"""
7a7c8473-d918-4387-b759-f042fccced7e
7a7c8473-d918-4387-b759-f042fccced7e
"""

# 获取任务执行结束时的时间
# 任务还没有结束, 所以返回None
print(res.date_done)
"""
None
"""

# 获取任务的返回值, 可以通过 result 或者 get()
# 注意: 如果是 result, 那么任务还没有执行完的话会直接返回 None
# 如果是 get(), 那么会阻塞直到任务完成
print(res.result)
print(res.get())
"""
None
name is 张三, age is 13
"""

# 再次查看状态和执行结束时的时间
# 发现 status 变成 SUCCESS
# date_done 变成了执行结束时的时间
print(res.status)
# 但显示的是 UTC 时间
print(res.date_done)
"""
SUCCESS
2022-09-23 03:35:26.867303
"""

可以打印结果可以发现,调用完任务工厂的 delay 方法之后,会创建一个任务并发送至队列,同时返回一个 AsyncResult 对象,基于此对象我们可以拿到任务执行时的所有信息,具体可参考官方文档 - celery.result

另外我们说结果需要存储在 backend 中,如果没有配置 backend,那么获取结果的时候会报错。至于 backend,因为它是存储结果的,所以一般会保存在数据库中,因为要持久化。我这里为了方便,就还是保存在 Redis 中。

Celery Borker的选择

不管是redis还是rabbitmq,生产上一定要先预估消息的总数,并且中间件一定要使用高可用的模式。避免中间件宕机造成不可预估的损失

redis

  • 快速:Redis是基于内存的数据存储,读写速度非常快,适合处理大量短期的任务。
  • 简单:Redis的配置相对较简单,易于部署和维护。
  • 功能限制:Redis的功能相对较少,不如RabbitMQ丰富。

rabbitmq

  • 可靠性:RabbitMQ提供了持久化、高可用性等特性,能够保证消息的可靠传输,适合生产环境中对可靠性有要求的场景。
  • 灵活性:RabbitMQ支持丰富的消息模式,包括点对点、发布/订阅、路由等,可以满足各种复杂的消息处理需求。
  • 复杂性:相比Redis,RabbitMQ的配置和部署相对复杂一些,需要更多的学习和调优。

简单搭建一个redis-sentinel

主从

version: '3'
services:
  master:
    image: redis
    container_name: redis-master
    command: redis-server  --requirepass zxcde@1  --masterauth zxcde@1
    ports:
      - 6380:6379
  slave1:
    image: redis
    container_name: redis-slave-1
    ports:
      - 6381:6379
    command:  redis-server --slaveof redis-master 6379 --masterauth zxcde@1  --requirepass zxcde@1
  slave2:
    image: redis
    container_name: redis-slave-2
    ports:
      - 6382:6379
    command: redis-server --slaveof redis-master 6379 --masterauth zxcde@1 --requirepass zxcde@1

image.png

哨兵

version: '3'
services:
  sentinel1:
    image: redis
    container_name: redis-sentinel-1
    ports:
      - 26379:26379
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel1.conf:/usr/local/etc/redis/sentinel.conf
  sentinel2:
    image: redis
    container_name: redis-sentinel-2
    ports:
    - 26380:26379
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel2.conf:/usr/local/etc/redis/sentinel.conf
  sentinel3:
    image: redis
    container_name: redis-sentinel-3
    ports:
      - 26381:26379
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel3.conf:/usr/local/etc/redis/sentinel.conf
networks:
  default:
    external:
      name: redis_default

port 26379
dir "/tmp"
sentinel myid c1d5bc05ab479807f24dc73a3bfd6f62986bd48c
sentinel deny-scripts-reconfig yes
sentinel monitor mymaster 172.26.0.3 6379 2
sentinel auth-pass mymaster zxcde@1
sentinel config-epoch mymaster 1
sentinel leader-epoch mymaster 1
# Generated by CONFIG REWRITE
sentinel known-replica mymaster 172.26.0.2 6379
sentinel known-replica mymaster 172.26.0.4 6379
sentinel known-sentinel mymaster 172.26.0.7 26379 082905d0d92e8af9839b771183a29feeda2bff46
sentinel known-sentinel mymaster 172.26.0.5 26379 5cf8ce9e522b640c18be002f39895df8843ea1d8
sentinel current-epoch 1

记得修改相对应的ip,复制出三个sentinel.conf

image.png

生产使用celery

上面的最简易代码示例主要是实现功能,让代码能跑通。而在实际开发中,我们需要做一些封装和抽象。

封装过后的项目目录如下:

celery_demo/
├── app.py
├── config.py
├── run_async_task.py
├── tasks/
│   └── my_tasks.py
└── ...

下面逐一讲解这些文件里都是哪些内容。

1)提取配置文件

实际开发中,一般都会将配置项统一写入到一个配置文件中,然后通过加载配置文件的方式来指定这些配置项。

配置文件 celery_demo/config.py

# celery_demo/config.py
broker_url = "redis://127.0.0.1:6379/1"
result_backend = "redis://127.0.0.1:6379/2"

主文件 app.py

# celery_demo/app.py
from celery import Celery
import config

# 指定一个 name 即可
app = Celery("celery_demo")
# 其它参数通过加载配置文件的方式指定
app.config_from_object(config)

2)将任务抽离出来

celery 可以支持非常多的定时任务,而不同种类的定时任务我们一般都会写在不同的模块中(当然这里的案例目前只有一个),然后再将这些模块组织在一个单独的目录中。

比如单独创建一个 celery_demo/tasks/my_tasks.py 用来放置一部分任务,随便往里面写点东西,当然你也可以创建更多的文件:

# celery_demo/tasks/my_tasks.py
from app import app


@app.task
def task1():
    print("我是task1")
    return "task1你好"


@app.task
def task2(name):
    print(f"我是{name}")
    return f"{name}你好"


@app.task
def task3():
    print("我是task3")
    return "task3你好"


@app.task
def task4(name):
    print(f"我是{name}")
    return f"{name}你好"

然后回到 celery_demo/app.py,通过 include 来加载这些任务:

# celery_demo/app.py
from celery import Celery
import config

# 通过 include 指定存放任务的 py 文件
app = Celery("celery_demo", include=["tasks.my_tasks"])

# 其它参数通过加载配置文件的方式指定
app.config_from_object(config)

这里需要注意,通过 include 指定存放任务的 py 文件时,它和 worker 启动路径之间的关系:

  • 我们是在 celery_demo/ 目录下启动的 worker,所以上面应该写成 "tasks.my_tasks"
    • 启动时的 -A 参数就是 -A app
  • 如果是在 celery_demo/ 的上一级目录启动 worker,这里就要写成 "celery_demo.tasks.my_tasks"
    • 启动时的 -A 参数也要换成 -A celery_demo.app

如果还有其它文件,比如 second_tasks.pythird_tasks.py,按照同样的方式加进这个参数列表就行了。

注意修改完后要重新启动 worker:

输出结果显示,任务工厂都已经被加载进来了,接下来我们调用这些任务并发送至队列。

3)调用任务

要调用这些任务,编辑现在的 run_async_task.py 如下:

# celery_demo/run_async_task.py
from tasks.my_tasks import *

# get() 方法用于获取任务的返回值,前面讲过
task1.delay().get()
task2.delay("张三").get()
task3.delay().get()
task4.delay("李四").get()

执行脚本,可以发现结果正常返回了,再来看看此时的 worker 的输出:

通过上面这些微小的抽象封装,添加任务就更加方便和优雅一些了。

生产使用日志

  • python的原生日志真的不会用
  • 所以我们在使用tornado的时候,就用了loguru
  • 毕竟叫做傻瓜式(stupidly)日志库

image.png

安装

pip install loguru

具体参考官方文档https://loguru.readthedocs.io/en/stable/overview.html

1) logurus

装饰器@setup_logging.connect,给我们提供了初始化日志的功能。

  • init_log:初始化loguru
  • InterceptHandler 将标准log转化为loguru
  • 关键代码在于logger_opt.log(mapper.get(record.levelno), record.getMessage())
@setup_logging.connect
def setup_logging(*args, **kwargs):
    init_log()
    logging.basicConfig(handlers=[InterceptHandler()], level="INFO")
def formatter(record):
    try:
        from celery._state import get_current_task
    except ImportError:
        task_id = ""
        task_name = ""
    else:
        task = get_current_task()
        if task is None:
            task_id = ""
            task_name = ""
        else:
            task_id = task.request.id
            task_name = task.name

    record["extra"].update(task_id=task_id, task_name=task_name)

    data = {
        "time": record["time"].strftime('%Y-%m-%d %H:%M:%S'),
        "level": record["level"].name,
        "message": record["message"],
        "task_id": task_id,
        "task_name": task_name,
        "function": record["function"]
    }

    record["extra"] = json.dumps(data, default=str, ensure_ascii=False)

    return "{extra}\n"


def init_log():
    # log
    if settings.service.server.env == "prod":
        logger.remove(handler_id=None)  # 生产环境禁止log
    time_name = ".{time:YYYY-MM-DD}"
    celery_log = os.path.join(base_path, settings.service.celery_log.celery_path,
                              settings.service.celery_log.celery_name)
    # 初始化log
    logger.add(
        celery_log + time_name,
        level=20,
        rotation="00:00",
        format=formatter
    )


class InterceptHandler(logging.Handler):
    def emit(self, record: logging.LogRecord):
        logger_opt = logger.opt(depth=6, exception=record.exc_info)
        mapper = {
            20: "INFO",
            10: "DEBUG",
            30: "WARNING",
            40: "ERROR",
            50: "CRITICAL"
        }
        logger_opt.log(mapper.get(record.levelno), record.getMessage())

执行asynio函数与记录任务状态

1) 任务callback

假如你想在任务执行失败的时候,打印错误信息并且发出报警,该怎么搞。有两个方法:

(1)link_error

(2)on_failure/on_success

link_error的方法比较爽,但是我没有亲测过,on_failure的方式,是当任务抛出异常的时候,会触发一些事件,提供给大家代码:

定义一个新类重写Task里的on_success和on_failure方法:


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        """Success handler.

             Run by the worker if the task executes successfully.

             Arguments:
                 retval (Any): The return value of the task.
                 task_id (str): Unique id of the executed task.
                 args (Tuple): Original arguments for the executed task.
                 kwargs (Dict): Original keyword arguments for the executed task.

             Returns:
                 None: The return value of this handler is ignored.
        """

        success_onutc = datetime.datetime.utcnow()
        executivetime = str((success_onutc - self.created_onutc).seconds) + 's'

        mongo.insert_document(collection=table.TaskRecord, document={
            "msg": "",
            "job_id": task_id,
            "run_time": success_onutc,
            "state": "success",
            "rule_id": task_id,
            "rule_name": celery.current_task.name,
            "insert_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            # "args": args
        })
        mongo.close()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        success_onutc = datetime.datetime.utcnow()
        executivetime = str((success_onutc - self.created_onutc).seconds) + 's'

        mongo.insert_document(collection=table.TaskRecord, document={
            "msg": str(exc),
            "job_id": task_id,
            "run_time": success_onutc,
            "state": "error",
            "rule_id": task_id,
            "rule_name": self.name,
            "insert_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        })
        mongo.close()

2) 运行asyncio

由于tornado的async的特性,基本的一些类都被封装成了async,await异步函数。 但是celery是只支持同步函数,不支持异步函数。这问题一直困扰了我许久。 官方说要到6.0才支持,我真的会谢~~~~~~ image.png

但是运行asyncio函数,本质上还是得先启动一个asyncio loop。 于是我尝试这样

class CallbackTask(Task):
    created_onutc = None

    _event_loop = None

    @property
    def loop(self):
        if self._event_loop is None:
            self._event_loop = asyncio.get_event_loop()
        return self._event_loop

    @property
    def mongo(self):
        global mongo_instance
        if mongo_instance is None:
            mongo_instance = OpsMongo(settings.mongo.ops.uri)
        return mongo_instance

    @property
    def redis(self):
        global redis_instance
        if redis_instance is None:
            redis_sentinels = self.loop.run_until_complete(init_sentinel())
            redis_instance = RedisManager(redis_sentinels)
        return redis_instance
  • loop属性:作为异步编程的一部分,提供一个事件循环访问器。如果当前类实例没有关联的事件循环,则通过调用asyncio.get_event_loop()来获取或创建一个事件循环。

  • mongo属性: 提供对MongoDB实例的访问。如果全局变量mongo_instance未初始化,则会创建一个OpsMongo实例,并使用传入的MongoDB连接URI进行配置。

  • redis属性: 提供对Redis实例的访问。类似于MongoDB,如果全局变量redis_instance未初始化,则会异步初始化Redis Sentinel(哨兵),并基于初始化结果创建一个RedisManager实例来管理Redis连接。

这是使用的方法

@app.task(bind=True, base=CallbackTask)
def add(self, rule):
    task_id = self.request.id
    mongo: OpsMongo = self.mongo
    redis: RedisManager = self.redis
    self.loop.run_until_complete(get_attr(task_id, mongo, redis, rule))


async def get_attr(task_id: str, mongo: OpsMongo, redis: RedisManager, rule):
    att = AttrTask(task_id, mongo)
    await att.attr_monitor(rule, redis)

通过self.loop.run_until_complete运行异步函数即可

任务监控

1) flower

使用flower web 监控任务执行

通过日志可以看到任务执行的结果,而图形界面是对新手非常友好的。flower 就是这样一个可以很好与celery集成的监控任务执行情况的插件

flower 使用 pip 即可安装,简单得很

pip install flower 

然后通过指令

celery --broker=redis://127.0.0.1:6379/2 flower

运行起来之后,页面访问 http://127.0.0.1:5555 即可

image.png

2) grafana

安装好flower后访问localhost:5555/metrics

将Flower的指标添加到Prometheus的配置中,可以通过编辑Prometheus的配置文件prometheus.yml来完成

在初始状态下,prometheus.yml文件的内容如下:

global:
  scrape_interval:     15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: prometheus
    static_configs:
      - targets: ['localhost:9090']

需要将scrape_configs的定义修改为以下内容:

scrape_configs:
  - job_name: prometheus
    static_configs:
      - targets: ['localhost:9090']
  - job_name: flower
    static_configs:
      - targets: ['localhost:5555']

也可以直接在启动Prometheus时通过命令行指定示例prometheus.yml文件的路径(注意,您需要在etc/hosts配置中将flower指向localhost以正确解析DNS):

./prometheus --config.file=prometheus.yml

grafana使用这个官方的就好了,但是我还是觉得不好看 Download Grafana dashboard.

8cd6730433225c92148a5a3e6fb970e.jpg

参考资料