DAG 运行

DAG 运行是一个对象,表示 DAG 在时间中的一个实例化。任何时候执行 DAG,都会创建一个 DAG 运行,并执行其中的所有任务。DAG 运行的状态取决于任务状态。每个 DAG 运行彼此独立运行,这意味着你可以同时运行 DAG 的多个运行。

DAG 运行状态

当 DAG 执行完成后,将确定 DAG 运行状态。DAG 的执行取决于其包含的任务及其依赖项。当所有任务都处于终端状态之一(即如果不可能转换到其他状态)时,例如 successfailedskipped,将向 DAG 运行分配状态。DAG 运行的状态分配基于所谓的“叶节点”或简称“叶”。叶节点是没有子节点的任务。

DAG 运行有两个可能的终端状态

  • success 如果所有叶节点状态都是 successskipped

  • failed 如果任何叶节点状态都是 failedupstream_failed

注意

如果某些任务定义了一些特定的 触发规则,请小心。这些可能会导致一些意外的行为,例如,如果你有一个叶任务的触发规则为 “all_done”,它将执行,而不管其他任务的状态如何,如果它成功,那么整个 DAG 运行也将标记为 success,即使中间出现故障。

在 Airflow 2.7 中添加

在“正在运行”选项卡中,可以在 UI 仪表板上显示当前正在运行 DAG 运行的 DAG。同样,最新 DAG 运行标记为失败的 DAG 可以显示在“失败”选项卡中。

数据间隔

Airflow 中的每个 DAG 运行都有一个分配的“数据间隔”,表示其操作的时间范围。例如,对于使用 @daily 调度的 DAG,其每个数据间隔都将在每天午夜 (00:00) 开始,并在午夜 (24:00) 结束。

DAG 运行通常在关联的数据间隔结束后安排,以确保运行能够收集该时间段内的所有数据。换句话说,通常直到 2020-01-01 结束,即 2020-01-02 00:00:00 之后,才会开始运行涵盖 2020-01-01 数据周期的运行。

Airflow 中的所有日期在某种程度上都与数据间隔的概念相关。例如,DAG 运行的“逻辑日期”(在 Airflow 2.2 之前的版本中也称为 execution_date)表示数据间隔的开始,而不是 DAG 实际执行的时间。

类似地,由于 DAG 及其任务的 start_date 参数指向同一逻辑日期,因此它标记了DAG 的第一个数据间隔的开始,而不是 DAG 中的任务开始运行的时间。换句话说,DAG 运行只会在 start_date 之后的一个间隔内安排。

提示

如果 cron 表达式或 timedelta 对象不足以表示 DAG 的计划、逻辑日期或数据间隔,请参阅 时间表。有关 logical date 的更多信息,请参阅 运行 DAGexecution_date 的含义

重新运行 DAG

在某些情况下,您可能需要再次执行 DAG。其中一种情况是计划的 DAG 运行失败时。

追赶

使用 start_date(可能还有 end_date)和非数据集计划定义的 Airflow DAG 定义了一系列间隔,调度程序将这些间隔转换为各个 DAG 运行并执行。默认情况下,调度程序将为自上次数据间隔以来尚未运行(或已清除)的任何数据间隔启动 DAG 运行。此概念称为追赶。

如果您的 DAG 未编写为处理其追赶(即,不仅限于间隔,而是 Now 例如),那么您将希望关闭追赶。这可以通过在 DAG 中设置 catchup=False 或在配置文件中设置 catchup_by_default=False 来完成。关闭后,调度程序仅为最新间隔创建 DAG 运行。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
    catchup=False,
)

在上面的示例中,如果调度程序守护程序在 2016-01-02 上午 6 点(或从命令行)获取 DAG,将创建一个数据在 2016-01-01 和 2016-01-02 之间的数据运行,下一个运行将在 2016-01-03 午夜后创建,数据间隔在 2016-01-02 和 2016-01-03 之间。

请注意,使用 datetime.timedelta 对象作为调度可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将涵盖 2016-01-01 06:00 和 2016-01-02 06:00 之间的数据(一个调度间隔现在结束)。有关 cron 和基于 delta 的调度之间差异的更详细描述,请参阅 时间表比较

如果 dag.catchup 值改为 True,调度程序将为 2015-12-01 和 2016-01-02 之间的每个已完成间隔创建一个 DAG 运行(但尚未为 2016-01-02 创建一个,因为该间隔尚未完成),并且调度程序将顺序执行它们。

当您在指定时间段内关闭 DAG 然后重新启用它时,也会触发追赶。

对于可以轻松地分成几个时期的原子数据集,这种行为非常棒。如果您的 DAG 在内部执行追赶,关闭追赶非常棒。

回填

在某些情况下,您可能希望为指定的历史时期运行 DAG,例如,使用 start_date 2019-11-21 创建数据填充 DAG,但另一位用户需要一个月前即 2019-10-21 的输出数据。此过程称为回填。

即使在禁用追赶的情况下,您可能也希望回填数据。这可以通过 CLI 完成。运行以下命令

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

回填命令 将重新运行 start date 和 end date 内所有间隔的 dag_id 的所有实例。

重新运行任务

有些任务可能在计划运行期间失败。在查看日志后修复错误后,可以通过清除计划日期的任务来重新运行任务。清除任务实例不会删除任务实例记录。相反,它将 max_tries 更新为 0,并将当前任务实例状态设置为 None,这会导致任务重新运行。

在树形或图形视图中单击失败的任务,然后单击清除。执行器将重新运行它。

您可以选择多个选项来重新运行 -

  • 过去 - DAG 最近数据间隔之前运行中的任务的所有实例

  • 未来 - DAG 最近数据间隔之后运行中的任务的所有实例

  • 上游 - 当前 DAG 中的上游任务

  • 下游 - 当前 DAG 中的下游任务

  • 递归 - 子 DAG 和父 DAG 中的所有任务

  • 失败 - DAG 最近运行中仅失败的任务

您还可以使用以下命令通过 CLI 清除任务

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

对于指定的 dag_id 和时间间隔,该命令将清除与正则表达式匹配的所有任务实例。有关更多选项,您可以查看 clear 命令 的帮助

airflow tasks clear --help

外部触发器

请注意,还可以通过 CLI 手动创建 DAG 运行。只需运行命令 -

airflow dags trigger --exec-date logical_date run_id

在调度程序外部创建的 DAG 运行与触发器的时间戳相关联,并与计划的 DAG 运行一起显示在 UI 中。可以使用 -e 参数指定 DAG 内传递的逻辑日期。默认值是 UTC 时区中的当前日期。

此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡DAG -> 列链接 -> 按钮触发 DAG

触发 DAG 时传递参数

从 CLI、REST API 或 UI 触发 DAG 时,可以将 DAG 运行的配置作为 JSON blob 传递。

参数化 DAG 的示例

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意:来自 dag_run.conf 的参数只能用于运算符的模板字段中。

使用 CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

使用 UI

在 UI 中,触发 DAG 的参数可以通过 params 定义更好地表示,如 Params 文档中所述。通过定义的参数,将呈现用于值输入的适当形式。

如果 DAG 未定义 params,通常会跳过该表单,通过配置选项 show_trigger_form_if_no_params,可以强制显示仅字典输入的经典表单以传递配置选项。

../_images/example_passing_conf.png

请考虑将此类用法转换为 params,因为这是更便捷的方式,并且还允许验证用户输入。

谨记

  • 可以通过 UI 将任务实例标记为失败。这可用于停止运行任务实例。

  • 可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。

此条目是否有用?