Google Cloud PubSub Operator(运算符)¶
Google Cloud PubSub 是一项全托管的实时消息服务,允许您在独立应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或互联网其他地方的系统和组件。
发布者应用程序可以将消息发送到某个主题,而其他应用程序可以订阅该主题来接收消息。通过解耦发送方和接收方,Google Cloud PubSub 允许开发者在独立编写的应用程序之间进行通信。
先决条件任务¶
要使用这些 Operator(运算符),您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,详情请参阅 Google Cloud 文档。
启用 API,详情请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息。
创建 PubSub 主题¶
PubSub 主题是发布者发送消息的命名资源。PubSubCreateTopicOperator
operator(运算符)用于创建主题。
tests/system/google/cloud/pubsub/example_pubsub.py
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
)
创建 PubSub 订阅¶
订阅(Subscription
)是一个命名资源,表示来自单个特定主题的消息流,将传递给订阅应用程序。PubSubCreateSubscriptionOperator
operator(运算符)用于创建订阅。
tests/system/google/cloud/pubsub/example_pubsub.py
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
)
发布 PubSub 消息¶
消息(Message
)是发布者发送到主题的数据和(可选)属性的组合,最终会传递给订阅者。PubSubPublishMessageOperator
operator(运算符)用于发布消息。
tests/system/google/cloud/pubsub/example_pubsub.py
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=PROJECT_ID,
topic=TOPIC_ID,
messages=[MESSAGE, MESSAGE],
)
从 PubSub 订阅拉取(Pull)消息¶
PubSubPullSensor
sensor(传感器)从 PubSub 订阅中拉取消息,并通过 XCom 传递它们。
tests/system/google/cloud/pubsub/example_pubsub.py
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
此外,对于此操作,您可以在可推迟模式(deferrable mode)下使用 sensor(传感器)。
tests/system/google/cloud/pubsub/example_pubsub_deferrable.py
pull_messages_async = PubSubPullSensor(
task_id="pull_messages_async",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
deferrable=True,
)
tests/system/google/cloud/pubsub/example_pubsub.py
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages_operator",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
要从 XCom 拉取消息,请使用 BashOperator
。
tests/system/google/cloud/pubsub/example_pubsub.py
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
tests/system/google/cloud/pubsub/example_pubsub.py
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
删除 PubSub 订阅¶
PubSubDeleteSubscriptionOperator
operator(运算符)用于删除订阅。
tests/system/google/cloud/pubsub/example_pubsub.py
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=PROJECT_ID,
subscription=subscription,
)
删除 PubSub 主题¶
PubSubDeleteTopicOperator
operator(运算符)用于删除主题。
tests/system/google/cloud/pubsub/example_pubsub.py
delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)
参考资料¶
更多信息,请参阅