Google Cloud PubSub Operator(运算符)

Google Cloud PubSub 是一项全托管的实时消息服务,允许您在独立应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或互联网其他地方的系统和组件。

发布者应用程序可以将消息发送到某个主题,而其他应用程序可以订阅该主题来接收消息。通过解耦发送方和接收方,Google Cloud PubSub 允许开发者在独立编写的应用程序之间进行通信。

先决条件任务

要使用这些 Operator(运算符),您必须执行以下几项操作

创建 PubSub 主题

PubSub 主题是发布者发送消息的命名资源。PubSubCreateTopicOperator operator(运算符)用于创建主题。

tests/system/google/cloud/pubsub/example_pubsub.py

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )

创建 PubSub 订阅

订阅(Subscription)是一个命名资源,表示来自单个特定主题的消息流,将传递给订阅应用程序。PubSubCreateSubscriptionOperator operator(运算符)用于创建订阅。

tests/system/google/cloud/pubsub/example_pubsub.py

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )

发布 PubSub 消息

消息(Message)是发布者发送到主题的数据和(可选)属性的组合,最终会传递给订阅者。PubSubPublishMessageOperator operator(运算符)用于发布消息。

tests/system/google/cloud/pubsub/example_pubsub.py

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )

从 PubSub 订阅拉取(Pull)消息

PubSubPullSensor sensor(传感器)从 PubSub 订阅中拉取消息,并通过 XCom 传递它们。

tests/system/google/cloud/pubsub/example_pubsub.py

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

此外,对于此操作,您可以在可推迟模式(deferrable mode)下使用 sensor(传感器)。

tests/system/google/cloud/pubsub/example_pubsub_deferrable.py

pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)

tests/system/google/cloud/pubsub/example_pubsub.py


    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

要从 XCom 拉取消息,请使用 BashOperator

tests/system/google/cloud/pubsub/example_pubsub.py

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

tests/system/google/cloud/pubsub/example_pubsub.py

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

删除 PubSub 订阅

PubSubDeleteSubscriptionOperator operator(运算符)用于删除订阅。

tests/system/google/cloud/pubsub/example_pubsub.py

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )

删除 PubSub 主题

PubSubDeleteTopicOperator operator(运算符)用于删除主题。

tests/system/google/cloud/pubsub/example_pubsub.py

    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)

参考资料

更多信息,请参阅

此条目有帮助吗?