Apache Airflow中队列线程池使用

  |   0 评论   |   0 浏览

背景

airflow + celery,其中 airflow 接收到用户的dag请求后,会将相关数据持久化到数据库(如MySQL)中;然后airflow scheduler会将任务调度到 Celery Broker中。

Celery worker会从Broker中取出来任务,执行,将结果存储在Result Backend中。

问题

在airflow中,会遇到希望将不同种类的任务,交给不同的机器组进行执行的需求。如将长运行时间任务和短运行时间任务分开,将CPU任务和GPU任务分开等需求。

那么如何实现呢?分三步,先建立不同的Pool,再设置任务运行的Pool,最后指定每个worker可以执行的Pool类型。

初体验

建立Pool

在airflow控制台上,admin -> Pools -> Create 中,创建自定义 Pool,如system_pool。

设置任务运行的Pool

在 dag 文件中,Operator的设置中,指定其运行的Pool,如:

# 定义DAG
def task_run_cmd(dag, **kwargs):
    # PythonOperator
    task = PythonOperator(
        task_id='task_run_cmd',
        provide_context=True,
        python_callable=real_run_cmd,  # 指定要执行的函数
        pool = "system_pool",
        dag=dag)
    return task

指定每个worker可以执行的Pool类型

worker的节点标签。启动worker时,只执行spark任务,则:

airflow worker -q spark

参考