airflow.providers.standard.sensors.external_task

ExternalDagLink

用于 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 链接。

ExternalTaskSensor

等待其他 DAG、任务组或任务在特定逻辑日期完成。

ExternalTaskMarker

使用此 Operator 指示其他 DAG 上的任务依赖于此任务。

模块内容

基类: airflow.sdk.BaseOperatorLink

用于 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 链接。

它允许用户访问由 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。

name = 'External DAG'[source]

链接名称。这将是任务 UI 上的按钮名称。

外部系统链接。

参数:
返回:

外部系统链接

返回类型:

str

class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sdk.bases.sensor.BaseSensorOperator

等待其他 DAG、任务组或任务在特定逻辑日期完成。

如果 external_task_group_idexternal_task_id 都为 None(默认),则 Sensor 会等待 DAG。无法同时设置 external_task_group_idexternal_task_id 的值。

默认情况下,ExternalTaskSensor 将等待外部任务成功,此时它也会成功。但是,默认情况下,如果外部任务失败,它**不会**失败,而是会继续检查状态直到 Sensor 超时(这样您就有时间重试外部任务,而无需同时清除 Sensor)。

默认情况下,如果外部任务跳过,ExternalTaskSensor 不会跳过。要更改此行为,只需设置 skipped_states=[TaskInstanceState.SKIPPED]。请注意,如果您正在监控多个任务,其中一个进入错误状态而另一个进入跳过状态,则 ExternalTaskSensor 将对首先看到的状态做出反应。如果同时发生,失败状态优先。

可以通过设置导致 Sensor 失败的状态来更改默认行为,例如,通过设置 allowed_states=[DagRunState.FAILED]failed_states=[DagRunState.SUCCESS],您将颠倒行为,得到一个在外部任务**失败**时变为绿色并在外部任务**成功**时立即变为红色的 Sensor!

请注意,检查 failed_states 时会遵守 soft_fail。因此,如果外部任务进入失败状态并且 soft_fail == True,Sensor 将会**跳过**而不是失败。因此,设置 soft_fail=Truefailed_states=[DagRunState.SKIPPED] 将导致 Sensor 在外部任务跳过时跳过。然而,这是一个牵强的例子——如果您想要这种行为,请考虑使用 skipped_states。使用 skipped_states 允许 Sensor 在目标失败时跳过,但在超时时仍进入失败状态。如上所述使用 soft_fail == True 将导致 Sensor 在目标失败时跳过,并且在超时时也跳过。

参数:
  • external_dag_id (str) – 包含您要等待的任务的 dag_id。(模板化)

  • external_task_id (str | None) – 包含您要等待的任务的 task_id。(模板化)

  • external_task_ids (collections.abc.Collection[str] | None) – 您要等待的任务 task_id 列表。(模板化)如果为 None(默认值),则 Sensor 等待 DAG。ExternalTaskSensor 可以传递 external_task_id 或 external_task_ids,但不能同时传递两者。

  • external_task_group_id (str | None) – 包含您要等待的任务组的 task_group_id。(模板化)

  • allowed_states (collections.abc.Iterable[str] | None) – 允许的状态的可迭代对象,默认是 ['success']

  • skipped_states (collections.abc.Iterable[str] | None) – 将此任务标记为跳过的状态的可迭代对象,默认是 None

  • failed_states (collections.abc.Iterable[str] | None) – 失败或不允许的状态的可迭代对象,默认是 None

  • execution_delta (datetime.timedelta | None) – 与上次执行时间差,用于查找,默认是当前任务或 DAG 的相同逻辑日期。对于昨天,使用 [正的!] datetime.timedelta(days=1)。ExternalTaskSensor 可以传递 execution_delta 或 execution_date_fn,但不能同时传递两者。

  • execution_date_fn (Callable | None) – 一个函数,接收当前执行的逻辑日期作为第一个位置参数,并可选地接收 context 字典中可用的任何数量的关键字参数,然后返回要查询的所需逻辑日期。ExternalTaskSensor 可以传递 execution_delta 或 execution_date_fn,但不能同时传递两者。

  • check_existence (bool) – 设置为 True 以检查外部任务是否存在(当 external_task_id 不为 None 时)或检查要等待的 DAG 是否存在(当 external_task_id 为 None 时),并在外部任务或 DAG 不存在时立即停止等待(默认值:False)。

  • poll_interval (float) – 轮询间隔(秒),用于检查状态

  • deferrable (bool) – 在可延迟模式下运行 Sensor

template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]
ui_color = '#4db7db'[source]
allowed_states[source]
skipped_states = [][source]
failed_states = [][source]
execution_delta = None[source]
execution_date_fn = None[source]
external_dag_id[source]
external_task_id = None[source]
external_task_ids = None[source]
external_task_group_id = None[source]
check_existence = False[source]
deferrable = True[source]
poll_interval = 2.0[source]
poke(context)[source]

派生此类时重写。

execute(context)[source]

在 worker 上运行,如果 deferrable 设置为 True,则使用 trigger 进行延迟。

execute_complete(context, event=None)[source]

当 trigger 触发时执行 - 立即返回。

get_count(dttm_filter, session, states)[source]

根据 dttm 过滤器和状态获取记录计数。

参数:
  • dttm_filter – 逻辑日期的日期时间过滤器

  • session – airflow 会话对象

  • states – 任务或 DAG 状态

返回:

符合过滤条件的记录计数

返回类型:

int

get_external_task_group_task_ids(session, dttm_filter)[source]
class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]

基类: airflow.providers.standard.operators.empty.EmptyOperator

使用此 Operator 指示其他 DAG 上的任务依赖于此任务。

当此任务在选中“递归”的情况下被清除时,Airflow 将递归清除其他 DAG 上的该任务及其下游任务。传递依赖关系会一直追踪到达到 recursion_depth。

参数:
  • external_dag_id (str) – 包含需要被清除的依赖任务的 dag_id。

  • external_task_id (str) – 需要被清除的依赖任务的 task_id。

  • logical_date (str | datetime.datetime | None) – 需要被清除的依赖任务执行的逻辑日期。

  • recursion_depth (int) – 允许的最大传递依赖级别。默认值为 10。这主要用于防止循环依赖。如有必要,可以增加此数字。但是,太多级别的传递依赖会使在 Web UI 中清除任务变慢。

template_fields = ['external_dag_id', 'external_task_id', 'logical_date'][source]
ui_color = '#4db7db'[source]
external_dag_id[source]
external_task_id[source]
recursion_depth = 10[source]
classmethod get_serialized_fields()[source]

序列化 ExternalTaskMarker 以精确包含这些字段 + 模板化字段。

此条目是否有帮助?