airflow.providers.google.cloud.hooks.pubsub
¶
此模块包含一个 Google Pub/Sub Hook。
模块内容¶
类¶
用于访问 Google Pub/Sub 的 Hook。 |
|
用于获取 Google Cloud PubSub 异步 hook 的类。 |
- class airflow.providers.google.cloud.hooks.pubsub.PubSubHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, enable_message_ordering=False, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
用于访问 Google Pub/Sub 的 Hook。
操作所针对的 Google Cloud 项目由 gcp_conn_id 引用的连接中嵌入的项目确定。
- get_conn()[源代码]¶
检索与 Google Cloud Pub/Sub 的连接。
- 返回
Google Cloud Pub/Sub 客户端对象。
- 返回类型
google.cloud.pubsub_v1.PublisherClient
- subscriber_client()[源代码]¶
创建 SubscriberClient。
- 返回
Google Cloud Pub/Sub 客户端对象。
- 返回类型
google.cloud.pubsub_v1.SubscriberClient
- publish(topic, messages, project_id=PROVIDE_PROJECT_ID)[源代码]¶
将消息发布到 Pub/Sub 主题。
- 参数
topic (str) – 要发布到的 Pub/Sub 主题;不包括
projects/{project}/topics/
前缀。messages (list[dict]) – 要发布的消息;如果消息中的数据字段已设置,它应该是一个字节串 (utf-8 编码) https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage
project_id (str) – 可选,要发布到的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用来自 Google Cloud 连接的默认 project_id。
- create_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=())[源代码]¶
如果 Pub/Sub 主题尚不存在,则创建它。
- 参数
topic (str) – 要创建的 Pub/Sub 主题名称;不包括
projects/{project}/topics/
前缀。project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用来自 Google Cloud 连接的默认 project_id。
fail_if_exists (bool) – 如果设置,则在主题已存在时引发异常
labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels
message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 约束消息发布到主题时可以存储的 Google Cloud 区域集的策略。如果不存在,则不生效任何约束。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
kms_key_name (str | None) – 用于保护对发布到此主题的消息的访问的 Cloud KMS CryptoKey 的资源名称。预期格式为
projects/*/locations/*/keyRings/*/cryptoKeys/*
。schema_settings (dict | google.cloud.pubsub_v1.types.SchemaSettings) – (可选)用于根据现有架构验证发布的消息的设置。预期格式为
projects/*/schemas/*
。message_retention_duration (str | None) – (可选) 表示在消息发布到主题后保留消息的最短持续时间。预期格式是以秒为单位的持续时间,最多九个小数位,以“s”结尾。例如:“3.5s”。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- delete_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]¶
如果 Pub/Sub 主题存在,则删除它。
- 参数
topic (str) – 要删除的 Pub/Sub 主题名称;不要包含
projects/{project}/topics/
前缀。project_id (str) – (可选) 要在其中删除主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
fail_if_not_exists (bool) – 如果设置,则在主题不存在时引发异常。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- create_subscription(topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=())[source]¶
如果 Pub/Sub 订阅尚不存在,则创建它。
- 参数
topic (str) – 订阅将绑定到的 Pub/Sub 主题名称;不要包含
projects/{project}/subscriptions/
前缀。project_id (str) – (可选) 订阅将绑定到的主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
subscription (str | None) – Pub/Sub 订阅名称。如果为空,将使用 uuid 模块生成一个随机名称。
subscription_project_id (str | None) – 将在其中创建订阅的 Google Cloud 项目 ID。如果未指定,将使用
project_id
。ack_deadline_secs (int) – 订阅者必须确认从订阅中提取的每条消息的秒数。
fail_if_exists (bool) – 如果设置,则在主题已存在时引发异常
push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此订阅使用推送传递,则此字段用于配置它。空的
pushConfig
表示订阅者将使用 API 方法拉取和确认消息。retain_acked_messages (bool | None) – 指示是否保留已确认的消息。如果为 true,则即使确认了消息,在它们超出
message_retention_duration
窗口之前,也不会从订阅的积压中清除消息。如果您想 Seek 到时间戳,则必须为 true。message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 从发布消息的那一刻起,在订阅的积压中保留未确认消息的时间。如果
retain_acked_messages
为 true,则这也配置了已确认消息的保留,从而配置了Seek
可以向后追溯的时间。默认为 7 天。不能超过 7 天或少于 10 分钟。labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels
enable_message_ordering (bool) – 如果为 true,则在 PubsubMessage 中使用相同 ordering_key 发布的消息将按照 Pub/Sub 系统接收它们的顺序传递给订阅者。否则,它们可以按任意顺序传递。
expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 一项策略,用于指定此订阅过期的条件。只要任何连接的订阅者成功地从订阅中消费消息或正在对订阅发出操作,订阅就被认为是活动的。如果未设置 expiration_policy,则将使用 ttl 为 31 天的默认策略。expiration_policy.ttl 允许的最小值是 1 天。
filter – 使用 Cloud Pub/Sub 过滤器语言编写的表达式。如果非空,则只有属性字段与过滤器匹配的 PubsubMessages 才会在此订阅上交付。如果为空,则不会过滤掉任何消息。
dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 一项策略,用于指定此订阅中死信消息的条件。如果未设置 dead_letter_policy,则禁用死信。
retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 一项策略,用于指定 Pub/Sub 如何为此订阅重试消息传递。如果未设置,则应用默认的重试策略。这通常意味着对于健康的订阅者,将尽快重试消息。对于给定的消息,将对 NACK 或超出确认截止期限的事件触发 RetryPolicy。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- 返回
如果未提供
subscription
参数,则订阅名称将是系统生成的值。- 返回类型
- delete_subscription(subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]¶
如果 Pub/Sub 订阅存在,则删除它。
- 参数
subscription (str) – 要删除的 Pub/Sub 订阅名称;不要包含
projects/{project}/subscriptions/
前缀。project_id (str) – (可选) 订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
fail_if_not_exists (bool) – 如果设置,则在主题不存在时引发异常。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[源代码]¶
从 Pub/Sub 订阅中拉取最多
max_messages
条消息。- 参数
subscription (str) – 要从中拉取的 Pub/Sub 订阅名称;不包括 ‘projects/{project}/topics/’ 前缀。
max_messages (int) – 从 Pub/Sub API 返回的最大消息数。
project_id (str) – 可选,订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
return_immediately (bool) – 如果设置,如果没有可用消息,Pub/Sub API 将立即返回。否则,请求将阻塞一段未公开但有界的时间。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- 返回
Pub/Sub ReceivedMessage 对象的列表,每个对象都包含一个
ackId
属性和一个message
属性,其中包括 base64 编码的消息内容。请参阅 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage- 返回类型
list[google.cloud.pubsub_v1.types.ReceivedMessage]
- acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[源代码]¶
确认与 Pub/Sub 订阅中的
ack_ids
关联的消息。- 参数
subscription (str) – 要删除的 Pub/Sub 订阅名称;不包括 ‘projects/{project}/topics/’ 前缀。
ack_ids (list[str] | None) – 来自先前拉取响应的 ReceivedMessage ackIds 列表。与
messages
参数互斥。messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要确认的 ReceivedMessage 对象列表。与
ack_ids
参数互斥。project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目名称或 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 一个重试对象,用于重试请求。如果指定为 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- class airflow.providers.google.cloud.hooks.pubsub.PubSubAsyncHook(project_id=PROVIDE_PROJECT_ID, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
用于获取 Google Cloud PubSub 异步 hook 的类。
- async acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[源代码]¶
确认与 Pub/Sub 订阅中的
ack_ids
关联的消息。- 参数
subscription (str) – 要删除的 Pub/Sub 订阅名称;不包括 ‘projects/{project}/topics/’ 前缀。
ack_ids (list[str] | None) – 来自先前拉取响应的 ReceivedMessage ackIds 列表。与
messages
参数互斥。messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要确认的 ReceivedMessage 对象列表。与
ack_ids
参数互斥。project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目名称或 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- async pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[源代码]¶
从 Pub/Sub 订阅中拉取最多
max_messages
条消息。- 参数
subscription (str) – 要从中拉取的 Pub/Sub 订阅名称;不包括 ‘projects/{project}/topics/’ 前缀。
max_messages (int) – 从 Pub/Sub API 返回的最大消息数。
project_id (str) – 可选,订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
return_immediately (bool) – 如果设置,如果没有可用消息,Pub/Sub API 将立即返回。否则,请求将阻塞一段未公开但有界的时间。
retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。
timeout (float | None) – (可选) 请求完成的等待时间,以秒为单位。请注意,如果指定了重试,则超时适用于每次单独的尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选) 提供给方法的其他元数据。
- 返回
Pub/Sub ReceivedMessage 对象的列表,每个对象都包含一个
ackId
属性和一个message
属性,其中包括 base64 编码的消息内容。请参阅 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage- 返回类型
list[google.cloud.pubsub_v1.types.ReceivedMessage]