跨 DAG 依赖关系

当两个 DAG 具有依赖关系时,值得考虑将它们合并成一个 DAG,这通常更容易理解。对于同一个 DAG 上的任务,Airflow 还提供了更好的依赖关系可视化表示。但是,有时将所有相关任务放在同一个 DAG 上并不实际。例如

  • 两个 DAG 可能有不同的调度。例如,一个每周 DAG 可能包含依赖于另一个每日 DAG 上的任务的任务。

  • 不同的团队负责不同的 DAG,但这些 DAG 之间存在一些跨 DAG 依赖关系。

  • 一个任务可能依赖于同一个 DAG 上的另一个任务,但依赖于不同的 execution_date(数据间隔的开始)。

  • 对在不同时间运行的任务使用 execution_delta,例如 execution_delta=timedelta(hours=1) 来检查 1 小时前运行的任务。

ExternalTaskSensor 可用于在不同 DAG 之间建立此类依赖关系。当它与 ExternalTaskMarker 一起使用时,清除依赖任务也可以跨不同 DAG 进行。

ExternalTaskSensor

使用 ExternalTaskSensor 使一个 DAG 上的任务等待另一个 DAG 上的另一个任务在特定的 execution_date 完成。

ExternalTaskSensor 还提供了一些选项,可以通过 allowed_statesfailed_states 参数来设置远程 DAG 上的任务是成功还是失败。

airflow/example_dags/example_external_task_marker_dag.py[源代码]

child_task1 = ExternalTaskSensor(
    task_id="child_task1",
    external_dag_id=parent_dag.dag_id,
    external_task_id=parent_task.task_id,
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

您也可以在可延迟模式下使用传感器来执行此操作

tests/system/providers/core/example_external_task_parent_deferrable.py[源代码]

external_task_sensor = ExternalTaskSensor(
    task_id="parent_task_sensor",
    external_task_id="child_task",
    external_dag_id="child_dag",
    deferrable=True,
)

具有任务组依赖关系的 ExternalTaskSensor

此外,我们还可以使用 ExternalTaskSensor 使一个 DAG 上的任务等待另一个 DAG 上的另一个 task_group 在特定的 execution_date 完成。

airflow/example_dags/example_external_task_marker_dag.py[源代码]

child_task2 = ExternalTaskSensor(
    task_id="child_task2",
    external_dag_id=parent_dag.dag_id,
    external_task_group_id="parent_dag_task_group_id",
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

ExternalTaskMarker

如果希望在 parent_dag 上的 parent_task 被清除时,child_dag 上的 child_task1 在特定的 execution_date 也应该被清除,则应该使用 ExternalTaskMarker。请注意,仅当用户在清除 parent_task 时选择了“递归”时,才会清除 child_task1

airflow/example_dags/example_external_task_marker_dag.py[源代码]

parent_task = ExternalTaskMarker(
    task_id="parent_task",
    external_dag_id="example_external_task_marker_child",
    external_task_id="child_task1",
)

此条目对您有帮助吗?