Apache Airflow中队列线程池使用
背景
airflow + celery,其中 airflow 接收到用户的dag请求后,会将相关数据持久化到数据库(如MySQL)中;然后airflow scheduler会将任务调度到 Celery Broker中。
Celery worker会从Broker中取出来任务,执行,将结果存储在Result Backend中。
问题
在airflow中,会遇到希望将不同种类的任务,交给不同的机器组进行执行的需求。如将长运行时间任务和短运行时间任务分开,将CPU任务和GPU任务分开等需求。
那么如何实现呢?分三步,先建立不同的Pool,再设置任务运行的Pool,最后指定每个worker可以执行的Pool类型。
初体验
建立Pool
在airflow控制台上,admin -> Pools -> Create 中,创建自定义 Pool,如system_pool。
设置任务运行的Pool
在 dag 文件中,Operator的设置中,指定其运行的Pool,如:
# 定义DAG
def task_run_cmd(dag, **kwargs):
# PythonOperator
task = PythonOperator(
task_id='task_run_cmd',
provide_context=True,
python_callable=real_run_cmd, # 指定要执行的函数
pool = "system_pool",
dag=dag)
return task
指定每个worker可以执行的Pool类型
worker的节点标签。启动worker时,只执行spark任务,则:
airflow worker -q spark