基本概念

本教程将引导您了解一些基本的 Airflow 概念、对象以及在编写第一个 DAG 时的用法。

示例管道定义

这是一个基本管道定义的示例。如果这看起来很复杂,请不要担心,下面将逐行进行解释。

airflow/example_dags/tutorial.py[源代码]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

它是一个 DAG 定义文件

需要理解的一件事是(对于每个人来说,起初可能不是很直观),这个 Airflow Python 脚本实际上只是一个配置文件,它以代码的形式指定 DAG 的结构。这里定义的实际任务将在与此脚本的上下文不同的上下文中运行。不同的任务在不同的时间在不同的工作节点上运行,这意味着此脚本不能用于在任务之间进行交叉通信。请注意,为此我们有一个更高级的功能,称为XComs

人们有时会将 DAG 定义文件视为他们可以进行一些实际数据处理的地方 - 事实并非如此!该脚本的目的是定义一个 DAG 对象。它需要快速(几秒钟,而不是几分钟)评估,因为调度程序会定期执行它以反映是否有任何更改。

导入模块

Airflow 管道只是一个恰好定义 Airflow DAG 对象的 Python 脚本。让我们首先导入我们需要的库。

airflow/example_dags/tutorial.py[源代码]

import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅模块管理

默认参数

我们即将创建一个 DAG 和一些任务,并且我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得冗余),或者(更好!)我们可以定义一个默认参数字典,我们在创建任务时可以使用它。

airflow/example_dags/tutorial.py[源代码]

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    "depends_on_past": False,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'on_skipped_callback': another_function, #or list of functions
    # 'trigger_rule': 'all_success'
},

有关 BaseOperator 的参数及其作用的更多信息,请参阅airflow.models.baseoperator.BaseOperator文档。

另外,请注意,您可以轻松定义用于不同目的的不同参数集。一个例子是在生产环境和开发环境之间具有不同的设置。

实例化 DAG

我们需要一个 DAG 对象来将我们的任务嵌套到其中。在这里,我们传递一个字符串,该字符串定义 dag_id,它用作 DAG 的唯一标识符。我们还传递我们刚刚定义的默认参数字典,并为 DAG 定义 1 天的 schedule

airflow/example_dags/tutorial.py[源代码]

with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

操作符

操作符定义 Airflow 完成的工作单元。使用操作符是在 Airflow 中定义工作的经典方法。对于某些用例,最好使用 TaskFlow API 在 Python 上下文中定义工作,如使用 TaskFlow中所述。现在,使用操作符有助于可视化我们的 DAG 代码中的任务依赖关系。

所有操作符都继承自 BaseOperator,其中包括在 Airflow 中运行工作所需的所有参数。从这里开始,每个操作符都包含其完成的工作类型的唯一参数。一些最受欢迎的操作符是 PythonOperator、BashOperator 和 KubernetesPodOperator。

Airflow 根据您传递给操作符的参数完成工作。在本教程中,我们使用 BashOperator 运行一些 bash 脚本。

任务

要在 DAG 中使用操作符,您必须将其实例化为任务。任务确定如何在 DAG 的上下文中执行操作符的工作。

在以下示例中,我们将 BashOperator 实例化为两个单独的任务,以便运行两个单独的 bash 脚本。每个实例化的第一个参数 task_id 作为任务的唯一标识符。

airflow/example_dags/tutorial.py[源代码]

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

请注意,我们如何将操作符特定的参数(bash_command)和所有操作符通用的参数(从 BaseOperator 继承的 retries)传递给操作符的构造函数。这比为每个构造函数调用传递每个参数要简单得多。此外,请注意,在第二个任务中,我们使用 3 覆盖了 retries 参数。

任务的优先级规则如下

  1. 显式传递的参数

  2. default_args 字典中存在的值

  3. 操作符的默认值(如果存在)

注意

任务必须包含或继承参数 task_idowner,否则 Airflow 将引发异常。新安装的 Airflow 将为 owner 设置默认值“airflow”,因此您只需确保 task_id 具有值即可。

使用 Jinja 进行模板化

Airflow 利用了 Jinja 模板 的强大功能,并为管道作者提供了一组内置参数和宏。Airflow 还为管道作者提供了钩子来定义他们自己的参数、宏和模板。

本教程仅触及 Airflow 中可以使用模板完成的操作的皮毛,但本节的目标是让您了解此功能的存在,让您熟悉双大括号,并指出最常见的模板变量: {{ ds }} (今天的“日期戳”)。

airflow/example_dags/tutorial.py[源代码]

templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

请注意,templated_command{% %} 代码块中包含代码逻辑,引用 {{ ds }} 之类的参数,并像在 {{ macros.ds_add(ds, 7)}} 中一样调用函数。

也可以将文件传递给 bash_command 参数,例如 bash_command='templated_command.sh',其中文件位置相对于包含管道文件的目录(在本例中为 tutorial.py)。这可能是可取的,原因有很多,例如分离脚本的逻辑和管道代码,允许在以不同语言组成的文件中进行正确的代码突出显示,以及在构建管道时的总体灵活性。也可以将 template_searchpath 定义为指向 DAG 构造函数调用中的任何文件夹位置。

使用相同的 DAG 构造函数调用,可以定义 user_defined_macros,这允许您指定自己的变量。例如,将 dict(foo='bar') 传递给此参数允许您在模板中使用 {{ foo }}。此外,指定 user_defined_filters 允许您注册自己的过滤器。例如,将 dict(hello=lambda name: 'Hello %s' % name) 传递给此参数允许您在模板中使用 {{ 'world' | hello }}。有关自定义过滤器的更多信息,请参阅 Jinja 文档

有关可以在模板中引用的变量和宏的更多信息,请确保阅读模板参考

添加 DAG 和任务文档

我们可以为 DAG 或每个单独的任务添加文档。到目前为止,DAG 文档仅支持 Markdown,而任务文档支持纯文本、Markdown、reStructuredText、json 和 yaml。DAG 文档可以作为文档字符串编写在 DAG 文件的开头(推荐),也可以编写在文件中的任何其他位置。下面您可以找到有关如何实现任务和 DAG 文档以及屏幕截图的一些示例

airflow/example_dags/tutorial.py[源代码]

t1.doc_md = textwrap.dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this
../_images/task_doc.png ../_images/dag_doc.png

设置依赖关系

我们有任务 t1t2t3,它们之间相互依赖。以下是定义它们之间依赖关系的几种方法。

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

请注意,在执行脚本时,如果 Airflow 在 DAG 中发现循环,或者某个依赖项被多次引用,则会引发异常。

使用时区

创建一个具有时区意识的 DAG 非常简单。只需确保使用 pendulum 提供具有时区意识的日期。请不要尝试使用标准库的 时区,因为它们已知存在限制,并且我们特意禁止在 DAG 中使用它们。

回顾

好的,我们现在有一个非常基本的 DAG。此时,你的代码应该如下所示:

airflow/example_dags/tutorial.py[源代码]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

测试

运行脚本

现在开始运行一些测试。首先,让我们确保管道已成功解析。

假设我们将上一步中的代码保存在 airflow.cfg 中引用的 DAG 文件夹中的 tutorial.py 中。DAG 的默认位置是 ~/airflow/dags

python ~/airflow/dags/tutorial.py

如果脚本没有引发异常,则表示你没有做任何严重错误的事情,并且你的 Airflow 环境是健全的。

命令行元数据验证

让我们运行一些命令以进一步验证此脚本。

# initialize the database tables
airflow db migrate

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree

测试

让我们通过运行特定日期的实际任务实例来进行测试。在此上下文中指定的日期称为逻辑日期(出于历史原因也称为执行日期),它模拟调度程序在特定日期和时间运行你的任务或 DAG,即使它实际上会立即运行(或在满足其依赖关系后立即运行)。

我们说调度程序是特定日期和时间运行你的任务,而不是特定日期和时间运行。这是因为 DAG 的每次运行在概念上代表的不是一个特定的日期和时间,而是两个时间之间的时间间隔,称为数据间隔。DAG 运行的逻辑日期是其数据间隔的开始。

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

现在还记得我们之前使用模板做了什么吗?通过运行此命令,查看此模板是如何呈现和执行的:

# testing templated
airflow tasks test tutorial templated 2015-06-01

这应该会导致显示详细的事件日志,并最终运行你的 bash 命令并打印结果。

请注意,airflow tasks test 命令在本地运行任务实例,将其日志输出到 stdout(屏幕上),不处理依赖关系,也不会将状态(正在运行、成功、失败等)传达给数据库。它只是允许测试单个任务实例。

同样适用于 airflow dags test,但在 DAG 级别。它执行给定 DAG ID 的单个 DAG 运行。虽然它确实考虑了任务依赖关系,但不会在数据库中注册任何状态。它方便在本地测试 DAG 的完整运行,前提是例如如果你的某个任务期望在某个位置有数据,则该数据可用。

回填

一切看起来都运行良好,让我们运行一个回填。backfill 将会遵守你的依赖关系,将日志发送到文件中,并与数据库通信以记录状态。如果你的 Web 服务器已启动,你将能够跟踪进度。如果你有兴趣以可视方式跟踪回填进度,airflow webserver 将启动一个 Web 服务器。

请注意,如果你使用 depends_on_past=True,则各个任务实例将依赖于其先前任务实例的成功(即,根据逻辑日期的先前任务实例)。逻辑日期等于 start_date 的任务实例将忽略此依赖关系,因为不会为它们创建过去的任务实例。

在使用 depends_on_past=True 时,你可能还需要考虑 wait_for_downstream=True。虽然 depends_on_past=True 会导致任务实例依赖于其先前任务实例的成功,但 wait_for_downstream=True 也会导致任务实例等待先前任务实例的直接下游的所有任务实例成功。

此上下文中的日期范围是 start_date 和可选的 end_date,用于使用来自此 DAG 的任务实例填充运行计划。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

下一步是什么?

就这样!你已经编写、测试并回填了你的第一个 Airflow 管道。将你的代码合并到针对其运行调度程序的存储库中,应该会导致每天触发并运行。

以下是一些你可能想做的下一步:

另请参阅

  • 继续本教程的下一步:使用 TaskFlow

  • 跳到核心概念部分,详细了解 Airflow 的概念,例如 DAG、任务、运算符等

此条目是否有帮助?