airflow.providers.google.cloud.hooks.pubsub

此模块包含一个 Google Pub/Sub Hook。

异常

PubSubException

Exception 的别名。

PubSubHook

用于访问 Google Pub/Sub 的 Hook。

PubSubAsyncHook

获取 Google Cloud PubSub 异步 Hook 的类。

模块内容

exception airflow.providers.google.cloud.hooks.pubsub.PubSubException[source]

基类: Exception

Exception 的别名。

class airflow.providers.google.cloud.hooks.pubsub.PubSubHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, enable_message_ordering=False, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

用于访问 Google Pub/Sub 的 Hook。

操作所应用的 Google Cloud 项目由 gcp_conn_id 引用的连接中嵌入的项目确定。

enable_message_ordering = False[source]
get_conn()[source]

检索 Google Cloud Pub/Sub 的连接。

返回:

Google Cloud Pub/Sub 客户端对象。

返回类型:

google.cloud.pubsub_v1.PublisherClient

property subscriber_client: google.cloud.pubsub_v1.SubscriberClient[source]

创建 SubscriberClient。

返回:

Google Cloud Pub/Sub 客户端对象。

返回类型:

google.cloud.pubsub_v1.SubscriberClient

publish(topic, messages, project_id=PROVIDE_PROJECT_ID)[source]

发布消息到 Pub/Sub 主题。

参数:
create_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=())[source]

创建一个 Pub/Sub 主题,如果它尚不存在。

参数:
  • topic (str) – 要创建的 Pub/Sub 主题名称;不要包含 projects/{project}/topics/ 前缀。

  • project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • fail_if_exists (bool) – 如果设置,则在主题已存在时引发异常

  • labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels

  • message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制发布到主题的消息可能存储在哪些 Google Cloud 区域的策略。如果不存在,则没有限制生效。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]

  • kms_key_name (str | None) – 用于保护对此主题上发布的消息访问的 Cloud KMS CryptoKey 的资源名称。预期格式为 projects/*/locations/*/keyRings/*/cryptoKeys/*

  • schema_settings (dict | google.cloud.pubsub_v1.types.SchemaSettings) – (可选)用于针对现有模式验证发布的消息的设置。预期格式为 projects/*/schemas/*

  • message_retention_duration (str | None) – (可选)指示消息发布到主题后保留的最短时长。预期格式是秒数,最多包含九位小数,以“s”结尾。示例:“3.5s”。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

delete_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]

删除一个 Pub/Sub 主题,如果它存在。

参数:
  • topic (str) – 要删除的 Pub/Sub 主题名称;不要包含 projects/{project}/topics/ 前缀。

  • project_id (str) – 可选,要在其中删除主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • fail_if_not_exists (bool) – 如果设置,则在主题不存在时引发异常

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

create_subscription(topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=())[source]

创建一个 Pub/Sub 订阅,如果它尚不存在。

参数:
  • topic (str) – 订阅将绑定的 Pub/Sub 主题名称;不要包含 projects/{project}/subscriptions/ 前缀。

  • project_id (str) – 可选,订阅将绑定的主题所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • subscription (str | None) – Pub/Sub 订阅名称。如果为空,将使用 uuid 模块生成一个随机名称

  • subscription_project_id (str | None) – 将在其中创建订阅的 Google Cloud 项目 ID。如果未指定,将使用 project_id

  • ack_deadline_secs (int) – 订阅者确认从订阅中拉取的每条消息所需的秒数。

  • fail_if_exists (bool) – 如果设置,则在主题已存在时引发异常

  • push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此订阅使用推送交付,此字段用于配置它。空的 pushConfig 表示订阅者将使用 API 方法拉取和确认消息。

  • retain_acked_messages (bool | None) – 指示是否保留已确认的消息。如果为 true,则消息不会从订阅的积压中删除,即使已确认,直到它们超出 message_retention_duration 窗口。如果您想 Seek 到时间戳,则此值必须为 true。

  • message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 从消息发布之时起,未确认消息在订阅积压中保留多长时间。如果 retain_acked_messages 为 true,则这也配置了已确认消息的保留时间,从而配置了可以回溯到多久之前的 Seek 操作。默认为 7 天。不能超过 7 天或少于 10 分钟。

  • labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels

  • enable_message_ordering (bool) – 如果为 true,则在 PubsubMessage 中使用相同 ordering_key 发布的消息将按照 Pub/Sub 系统接收到的顺序传递给订阅者。否则,它们可能以任何顺序传递。

  • expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此订阅过期条件的策略。只要任何已连接的订阅者成功消费来自订阅的消息或正在对订阅执行操作,订阅就被视为处于活动状态。如果未设置 expiration_policy,将使用默认的 ttl 为 31 天的策略。expiration_policy.ttl 的最小允许值为 1 天。

  • filter – 使用 Cloud Pub/Sub 过滤语言编写的表达式。如果非空,则只有其 attributes 字段与过滤器匹配的 PubsubMessages 才会在此订阅上交付。如果为空,则不过滤任何消息。

  • dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定此订阅中死信消息条件的策略。如果未设置 dead_letter_policy,则禁用死信功能。

  • retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何为此订阅重试消息交付的策略。如果未设置,则应用默认重试策略。这通常意味着对于健康的订阅者,消息将尽快重试。RetryPolicy 将在对给定消息进行 NACK 或超出确认截止时间事件时触发。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

返回:

订阅名称,如果未提供 subscription 参数,则为系统生成的值

返回类型:

str

delete_subscription(subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]

删除一个 Pub/Sub 订阅,如果它存在。

参数:
  • subscription (str) – 要删除的 Pub/Sub 订阅名称;不要包含 projects/{project}/subscriptions/ 前缀。

  • project_id (str) – 可选,订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • fail_if_not_exists (bool) – 如果设置,则在主题不存在时引发异常

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[source]

从 Pub/Sub 订阅中最多拉取 max_messages 条消息。

参数:
  • subscription (str) – 要从中拉取的 Pub/Sub 订阅名称;不要包含 'projects/{project}/topics/' 前缀。

  • max_messages (int) – 从 Pub/Sub API 返回的最大消息数量。

  • project_id (str) – 可选,订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • return_immediately (bool) – 如果设置,Pub/Sub API 将在没有可用消息时立即返回。否则,请求将阻塞一段未公开但有限的时间。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

返回:

一个 Pub/Sub ReceivedMessage 对象列表,每个对象包含一个 ackId 属性和一个 message 属性,message 属性包含 base64 编码的消息内容。请参阅 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回类型:

list[google.cloud.pubsub_v1.types.ReceivedMessage]

acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[source]

确认与 Pub/Sub 订阅中的 ack_ids 关联的消息。

参数:
  • subscription (str) – 要删除的 Pub/Sub 订阅名称;不要包含 'projects/{project}/topics/' 前缀。

  • ack_ids (list[str] | None) – 来自先前拉取响应的 ReceivedMessage ackIds 列表。与 messages 参数互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要确认的 ReceivedMessage 对象列表。与 ack_ids 参数互斥。

  • project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目名称或 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,请求将不会重试。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

class airflow.providers.google.cloud.hooks.pubsub.PubSubAsyncHook(project_id=PROVIDE_PROJECT_ID, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

获取 Google Cloud PubSub 异步 Hook 的类。

sync_hook_class[source]
project_id = None[source]
async acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[source]

确认与 Pub/Sub 订阅中的 ack_ids 关联的消息。

参数:
  • subscription (str) – 要删除的 Pub/Sub 订阅名称;不要包含 'projects/{project}/topics/' 前缀。

  • ack_ids (list[str] | None) – 来自先前拉取响应的 ReceivedMessage ackIds 列表。与 messages 参数互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要确认的 ReceivedMessage 对象列表。与 ack_ids 参数互斥。

  • project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目名称或 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 用于重试请求的重试对象。如果指定为 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

async pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[source]

从 Pub/Sub 订阅中最多拉取 max_messages 条消息。

参数:
  • subscription (str) – 要从中拉取的 Pub/Sub 订阅名称;不要包含 'projects/{project}/topics/' 前缀。

  • max_messages (int) – 从 Pub/Sub API 返回的最大消息数量。

  • project_id (str) – 可选,订阅所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • return_immediately (bool) – 如果设置,Pub/Sub API 将在没有可用消息时立即返回。否则,请求将阻塞一段未公开但有限的时间。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可选) 用于重试请求的重试对象。如果指定为 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则 timeout 适用于每次尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

返回:

一个 Pub/Sub ReceivedMessage 对象列表,每个对象包含一个 ackId 属性和一个 message 属性,message 属性包含 base64 编码的消息内容。请参阅 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回类型:

list[google.cloud.pubsub_v1.types.ReceivedMessage]

此条目有帮助吗?