airflow.providers.google.cloud.sensors.pubsub

此模块包含一个 Google PubSub 传感器。

异常

PubSubMessageTransformException

当消息转换为 pubsub 接收格式失败时抛出。

PubSubPullSensor

从 PubSub 订阅中拉取消息,并通过 XCom 传递它们。

模块内容

exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[源]

基类: airflow.exceptions.AirflowException

当消息转换为 pubsub 接收格式失败时抛出。

class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源]

基类: airflow.sensors.base.BaseSensorOperator

从 PubSub 订阅中拉取消息,并通过 XCom 传递它们。

始终等待从订阅中返回至少一条消息。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:从 PubSub 订阅拉取消息

另请参阅

如果您不想等待至少一条消息到来,请改用操作符: PubSubPullOperator

此传感器操作符将从指定的 PubSub 订阅中拉取最多 max_messages 条消息。当订阅返回消息时,poke 方法的条件将被满足,消息将从操作符返回,并通过 XCom 传递给下游任务。

如果将 ack_messages 设置为 True,消息将在返回前立即得到确认;否则,下游任务将负责确认它们。

如果您想要一个不等待消息的非阻塞任务,请改用 PubSubPullOperator

project_idsubscription 是模板化的,因此您可以在其中使用变量。

参数:
  • project_id (str) – 订阅的 Google Cloud 项目 ID(模板化)

  • subscription (str) – Pub/Sub 订阅名称。请勿包含完整的订阅路径。

  • max_messages (int) – 每次 PubSub 拉取请求检索的最大消息数

  • return_immediately (bool) – 如果此字段设置为 true,即使在 Pull 响应中没有可返回的消息,系统也会立即响应。否则,系统可能会等待(有限的时间)直到至少有一条消息可用,而不是返回零条消息。警告:不建议将此字段设置为 true,因为它会对 Pull 操作的性能产生不利影响。我们建议用户不要设置此字段。

  • ack_messages (bool) – 如果为 True,每条消息将立即得到确认,而不是由任何下游任务确认

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可选)用于处理接收到消息的回调函数。其返回值将保存到 XCom 中。如果您正在拉取大量消息,可能需要提供自定义回调函数。如果未提供,默认实现将使用 google.protobuf.json_format.MessageToDict 函数将 ReceivedMessage 对象转换为 JSON 可序列化的字典。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)用于使用短期凭据模拟的服务账号,或获取列表中最后一个账号的 access_token 所需的链式账号列表,最后一个账号将在请求中被模拟。如果设置为字符串,该账号必须授予源账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须将 Service Account Token Creator IAM 角色授予紧接在其前的身份,列表中的第一个账号将此角色授予源账号(模板化)。

  • deferrable (bool) – 在可延迟模式下运行 sensor

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[源]
ui_color = '#ff7f50'[源]
gcp_conn_id = 'google_cloud_default'[源]
project_id[源]
subscription[源]
max_messages = 5[源]
return_immediately = True[源]
ack_messages = False[源]
messages_callback = None[源]
impersonation_chain = None[源]
deferrable = True[源]
poke_interval = 10.0[源]
poke(context)[源]

在派生此类时重写。

execute(context)[源]

如果 deferrable 为 True,Airflow 会在 worker 上运行此方法,并使用 trigger 进行延迟。

execute_complete(context, event)[源]

如果提供了 messages_callback,则执行它;否则,立即返回 trigger 事件消息。

此条目是否有帮助?