Apache AirFlow 初体验

  |   0 评论   |   0 浏览

背景

AirFlow是一个支持DAG的工作流调度系统。本文给一个从零开始的基础使用示例。

初体验

快速安装

sudo docker pull apache/airflow:1.10.10-python3.7

或者直接安装

本文使用了阿里云的源,安装在用户自己的目录下。

# 需要的依赖
sudo yum install python3-devel mariadb-devel
sudo pip3 install mysqlclient

export AIRFLOW_HOME=~/airflow
pip3 install apache-airflow -i https://mirrors.aliyun.com/pypi/simple/ --user

安装结果

Successfully installed apache-airflow-1.10.10 argcomplete-1.11.1 dill-0.3.2 flask-caching-1.3.3 funcsigs-1.0.2 future-0.18.2 gunicorn-19.10.0 numpy-1.19.0 pandas-0.25.3 psutil-5.7.0 pygments-2.6.1 setproctitle-1.1.10 typing-extensions-3.7.4.2

初始化数据库

airflow initdb

如果使用 --user安装的,airflow在 ~/.local/bin/下面。

启动服务

airflow webserver -p 8081

使用用浏览器打开 localhost:8081即可。

确认安装成功

执行示例

# 第一个,能很快执行完的例子。
airflow run example_bash_operator runme_0 2015-01-01

# 第二个,需要执行2分钟的例子。
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02

正式安装

和前面快速安装相比,修改配置文件,使用了MySQL数据库,来使多个airflow可以共享一个数据库。

修改配置文件

修改配置文件,如下:

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

mysql数据库的默认配置修改如下:

explicit_defaults_for_timestamp = 1

重新初始化数据库

airflow initdb

启动完整服务

先前台启动,观察是否正常。

airflow webserver -p 8081
airflow scheduler
airflow worker

如果都正常,之后可以从后台启动。

airflow webserver -p 8081 -D
airflow scheduler -D
airflow worker -D

使用 celery调度器

先安装好 celery 调度器,可以参考本站的相关文章 。

然后修改配置文件:

broker_url = redis://127.0.0.1:6379
result_backend = redis://127.0.0.1:6379/0
# result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

使用

DAG定义文件

DAG定义文件,是用来定义DAG对象的执行关系的,不执行具体的业务逻辑。DAG定义文件需要执行的足够快(秒级),会被调度器不停的调度。

airflow的pipeline就是一个DAG定义文件。

常用命令

# 查看dags
airflow list_dags

# 查看指定dags下的任务
airflow list_tasks my_demo_task

# 查看指定dags下的任务,以树形图展示
airflow list_tasks my_demo_task --tree

# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test my_demo_task test1 2020-06-26

# 触发任务(也可以在web界面点trigger按钮)
airflow trigger_dag my_demo_task

# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定
airflow webserver -D  

# 守护进程运行调度器   
airflow scheduler -D   

# 守护进程运行调度器  
airflow worker -D  

# 暂停任务
airflow pause dag_id    

# 取消暂停,等同于在web管理界面打开off按钮
airflow unpause dag_id   

# 清空任务状态
airflow clear dag_id   

# 运行task
airflow run dag_id task_id execution_date

worker

airflow支持Celery, Dash和Kubernetes执行器。

完整的基本使用示例

这里我们从头创建一个task。

定义demo的DAG文件

编辑文件 ~/airflow/dags/demo.py,内容如下:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


def default_options():
    default_args = {
        'owner': 'airflow',  # 拥有者名称
        'start_date': dates.days_ago(1),  # 第一次开始执行的时间,为 UTC 时间
        'retries': 1,  # 失败重试次数
        'retry_delay': timedelta(seconds=5)  # 失败重试间隔
    }
    return default_args


# 定义DAG
def test1(dag):
    t = "pwd"
    # operator 支持多种类型, 这里使用 BashOperator
    task = BashOperator(
        task_id='test1',  # task_id
        bash_command=t,  # 指定要执行的命令
        dag=dag  # 指定归属的dag
    )
    return task


def hello_world_1():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))


def test2(dag):
    # PythonOperator
    task = PythonOperator(
        task_id='test2',
        python_callable=hello_world_1,  # 指定要执行的函数
        dag=dag)
    return task


def test3(dag):
    t = "date"
    task = BashOperator(
        task_id='test3',
        bash_command=t,
        dag=dag)
    return task


with DAG(
        'test_task',  # dag_id
        default_args=default_options(),  # 指定默认参数
        schedule_interval="20 8 * * *"  # 执行周期
) as d:
    task1 = test1(d)
    task2 = test2(d)
    task3 = test3(d)
    chain(task1, task2, task3)  # 指定执行顺序

执行 python3 ~/airflow/dags/demo.py,如果没有输出,表示没有错误。

触发任务

在页面上,找到对应的DAG,然后点击 Trigger DAG,可以看到任务被顺利执行完成,结果在每个Task的日志中可以看到。

如果点了 Trigger DAG后,任务没运行。那可以点中具体的步骤再手动运行。

通过API触发任务:

curl -X POST \
  http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -d '{"conf":"{\"key\":\"value\"}"}'

传递参数

对于单次运行的任务,可以传递参数到工作流中,方法如下:

  1. 修改配置文件 airflow.cfg,如下:
dag_run_conf_overrides_params = True
  1. 调用API时传递参数
curl -X POST \
  http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -d '{"conf":"{\"name\":\"abeffect\"}"}'
  1. 对于PythonOperator,要设置 provide_context=True,然后在 kwargs中的 params可能找到刚刚传入的值。示例如下:
def hello_world_1(ds, **kwargs):
    logging.info(ds)
    logging.info(kwargs)
    logging.info(kwargs.get('params').get('name'))
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))


def test2(dag):
    # PythonOperator
    task = PythonOperator(
        task_id='test2',
        provide_context=True,
        python_callable=hello_world_1,  # 指定要执行的函数
        dag=dag)
    return task

如果使用了模版的话,可以使用

`{{ dag_run.conf['name'] }}`

来使用。

  1. BashOperator不支持参数传入,如果要执行Bash的话,可以用PythonOperator来间接实现,如下:
def hello_world_1(ds, **kwargs):
    logging.info(ds)
    logging.info(kwargs)
    logging.info(kwargs.get('params'))
    logging.info(kwargs['dag_run'].conf)

    # 拼出cmd,通过popen来执行,且拿到结果. 
    cmd = "echo {0}".format(kwargs.get('params').get('name'))
    logging.info("to run cmd: {0}".format(t))
    response = os.popen(cmd).read()
    logging.info(response)

    # 可以自定义返回值
    return "i'm from hello world 1"
  1. 在页面的 Trigger DAG中,也支持传入JSON,实现上面的同样的效果。

其它设置

时区设置

默认是utf时区,可以自己改成本地时区。修改 airflow.cfg

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
# default_timezone = utc
default_timezone = system

权限管理(未完成)

安装模块:flask-bcrypt

sudo pip3 install flask-bcrypt

修改 airflow.cfg的配置,

[webserver]
rbac = True
authenticate = True
filter_by_owner = True

注册用户

Tasks间参数传递(未完成)

钉钉发送消息

  1. 增加一个钉钉机器人,记下token。
  2. 编辑文件:
from airflow.contrib.operators.dingding_operator import DingdingOperator

def send_dingding_msg(dag, **context):
    task = DingdingOperator(
        task_id='text_msg_remind_all',
        dingding_conn_id='dingding_default',
        message_type='text',
        message='hello, I'm from dingding. ',
        at_mobiles=['170xxxxxx18'],
        at_all=False,
        dag=dag)
    return task

后记

对于300个调度DAG,1000-2000个Task的情况,是使用体验良好的。

参考