airflow.providers.standard.triggers.external_task

WorkflowTrigger

一个用于监控 Apache Airflow 中任务、任务组和 DAG 执行的 trigger。

DagStateTrigger

异步等待指定 run_id 的 DAG 完成。

模块内容

class airflow.providers.standard.triggers.external_task.WorkflowTrigger(external_dag_id, run_ids=None, execution_dates=None, logical_dates=None, external_task_ids=None, external_task_group_id=None, failed_states=None, skipped_states=None, allowed_states=None, poke_interval=2.0, soft_fail=False, **kwargs)[source]

基类: airflow.triggers.base.BaseTrigger

一个用于监控 Apache Airflow 中任务、任务组和 DAG 执行的 trigger。

参数:
  • external_dag_id (str) – 外部 DAG 的 ID。

  • run_ids (list[str] | None) – 外部 DAG 的 run id 列表。

  • external_task_ids (Collection[str] | None) – 要等待的外部任务 ID 集合。

  • external_task_group_id (str | None) – 要等待的外部任务组的 ID。

  • failed_states (Iterable[str] | None) – 外部任务被视为失败的状态。

  • skipped_states (Iterable[str] | None) – 外部任务被视为跳过的状态。

  • allowed_states (Iterable[str] | None) – 外部任务被视为成功的状态。

  • poke_interval (float) – 检查外部任务状态的间隔(秒)。

  • soft_fail (bool) – 如果为 True,则当外部任务失败时,此 trigger 不会导致整个 DAG 失败。

  • logical_dates (list[datetime.datetime] | None) – 外部 DAG 的逻辑日期列表。

external_dag_id[source]
external_task_ids = None[source]
external_task_group_id = None[source]
failed_states = None[source]
skipped_states = None[source]
allowed_states = None[source]
run_ids = None[source]
poke_interval = 2.0[source]
soft_fail = False[source]
execution_dates = None[source]
logical_dates = None[source]
serialize()[source]

序列化 trigger 参数和模块路径。

async run()[source]

定期检查任务、任务组或 DAG 状态。

class airflow.providers.standard.triggers.external_task.DagStateTrigger(dag_id, states, run_ids=None, execution_dates=None, poll_interval=5.0)[source]

基类: airflow.triggers.base.BaseTrigger

异步等待指定 run_id 的 DAG 完成。

参数:
  • dag_id (str) – 包含您要等待的任务的 dag_id。

  • states (list[airflow.utils.state.DagRunState]) – 允许的状态,默认为 ['success']

  • run_ids (list[str] | None) – DAG 运行的 run_id。

  • poll_interval (float) – 检查状态的时间间隔(秒)。默认值为 5.0 秒。

dag_id[source]
states[source]
run_ids = None[source]
execution_dates = None[source]
poll_interval = 5.0[source]
serialize()[source]

序列化 DagStateTrigger 参数和类路径。

async run()[source]

定期检查 DAG 运行是否存在,以及是否已达到其中一个状态。

async validate_count_dags_af_3(runs_ids_or_dates_len=0)[source]
count_dags(*, session=NEW_SESSION)[source]

统计数据库中符合我们条件的 DAG 运行数量。

此条目有帮助吗?