Google Cloud 代管式 Apache Kafka 算子¶
Google Cloud 代管式 Apache Kafka 可帮助您设置、保护、维护和扩展 Apache Kafka 集群。
与 Apache Kafka 集群交互¶
要创建 Apache Kafka 集群,您可以使用 ManagedKafkaCreateClusterOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
create_cluster = ManagedKafkaCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster=CLUSTER_CONF,
cluster_id=CLUSTER_ID,
)
要删除集群,您可以使用 ManagedKafkaDeleteClusterOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
delete_cluster = ManagedKafkaDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要获取集群,您可以使用 ManagedKafkaGetClusterOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
get_cluster = ManagedKafkaGetClusterOperator(
task_id="get_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要获取集群列表,您可以使用 ManagedKafkaListClustersOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
list_clusters = ManagedKafkaListClustersOperator(
task_id="list_clusters",
project_id=PROJECT_ID,
location=LOCATION,
)
要更新集群,您可以使用 ManagedKafkaUpdateClusterOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
update_cluster = ManagedKafkaUpdateClusterOperator(
task_id="update_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
cluster=CLUSTER_TO_UPDATE,
update_mask=CLUSTER_UPDATE_MASK,
)
与 Apache Kafka 主题交互¶
要创建 Apache Kafka 主题,您可以使用 ManagedKafkaCreateTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
create_topic = ManagedKafkaCreateTopicOperator(
task_id="create_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
topic=TOPIC_CONF,
)
要删除主题,您可以使用 ManagedKafkaDeleteTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
delete_topic = ManagedKafkaDeleteTopicOperator(
task_id="delete_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要获取主题,您可以使用 ManagedKafkaGetTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
get_topic = ManagedKafkaGetTopicOperator(
task_id="get_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
)
要获取主题列表,您可以使用 ManagedKafkaListTopicsOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
list_topics = ManagedKafkaListTopicsOperator(
task_id="list_topics",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要更新主题,您可以使用 ManagedKafkaUpdateTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
update_topic = ManagedKafkaUpdateTopicOperator(
task_id="update_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
topic=TOPIC_TO_UPDATE,
update_mask=TOPIC_UPDATE_MASK,
)
与 Apache Kafka 消费者组交互¶
要删除消费者组,您可以使用 ManagedKafkaDeleteConsumerGroupOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
delete_consumer_group = ManagedKafkaDeleteConsumerGroupOperator(
task_id="delete_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要获取消费者组,您可以使用 ManagedKafkaGetConsumerGroupOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
get_consumer_group = ManagedKafkaGetConsumerGroupOperator(
task_id="get_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
)
要获取消费者组列表,您可以使用 ManagedKafkaListConsumerGroupsOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
list_consumer_groups = ManagedKafkaListConsumerGroupsOperator(
task_id="list_consumer_groups",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要更新消费者组,您可以使用 ManagedKafkaUpdateConsumerGroupOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
update_consumer_group = ManagedKafkaUpdateConsumerGroupOperator(
task_id="update_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
consumer_group={
"topics": {},
},
update_mask={"paths": ["topics"]},
)
将 Apache Kafka Provider 与 Google Cloud 代管式 Apache Kafka 一起使用¶
要向主题生产数据,您可以使用 ProduceToTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
produce_to_topic = ProduceToTopicOperator(
task_id="produce_to_topic",
kafka_config_id=CONNECTION_ID,
topic=TOPIC_ID,
producer_function=producer,
poll_timeout=10,
)
要从主题消费数据,您可以使用 ConsumeFromTopicOperator
。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
consume_from_topic = ConsumeFromTopicOperator(
task_id="consume_from_topic",
kafka_config_id=CONNECTION_ID,
topics=[TOPIC_ID],
apply_function=consumer,
poll_timeout=20,
max_messages=20,
max_batch_size=20,
)
参考资料¶
更多信息,请参阅