Apache Kafka Operator

ConsumeFromTopicOperator

一个用于消费一个或多个 Kafka 主题的消息并对其进行处理的 Operator。该 Operator 创建一个 Kafka Consumer,它从集群中读取一批消息,并使用用户提供的可调用函数 apply_function 对其进行处理。Consumer 将持续以批次方式读取,直到达到日志末尾或达到最大读取消息数量 (max_messages) 为止。

有关参数定义,请参阅 ConsumeFromTopicOperator

使用该 Operator

tests/system/apache/kafka/example_dag_hello_kafka.py

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

参考

更多信息请参阅 Apache Kafka Consumer 文档

ProduceToTopicOperator

一个用于向 Kafka 主题生产消息的 Operator。该 Operator 将生产由用户提供的 producer_function 函数创建的键/值对消息。

有关参数定义,请参阅 ProduceToTopicOperator

使用该 Operator

tests/system/apache/kafka/example_dag_hello_kafka.py

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

参考

更多信息请参阅 Apache Kafka Producer 文档

这篇文档有帮助吗?