时间表

对于具有基于时间调度的 DAG(与事件驱动相反),DAG 的内部“时间表”驱动调度。时间表还确定为 DAG 创建的每次运行的数据间隔和逻辑日期。

使用 cron 表达式或 timedelta 对象调度的 DAG 在内部转换为始终使用时间表。

如果 cron 表达式或 timedelta 足以满足您的用例,则无需担心编写自定义时间表,因为 Airflow 具有处理这些情况的默认时间表。但是,对于更复杂的调度要求,您可以创建自己的时间表类,并将其传递给 DAG 的 schedule 参数。

以下是一些自定义时间表实现有用的示例

  • 每天在不同时间发生的任务运行。例如,天文学家可能会发现,在黎明时运行任务来处理从前一晚收集的数据很有用。

  • 不遵循公历的计划。例如,为中国传统历法中的每个月创建一个运行。这在概念上类似于日出情况,但时间尺度不同。

  • 滚动窗口或重叠数据间隔。例如,您可能希望每天都有一个运行,但使每个运行都涵盖前七天的时间段。可以使用 cron 表达式来实现这一点,但自定义数据间隔提供了更自然的表示形式。

  • 数据间隔之间有“漏洞”而不是连续间隔的数据间隔,因为 cron 表达式和 timedelta 计划都表示连续间隔。请参阅数据间隔

Airflow 允许您在插件中编写自定义时间表,并由 DAG 使用。您可以在使用时间表自定义 DAG 调度操作指南中找到一个演示自定义时间表的示例。

注意

作为一般规则,请始终在代码中尽可能晚地访问需要访问数据库的变量、连接或任何其他内容。请参阅时间表,了解更多最佳实践。

内置时间表

Airflow 内置了几个常见的时间表,以涵盖最常见的用例。其他时间表可能在插件中可用。

CronTriggerTimetable

接受 cron 表达式并根据其触发 DAG 运行的时间表。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

您还可以向时间表提供静态数据间隔。可选的 interval 参数必须是 datetime.timedeltadateutil.relativedelta.relativedelta。使用这些参数时,触发的 DAG 运行的数据间隔跨越指定的时间长度,并结束于触发时间。

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

使用时间增量调度数据间隔的时间表。您可以通过将 datetime.timedeltadateutil.relativedelta.relativedelta 提供给 DAG 的 schedule 参数来选择它。

此时间表侧重于数据间隔值,并且不一定将执行日期与任意边界(例如,一天或一小时的开始)对齐。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

接受 cron 表达式的时间表,根据每个 cron 触发点之间的时间间隔创建数据间隔,并在每个数据间隔结束时触发 DAG 运行。

通过将有效的 cron 表达式作为字符串提供给 DAG 的 schedule 参数来选择此时间表,如DAG文档中所述。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

EventsTimetable

传递 datetime 的列表以供 DAG 在之后运行。这对于根据体育赛事、计划的沟通活动和其他任意且不规则但可预测的时间表进行计时非常有用。

事件列表必须是有限的,并且大小合理,因为它必须在每次解析 DAG 时加载。可选地,使用 restrict_to_events 标志来强制手动运行 DAG,该 DAG 使用最近或第一个事件的时间作为数据间隔。否则,手动运行将以等于手动运行开始时间的 data_interval_startdata_interval_end 开始。您还可以使用 description 参数命名事件集,该参数将显示在 Airflow UI 中。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基于数据集事件的定时调度

将条件数据集表达式与基于时间的计划相结合可以增强调度的灵活性。

DatasetOrTimeSchedule 是一个专门的时间表,允许根据基于时间的计划和数据集事件调度 DAG。它还促进了按传统时间表创建的计划运行和独立运行的数据集触发运行。

此功能在需要 DAG 在数据集更新时以及以周期性间隔运行时特别有用。它确保工作流程对数据更改保持响应,并始终运行定期检查或更新。

以下是使用 DatasetOrTimeSchedule 的 DAG 的示例

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

时间表对比

两个 cron 时间表之间的差异

Airflow 有两个时间表 CronTriggerTimetableCronDataIntervalTimetable,它们都接受 cron 表达式。

但是,两者之间存在差异:- CronTriggerTimetable 不处理数据间隔,而 CronDataIntervalTimetable 处理。- CronTriggerTimetableCronDataIntervalTimetablerun_idlogical_date 中的时间戳根据它们如何处理数据间隔而以不同的方式定义,如DAG 运行的触发时间中所述。

是否考虑数据间隔

CronTriggerTimetable 包含数据间隔。这意味着 data_interval_startdata_interval_end 的值(以及旧的 execution_date)是相同的;即 DAG 运行被触发的时间。

然而,CronDataIntervalTimetable 包含数据间隔。这意味着 data_interval_startdata_interval_end 的值(以及旧的 execution_date)是不同的。data_interval_start 是 DAG 运行被触发的时间,而 data_interval_end 是间隔的结束时间。

补漏行为

无论您使用 CronTriggerTimetable 还是 CronDataIntervalTimetable,当 catchupTrue 时,没有区别。

在某些情况下,您可能希望将 catchup 设置为 False,以防止运行不必要的 DAG:- 如果您创建一个新的 DAG,其开始日期在过去,并且不想运行过去的 DAG。如果 catchupTrue,Airflow 将运行该时间间隔内本应运行的所有 DAG。- 如果您暂停现有的 DAG,然后在稍后重新启动它,并且不想如果 catchupTrue

在这些情况下,run_id 中的 logical_date 基于 CronTriggerTimetableCronDataIntervalTimetable 如何处理数据间隔。

有关使用 catchup 时如何触发 DAG 运行的更多信息,请参阅 补漏

DAG 运行被触发的时间

CronTriggerTimetableCronDataIntervalTimetable 在同一时间触发 DAG 运行。但是,每个运行的 run_id 的时间戳是不同的。

例如,假设有一个 cron 表达式 @daily0 0 * * *,它计划每天凌晨 12 点运行。如果您在 1 月 31 日下午 3 点使用两个时间表启用 DAG,则:- CronTriggerTimetable 将在 2 月 1 日凌晨 12 点触发一个新的 DAG 运行。 run_id 时间戳为 2 月 1 日的午夜。- CronDataIntervalTimetable 立即触发一个新的 DAG 运行,因为 1 月 31 日凌晨 12 点开始的每日时间间隔的 DAG 运行尚未发生。 run_id 时间戳为 1 月 31 日的午夜,因为那是数据间隔的开始时间。

这是另一个示例,显示了在跳过 DAG 运行的情况下存在的差异。

假设有两个正在运行的 DAG,其 cron 表达式为 @daily0 0 * * *,它们使用两个不同的时间表。如果您在 1 月 31 日下午 3 点暂停 DAG,并在 2 月 2 日下午 3 点重新启用它们,则:- CronTriggerTimetable 跳过本应在 2 月 1 日和 2 日触发的 DAG 运行。下一个 DAG 运行将在 2 月 3 日凌晨 12 点触发。- CronDataIntervalTimetable 仅跳过本应在 2 月 1 日触发的 DAG 运行。在您重新启用 DAG 后,会立即触发 2 月 2 日的 DAG 运行。

在这些示例中,您可以看到,与 CronDataIntervalTimetable 的行为方式相比,CronTriggerTimetable 触发 DAG 运行的方式更直观,并且更类似于人们期望 cron 的行为方式。

cron 和增量数据间隔时间表之间的差异:

DeltaDataIntervalTimetableCronDataIntervalTimetable 之间进行选择取决于您的使用情况。如果您在 2 月 1 日 01:05 启用 DAG,则下表总结了创建的 DAG 运行及其涵盖的数据间隔,具体取决于 3 个参数:schedulestart_datecatchup

计划

开始日期

补漏

覆盖的间隔

备注

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用 timedelta 对象时的行为相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

间隔 00:00 - 00:30 不在开始日期之后,因此被跳过。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

无论开始日期如何,数据间隔都与小时/天/等边界对齐。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用 cron 表达式时的行为相同。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

间隔未与开始日期对齐,而是与当前时间对齐。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

间隔与开始日期对齐。下一个将在 5 分钟后触发,覆盖 00:40 - 01:10。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

间隔与当前时间对齐。下一个运行将在 30 分钟后触发。

此条目是否有帮助?