python Celery初体验
背景
Celery是Python开发的分布式任务调度模块。
Celery支持的Broker有:RabbitMQ, Redis, Amazon SQS等
Celery支持的结果存储有:AMQP, Redis, Memcached,SQLAlchemy, Django ORM, Apache Cassandra, Elasticsearch, Riak, MongoDB, CouchDB, Couchbase, ArangoDB, Amazon DynamoDB, Amazon S3, Microsoft Azure Block Blob, Microsoft Azure Cosmos DB, File system
其架构如下:
异步任务Async Task / 定时任务Beat --> 消息中间件Broker <-- 任务执行单元Worker --> 结果存储Backend
初体验
安装
pip3 install Celery flower redis -i https://mirrors.aliyun.com/pypi/simple/
第一个例子
目录结构为:
.
├── celery_config.py
├── client.py
└── task.py
配置文件 celery_config.py
为:
BROKER_URL = 'redis://127.0.0.1:6379'
# 结果存储在redis中,默认保存24小时
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_TIMEZONE = 'Asia/Shanghai'
任务类型 task.py
为:
from celery_config import *
from celery import Celery
app = Celery("task",broker=BROKER_URL,backend=CELERY_RESULT_BACKEND)
@app.task
def add(x,y):
return x + y
客户端 client.py
为:
from task import add
result = add.delay(5,9)
while not result.ready():
print(result.get(timeout=1))
运行方式
启动 worker:
$ celery -A task worker --loglevel=info
[tasks]
. task.add
[2020-06-26 23:08:32,085: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2020-06-26 23:08:32,117: INFO/MainProcess] mingle: searching for neighbors
[2020-06-26 23:08:33,197: INFO/MainProcess] mingle: all alone
[2020-06-26 23:08:33,241: INFO/MainProcess] celery@star ready.
[2020-06-26 23:08:49,298: INFO/MainProcess] Received task: task.add[399848b8-1737-481c-bb44-a315fb3481e1]
[2020-06-26 23:08:49,322: INFO/ForkPoolWorker-2] Task task.add[399848b8-1737-481c-bb44-a315fb3481e1] succeeded in 0.0176366479136s: 14
启动 client:
$ python3 client.py
14
可见,client发出了add的异步请求,到broker,然后由worker执行后,将结果写入了redis,最终由client取回。
同时,也测试了机器A上执行worker,机器B上执行client.py,也可以返回预期的结果。
redis中元数据
结果数据会保存在redis中一份,如下:
127.0.0.1:6379> get celery-task-meta-f6ab121c-acf0-4b37-8c44-2c4eb783eba7
"{\"status\": \"SUCCESS\", \"result\": 14, \"traceback\": null, \"children\": [], \"date_done\": \"2020-06-26T15:20:50.234294\", \"task_id\": \"f6ab121c-acf0-4b37-8c44-2c4eb783eba7\"}"
celery后台运动
启动
$ celery multi start -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Starting nodes...
> worker@gateway: OK
关闭
$ celery multi stop -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Stopping nodes...
> worker@gateway: TERM -> 13565
优雅关闭
$ celery multi stopwait -A task worker --loglevel=info
celery multi v4.4.6 (cliffs)
> Stopping nodes...
> worker@gateway: TERM -> 13592
> Waiting for 1 node -> 13592.....
> worker@gateway: OK
> worker@gateway: DOWN
> Waiting for 1 node -> None...
flower初体验
使用flower,可以通过查看redis,来监控celery的运行情况。
启动
flower -A task --address=0.0.0.0
访问 http://localhost:5555
即可。
可以查看 worker情况,task情况,broker情况和整体的monitor情况。
flower本文不放图了,图可以参考 celery简介/使用/demo测试(python) 一文。