2025 年 Airflow 峰会将于 10 月 07-09 日举行。立即注册以获取早鸟票!

airflow.triggers.base

属性

log

DiscrimatedTriggerEvent

StartTriggerArgs

从触发器启动任务执行所需的参数。

BaseTrigger

所有触发器的基类。

BaseEventTrigger

用于根据外部事件调度 DAG 的触发器的基类。

TriggerEvent

当条件满足时,触发器可以触发的事件。

TaskSuccessEvent

生成此事件以使任务成功结束。

TaskFailedEvent

生成此事件以使任务失败结束。

TaskSkippedEvent

生成此事件以使任务以 '跳过' 状态结束。

函数

trigger_event_discriminator(v)

模块内容

airflow.triggers.base.log[source]
class airflow.triggers.base.StartTriggerArgs[source]

从触发器启动任务执行所需的参数。

trigger_cls: str[source]
next_method: str[source]
trigger_kwargs: dict[str, Any] | None = None[source]
next_kwargs: dict[str, Any] | None = None[source]
timeout: datetime.timedelta | None = None[source]
class airflow.triggers.base.BaseTrigger(**kwargs)[source]

基类: abc.ABC, airflow.utils.log.logging_mixin.LoggingMixin

所有触发器的基类。

触发器可以存在于两种上下文中

  • 在 Operator 内部,当它被传递给 TaskDeferred 时

  • 在触发器工作进程中主动运行

我们在两种情况下使用相同的类,并依赖所有 Trigger 类能够返回(可以使用 Airflow-JSON 编码的)参数,这些参数将允许它们在其他地方被重新实例化。

task_instance = None[source]
trigger_id = None[source]
abstract serialize()[source]

返回重新构造此 Trigger 所需的信息。

返回:

元组 (类路径, 重新实例化所需的关键字参数)。

返回类型:

tuple[str, dict[str, Any]]

abstract run()[source]
异步:

在异步上下文中运行触发器。

触发器应该在需要触发事件时 yield 一个 Event,并在完成时返回 None。单事件触发器应该 yield 然后立即返回。

如果它 yield,它很可能会很快恢复,但也有可能不会(例如,如果工作负载被移动到另一个触发器进程,或者多事件触发器被用于单事件任务延迟)。

在任何一种情况下,Trigger 类都应假定它们会被持久化,然后在不再需要时依赖于 cleanup() 方法被调用。

async cleanup()[source]

清理触发器。

当不再需要触发器并将其从活动触发器进程中移除时调用此方法。

此方法遵循 async/await 模式,允许在触发器主事件循环中运行清理。清理方法引发的异常会被忽略,因此如果您想调试它们并被通知清理方法失败,您应该用 try/except 块包裹您的代码并适当处理(以异步兼容的方式)。

static repr(classpath, kwargs)[source]
__repr__()[source]
class airflow.triggers.base.BaseEventTrigger(**kwargs)[source]

基类: BaseTrigger

用于根据外部事件调度 DAG 的触发器的基类。

BaseEventTriggerBaseTrigger 的子类,用于标识与事件驱动调度兼容的触发器。

static hash(classpath, kwargs)[source]

返回触发器类路径和 kwargs 的哈希值。这用于唯一标识一个触发器。

我们不希望将此逻辑放在 BaseTrigger 中,因为在用于延迟任务时,两个触发器可以具有相同的类路径和 kwargs。这对于事件驱动调度来说是不正确的。

class airflow.triggers.base.TriggerEvent(payload, **kwargs)[source]

基类: pydantic.BaseModel

当条件满足时,触发器可以触发的事件。

事件必须具有一个唯一标识值,该值在无论触发器在何处运行都应相同;这是为了确保如果同一触发器在两个位置运行(出于高可用性原因),我们可以对其事件进行去重。

payload: Any = None[source]

要发送回任务的事件负载。

必须是原生 JSON 可序列化的,或在 airflow 序列化代码中注册。

__repr__()[source]
class airflow.triggers.base.TaskSuccessEvent(*, xcoms=None, **kwargs)[source]

基类: BaseTaskEndEvent

生成此事件以使任务成功结束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskFailedEvent(*, xcoms=None, **kwargs)[source]

基类: BaseTaskEndEvent

生成此事件以使任务失败结束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskSkippedEvent(*, xcoms=None, **kwargs)[source]

基类: BaseTaskEndEvent

生成此事件以使任务以 '跳过' 状态结束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
airflow.triggers.base.trigger_event_discriminator(v)[source]
airflow.triggers.base.DiscrimatedTriggerEvent[source]

此条目有帮助吗?