DAG 运行¶
DAG 运行是一个对象,表示 DAG 在时间中的一个实例化。任何时候执行 DAG,都会创建一个 DAG 运行,并执行其中的所有任务。DAG 运行的状态取决于任务状态。每个 DAG 运行彼此独立运行,这意味着你可以同时运行 DAG 的多个运行。
DAG 运行状态¶
当 DAG 执行完成后,将确定 DAG 运行状态。DAG 的执行取决于其包含的任务及其依赖项。当所有任务都处于终端状态之一(即如果不可能转换到其他状态)时,例如 success
、failed
或 skipped
,将向 DAG 运行分配状态。DAG 运行的状态分配基于所谓的“叶节点”或简称“叶”。叶节点是没有子节点的任务。
DAG 运行有两个可能的终端状态
success
如果所有叶节点状态都是success
或skipped
,failed
如果任何叶节点状态都是failed
或upstream_failed
。
注意
如果某些任务定义了一些特定的 触发规则,请小心。这些可能会导致一些意外的行为,例如,如果你有一个叶任务的触发规则为 “all_done”,它将执行,而不管其他任务的状态如何,如果它成功,那么整个 DAG 运行也将标记为 success
,即使中间出现故障。
在 Airflow 2.7 中添加
在“正在运行”选项卡中,可以在 UI 仪表板上显示当前正在运行 DAG 运行的 DAG。同样,最新 DAG 运行标记为失败的 DAG 可以显示在“失败”选项卡中。
数据间隔¶
Airflow 中的每个 DAG 运行都有一个分配的“数据间隔”,表示其操作的时间范围。例如,对于使用 @daily
调度的 DAG,其每个数据间隔都将在每天午夜 (00:00) 开始,并在午夜 (24:00) 结束。
DAG 运行通常在关联的数据间隔结束后安排,以确保运行能够收集该时间段内的所有数据。换句话说,通常直到 2020-01-01 结束,即 2020-01-02 00:00:00 之后,才会开始运行涵盖 2020-01-01 数据周期的运行。
Airflow 中的所有日期在某种程度上都与数据间隔的概念相关。例如,DAG 运行的“逻辑日期”(在 Airflow 2.2 之前的版本中也称为 execution_date
)表示数据间隔的开始,而不是 DAG 实际执行的时间。
类似地,由于 DAG 及其任务的 start_date
参数指向同一逻辑日期,因此它标记了DAG 的第一个数据间隔的开始,而不是 DAG 中的任务开始运行的时间。换句话说,DAG 运行只会在 start_date
之后的一个间隔内安排。
提示
如果 cron 表达式或 timedelta 对象不足以表示 DAG 的计划、逻辑日期或数据间隔,请参阅 时间表。有关 logical date
的更多信息,请参阅 运行 DAG 和 execution_date 的含义
重新运行 DAG¶
在某些情况下,您可能需要再次执行 DAG。其中一种情况是计划的 DAG 运行失败时。
追赶¶
使用 start_date
(可能还有 end_date
)和非数据集计划定义的 Airflow DAG 定义了一系列间隔,调度程序将这些间隔转换为各个 DAG 运行并执行。默认情况下,调度程序将为自上次数据间隔以来尚未运行(或已清除)的任何数据间隔启动 DAG 运行。此概念称为追赶。
如果您的 DAG 未编写为处理其追赶(即,不仅限于间隔,而是 Now
例如),那么您将希望关闭追赶。这可以通过在 DAG 中设置 catchup=False
或在配置文件中设置 catchup_by_default=False
来完成。关闭后,调度程序仅为最新间隔创建 DAG 运行。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule="@daily",
catchup=False,
)
在上面的示例中,如果调度程序守护程序在 2016-01-02 上午 6 点(或从命令行)获取 DAG,将创建一个数据在 2016-01-01 和 2016-01-02 之间的数据运行,下一个运行将在 2016-01-03 午夜后创建,数据间隔在 2016-01-02 和 2016-01-03 之间。
请注意,使用 datetime.timedelta
对象作为调度可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将涵盖 2016-01-01 06:00 和 2016-01-02 06:00 之间的数据(一个调度间隔现在结束)。有关 cron 和基于 delta 的调度之间差异的更详细描述,请参阅 时间表比较
如果 dag.catchup
值改为 True
,调度程序将为 2015-12-01 和 2016-01-02 之间的每个已完成间隔创建一个 DAG 运行(但尚未为 2016-01-02 创建一个,因为该间隔尚未完成),并且调度程序将顺序执行它们。
当您在指定时间段内关闭 DAG 然后重新启用它时,也会触发追赶。
对于可以轻松地分成几个时期的原子数据集,这种行为非常棒。如果您的 DAG 在内部执行追赶,关闭追赶非常棒。
回填¶
在某些情况下,您可能希望为指定的历史时期运行 DAG,例如,使用 start_date
2019-11-21 创建数据填充 DAG,但另一位用户需要一个月前即 2019-10-21 的输出数据。此过程称为回填。
即使在禁用追赶的情况下,您可能也希望回填数据。这可以通过 CLI 完成。运行以下命令
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
回填命令 将重新运行 start date 和 end date 内所有间隔的 dag_id 的所有实例。
重新运行任务¶
有些任务可能在计划运行期间失败。在查看日志后修复错误后,可以通过清除计划日期的任务来重新运行任务。清除任务实例不会删除任务实例记录。相反,它将 max_tries
更新为 0
,并将当前任务实例状态设置为 None
,这会导致任务重新运行。
在树形或图形视图中单击失败的任务,然后单击清除。执行器将重新运行它。
您可以选择多个选项来重新运行 -
过去 - DAG 最近数据间隔之前运行中的任务的所有实例
未来 - DAG 最近数据间隔之后运行中的任务的所有实例
上游 - 当前 DAG 中的上游任务
下游 - 当前 DAG 中的下游任务
递归 - 子 DAG 和父 DAG 中的所有任务
失败 - DAG 最近运行中仅失败的任务
您还可以使用以下命令通过 CLI 清除任务
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
对于指定的 dag_id
和时间间隔,该命令将清除与正则表达式匹配的所有任务实例。有关更多选项,您可以查看 clear 命令 的帮助
airflow tasks clear --help
外部触发器¶
请注意,还可以通过 CLI 手动创建 DAG 运行。只需运行命令 -
airflow dags trigger --exec-date logical_date run_id
在调度程序外部创建的 DAG 运行与触发器的时间戳相关联,并与计划的 DAG 运行一起显示在 UI 中。可以使用 -e
参数指定 DAG 内传递的逻辑日期。默认值是 UTC 时区中的当前日期。
此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡DAG -> 列链接 -> 按钮触发 DAG)
触发 DAG 时传递参数¶
从 CLI、REST API 或 UI 触发 DAG 时,可以将 DAG 运行的配置作为 JSON blob 传递。
参数化 DAG 的示例
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
注意:来自 dag_run.conf
的参数只能用于运算符的模板字段中。
使用 CLI¶
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
谨记¶
可以通过 UI 将任务实例标记为失败。这可用于停止运行任务实例。
可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。