Apache AirFlow 初体验
背景
AirFlow是一个支持DAG的工作流调度系统。本文给一个从零开始的基础使用示例。
初体验
快速安装
sudo docker pull apache/airflow:1.10.10-python3.7
或者直接安装
本文使用了阿里云的源,安装在用户自己的目录下。
# 需要的依赖
sudo yum install python3-devel mariadb-devel
sudo pip3 install mysqlclient
export AIRFLOW_HOME=~/airflow
pip3 install apache-airflow -i https://mirrors.aliyun.com/pypi/simple/ --user
安装结果
Successfully installed apache-airflow-1.10.10 argcomplete-1.11.1 dill-0.3.2 flask-caching-1.3.3 funcsigs-1.0.2 future-0.18.2 gunicorn-19.10.0 numpy-1.19.0 pandas-0.25.3 psutil-5.7.0 pygments-2.6.1 setproctitle-1.1.10 typing-extensions-3.7.4.2
初始化数据库
airflow initdb
如果使用 --user
安装的,airflow在 ~/.local/bin/
下面。
启动服务
airflow webserver -p 8081
使用用浏览器打开 localhost:8081
即可。
确认安装成功
执行示例
# 第一个,能很快执行完的例子。
airflow run example_bash_operator runme_0 2015-01-01
# 第二个,需要执行2分钟的例子。
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
正式安装
和前面快速安装相比,修改配置文件,使用了MySQL数据库,来使多个airflow可以共享一个数据库。
修改配置文件
修改配置文件,如下:
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
mysql数据库的默认配置修改如下:
explicit_defaults_for_timestamp = 1
重新初始化数据库
airflow initdb
启动完整服务
先前台启动,观察是否正常。
airflow webserver -p 8081
airflow scheduler
airflow worker
如果都正常,之后可以从后台启动。
airflow webserver -p 8081 -D
airflow scheduler -D
airflow worker -D
使用 celery调度器
先安装好 celery 调度器,可以参考本站的相关文章 。
然后修改配置文件:
broker_url = redis://127.0.0.1:6379
result_backend = redis://127.0.0.1:6379/0
# result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
使用
DAG定义文件
DAG定义文件,是用来定义DAG对象的执行关系的,不执行具体的业务逻辑。DAG定义文件需要执行的足够快(秒级),会被调度器不停的调度。
airflow的pipeline就是一个DAG定义文件。
常用命令
# 查看dags
airflow list_dags
# 查看指定dags下的任务
airflow list_tasks my_demo_task
# 查看指定dags下的任务,以树形图展示
airflow list_tasks my_demo_task --tree
# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test my_demo_task test1 2020-06-26
# 触发任务(也可以在web界面点trigger按钮)
airflow trigger_dag my_demo_task
# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定
airflow webserver -D
# 守护进程运行调度器
airflow scheduler -D
# 守护进程运行调度器
airflow worker -D
# 暂停任务
airflow pause dag_id
# 取消暂停,等同于在web管理界面打开off按钮
airflow unpause dag_id
# 清空任务状态
airflow clear dag_id
# 运行task
airflow run dag_id task_id execution_date
worker
airflow支持Celery, Dash和Kubernetes执行器。
完整的基本使用示例
这里我们从头创建一个task。
定义demo的DAG文件
编辑文件 ~/airflow/dags/demo.py
,内容如下:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def default_options():
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': dates.days_ago(1), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(seconds=5) # 失败重试间隔
}
return default_args
# 定义DAG
def test1(dag):
t = "pwd"
# operator 支持多种类型, 这里使用 BashOperator
task = BashOperator(
task_id='test1', # task_id
bash_command=t, # 指定要执行的命令
dag=dag # 指定归属的dag
)
return task
def hello_world_1():
current_time = str(datetime.today())
print('hello world at {}'.format(current_time))
def test2(dag):
# PythonOperator
task = PythonOperator(
task_id='test2',
python_callable=hello_world_1, # 指定要执行的函数
dag=dag)
return task
def test3(dag):
t = "date"
task = BashOperator(
task_id='test3',
bash_command=t,
dag=dag)
return task
with DAG(
'test_task', # dag_id
default_args=default_options(), # 指定默认参数
schedule_interval="20 8 * * *" # 执行周期
) as d:
task1 = test1(d)
task2 = test2(d)
task3 = test3(d)
chain(task1, task2, task3) # 指定执行顺序
执行 python3 ~/airflow/dags/demo.py
,如果没有输出,表示没有错误。
触发任务
在页面上,找到对应的DAG,然后点击 Trigger DAG
,可以看到任务被顺利执行完成,结果在每个Task的日志中可以看到。
如果点了 Trigger DAG
后,任务没运行。那可以点中具体的步骤再手动运行。
通过API触发任务:
curl -X POST \
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"key\":\"value\"}"}'
传递参数
对于单次运行的任务,可以传递参数到工作流中,方法如下:
- 修改配置文件
airflow.cfg
,如下:
dag_run_conf_overrides_params = True
- 调用API时传递参数
curl -X POST \
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"name\":\"abeffect\"}"}'
- 对于PythonOperator,要设置
provide_context=True
,然后在kwargs
中的params
可能找到刚刚传入的值。示例如下:
def hello_world_1(ds, **kwargs):
logging.info(ds)
logging.info(kwargs)
logging.info(kwargs.get('params').get('name'))
current_time = str(datetime.today())
print('hello world at {}'.format(current_time))
def test2(dag):
# PythonOperator
task = PythonOperator(
task_id='test2',
provide_context=True,
python_callable=hello_world_1, # 指定要执行的函数
dag=dag)
return task
如果使用了模版的话,可以使用
`{{ dag_run.conf['name'] }}`
来使用。
- BashOperator不支持参数传入,如果要执行Bash的话,可以用PythonOperator来间接实现,如下:
def hello_world_1(ds, **kwargs):
logging.info(ds)
logging.info(kwargs)
logging.info(kwargs.get('params'))
logging.info(kwargs['dag_run'].conf)
# 拼出cmd,通过popen来执行,且拿到结果.
cmd = "echo {0}".format(kwargs.get('params').get('name'))
logging.info("to run cmd: {0}".format(t))
response = os.popen(cmd).read()
logging.info(response)
# 可以自定义返回值
return "i'm from hello world 1"
- 在页面的
Trigger DAG
中,也支持传入JSON,实现上面的同样的效果。
其它设置
时区设置
默认是utf时区,可以自己改成本地时区。修改 airflow.cfg
:
# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
# default_timezone = utc
default_timezone = system
权限管理(未完成)
安装模块:flask-bcrypt
sudo pip3 install flask-bcrypt
修改 airflow.cfg
的配置,
[webserver]
rbac = True
authenticate = True
filter_by_owner = True
注册用户
Tasks间参数传递(未完成)
钉钉发送消息
- 增加一个钉钉机器人,记下token。
- 编辑文件:
from airflow.contrib.operators.dingding_operator import DingdingOperator
def send_dingding_msg(dag, **context):
task = DingdingOperator(
task_id='text_msg_remind_all',
dingding_conn_id='dingding_default',
message_type='text',
message='hello, I'm from dingding. ',
at_mobiles=['170xxxxxx18'],
at_all=False,
dag=dag)
return task
后记
对于300个调度DAG,1000-2000个Task的情况,是使用体验良好的。