DAG 运行¶
DAG 运行是表示 DAG 在时间上的一个实例化对象。任何时候 DAG 被执行时,都会创建一个 DAG 运行,并且其中所有的任务都会被执行。DAG 运行的状态取决于任务的状态。每个 DAG 运行都是相互独立运行的,这意味着你可以同时运行同一个 DAG 的多个实例。
DAG 运行状态¶
DAG 运行状态在其执行完成时确定。DAG 的执行取决于其包含的任务及其依赖关系。当所有任务都处于终端状态(即没有可能转换到其他状态)时,例如 success
(成功)、failed
(失败)或 skipped
(跳过),状态会被分配给 DAG 运行。DAG 运行的状态是基于所谓的“叶子节点”或简称为“叶子”来分配的。叶子节点是没有子任务的任务。
DAG 运行有两种可能的终端状态
success
(成功):如果所有叶子节点状态都为success
或skipped
,则为success
。failed
(失败):如果任何叶子节点状态为failed
或upstream_failed
(上游失败),则为failed
。
注意
请注意,如果您的某些任务定义了特定的 触发规则,则需要小心。这可能会导致一些意想不到的行为,例如,如果您有一个叶子任务,其触发规则为“all_done”,则无论其他任务的状态如何,它都会被执行,如果它成功了,那么整个 DAG 运行也会被标记为 success
,即使中间有其他任务失败了。
在 Airflow 2.7 中新增
当前有正在运行的 DAG 运行的 DAGs 可以在 UI 面板的“运行中”选项卡中显示。类似地,最新 DAG 运行被标记为失败的 DAGs 可以在“失败”选项卡中找到。
数据区间¶
Airflow 中的每个 DAG 运行都有一个分配的“数据区间”,代表其操作的时间范围。例如,对于使用 @daily
调度的 DAG,其每个数据区间将从每天午夜 (00:00) 开始,并在午夜 (24:00) 结束。
DAG 运行通常在其关联的数据区间结束*之后*进行调度,以确保运行能够收集该时间段内的所有数据。换句话说,覆盖 2020-01-01 数据期间的运行通常不会在 2020-01-01 结束之前开始运行,即在 2020-01-02 00:00:00 之后。
Airflow 中的所有日期在某种程度上都与数据区间概念相关联。“逻辑日期”(在 Airflow 2.2 版本之前也称为 execution_date
)例如,表示数据区间的开始,而不是 DAG 实际执行的时间。
类似地,由于 DAG 及其任务的 start_date
参数指向相同的逻辑日期,它标志着 *DAG 的第一个数据区间*的开始,而不是 DAG 中的任务何时开始运行。换句话说,DAG 运行只会在 start_date
之后调度一个区间。
提示
如果 cron 表达式或 timedelta 对象不足以表达您的 DAG 调度、逻辑日期或数据区间,请参阅 时间表。有关 logical date
的更多信息,请参阅 运行 DAG 和 execution_date 是什么意思?
重新运行 DAG¶
有时您可能希望再次执行您的 DAG。其中一种情况是计划的 DAG 运行失败时。
追赶执行¶
使用 start_date
(可能还有 end_date
)和非资产计划定义的 Airflow DAG 定义了一系列区间,调度器将这些区间转化为独立的 DAG 运行并执行。默认情况下,自上一个数据区间以来尚未运行的 DAG 运行不会在 DAG 激活时由调度器创建 (Airflow 配置 scheduler.catchup_by_default=False
)。调度器只为最新的区间创建 DAG 运行。
如果您在 DAG 中设置 catchup=True
,调度器将为自上一个数据区间以来尚未运行(或已被清除)的任何数据区间启动一个 DAG 运行。这个概念被称为追赶执行 (Catchup)。
如果您的 DAG 没有编写为处理其追赶执行(例如,不限于区间,而是使用 Now
),那么您会希望关闭追赶执行,这是默认设置,或者可以在 DAG 定义中显式设置 catchup=False
,如果您的 Airflow 环境更改了默认配置。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule="@daily",
)
在上面的示例中,如果调度器守护程序在 2016-01-02 早上 6 点(或从命令行)拾取了该 DAG,则会创建一个数据区间在 2016-01-01 和 2016-01-02 之间的 DAG 运行,下一个将在 2016-01-03 凌晨刚过午夜时创建,数据区间在 2016-01-02 和 2016-01-03 之间。
请注意,使用 datetime.timedelta
对象作为计划可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将覆盖 2016-01-01 06:00 和 2016-01-02 06:00 之间的数据(一个计划区间现在结束)。有关基于 cron 和基于 delta 的计划之间差异的更详细描述,请查看 时间表比较。
如果 dag.catchup
的值改为 True
,调度器将为 2015-12-01 和 2016-01-02 之间的每个已完成区间(但不包括 2016-01-02 的区间,因为它尚未完成)创建一个 DAG 运行,并且调度器将按顺序执行它们。
当您在指定时间内关闭 DAG 然后重新启用它时,也会触发追赶执行。
这种行为非常适合可以轻松按周期划分的原子资产。如果您的 DAG 内部执行追赶执行,则关闭 catchup 是个不错的选择。
回填执行¶
您可能希望在指定的历史时期内运行 DAG。例如,创建了一个 start_date
为 2024-11-21 的 DAG,但另一个用户需要一个月前的数据输出,即 2024-10-21。这个过程称为回填执行 (Backfill)。
这可以通过 API 或 CLI 完成。对于 CLI 使用,运行以下命令
airflow backfill create --dag-id DAG_ID \
--start-date START_DATE \
--end-date END_DATE \
backfill 命令 将重新运行指定开始日期和结束日期范围内的 dag_id 的所有实例,针对所有区间。
重新运行任务¶
在计划运行期间,某些任务可能会失败。在查阅日志并修复错误后,您可以通过清除计划日期的任务实例来重新运行任务。清除任务实例会创建该任务实例的记录。当前任务实例的 try_number
会递增,max_tries
设置为 0
,状态设置为 None
,这将导致任务重新运行。
在 Tree 或 Graph 视图中单击失败的任务,然后单击 Clear。Executor 将重新运行它。
您可以选择多个选项来重新运行 -
Past (过去) - DAG 最近数据区间之前运行中的该任务的所有实例
Future (将来) - DAG 最近数据区间之后运行中的该任务的所有实例
Upstream (上游) - 当前 DAG 中的上游任务
Downstream (下游) - 当前 DAG 中的下游任务
Recursive (递归) - 子 DAG 和父 DAG 中的所有任务
Failed (失败) - 仅限 DAG 最近一次运行中失败的任务
您还可以通过 CLI 使用以下命令清除任务
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
对于指定的 dag_id
和时间区间,该命令将清除所有与正则表达式匹配的任务实例。有关更多选项,您可以查看 clear 命令 的帮助。
airflow tasks clear --help
任务实例历史记录¶
当任务实例重试或被清除时,会保留任务实例历史记录。您可以在 Grid 视图中单击任务实例来查看此历史记录。

注意
上面显示的尝试次数选择器仅适用于已重试或已清除的任务。
历史记录显示了特定运行结束时任务实例属性的值。在日志页面上,您还可以查看任务实例每次尝试的日志。这对于调试非常有用。

注意
相关的任务实例对象,如 XComs、渲染后的模板字段等,不会保留在历史记录中。仅保留任务实例属性,包括日志。
外部触发器¶
请注意,DAG 运行也可以通过 CLI 手动创建。只需运行命令 -
airflow dags trigger --logical-date logical_date run_id
通过调度器外部创建的 DAG 运行会关联到触发器的时间戳,并在 UI 中与计划的 DAG 运行一起显示。DAG 内部传递的逻辑日期可以使用 -e
参数指定。默认值为 UTC 时区的当前日期。
此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡 Dags -> 列 Links -> 按钮 Trigger Dag)
触发 DAG 时传递参数¶
通过 CLI、REST API 或 UI 触发 DAG 时,可以将 DAG 运行的配置作为 JSON blob 传递。
带参数的 DAG 示例
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
注意:来自 dag_run.conf
的参数只能在 operator 的模板字段中使用。
使用 CLI¶
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
注意事项¶
可以通过 UI 将任务实例标记为失败。这可用于停止正在运行的任务实例。
可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。