Apache Kafka Operator¶
ConsumeFromTopicOperator¶
一个用于消费一个或多个 Kafka 主题的消息并对其进行处理的 Operator。该 Operator 创建一个 Kafka Consumer,它从集群中读取一批消息,并使用用户提供的可调用函数 apply_function
对其进行处理。Consumer 将持续以批次方式读取,直到达到日志末尾或达到最大读取消息数量 (max_messages
) 为止。
有关参数定义,请参阅 ConsumeFromTopicOperator
。
使用该 Operator¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t2 = ConsumeFromTopicOperator(
kafka_config_id="t2",
task_id="consume_from_topic",
topics=["test_1"],
apply_function="example_dag_hello_kafka.consumer_function",
apply_function_kwargs={"prefix": "consumed:::"},
commit_cadence="end_of_batch",
max_messages=10,
max_batch_size=2,
)
参考¶
更多信息请参阅 Apache Kafka Consumer 文档。
ProduceToTopicOperator¶
一个用于向 Kafka 主题生产消息的 Operator。该 Operator 将生产由用户提供的 producer_function
函数创建的键/值对消息。
有关参数定义,请参阅 ProduceToTopicOperator
。
使用该 Operator¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t1 = ProduceToTopicOperator(
kafka_config_id="t1-3",
task_id="produce_to_topic",
topic="test_1",
producer_function="example_dag_hello_kafka.producer_function",
)
参考¶
更多信息请参阅 Apache Kafka Producer 文档。