python Celery初体验

  |   0 评论   |   0 浏览

背景

Celery是Python开发的分布式任务调度模块。

Celery支持的Broker有:RabbitMQ, Redis, Amazon SQS等

Celery支持的结果存储有:AMQP, Redis, Memcached,SQLAlchemy, Django ORM, Apache Cassandra, Elasticsearch, Riak, MongoDB, CouchDB, Couchbase, ArangoDB, Amazon DynamoDB, Amazon S3, Microsoft Azure Block Blob, Microsoft Azure Cosmos DB, File system

其架构如下:

异步任务Async Task / 定时任务Beat --> 消息中间件Broker <-- 任务执行单元Worker --> 结果存储Backend

初体验

安装

pip3 install Celery flower redis -i https://mirrors.aliyun.com/pypi/simple/

第一个例子

目录结构为:

.
├── celery_config.py
├── client.py
└── task.py

配置文件 celery_config.py为:

BROKER_URL = 'redis://127.0.0.1:6379'

# 结果存储在redis中,默认保存24小时
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_TIMEZONE = 'Asia/Shanghai'

任务类型 task.py为:

from celery_config import *
from celery import Celery

app = Celery("task",broker=BROKER_URL,backend=CELERY_RESULT_BACKEND)

@app.task
def add(x,y):
    return x + y

客户端 client.py为:

from task import add

result = add.delay(5,9)
while not result.ready():
    print(result.get(timeout=1))

运行方式

启动 worker:

$ celery -A task worker --loglevel=info

[tasks]
  . task.add

[2020-06-26 23:08:32,085: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2020-06-26 23:08:32,117: INFO/MainProcess] mingle: searching for neighbors
[2020-06-26 23:08:33,197: INFO/MainProcess] mingle: all alone
[2020-06-26 23:08:33,241: INFO/MainProcess] celery@star ready.
[2020-06-26 23:08:49,298: INFO/MainProcess] Received task: task.add[399848b8-1737-481c-bb44-a315fb3481e1]
[2020-06-26 23:08:49,322: INFO/ForkPoolWorker-2] Task task.add[399848b8-1737-481c-bb44-a315fb3481e1] succeeded in 0.0176366479136s: 14

启动 client:

$ python3 client.py
14

可见,client发出了add的异步请求,到broker,然后由worker执行后,将结果写入了redis,最终由client取回。

同时,也测试了机器A上执行worker,机器B上执行client.py,也可以返回预期的结果。

redis中元数据

结果数据会保存在redis中一份,如下:

127.0.0.1:6379> get celery-task-meta-f6ab121c-acf0-4b37-8c44-2c4eb783eba7
"{\"status\": \"SUCCESS\", \"result\": 14, \"traceback\": null, \"children\": [], \"date_done\": \"2020-06-26T15:20:50.234294\", \"task_id\": \"f6ab121c-acf0-4b37-8c44-2c4eb783eba7\"}"

celery后台运动

启动

$ celery multi start -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Starting nodes...
	> worker@gateway: OK

关闭

$ celery multi stop -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Stopping nodes...
	> worker@gateway: TERM -> 13565

优雅关闭

$ celery multi stopwait -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Stopping nodes...
	> worker@gateway: TERM -> 13592
> Waiting for 1 node -> 13592.....
	> worker@gateway: OK
> worker@gateway: DOWN
> Waiting for 1 node -> None...

flower初体验

使用flower,可以通过查看redis,来监控celery的运行情况。

启动

flower -A task --address=0.0.0.0

访问 http://localhost:5555即可。

可以查看 worker情况,task情况,broker情况和整体的monitor情况。

flower本文不放图了,图可以参考 celery简介/使用/demo测试(python) 一文。

参考