BranchDateTimeOperator¶
使用 BranchDateTimeOperator
将工作流分支到两个执行路径之一,取决于时间是否落在两个目标参数给定的范围内,
此操作符有两种模式。第一种模式是使用当前时间(DAG 执行时的机器时钟时间),第二种模式是使用它运行所在的 DAG 运行的 logical_date
。
与当前时间一起使用¶
上述用法在某些情况下可能有用 - 例如当 DAG 用于执行清理和维护,并且不适用于任何需要回填的 DAG,因为“当前时间”使回填非幂等,其结果取决于 DAG 实际运行的时间。即使按计划运行,它也可能略微不确定。DAGRun 的调度和执行之间可能会有一些时间差,这可能意味着即使 DAGRun 已正确调度,用于分支决定的实际时间将与调度时间不同,并且分支决定可能会因这些延迟而异。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)
cond1 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag1,
)
# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]
目标参数 target_upper
和 target_lower
可以接收一个 datetime.datetime
、一个 datetime.time
或 None
。当使用一个 datetime.time
对象时,它将与当前日期组合以便允许进行比较。如果 target_upper
设置为一个发生在给定的 target_lower
之前的 datetime.time
,一天将被添加到 target_upper
中。这样做是为了允许跨越两天的时段。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_12 = EmptyOperator(task_id="date_in_range", dag=dag2)
empty_task_22 = EmptyOperator(task_id="date_outside_range", dag=dag2)
cond2 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag2,
)
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]
如果目标参数设置为 None
,操作符将仅使用非 None
的目标进行单边比较。将 target_upper
和 target_lower
都设置为 None
将引发异常。
与逻辑日期一起使用¶
这种用法对“数据范围”更友好。该 logical_date
在 DAG 重新运行时不会改变,也不受执行延迟的影响,因此这种方法适用于可能需要回填的幂等 DAG 运行。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_13 = EmptyOperator(task_id="date_in_range", dag=dag3)
empty_task_23 = EmptyOperator(task_id="date_outside_range", dag=dag3)
cond3 = BranchDateTimeOperator(
task_id="datetime_branch",
use_task_logical_date=True,
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag3,
)
# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]
BranchDayOfWeekOperator¶
使用 BranchDayOfWeekOperator
根据星期几的值来分支你的工作流。
airflow/example_dags/example_branch_day_of_week_operator.py
empty_task_1 = EmptyOperator(task_id="branch_true")
empty_task_2 = EmptyOperator(task_id="branch_false")
empty_task_3 = EmptyOperator(task_id="branch_weekend")
empty_task_4 = EmptyOperator(task_id="branch_mid_week")
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_true",
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
branch_weekend = BranchDayOfWeekOperator(
task_id="make_weekend_choice",
follow_task_ids_if_true="branch_weekend",
follow_task_ids_if_false="branch_mid_week",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)
# Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
# Run empty_task_3 if it's a weekend, empty_task_4 otherwise
empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]