airflow.providers.google.cloud.sensors.pubsub
¶
此模块包含 Google PubSub 传感器。
模块内容¶
类¶
从 PubSub 订阅中拉取消息并通过 XCom 传递它们。 |
- exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[源代码]¶
基类:
airflow.exceptions.AirflowException
当消息无法转换 PubSub 接收的格式时引发。
- class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]¶
基类:
airflow.sensors.base.BaseSensorOperator
从 PubSub 订阅中拉取消息并通过 XCom 传递它们。
始终等待至少从订阅返回一条消息。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:从 PubSub 订阅中拉取消息
另请参阅
如果您不想等待至少一条消息到达,请改用操作符:
PubSubPullOperator
此传感器操作符将从指定的 PubSub 订阅中拉取最多
max_messages
条消息。当订阅返回消息时,将满足 poke 方法的标准,并且消息将从操作符返回并通过 XCom 传递给下游任务。如果
ack_messages
设置为 True,消息将在返回之前立即被确认,否则,下游任务将负责确认它们。如果您想要一个非阻塞的任务,不需要等待消息,请改用
PubSubPullOperator
。project_id
和subscription
是模板化的,因此您可以在其中使用变量。- 参数
project_id (str) – 订阅的 Google Cloud 项目 ID(模板化)
subscription (str) – Pub/Sub 订阅名称。不包括完整的订阅路径。
max_messages (int) – 每个 PubSub 拉取请求要检索的最大消息数
return_immediately (bool) – 如果此字段设置为 true,即使
Pull
响应中没有可返回的消息,系统也会立即响应。否则,系统可能会等待(在有限的时间内),直到至少有一条消息可用,而不是返回没有消息。警告:不鼓励将此字段设置为true
,因为它会对Pull
操作的性能产生不利影响。我们建议用户不要设置此字段。ack_messages (bool) – 如果为 True,每条消息将立即被确认,而不是由任何下游任务确认
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可选)用于处理接收到的消息的回调。其返回值将保存到 XCom。如果您要拉取大型消息,您可能需要提供自定义回调。如果未提供,则默认实现将使用 google.protobuf.json_format.MessageToDict 函数将 ReceivedMessage 对象转换为 JSON 可序列化的字典。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接在前身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(模板化)。
deferrable (bool) – 在可延迟模式下运行传感器
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[源代码]¶