Google Cloud 代管式 Apache Kafka 算子

Google Cloud 代管式 Apache Kafka 可帮助您设置、保护、维护和扩展 Apache Kafka 集群。

与 Apache Kafka 集群交互

要创建 Apache Kafka 集群,您可以使用 ManagedKafkaCreateClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

create_cluster = ManagedKafkaCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster=CLUSTER_CONF,
    cluster_id=CLUSTER_ID,
)

要删除集群,您可以使用 ManagedKafkaDeleteClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

delete_cluster = ManagedKafkaDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取集群,您可以使用 ManagedKafkaGetClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

get_cluster = ManagedKafkaGetClusterOperator(
    task_id="get_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要获取集群列表,您可以使用 ManagedKafkaListClustersOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

list_clusters = ManagedKafkaListClustersOperator(
    task_id="list_clusters",
    project_id=PROJECT_ID,
    location=LOCATION,
)

要更新集群,您可以使用 ManagedKafkaUpdateClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

update_cluster = ManagedKafkaUpdateClusterOperator(
    task_id="update_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    cluster=CLUSTER_TO_UPDATE,
    update_mask=CLUSTER_UPDATE_MASK,
)

与 Apache Kafka 主题交互

要创建 Apache Kafka 主题,您可以使用 ManagedKafkaCreateTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

create_topic = ManagedKafkaCreateTopicOperator(
    task_id="create_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_CONF,
)

要删除主题,您可以使用 ManagedKafkaDeleteTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

delete_topic = ManagedKafkaDeleteTopicOperator(
    task_id="delete_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取主题,您可以使用 ManagedKafkaGetTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

get_topic = ManagedKafkaGetTopicOperator(
    task_id="get_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
)

要获取主题列表,您可以使用 ManagedKafkaListTopicsOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

list_topics = ManagedKafkaListTopicsOperator(
    task_id="list_topics",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要更新主题,您可以使用 ManagedKafkaUpdateTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

update_topic = ManagedKafkaUpdateTopicOperator(
    task_id="update_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_TO_UPDATE,
    update_mask=TOPIC_UPDATE_MASK,
)

与 Apache Kafka 消费者组交互

要删除消费者组,您可以使用 ManagedKafkaDeleteConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

delete_consumer_group = ManagedKafkaDeleteConsumerGroupOperator(
    task_id="delete_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取消费者组,您可以使用 ManagedKafkaGetConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

get_consumer_group = ManagedKafkaGetConsumerGroupOperator(
    task_id="get_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
)

要获取消费者组列表,您可以使用 ManagedKafkaListConsumerGroupsOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

list_consumer_groups = ManagedKafkaListConsumerGroupsOperator(
    task_id="list_consumer_groups",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要更新消费者组,您可以使用 ManagedKafkaUpdateConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

update_consumer_group = ManagedKafkaUpdateConsumerGroupOperator(
    task_id="update_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    consumer_group={
        "topics": {},
    },
    update_mask={"paths": ["topics"]},
)

将 Apache Kafka Provider 与 Google Cloud 代管式 Apache Kafka 一起使用

要向主题生产数据,您可以使用 ProduceToTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

produce_to_topic = ProduceToTopicOperator(
    task_id="produce_to_topic",
    kafka_config_id=CONNECTION_ID,
    topic=TOPIC_ID,
    producer_function=producer,
    poll_timeout=10,
)

要从主题消费数据,您可以使用 ConsumeFromTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

consume_from_topic = ConsumeFromTopicOperator(
    task_id="consume_from_topic",
    kafka_config_id=CONNECTION_ID,
    topics=[TOPIC_ID],
    apply_function=consumer,
    poll_timeout=20,
    max_messages=20,
    max_batch_size=20,
)

参考资料

更多信息,请参阅

此条目有帮助吗?