DAG 运行¶
DAG 运行是表示 DAG 在时间上的实例化的对象。 每次执行 DAG 时,都会创建一个 DAG 运行,并且执行其中的所有任务。DAG 运行的状态取决于任务状态。每个 DAG 运行都是彼此独立运行的,这意味着您可以同时运行多个 DAG 运行。
DAG 运行状态¶
DAG 运行状态在 DAG 执行完成时确定。DAG 的执行取决于其包含的任务及其依赖关系。当所有任务都处于终端状态之一(即,如果没有可能转换为另一种状态)时,例如 success
,failed
或 skipped
时,状态将分配给 DAG 运行。DAG 运行的状态是根据所谓的“叶节点”或简称为“叶子”来分配的。叶节点是没有子节点的任务。
DAG 运行有两种可能的终端状态
如果所有叶节点状态均为
success
或skipped
,则为success
,如果任何叶节点状态为
failed
或upstream_failed
,则为failed
。
注意
请注意,如果您的某些任务定义了一些特定的触发规则。这些可能会导致一些意外行为,例如,如果您有一个触发规则为 “all_done” 的叶子任务,则无论其余任务的状态如何都将执行该任务,如果它成功,则整个 DAG 运行也将被标记为 success
,即使中间的某些任务失败了。
在 Airflow 2.7 中添加
在 UI 仪表板上的“正在运行”选项卡中可以显示当前正在运行 DAG 运行的 DAG。同样,可以在“失败”选项卡上找到其最新 DAG 运行标记为失败的 DAG。
数据间隔¶
Airflow 中的每个 DAG 运行都分配有一个“数据间隔”,表示它运行的时间范围。例如,对于使用 @daily
调度的 DAG,其每个数据间隔都将在每天午夜(00:00)开始,并在午夜(24:00)结束。
通常在关联的数据间隔结束后调度 DAG 运行,以确保该运行能够收集该时间段内的所有数据。换句话说,覆盖 2020-01-01 数据周期的运行通常不会在 2020-01-01 结束后(即在 2020-01-02 00:00:00 之后)才开始运行。
Airflow 中的所有日期都以某种方式与数据间隔概念相关联。例如,DAG 运行的“逻辑日期”(在 2.2 之前的 Airflow 版本中也称为 execution_date
)表示数据间隔的开始,而不是 DAG 实际执行的时间。
同样,由于 DAG 及其任务的 start_date
参数指向同一逻辑日期,因此它标记的是DAG 的第一个数据间隔的开始,而不是 DAG 中的任务将开始运行的时间。换句话说,DAG 运行仅在 start_date
之后的一个间隔被调度。
提示
如果 cron 表达式或 timedelta 对象不足以表达 DAG 的计划、逻辑日期或数据间隔,请参阅 时间表。有关 logical date
的更多信息,请参阅 运行 DAG 和 execution_date 是什么意思?
重新运行 DAG¶
在某些情况下,您可能需要再次执行 DAG。其中一种情况是,当计划的 DAG 运行失败时。
追赶¶
使用 start_date
、可能还有 end_date
和非数据集计划定义的 Airflow DAG 定义了一系列间隔,调度程序将其转换为单独的 DAG 运行并执行。默认情况下,调度程序将启动自上次数据间隔以来(或已清除)尚未运行的任何数据间隔的 DAG 运行。此概念称为追赶。
如果您的 DAG 未编写为处理其追赶(即,不限于间隔,而是例如 Now
),则您将需要关闭追赶。这可以通过在 DAG 中设置 catchup=False
或在配置文件中设置 catchup_by_default=False
来完成。关闭后,调度程序仅为最新间隔创建 DAG 运行。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule="@daily",
catchup=False,
)
在上面的示例中,如果调度程序守护程序在 2016-01-02 上午 6 点(或从命令行)拾取了 DAG,则将创建一个数据介于 2016-01-01 和 2016-01-02 之间的 DAG 运行,下一个 DAG 运行将在 2016-01-03 的早晨午夜之后创建,数据间隔在 2016-01-02 和 2016-01-03 之间。
请注意,使用 datetime.timedelta
对象作为计划可能会导致不同的行为。在这种情况下,创建的单个 DAG 运行将覆盖 2016-01-01 06:00 到 2016-01-02 06:00 之间的数据(一个计划间隔在现在结束)。有关基于 cron 和 delta 的计划之间差异的更详细描述,请查看 时间表比较
如果 dag.catchup
值改为 True
,则调度程序将为 2015-12-01 和 2016-01-02 之间每个已完成的间隔创建一个 DAG 运行(但尚未为 2016-01-02 创建,因为该间隔尚未完成),并且调度程序将按顺序执行它们。
当您关闭 DAG 一段时间然后再重新启用它时,也会触发追赶。
此行为非常适合可以轻松拆分为周期的原子数据集。如果您的 DAG 在内部执行追赶,则关闭追赶非常有用。
回填¶
有时您可能需要为指定的历史时期运行 DAG,例如,使用 start_date
2019-11-21 创建数据填充 DAG,但是另一个用户需要一个月前(即 2019-10-21)的输出数据。此过程称为回填。
即使在禁用追赶的情况下,您也可能需要回填数据。这可以通过 CLI 完成。运行以下命令
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
回填命令将为开始日期和结束日期内的所有间隔重新运行 dag_id 的所有实例。
重新运行任务¶
在计划运行时,某些任务可能会失败。在查看日志后修复错误后,可以通过清除计划日期的任务来重新运行这些任务。清除任务实例会创建任务实例的记录。当前任务实例的 try_number
会递增,max_tries
设置为 0
,状态设置为 None
,这会导致任务重新运行。
在“树”或“图形”视图中单击失败的任务,然后单击“清除”。执行程序将重新运行它。
您可以选择多个选项来重新运行 -
过去 - DAG 的最新数据间隔之前运行中的所有任务实例
未来 - DAG 的最新数据间隔之后运行中的所有任务实例
上游 - 当前 DAG 中的上游任务
下游 - 当前 DAG 中的下游任务
递归 - 子 DAG 和父 DAG 中的所有任务
失败 - DAG 最近一次运行中仅失败的任务
您也可以使用命令通过 CLI 清除任务
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
对于指定的 dag_id
和时间间隔,此命令会清除与正则表达式匹配的所有任务实例。如需了解更多选项,可以查看clear 命令的帮助文档。
airflow tasks clear --help
任务实例历史记录¶
当任务实例重试或被清除时,任务实例的历史记录会被保留。您可以通过点击网格视图中的任务实例来查看此历史记录。
注意
上面显示的尝试选择器仅适用于已重试或清除的任务。
历史记录显示特定运行结束时任务实例属性的值。在日志页面上,您还可以查看每次任务实例尝试的日志。这对于调试很有用。
注意
相关的任务实例对象,如 XComs、渲染的模板字段等,不会保存在历史记录中。仅保留任务实例属性,包括日志。
外部触发器¶
请注意,也可以通过 CLI 手动创建 DAG 运行。只需运行以下命令 -
airflow dags trigger --exec-date logical_date run_id
在调度程序外部创建的 DAG 运行会与触发器的的时间戳关联,并与计划的 DAG 运行一起显示在 UI 中。可以使用 -e
参数指定在 DAG 中传递的逻辑日期。默认值为 UTC 时区的当前日期。
此外,您还可以使用 Web UI 手动触发 DAG 运行(选项卡 DAGs -> 列 Links -> 按钮 Trigger Dag)。
触发 DAG 时传递参数¶
从 CLI、REST API 或 UI 触发 DAG 时,可以传递 DAG 运行的配置作为 JSON blob。
参数化 DAG 的示例
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
注意:dag_run.conf
中的参数只能在运算符的模板字段中使用。
使用 CLI¶
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
需要记住¶
可以通过 UI 将任务实例标记为失败。这可以用于停止正在运行的任务实例。
可以通过 UI 将任务实例标记为成功。这主要用于修复误报,或者例如,当修复已在 Airflow 外部应用时。