airflow.providers.standard.sensors.external_task
¶
模块内容¶
类¶
ExternalTaskSensor 和 ExternalTaskMarker 的操作符链接。 |
|
等待不同的 DAG、任务组或任务在特定的逻辑日期完成。 |
|
使用此操作符来指示另一个 DAG 上的任务依赖于此任务。 |
- class airflow.providers.standard.sensors.external_task.ExternalDagLink[源代码]¶
基类:
airflow.models.baseoperatorlink.BaseOperatorLink
ExternalTaskSensor 和 ExternalTaskMarker 的操作符链接。
它允许用户访问使用 ExternalTaskSensor 等待或使用 ExternalTaskMarker 清除的 DAG。
- get_link(operator, *, ti_key)[源代码]¶
链接到外部系统。
注意:此函数的旧签名是
(self, operator, dttm: datetime)
。在运行时仍然支持,但已弃用。- 参数
operator (airflow.models.baseoperator.BaseOperator) – 此链接关联的 Airflow 操作符对象。
ti_key (airflow.models.taskinstancekey.TaskInstanceKey) – 返回链接的任务实例 ID。
- 返回
链接到外部系统
- 返回类型
- class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]¶
基类:
airflow.sensors.base.BaseSensorOperator
等待不同的 DAG、任务组或任务在特定的逻辑日期完成。
如果 external_task_group_id 和 external_task_id 都为
None
(默认),则传感器等待 DAG。 external_task_group_id 和 external_task_id 的值不能同时设置。默认情况下,ExternalTaskSensor 将等待外部任务成功,此时它也会成功。但是,默认情况下,如果外部任务失败,它将不会失败,而是会继续检查状态,直到传感器超时(从而使您有时间重试外部任务,而无需清除传感器)。
默认情况下,如果外部任务跳过,则 ExternalTaskSensor 不会跳过。要更改此设置,只需设置
skipped_states=[TaskInstanceState.SKIPPED]
。请注意,如果您正在监视多个任务,并且一个任务进入错误状态,另一个任务进入跳过状态,则外部任务将对首先看到的状态作出反应。如果两者同时发生,则失败状态具有优先权。可以通过设置导致传感器失败的状态来更改默认行为,例如,通过设置
allowed_states=[DagRunState.FAILED]
和failed_states=[DagRunState.SUCCESS]
,您将翻转行为以获得一个当外部任务失败时变为绿色,并且当外部任务成功时立即变为红色的传感器!请注意,在检查 failed_states 时,会考虑
soft_fail
。因此,如果外部任务进入失败状态,并且soft_fail == True
,则传感器将_跳过_而不是失败。因此,设置soft_fail=True
和failed_states=[DagRunState.SKIPPED]
将导致如果外部任务跳过,则传感器跳过。但是,这是一个人为的示例 - 如果您需要此行为,请考虑使用skipped_states
。如果目标失败,使用skipped_states
允许传感器跳过,但仍会在超时时进入失败状态。如上所述,使用soft_fail == True
将导致如果目标失败则传感器跳过,如果超时也跳过。- 参数
external_dag_id (str) – 包含您要等待的任务的 dag_id。(已模板化)
external_task_id (str | None) – 包含您要等待的任务的 task_id。(已模板化)
external_task_ids (collections.abc.Collection[str] | None) – 您要等待的任务的 task_ids 列表。(已模板化) 如果
None
(默认值),则传感器等待 DAG。可以将 external_task_id 或 external_task_ids 传递给 ExternalTaskSensor,但不能同时传递两者。external_task_group_id (str | None) – 包含您要等待的任务的任务组 ID。(已模板化)
allowed_states (collections.abc.Iterable[str] | None) – 允许状态的可迭代对象,默认为
['success']
skipped_states (collections.abc.Iterable[str] | None) – 使此任务标记为跳过的状态的可迭代对象,默认为
None
failed_states (collections.abc.Iterable[str] | None) – 失败或不允许状态的可迭代对象,默认为
None
execution_delta ( datetime.timedelta | None) – 与上一次执行的时间差,用于查找,默认值与当前任务或 DAG 的逻辑日期相同。对于昨天,请使用 [正!] datetime.timedelta(days=1)。 execution_delta 或 execution_date_fn 只能传递给 ExternalTaskSensor,不能同时传递两者。
execution_date_fn (Callable | None) – 接收当前执行的逻辑日期作为第一个位置参数,并可选择接收上下文字典中可用的任意数量的关键字参数的函数,并返回要查询的所需逻辑日期。execution_delta 或 execution_date_fn 只能传递给 ExternalTaskSensor,不能同时传递两者。
check_existence ( bool) – 设置为 True 以检查外部任务是否存在(当 external_task_id 不为 None 时)或检查要等待的 DAG 是否存在(当 external_task_id 为 None 时),如果外部任务或 DAG 不存在则立即停止等待(默认值:False)。
poll_interval ( float) – 轮询状态的周期,以秒为单位。
deferrable ( bool) – 在可延迟模式下运行传感器。
- template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][来源]¶
- class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[来源]¶
基类:
airflow.operators.empty.EmptyOperator
使用此操作符来指示另一个 DAG 上的任务依赖于此任务。
当使用“递归”选项清除此任务时,Airflow 将递归清除另一个 DAG 上的任务及其下游任务。传递依赖关系会一直跟踪,直到达到 recursion_depth。
- 参数
external_dag_id ( str) – 包含需要清除的依赖任务的 dag_id。
external_task_id ( str) – 需要清除的依赖任务的 task_id。
logical_date ( str | datetime.datetime | None) – 需要清除的依赖任务执行的逻辑日期。
recursion_depth ( int) – 允许的最大传递依赖级别。默认值为 10。这主要用于防止循环依赖。如有必要,可以增加此数字。但是,过多的传递依赖级别会使在 Web UI 中清除任务的速度变慢。