DAG 运行

DAG 运行是表示 DAG 在时间上的实例化的对象。 每次执行 DAG 时,都会创建一个 DAG 运行,并且执行其中的所有任务。DAG 运行的状态取决于任务状态。每个 DAG 运行都是彼此独立运行的,这意味着您可以同时运行多个 DAG 运行。

DAG 运行状态

DAG 运行状态在 DAG 执行完成时确定。DAG 的执行取决于其包含的任务及其依赖关系。当所有任务都处于终端状态之一(即,如果没有可能转换为另一种状态)时,例如 successfailedskipped 时,状态将分配给 DAG 运行。DAG 运行的状态是根据所谓的“叶节点”或简称为“叶子”来分配的。叶节点是没有子节点的任务。

DAG 运行有两种可能的终端状态

  • 如果所有叶节点状态均为 successskipped,则为 success

  • 如果任何叶节点状态为 failedupstream_failed,则为 failed

注意

请注意,如果您的某些任务定义了一些特定的触发规则。这些可能会导致一些意外行为,例如,如果您有一个触发规则为 “all_done” 的叶子任务,则无论其余任务的状态如何都将执行该任务,如果它成功,则整个 DAG 运行也将被标记为 success,即使中间的某些任务失败了。

在 Airflow 2.7 中添加

在 UI 仪表板上的“正在运行”选项卡中可以显示当前正在运行 DAG 运行的 DAG。同样,可以在“失败”选项卡上找到其最新 DAG 运行标记为失败的 DAG。

数据间隔

Airflow 中的每个 DAG 运行都分配有一个“数据间隔”,表示它运行的时间范围。例如,对于使用 @daily 调度的 DAG,其每个数据间隔都将在每天午夜(00:00)开始,并在午夜(24:00)结束。

通常在关联的数据间隔结束后调度 DAG 运行,以确保该运行能够收集该时间段内的所有数据。换句话说,覆盖 2020-01-01 数据周期的运行通常不会在 2020-01-01 结束后(即在 2020-01-02 00:00:00 之后)才开始运行。

Airflow 中的所有日期都以某种方式与数据间隔概念相关联。例如,DAG 运行的“逻辑日期”(在 2.2 之前的 Airflow 版本中也称为 execution_date)表示数据间隔的开始,而不是 DAG 实际执行的时间。

同样,由于 DAG 及其任务的 start_date 参数指向同一逻辑日期,因此它标记的是DAG 的第一个数据间隔的开始,而不是 DAG 中的任务将开始运行的时间。换句话说,DAG 运行仅在 start_date 之后的一个间隔被调度。

提示

如果 cron 表达式或 timedelta 对象不足以表达 DAG 的计划、逻辑日期或数据间隔,请参阅 时间表。有关 logical date 的更多信息,请参阅 运行 DAGexecution_date 是什么意思?

重新运行 DAG

在某些情况下,您可能需要再次执行 DAG。其中一种情况是,当计划的 DAG 运行失败时。

追赶

使用 start_date、可能还有 end_date 和非数据集计划定义的 Airflow DAG 定义了一系列间隔,调度程序将其转换为单独的 DAG 运行并执行。默认情况下,调度程序将启动自上次数据间隔以来(或已清除)尚未运行的任何数据间隔的 DAG 运行。此概念称为追赶。

如果您的 DAG 未编写为处理其追赶(即,不限于间隔,而是例如 Now),则您将需要关闭追赶。这可以通过在 DAG 中设置 catchup=False 或在配置文件中设置 catchup_by_default=False 来完成。关闭后,调度程序仅为最新间隔创建 DAG 运行。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
    catchup=False,
)

在上面的示例中,如果调度程序守护程序在 2016-01-02 上午 6 点(或从命令行)拾取了 DAG,则将创建一个数据介于 2016-01-01 和 2016-01-02 之间的 DAG 运行,下一个 DAG 运行将在 2016-01-03 的早晨午夜之后创建,数据间隔在 2016-01-02 和 2016-01-03 之间。

请注意,使用 datetime.timedelta 对象作为计划可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将覆盖 2016-01-01 06:00 到 2016-01-02 06:00 之间的数据(一个计划间隔在现在结束)。有关基于 cron 和 delta 的计划之间差异的更详细描述,请查看 时间表比较

如果 dag.catchup 值改为 True,则调度程序将为 2015-12-01 和 2016-01-02 之间每个已完成的间隔创建一个 DAG 运行(但尚未为 2016-01-02 创建,因为该间隔尚未完成),并且调度程序将按顺序执行它们。

当您关闭 DAG 一段时间然后再重新启用它时,也会触发追赶。

此行为非常适合可以轻松拆分为周期的原子数据集。如果您的 DAG 在内部执行追赶,则关闭追赶非常有用。

回填

有时您可能需要为指定的历史时期运行 DAG,例如,使用 start_date 2019-11-21 创建数据填充 DAG,但是另一个用户需要一个月前(即 2019-10-21)的输出数据。此过程称为回填。

即使在禁用追赶的情况下,您也可能需要回填数据。这可以通过 CLI 完成。运行以下命令

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

回填命令将为开始日期和结束日期内的所有间隔重新运行 dag_id 的所有实例。

重新运行任务

在计划运行时,某些任务可能会失败。在查看日志后修复错误后,可以通过清除计划日期的任务来重新运行这些任务。清除任务实例会创建任务实例的记录。当前任务实例的 try_number 会递增,max_tries 设置为 0,状态设置为 None,这会导致任务重新运行。

在“树”或“图形”视图中单击失败的任务,然后单击“清除”。执行程序将重新运行它。

您可以选择多个选项来重新运行 -

  • 过去 - DAG 的最新数据间隔之前运行中的所有任务实例

  • 未来 - DAG 的最新数据间隔之后运行中的所有任务实例

  • 上游 - 当前 DAG 中的上游任务

  • 下游 - 当前 DAG 中的下游任务

  • 递归 - 子 DAG 和父 DAG 中的所有任务

  • 失败 - DAG 最近一次运行中仅失败的任务

您也可以使用命令通过 CLI 清除任务

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

对于指定的 dag_id 和时间间隔,此命令会清除与正则表达式匹配的所有任务实例。如需了解更多选项,可以查看clear 命令的帮助文档。

airflow tasks clear --help

任务实例历史记录

当任务实例重试或被清除时,任务实例的历史记录会被保留。您可以通过点击网格视图中的任务实例来查看此历史记录。

../_images/task_instance_history.png

注意

上面显示的尝试选择器仅适用于已重试或清除的任务。

历史记录显示特定运行结束时任务实例属性的值。在日志页面上,您还可以查看每次任务实例尝试的日志。这对于调试很有用。

../_images/task_instance_history_log.png

注意

相关的任务实例对象,如 XComs、渲染的模板字段等,不会保存在历史记录中。仅保留任务实例属性,包括日志。

外部触发器

请注意,也可以通过 CLI 手动创建 DAG 运行。只需运行以下命令 -

airflow dags trigger --exec-date logical_date run_id

在调度程序外部创建的 DAG 运行会与触发器的的时间戳关联,并与计划的 DAG 运行一起显示在 UI 中。可以使用 -e 参数指定在 DAG 中传递的逻辑日期。默认值为 UTC 时区的当前日期。

此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡 DAGs -> 列 Links -> 按钮 Trigger Dag)。

触发 DAG 时传递参数

从 CLI、REST API 或 UI 触发 DAG 时,可以传递 DAG 运行的配置作为 JSON blob。

参数化 DAG 的示例

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意dag_run.conf 中的参数只能在运算符的模板字段中使用。

使用 CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

使用 UI

在 UI 中,触发 DAG 的参数可以通过 params 定义更好地表示,如 Params 文档中所述。通过定义的参数,会渲染适当的值输入表单。

如果 DAG 未定义 params,则通常会跳过表单。可以通过配置选项 show_trigger_form_if_no_params 强制显示仅用于传递配置选项的经典字典输入表单。

../_images/example_passing_conf.png

请考虑将此类用法转换为 params,因为这是一种更便捷的方式,并且还允许验证用户输入。

需要记住

  • 可以通过 UI 将任务实例标记为失败。这可以用于停止正在运行的任务实例。

  • 可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。

此条目是否对您有帮助?