Airflow Summit 2025 即将于 10 月 07-09 日召开。立即注册获取早鸟票!

DAG 运行

DAG 运行是表示 DAG 在时间上的一个实例化对象。任何时候 DAG 被执行时,都会创建一个 DAG 运行,并且其中所有的任务都会被执行。DAG 运行的状态取决于任务的状态。每个 DAG 运行都是相互独立运行的,这意味着你可以同时运行同一个 DAG 的多个实例。

DAG 运行状态

DAG 运行状态在其执行完成时确定。DAG 的执行取决于其包含的任务及其依赖关系。当所有任务都处于终端状态(即没有可能转换到其他状态)时,例如 success(成功)、failed(失败)或 skipped(跳过),状态会被分配给 DAG 运行。DAG 运行的状态是基于所谓的“叶子节点”或简称为“叶子”来分配的。叶子节点是没有子任务的任务。

DAG 运行有两种可能的终端状态

  • success(成功):如果所有叶子节点状态都为 successskipped,则为 success

  • failed(失败):如果任何叶子节点状态为 failedupstream_failed(上游失败),则为 failed

注意

请注意,如果您的某些任务定义了特定的 触发规则,则需要小心。这可能会导致一些意想不到的行为,例如,如果您有一个叶子任务,其触发规则为“all_done”,则无论其他任务的状态如何,它都会被执行,如果它成功了,那么整个 DAG 运行也会被标记为 success,即使中间有其他任务失败了。

在 Airflow 2.7 中新增

当前有正在运行的 DAG 运行的 DAGs 可以在 UI 面板的“运行中”选项卡中显示。类似地,最新 DAG 运行被标记为失败的 DAGs 可以在“失败”选项卡中找到。

数据区间

Airflow 中的每个 DAG 运行都有一个分配的“数据区间”,代表其操作的时间范围。例如,对于使用 @daily 调度的 DAG,其每个数据区间将从每天午夜 (00:00) 开始,并在午夜 (24:00) 结束。

DAG 运行通常在其关联的数据区间结束*之后*进行调度,以确保运行能够收集该时间段内的所有数据。换句话说,覆盖 2020-01-01 数据期间的运行通常不会在 2020-01-01 结束之前开始运行,即在 2020-01-02 00:00:00 之后。

Airflow 中的所有日期在某种程度上都与数据区间概念相关联。“逻辑日期”(在 Airflow 2.2 版本之前也称为 execution_date)例如,表示数据区间的开始,而不是 DAG 实际执行的时间。

类似地,由于 DAG 及其任务的 start_date 参数指向相同的逻辑日期,它标志着 *DAG 的第一个数据区间*的开始,而不是 DAG 中的任务何时开始运行。换句话说,DAG 运行只会在 start_date 之后调度一个区间。

提示

如果 cron 表达式或 timedelta 对象不足以表达您的 DAG 调度、逻辑日期或数据区间,请参阅 时间表。有关 logical date 的更多信息,请参阅 运行 DAGexecution_date 是什么意思?

重新运行 DAG

有时您可能希望再次执行您的 DAG。其中一种情况是计划的 DAG 运行失败时。

追赶执行

使用 start_date(可能还有 end_date)和非资产计划定义的 Airflow DAG 定义了一系列区间,调度器将这些区间转化为独立的 DAG 运行并执行。默认情况下,自上一个数据区间以来尚未运行的 DAG 运行不会在 DAG 激活时由调度器创建 (Airflow 配置 scheduler.catchup_by_default=False)。调度器只为最新的区间创建 DAG 运行。

如果您在 DAG 中设置 catchup=True,调度器将为自上一个数据区间以来尚未运行(或已被清除)的任何数据区间启动一个 DAG 运行。这个概念被称为追赶执行 (Catchup)。

如果您的 DAG 没有编写为处理其追赶执行(例如,不限于区间,而是使用 Now),那么您会希望关闭追赶执行,这是默认设置,或者可以在 DAG 定义中显式设置 catchup=False,如果您的 Airflow 环境更改了默认配置。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
)

在上面的示例中,如果调度器守护程序在 2016-01-02 早上 6 点(或从命令行)拾取了该 DAG,则会创建一个数据区间在 2016-01-01 和 2016-01-02 之间的 DAG 运行,下一个将在 2016-01-03 凌晨刚过午夜时创建,数据区间在 2016-01-02 和 2016-01-03 之间。

请注意,使用 datetime.timedelta 对象作为计划可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将覆盖 2016-01-01 06:00 和 2016-01-02 06:00 之间的数据(一个计划区间现在结束)。有关基于 cron 和基于 delta 的计划之间差异的更详细描述,请查看 时间表比较

如果 dag.catchup 的值改为 True,调度器将为 2015-12-01 和 2016-01-02 之间的每个已完成区间(但不包括 2016-01-02 的区间,因为它尚未完成)创建一个 DAG 运行,并且调度器将按顺序执行它们。

当您在指定时间内关闭 DAG 然后重新启用它时,也会触发追赶执行。

这种行为非常适合可以轻松按周期划分的原子资产。如果您的 DAG 内部执行追赶执行,则关闭 catchup 是个不错的选择。

回填执行

您可能希望在指定的历史时期内运行 DAG。例如,创建了一个 start_date2024-11-21 的 DAG,但另一个用户需要一个月前的数据输出,即 2024-10-21。这个过程称为回填执行 (Backfill)。

这可以通过 API 或 CLI 完成。对于 CLI 使用,运行以下命令

airflow backfill create --dag-id DAG_ID \
    --start-date START_DATE \
    --end-date END_DATE \

backfill 命令 将重新运行指定开始日期和结束日期范围内的 dag_id 的所有实例,针对所有区间。

重新运行任务

在计划运行期间,某些任务可能会失败。在查阅日志并修复错误后,您可以通过清除计划日期的任务实例来重新运行任务。清除任务实例会创建该任务实例的记录。当前任务实例的 try_number 会递增,max_tries 设置为 0,状态设置为 None,这将导致任务重新运行。

在 Tree 或 Graph 视图中单击失败的任务,然后单击 Clear。Executor 将重新运行它。

您可以选择多个选项来重新运行 -

  • Past (过去) - DAG 最近数据区间之前运行中的该任务的所有实例

  • Future (将来) - DAG 最近数据区间之后运行中的该任务的所有实例

  • Upstream (上游) - 当前 DAG 中的上游任务

  • Downstream (下游) - 当前 DAG 中的下游任务

  • Recursive (递归) - 子 DAG 和父 DAG 中的所有任务

  • Failed (失败) - 仅限 DAG 最近一次运行中失败的任务

您还可以通过 CLI 使用以下命令清除任务

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

对于指定的 dag_id 和时间区间,该命令将清除所有与正则表达式匹配的任务实例。有关更多选项,您可以查看 clear 命令 的帮助。

airflow tasks clear --help

任务实例历史记录

当任务实例重试或被清除时,会保留任务实例历史记录。您可以在 Grid 视图中单击任务实例来查看此历史记录。

../_images/task_instance_history.png

注意

上面显示的尝试次数选择器仅适用于已重试或已清除的任务。

历史记录显示了特定运行结束时任务实例属性的值。在日志页面上,您还可以查看任务实例每次尝试的日志。这对于调试非常有用。

../_images/task_instance_history_log.png

注意

相关的任务实例对象,如 XComs、渲染后的模板字段等,不会保留在历史记录中。仅保留任务实例属性,包括日志。

外部触发器

请注意,DAG 运行也可以通过 CLI 手动创建。只需运行命令 -

airflow dags trigger --logical-date logical_date run_id

通过调度器外部创建的 DAG 运行会关联到触发器的时间戳,并在 UI 中与计划的 DAG 运行一起显示。DAG 内部传递的逻辑日期可以使用 -e 参数指定。默认值为 UTC 时区的当前日期。

此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡 Dags -> 列 Links -> 按钮 Trigger Dag

触发 DAG 时传递参数

通过 CLI、REST API 或 UI 触发 DAG 时,可以将 DAG 运行的配置作为 JSON blob 传递。

带参数的 DAG 示例

import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意:来自 dag_run.conf 的参数只能在 operator 的模板字段中使用。

使用 CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

注意事项

  • 可以通过 UI 将任务实例标记为失败。这可用于停止正在运行的任务实例。

  • 可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。

本条目有帮助吗?