使用 airflow 替代你的 crontab

Airflow 是什么

Airflow 是 Airbnb 开发的用于工作流管理的开源项目,自带 UI,现在 Apache 下做孵化,地址是https://github.com/apache/incubator-airflow

airflow

Airflow 解决什么问题

Airflow 主要解决的问题可以参考 Airbnb 官方的博客: airflow-a-workflow-management-platform,简单来说就是管理和调度各种离线定时 Job ,可以替代 crontab。

当 cron job 规模达到数百上千时,其对人的要求将会非常高的,如果你的团队经历过这样的事情,应该能体会其中痛苦,所以使用类似 airflow 这样的工具代替 cron 来做定时任务将会极大提高工作效率。

开始使用 airflow 之前需要知道和准备的

Airflow 在 pip 上已经更名为 apache-airflow,下载最新版请使用后者 pip install apache-airflow

Airflow 1.8 版本依赖的是 MySQL 5.6 以上,5.7 以下报 1071, u'Specified key was too long; max key length is 767 bytes,如果你使用 MySQL 作为你的 airflow backend 请升级你的 MySQL 到最新版。

MySQL 5.6 升级到 5.7 在使用 airflow 时会报 1146, u"Table 'performance_schema.session_variables' doesn't exist",执行 mysql_upgrade -u root -p --force 解决。

Airflow 的 mysql driver 使用的是 mysqlclient mysql://root:@127.0.0.1/sqlalchemy_lab?charset=utf8,如果使用其他 driver 将报 syntax error。

基础概念

Airflow 中最基本的两个概念是:DAG 和 task。DAG 的全称是 Directed Acyclic Graph 是所有你想执行的任务的集合,在这个集合中你定义了他们的依赖关系,一个 DAG 是指一个 DAG object,一个 DAG object 可以在 Python 脚本中配置完成。

比如一个简单的的 DAG 包含三个 task:A、B、C,A 执行成功之后 B 才能执行,C 不依赖 A 和 B 即可执行。在这个简单的 DAG 中 A B C 可以是任何你想要执行的任务。

DAG 的定义使用 Python 完成的,其实就是一个 Python 文件,存放在 DAG 目录,Airflow 会动态的从这个目录构建 DAG object,每个 DAG object 代表了一个 workflow,每个 workflow 都可以包含任意个 task。

安装和使用

Airflow 是基于 Python 构建的,可以很容易用 pip 安装使用,pip install apache-airflow,默认情况下 airflow 会在 ~/airflow 目录存放相关配置。

Airflow 提供了一些列命令来完成 airflow 的初始化工作来和它的正确使用。

1
2
3
4
5
6
# 在 airflow 目录初始化数据库和 airflow 配置
airflow initdb
# 启动 airflow web
airflow webserver
# 开始调度
airflow scheduler

更详细的信息请参考文档 http://airflow.incubator.apache.org/

第一个 DAG

DAG 的配置用 Python 完成像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)

t2.set_upstream(t1) # t2 依赖 t1
t3.set_upstream(t1)

DAG 脚本的目的只是定义 DAG 的配置,并不包含任何的数据处理,在这里 operator 就是 task。

DAG 的实例化

一个 DAG 脚本是由 DAG object 的实例化和对应的 operator 组成的,除此之外我们还可以定义默认的参数提供给每个任务。

DAG 对象实例化可以根据我们的需要提供对应的初始化参数,实例化 DAG 对象需要提供唯一的 dag_id:

1
2
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))

Task 的实例化

1
2
3
4
5
6
7
8
9
10
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

task 对象的定义的就是 operator 的实例化,operator 有 task_id,用来区分任务,可以按照需要定制 bash_command,也可以传递参数等。

Task 的依赖

Task 之间是能相互建立依赖的,形如:

1
2
3
4
5
6
7
8
9
10
11
12
t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')

Airflow 会自动检测环形依赖以防止 task 无法工作的情况出现,更复杂的情况请参考文档。

执行和测试

和 airflow.cfg 同级目录下建立 dag 目录,用来存放第一个 DAG 脚本,然后执行 python tutorial.py ,如果没有报错说明 tutorial 建立成功了。

Airflow 的命令行

Airflow 提供了一些列的命令行用来查看 DAG 和 task

1
2
3
4
5
6
7
8
# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

测试任务的执行

执行任务很简单,指定 DAG 并去指定 task 和执行的日期

1
2
3
4
5
6
7
# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

test 命令会执行任务并且输出到控制台,不会把任务的执行状态进行持久化

执行任务和并记录状态

执行任务在 Airflow 中称之为 backfill,以 backfill 执行会真正开始追踪任务的执行状态和依赖,并且会记录日志

1
2
3
4
5
# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

更多关于 DAG 和 operator

DAG 的 scope

Airflow 会默认加载任意它能导入到饿 DAG object,这就意味着只要是全局的 DAG object 都可以被导入,但是有时候为了让 DAG 不被导入,比如 SubDagOperator 就可以使用 local 的作用域。

1
2
3
4
5
6
dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
dag_2 = DAG('but_this_dag_will_not')

my_function()

DAG 可以指定默认的参数

DAG 的默认参数会应用到所有的 operator 中。

1
2
3
4
5
6
7
default_args=dict(
start_date=datetime(2016, 1, 1),
owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

扩展性极强的 operator

Airflow operator 很容易扩展,这也是 airflow 几乎支持任何形式 task 重要原因。虽然 Airflow 支持不同的 task 可以传输数据,但是如果你的两个 task 之间确实需要共享数据,最好的办法是把他们写在一起。

常见问题

  1. 配置 Airflow 发送邮件

DAG 执行失败时会可以发送邮件,需要配置好 smtp,在 airflow 配置中默认会开启 starttls,但是最新版下面会报错,如果不在乎 ssl,改成 false 即可

  1. Airflow 支持配置 url 前缀

最新版已经支持配置 url 前缀,但是还有一些问题没有解决,比如很多页面的 url 都是硬编码的,如果启用配置 url 前缀,导致一些页面的 url 还是使用原来的相对路径会报 404 错误,这个可以修改页面调用的 url 路径即可

参考资料

三月沙 wechat
扫描关注 wecatch 的公众号