airflow.providers.google.cloud.operators.pubsub
¶
此模块包含 Google PubSub 操作符。
模块内容¶
类¶
创建一个 PubSub 主题。 |
|
创建一个 PubSub 订阅。 |
|
删除一个 PubSub 主题。 |
|
删除一个 PubSub 订阅。 |
|
将消息发布到 PubSub 主题。 |
|
从 PubSub 订阅中拉取消息并通过 XCom 传递它们。 |
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, gcp_conn_id='google_cloud_default', labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
创建一个 PubSub 主题。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南:创建 PubSub 主题
默认情况下,如果主题已存在,此操作符不会导致 DAG 失败。
with DAG("successful DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic >> create_topic_again
可以配置该操作符,以便在主题已存在时失败。
with DAG("failing DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator( project_id="my-project", topic="my_new_topic", fail_if_exists=True ) create_topic >> create_topic_again
project_id
和topic
都被模板化,因此你可以在它们的值中使用 Jinja 模板。- 参数
project_id (str) – 可选,要在其中创建主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用来自 Google Cloud 连接的默认 project_id。
topic (str) – 要创建的主题。不要包含完整的主题路径。换句话说,不要使用
projects/{project}/topics/{topic}
,而只提供{topic}
。(已模板化)gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
labels (dict[str, str] | None) – 客户端分配的标签;参见 https://cloud.google.com/pubsub/docs/labels
message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 约束可能存储在主题上的已发布消息的 Google Cloud 区域集合的策略。如果不存在,则不生效任何约束。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
kms_key_name (str | None) – 用于保护对此主题上发布的消息的访问的 Cloud KMS CryptoKey 的资源名称。预期格式为
projects/*/locations/*/keyRings/*/cryptoKeys/*
。retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果未指定 None,则不会重试请求。
timeout (float | None) – (可选)等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的其他元数据。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
创建一个 PubSub 订阅。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:创建 PubSub 订阅
默认情况下,订阅将在
project_id
中创建。如果指定了subscription_project_id
且 Google Cloud 凭据允许,则订阅可以在与主题不同的项目中创建。默认情况下,如果订阅已存在,此操作符不会导致 DAG 失败。但是,主题必须存在于项目中。
with DAG("successful DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription >> create_subscription_again
可以配置此操作符,使其在订阅已存在时失败。
with DAG("failing DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True ) create_subscription >> create_subscription_again
最后,订阅不是必需的。如果未传递,操作符将为订阅的名称生成一个通用唯一标识符。
with DAG("DAG") as dag: PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")
project_id
、topic
、subscription
、subscription_project_id
和impersonation_chain
都是模板化的,因此您可以在其值中使用 Jinja 模板。- 参数
project_id (str) – 可选,主题所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
topic (str) – 要创建的主题。不要包含完整的主题路径。换句话说,不要使用
projects/{project}/topics/{topic}
,而只提供{topic}
。(已模板化)subscription (str | None) – Pub/Sub 订阅名称。如果为空,将使用 uuid 模块生成一个随机名称。
subscription_project_id (str | None) – 将创建订阅的 Google Cloud 项目 ID。如果为空,将使用
topic_project
。ack_deadline_secs (int) – 订阅者确认从订阅中拉取的每条消息的秒数。
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此订阅使用推送交付,则此字段用于配置它。空的
pushConfig
表示订阅者将使用 API 方法拉取和确认消息。retain_acked_messages (bool | None) – 指示是否保留已确认的消息。如果为 true,则即使消息被确认,也不会从订阅的积压中删除,直到它们超出
message_retention_duration
窗口。如果要查找时间戳,则必须为 true。message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 从消息发布的那一刻起,在订阅的积压中保留未确认消息的时间。如果
retain_acked_messages
为 true,则这也配置了已确认消息的保留,从而配置了Seek
可以追溯到的时间。默认为 7 天。不能超过 7 天或少于 10 分钟。labels (dict[str, str] | None) – 客户端分配的标签;参见 https://cloud.google.com/pubsub/docs/labels
enable_message_ordering (bool) – 如果为 true,则在 PubsubMessage 中使用相同 ordering_key 发布的消息将按照 Pub/Sub 系统接收的顺序传递给订阅者。否则,它们可以以任何顺序传递。
expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此订阅过期条件的策略。只要任何连接的订阅者成功地从订阅中消费消息或在订阅上发出操作,就认为订阅处于活动状态。如果未设置 expiration_policy,则将使用 ttl 为 31 天的默认策略。expiration_policy.ttl 的最小允许值为 1 天。
filter – 以 Cloud Pub/Sub 过滤语言编写的表达式。如果非空,则只有 attributes 字段与过滤器匹配的 PubsubMessages 才会在此订阅上交付。如果为空,则不会过滤掉任何消息。
dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定此订阅中死信消息条件的策略。如果未设置 dead_letter_policy,则禁用死信。
retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何为此订阅重试消息传递的策略。如果未设置,则应用默认重试策略。这通常意味着对于健康的订阅者,消息将尽快重试。对于给定消息,RetryPolicy 将在 NACK 或确认截止日期超出事件时触发。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果未指定 None,则不会重试请求。
timeout (float | None) – (可选)等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的其他元数据。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[源]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
删除一个 PubSub 主题。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:删除 PubSub 主题
默认情况下,如果主题不存在,此操作符不会导致 DAG 失败。
with DAG("successful DAG") as dag: PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")
可以配置此操作符,以便在主题不存在时失败。
with DAG("failing DAG") as dag: PubSubDeleteTopicOperator( project_id="my-project", topic="non_existing_topic", fail_if_not_exists=True, )
project_id
和topic
都被模板化,因此你可以在它们的值中使用 Jinja 模板。- 参数
project_id (str) – 可选,要使用的 Google Cloud 项目 ID(已模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
topic (str) – 要删除的主题。不要包含完整的主题路径。换句话说,不要使用
projects/{project}/topics/{topic}
,而只提供{topic}
。(已模板化)fail_if_not_exists (bool) – 如果为 True 且主题不存在,则任务失败
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果未指定 None,则不会重试请求。
timeout (float | None) – (可选)等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的其他元数据。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[源]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[源]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
删除一个 PubSub 订阅。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:删除 PubSub 订阅
默认情况下,如果订阅不存在,此操作符不会导致 DAG 失败。
with DAG("successful DAG") as dag: PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")
可以配置此操作符,使其在订阅已存在时失败。
with DAG("failing DAG") as dag: PubSubDeleteSubscriptionOperator( project_id="my-project", subscription="non-existing", fail_if_not_exists=True, )
project_id
和subscription
已模板化,因此您可以在其值中使用 Jinja 模板。- 参数
project_id (str) – 可选,要使用的 Google Cloud 项目 ID(已模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
subscription (str) – 要删除的订阅。不要包含完整的订阅路径。换句话说,不要使用
projects/{project}/subscription/{subscription}
,而只提供{subscription}
。(已模板化)fail_if_not_exists (bool) – 如果为 True 且订阅不存在,则任务失败
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果未指定 None,则不会重试请求。
timeout (float | None) – (可选)等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次尝试。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的其他元数据。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[源]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', enable_message_ordering=False, impersonation_chain=None, **kwargs)[源]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
将消息发布到 PubSub 主题。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:发布 PubSub 消息
每个任务将所有提供的消息发布到同一个 Google Cloud 项目中的同一个主题。如果主题不存在,此任务将失败。
m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}} m2 = {"data": b"Knock, knock"} m3 = {"attributes": {"foo": ""}} m4 = {"data": b"Who's there?", "attributes": {"ordering_key": "knock_knock"}} t1 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m1, m2, m3], create_topic=True, dag=dag, ) t2 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m4], create_topic=True, enable_message_ordering=True, dag=dag, )
project_id
、topic
和messages
已模板化,因此您可以在其值中使用 Jinja 模板。- 参数
project_id (str) – 可选,要使用的 Google Cloud 项目 ID(已模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
topic (str) – 要发布的主题。不要包含完整的主题路径。换句话说,不要使用
projects/{project}/topics/{topic}
,而只提供{topic}
。(已模板化)messages (list) – 要发布到主题的消息列表。每条消息都是一个字典,其中包含一个或多个以下键值对:* ‘data’:字节串(utf-8 编码)* ‘attributes’:{‘key1’:‘value1’,…} 每条消息必须至少包含一个非空的 ‘data’ 值或一个至少包含一个键的属性字典(已模板化)。请参阅 https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
enable_message_ordering (bool) – 如果为 true,则具有相同 ordering_key 的 PubsubMessage 中发布的消息将按照 Pub/Sub 系统接收它们的顺序传递给订阅者。否则,它们可能会以任何顺序传递。默认为 False。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'messages', 'enable_message_ordering', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
从 PubSub 订阅中拉取消息并通过 XCom 传递它们。
如果队列为空,则返回空列表 - 绝不会等待消息。如果您确实需要等待,请改用
airflow.providers.google.cloud.sensors.PubSubPullSensor
。另请参阅
有关如何使用此操作符和 PubSubPullSensor 的更多信息,请查看指南:从 PubSub 订阅拉取消息
此操作符将从指定的 PubSub 订阅中拉取最多
max_messages
条消息。当订阅返回消息时,消息将立即从操作符返回,并通过 XCom 传递给下游任务。如果
ack_messages
设置为 True,消息将在返回之前立即被确认,否则,下游任务将负责确认它们。project_id `` 和 ``subscription
是模板化的,因此您可以在其值中使用 Jinja 模板。- 参数
project_id (str) – 订阅的 Google Cloud 项目 ID(模板化)
subscription (str) – Pub/Sub 订阅名称。不要包括完整的订阅路径。
max_messages (int) – 每个 PubSub 拉取请求要检索的最大消息数
ack_messages (bool) – 如果为 True,则每条消息将被立即确认,而不是由任何下游任务确认
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可选)用于处理接收消息的回调。其返回值将保存到 XCom。如果要拉取大量消息,可能需要提供自定义回调。如果未提供,默认实现将使用 google.protobuf.json_format.MessageToDict 函数将 ReceivedMessage 对象转换为 JSON 可序列化的字典。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭证模拟,或获取列表中最后一个帐户的 access_token 所需的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则帐户必须授予原始帐户服务帐户令牌创建者 IAM 角色。如果设置为序列,则列表中的标识必须将服务帐户令牌创建者 IAM 角色授予紧接其前的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶