Airflow 101:构建您的第一个工作流
欢迎来到 Apache Airflow 的世界!在本教程中,我们将引导您掌握 Airflow 的核心概念,帮助您编写第一个 DAG。无论您是熟悉 Python,还是刚刚入门,我们都会让学习过程变得轻松有趣。
什么是 DAG?
从本质上讲,DAG 是一组任务的集合,按照它们之间的关系和依赖进行组织。它就像工作流的路线图,展示每个任务如何相互关联。别担心,这听起来可能有点复杂;我们会一步一步为您拆解。
示例管道定义
让我们从一个简单的管道定义示例开始。虽然乍一看可能显得有点庞大,但我们会逐行解释清楚。
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
了解 DAG 定义文件
可以将 Airflow 的 Python 脚本视为一种配置文件,用代码来描述 DAG 的结构。您在这里定义的实际任务会在另一个环境中运行,这意味着该脚本并不是用来进行数据处理的。它的主要职责是定义 DAG 对象,并且需要快速求值,因为 DAG 文件处理器会定期检查它是否有变动。
导入模块
要开始编写代码,首先需要导入所需的库。这是所有 Python 脚本的常规第一步。
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
想了解 Python 与 Airflow 如何处理模块的更多细节,请查阅 模块管理。
设置默认参数
在创建 DAG 与任务时,您可以直接为每个任务传入参数,亦或在一个字典中定义一组默认参数。后一种方式通常更高效且更简洁。
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
如果想深入了解 BaseOperator 的参数,请参考 airflow.sdk.BaseOperator 的文档。
创建 DAG
接下来,需要创建一个 DAG 对象来容纳我们的任务。我们会为 DAG 提供唯一标识 dag_id,以及前面定义的默认参数,并为其设置一个每天执行一次的调度。
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
了解 Operator
Operator 表示 Airflow 中的一个工作单元。它们是工作流的构建块,用来定义要执行的任务。虽然可以使用 Operator 完成大多数工作,Airflow 还提供了 TaskFlow API,一种更 Pythonic 的定义工作流的方式,我们稍后会简要说明。
所有 Operator 都继承自 BaseOperator,该基类包含运行任务所需的基本参数。常用的 Operator 包括 PythonOperator、BashOperator 与 KubernetesPodOperator。本教程中我们将重点使用 BashOperator 来执行简单的 bash 命令。
定义任务
要使用 Operator,需要将其实例化为任务。任务决定了 Operator 在 DAG 中的执行方式。下面的示例中,我们实例化了两次 BashOperator,分别运行两个不同的 bash 脚本。task_id 用作每个任务的唯一标识。
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
请注意我们将 Operator 特有的参数(如 bash_command)与从 BaseOperator 继承的通用参数(如 retries)混合使用,这样可以让代码更加简洁。在第二个任务中,我们甚至覆盖了 retries 参数,将其设为 3。
任务参数的优先级如下:
显式传入的参数
来自
default_args字典的值Operator 本身的默认值(若存在)
注意
请记住,每个任务必须包含或继承 task_id 与 owner 参数。否则 Airflow 会抛出错误。幸运的是,新的 Airflow 安装会默认将 owner 设为 airflow,因此您只需确保设置了 task_id 即可。
使用 Jinja 进行模板化
Airflow 使用 Jinja 模板引擎,为您提供内置参数和宏,以增强工作流的可配置性。本节将介绍 Airflow 中模板化的基础知识,重点讲解最常用的模板变量 {{ ds }}(即执行日期的字符串表示)。
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
您会看到 templated_command 中使用了 {% %} 块的逻辑,并引用了像 {{ ds }} 这样的参数。您也可以将文件传递给 bash_command,例如 bash_command='templated_command.sh',以便更好地组织代码。甚至可以定义 user_defined_macros 与 user_defined_filters,在模板中使用自定义变量和过滤器。有关自定义过滤器的更多信息,请参考 Jinja 文档。
若想了解模板中可引用的变量与宏,请阅读 模板参考。
为 DAG 与任务添加文档
您可以为 DAG 或单个任务添加文档。目前 DAG 文档支持 Markdown,任务文档则可以是纯文本、Markdown、reStructuredText、JSON 或 YAML。建议在 DAG 文件的开头加入文档说明。
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
设置依赖关系
在 Airflow 中,任务之间可以相互依赖。例如,若您有任务 t1、t2 与 t3,可以通过多种方式定义它们的依赖关系。
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
请注意,如果 DAG 中出现循环依赖或同一依赖被多次引用,Airflow 将抛出错误。
处理时区
创建支持时区的 DAG 非常简单,只需使用 pendulum 提供的时区感知日期即可。请避免使用标准库的 timezone,因为它已知存在局限。
回顾
恭喜!现在您应该已经掌握了如何创建 DAG、定义任务及其依赖关系,并在 Airflow 中使用模板。您的代码应类似如下:
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
测试您的管道
现在是测试管道的时候了!首先确保脚本能够成功解析。若您已将代码保存为 tutorial.py,且放置在 airflow.cfg 中指定的 DAG 文件夹下,可执行以下命令:
python ~/airflow/dags/tutorial.py
如果脚本运行无误,恭喜您!DAG 已正确设置。
命令行元数据验证
让我们通过运行几个命令进一步验证脚本。
# initialize the database tables
airflow db migrate
# print the list of active Dags
airflow dags list
# prints the list of tasks in the "tutorial" Dag
airflow tasks list tutorial
# prints the graphviz representation of "tutorial" Dag
airflow dags show tutorial
测试任务实例和 DAG 运行
您可以为指定的 逻辑日期 测试特定的任务实例。这相当于模拟调度器在该日期和时间运行任务。
注意
请注意,调度器是为某一特定日期和时间**运行**任务,而不是在该日期或时间**执行**任务。逻辑日期 是 DAG 运行的 **名称**,通常对应工作流所处理时间段的**结束时间**——或手动触发 DAG 时的触发时间。
Airflow 使用这个逻辑日期来组织和追踪每一次运行;在 UI、日志以及代码中,您都通过该日期来引用具体的执行。当通过 UI 或 API 触发 DAG 时,可以自行提供逻辑日期,以便工作流在特定时间点进行回放。
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
您还可以通过以下命令查看模板渲染后的结果:
# testing templated
airflow tasks test tutorial templated 2015-06-01
该命令会输出详细日志并执行您的 bash 命令。
请记住,airflow tasks test 命令在本地运行任务实例,只会把日志输出到标准输出,并不会在数据库中记录状态。这是一种快速测试单个任务实例的便捷方式。
同理,airflow dags test 会在本地运行一次 DAG,且同样不会在数据库中注册任何状态,非常适合对整个 DAG 进行本地测试。
下一步?
到此为止!您已经成功编写并测试了第一个 Airflow 管道。接下来,建议将代码提交到代码库,并让 Scheduler 对其进行调度,这样您的 DAG 将能够每日自动触发执行。
以下是一些后续的建议:
另请参阅
继续教程的下一步:使用 TaskFlow API 编写 Pythonic DAG
浏览 核心概念 页面,深入了解 DAG、任务、算子等 Airflow 基础知识。