airflow.sensors.external_task

模块内容

ExternalDagLink

ExternalTaskSensor 和 ExternalTaskMarker 的操作符链接。

ExternalTaskSensor

等待不同的 DAG、任务组或任务完成特定的逻辑日期。

ExternalTaskMarker

使用此操作符指示另一个 DAG 上的任务依赖于此任务。

ExternalTaskSensorLink

此外部链接已弃用。

基类: airflow.models.baseoperatorlink.BaseOperatorLink

ExternalTaskSensor 和 ExternalTaskMarker 的操作符链接。

它允许用户访问使用 ExternalTaskSensor 等待或被 ExternalTaskMarker 清除的 DAG。

name = '外部 DAG'[源代码]

链接到外部系统。

注意:此函数的旧签名是 (self, operator, dttm: datetime)。运行时仍然支持该签名,但已弃用。

参数
返回

链接到外部系统

返回类型

str

class airflow.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类: airflow.sensors.base.BaseSensorOperator

等待不同的 DAG、任务组或任务完成特定的逻辑日期。

如果 external_task_group_idexternal_task_id 均为 None (默认值),则传感器将等待 DAG。 external_task_group_idexternal_task_id 的值不能同时设置。

默认情况下,ExternalTaskSensor 将等待外部任务成功,此时它也会成功。但是,默认情况下,如果外部任务失败,它将会失败,而是会继续检查状态,直到传感器超时(从而使你有时间重试外部任务,而无需清除传感器)。

默认情况下,如果外部任务跳过,ExternalTaskSensor 将不会跳过。要更改此行为,只需设置 skipped_states=[TaskInstanceState.SKIPPED]。请注意,如果要监视多个任务,并且一个任务进入错误状态,而另一个任务进入跳过状态,则外部任务将对它首先看到的状态做出反应。如果两者同时发生,则失败状态优先。

可以通过设置导致传感器失败的状态来更改默认行为,例如,通过设置 allowed_states=[DagRunState.FAILED]failed_states=[DagRunState.SUCCESS],你可以反转此行为,从而获得一个在外部任务失败时变为绿色,并且在外部任务成功时立即变为红色的传感器!

请注意,检查 failed_states 时会考虑 soft_fail。因此,如果外部任务进入失败状态,并且 soft_fail == True,则传感器将_跳过_而不是失败。因此,如果设置 soft_fail=Truefailed_states=[DagRunState.SKIPPED],则如果外部任务跳过,传感器将跳过。但是,这是一个虚构的例子——如果需要此行为,请考虑使用 skipped_states。使用 skipped_states 允许传感器在目标失败时跳过,但在超时时仍进入失败状态。如上所述,如果使用 soft_fail == True,则传感器会在目标失败时跳过,如果超时也会跳过。

参数
  • external_dag_id (str) – 包含你要等待的任务的 dag_id。(已模板化)

  • external_task_id (str | None) – 包含你要等待的任务的 task_id。(已模板化)

  • external_task_ids (Collection[str] | None) – 你要等待的 task_id 列表。(已模板化)如果为 None(默认值),则传感器等待 DAG。可以将 external_task_id 或 external_task_ids 传递给 ExternalTaskSensor,但不能同时传递两者。

  • external_task_group_id (str | None) – 包含你要等待的任务的任务组 ID。(已模板化)

  • allowed_states (Iterable[str] | None) – 允许状态的可迭代对象,默认为 ['success']

  • skipped_states (Iterable[str] | None) – 使此任务标记为跳过的状态的可迭代对象,默认为 None

  • failed_states (Iterable[str] | None) – 失败或不允许状态的可迭代对象,默认为 None

  • execution_delta (datetime.timedelta | None) – 要查看的与上一次执行的时间差,默认值与当前任务或 DAG 的逻辑日期相同。对于昨天,请使用 [正!] datetime.timedelta(days=1)。可以将 execution_delta 或 execution_date_fn 传递给 ExternalTaskSensor,但不能同时传递两者。

  • execution_date_fn (Callable | None) – 接收当前执行的逻辑日期作为第一个位置参数,并可以选择接收上下文字典中可用的任意数量的关键字参数的函数,并返回要查询的所需逻辑日期。可以将 execution_delta 或 execution_date_fn 传递给 ExternalTaskSensor,但不能同时传递两者。

  • check_existence ( bool ) – 设置为 True 以检查外部任务是否存在(当 external_task_id 不为 None 时)或检查要等待的 DAG 是否存在(当 external_task_id 为 None 时),如果外部任务或 DAG 不存在,则立即停止等待(默认值:False)。

  • poll_interval ( float ) – 轮询状态的间隔时间,以秒为单位。

  • deferrable ( bool ) – 以可延期模式运行传感器。

template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]
ui_color = '#4db7db'[source]
poke(context, session=NEW_SESSION)[source]

在派生此类时重写。

execute(context)[source]

如果 deferrable 设置为 True,则在 worker 上运行并使用触发器进行延迟。

execute_complete(context, event=None)[source]

当触发器触发时执行 - 立即返回。

get_count(dttm_filter, session, states)[source]

获取针对 dttm 过滤器和状态的记录计数。

参数
  • dttm_filter – 执行日期的日期时间过滤器

  • session – airflow 会话对象

  • states – 任务或 DAG 状态

返回

针对过滤器的记录计数

返回类型

int

get_external_task_group_task_ids(session, dttm_filter)[source]
class airflow.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, execution_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]

基类: airflow.operators.empty.EmptyOperator

使用此操作符指示另一个 DAG 上的任务依赖于此任务。

当选中“递归”清除此任务时,Airflow 将递归清除其他 DAG 上的任务及其下游任务。 传递依赖项将一直跟踪到达到 recursion_depth。

参数
  • external_dag_id ( str ) – 包含需要清除的依赖任务的 dag_id。

  • external_task_id ( str ) – 需要清除的依赖任务的 task_id。

  • execution_date ( str | datetime.datetime | None ) – 需要清除的依赖任务执行的逻辑日期。

  • recursion_depth ( int ) – 允许的最大传递依赖项级别。 默认为 10。这主要用于防止循环依赖。 如果必要,可以增加此数字。 但是,过多的传递依赖项级别会使在 Web UI 中清除任务的速度变慢。

template_fields = ['external_dag_id', 'external_task_id', 'execution_date'][source]
ui_color = '#4db7db'[source]
classmethod get_serialized_fields()[source]

序列化 ExternalTaskMarker 以准确包含这些字段 + templated_fields。

基类: ExternalDagLink

此外部链接已弃用。

请使用 airflow.sensors.external_task.ExternalDagLink

__attrs_post_init__()[source]

此条目是否有帮助?