Google Cloud PubSub 操作符¶
Google Cloud PubSub 是一种完全托管的实时消息服务,允许您在独立的应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或 Internet 其他位置的系统和组件。
发布者应用程序可以将消息发送到主题,其他应用程序可以订阅该主题以接收消息。通过解耦发送者和接收者,Google Cloud PubSub 允许开发人员在独立编写的应用程序之间进行通信。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档 中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
创建 PubSub 主题¶
PubSub 主题是一个命名资源,发布者将消息发送到该资源。PubSubCreateTopicOperator
操作符用于创建主题。
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
)
创建 PubSub 订阅¶
Subscription
是一个命名资源,表示来自单个特定主题的消息流,这些消息将传递到订阅应用程序。PubSubCreateSubscriptionOperator
操作符用于创建订阅。
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
)
发布 PubSub 消息¶
Message
是发布者发送到主题并最终传递给订阅者的数据和(可选)属性的组合。PubSubPublishMessageOperator
操作符用于发布消息。
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=PROJECT_ID,
topic=TOPIC_ID,
messages=[MESSAGE, MESSAGE],
)
从 PubSub 订阅拉取消息¶
PubSubPullSensor
传感器从 PubSub 订阅拉取消息,并通过 XCom 传递它们。
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
此外,对于此操作,您可以使用可延迟模式的传感器
pull_messages_async = PubSubPullSensor(
task_id="pull_messages_async",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
deferrable=True,
)
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages_operator",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
要从 XCom 拉取消息,请使用 BashOperator
。
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
删除 PubSub 订阅¶
PubSubDeleteSubscriptionOperator
操作符用于删除订阅。
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=PROJECT_ID,
subscription=subscription,
)
删除 PubSub 主题¶
PubSubDeleteTopicOperator
操作符用于删除主题。
delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)