Airflow 101:构建你的第一个工作流¶
欢迎来到 Apache Airflow 的世界!在本教程中,我们将引导您了解 Airflow 的基本概念,帮助您理解如何编写您的第一个 DAG。无论您是否熟悉 Python,或者刚刚入门,我们都会让这段旅程变得愉快而简单。
什么是 DAG?¶
其核心是,DAG 是按反映任务关系和依赖关系的方式组织的任务集合。它就像您工作流的路线图,展示了每个任务如何相互连接。如果这听起来有点复杂,请不要担心;我们将逐步分解它。
流水线定义示例¶
让我们从一个简单的流水线定义示例开始。尽管一开始可能看起来令人不知所措,但我们将详细解释每一行。
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
理解 DAG 定义文件¶
将 Airflow Python 脚本视为一个配置文件,它用代码定义了 DAG 的结构。您在此处定义的实际任务在不同的环境中运行,这意味着此脚本并非用于数据处理。它的主要作用是定义 DAG 对象,并且需要快速评估,因为 DAG 文件处理器会定期检查它是否有任何更改。
导入模块¶
要开始使用,我们需要导入必要的库。这是任何 Python 脚本中典型的第一步。
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
有关 Python 和 Airflow 如何处理模块的更多详细信息,请查看模块管理。
设置默认参数¶
创建 DAG 及其任务时,您可以直接将参数传递给每个任务,也可以在字典中定义一组默认参数。后一种方法通常更高效、更简洁。
src/airflow/example_dags/tutorial.py
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
如果您想深入了解 BaseOperator 的参数,请查看 airflow.sdk.BaseOperator
文档。
创建 DAG¶
接下来,我们需要创建一个 DAG 对象来容纳我们的任务。我们将为 DAG 提供一个唯一的标识符,称为 dag_id
,并指定我们刚刚定义的默认参数。我们还将为我们的 DAG 设置每天运行的调度。
src/airflow/example_dags/tutorial.py
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
理解 Operator¶
Operator 在 Airflow 中代表一个工作单元。它们是工作流的构建块,允许您定义将执行哪些任务。虽然我们可以将 Operator 用于许多任务,但 Airflow 也提供了Taskflow API,这是一种更 Python 式的方式来定义工作流,我们稍后将对此进行介绍。
所有 Operator 都派生自 BaseOperator
,它包含在 Airflow 中运行任务所需的基本参数。一些流行的 Operator 包括 PythonOperator
、BashOperator
和 KubernetesPodOperator
。在本教程中,我们将重点介绍 BashOperator
来执行一些简单的 bash 命令。
定义任务¶
要使用 Operator,必须将其实例化为一个任务。任务规定了 Operator 将如何在 DAG 的上下文中执行其工作。在下面的示例中,我们两次实例化了 BashOperator 来运行两个不同的 bash 脚本。task_id
作为每个任务的唯一标识符。
src/airflow/example_dags/tutorial.py
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
请注意我们如何将特定于 Operator 的参数(如 bash_command
)与从 BaseOperator
继承的通用参数(如 retries
)混合使用。这种方法简化了我们的代码。在第二个任务中,我们甚至覆盖了 retries
参数,将其设置为 3
。
任务参数的优先级如下
显式传递的参数
default_args
字典中的值Operator 的默认值(如果可用)
注意
请记住,每个任务都必须包含或继承参数 task_id
和 owner
。否则,Airflow 将抛出错误。幸运的是,全新的 Airflow 安装将 owner
默认设置为 airflow
,因此您主要需要确保设置了 task_id
。
使用 Jinja 进行模板化¶
Airflow 利用了 Jinja 模板化 的强大功能,让您可以访问内置参数和宏来增强您的工作流。本节将向您介绍 Airflow 中模板化的基础知识,重点关注常用的模板变量:{{ ds }}
,它代表今天的日期戳。
src/airflow/example_dags/tutorial.py
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
您会注意到 templated_command
包含 {% %}
块中的逻辑,并引用 {{ ds }}
等参数。您还可以将文件传递给 bash_command
,例如 bash_command='templated_command.sh'
,以便更好地组织代码。您甚至可以定义 user_defined_macros
和 user_defined_filters
来创建自己的变量和过滤器用于模板。有关自定义过滤器的更多信息,请参阅 Jinja 文档。
有关可在模板中引用的变量和宏的更多信息,请阅读模板参考。
添加 DAG 和任务文档¶
您可以为您的 DAG 或单个任务添加文档。虽然 DAG 文档目前支持 markdown,但任务文档可以是纯文本、markdown、reStructuredText、JSON 或 YAML。一个好的做法是在 DAG 文件的开头包含文档。
src/airflow/example_dags/tutorial.py
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this


设置依赖关系¶
在 Airflow 中,任务可以相互依赖。例如,如果您有任务 t1
、t2
和 t3
,您可以通过几种方式定义它们的依赖关系
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
请注意,如果 Airflow 检测到 DAG 中的循环或某个依赖关系被多次引用,它将抛出错误。
处理时区¶
创建感知时区的 DAG 很简单。只需确保使用 pendulum
处理感知时区日期即可。避免使用标准库中的 timezone
,因为它们存在已知限制。
回顾¶
恭喜!现在,您应该对如何在 Airflow 中创建 DAG、定义任务及其依赖关系以及使用模板有了基本的了解。您的代码应该类似于以下内容
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
测试你的流水线¶
现在是时候测试您的流水线了!首先,确保您的脚本能成功解析。如果您将代码保存在 airflow.cfg
中指定的 dags
文件夹下的 tutorial.py
中,您可以运行
python ~/airflow/dags/tutorial.py
如果脚本运行无误,恭喜!您的 DAG 已正确设置。
命令行元数据验证¶
让我们通过运行一些命令进一步验证您的脚本
# initialize the database tables
airflow db migrate
# print the list of active dags
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the graphviz representation of "tutorial" DAG
airflow dags show tutorial
测试任务实例和 DAG 运行¶
您可以针对指定的 逻辑日期 测试特定的任务实例。这模拟了调度程序在特定日期和时间运行您的任务。
注意
请注意,调度程序是 针对 特定的日期和时间运行您的任务,而不是一定 在 该日期或时间运行。逻辑日期 是 DAG 运行的命名所依据的时间戳,它通常对应于您的工作流操作的时间段的 结束 时间 — 或 DAG 运行手动触发的时间。
Airflow 使用此逻辑日期来组织和跟踪每次运行;这是您在 UI、日志和代码中引用特定执行的方式。通过 UI 或 API 触发 DAG 时,您可以提供自己的逻辑日期,以便在特定时间点运行工作流。
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
您还可以通过运行以下命令查看您的模板是如何渲染的
# testing templated
airflow tasks test tutorial templated 2015-06-01
此命令将提供详细日志并执行您的 bash 命令。
请记住,airflow tasks test
命令在本地运行任务实例,将其日志输出到 stdout,并且不在数据库中跟踪状态。这是测试单个任务实例的便捷方法。
类似地,airflow dags test
运行单个 DAG 运行,而不在数据库中注册任何状态,这对于在本地测试整个 DAG 非常有用。
下一步是什么?¶
本教程到此结束!您已成功编写并测试了您的第一个 Airflow 流水线。在您继续您的旅程时,可以考虑将您的代码合并到一个配置了 Scheduler 的仓库中,这将允许您的 DAG 每天被触发和执行。
以下是关于您下一步的一些建议
另请参阅
继续本教程的下一步:使用 TaskFlow API 的 Python 式 DAG
探索核心概念部分,以获取有关 Airflow 概念(例如 DAG、任务、Operator 等)的详细解释。