Apache Kafka 操作器

ConsumeFromTopicOperator

一个从一个或多个 Kafka 主题消费并处理消息的操作器。该操作器创建一个 Kafka 消费者,从集群中读取一批消息,并使用用户提供的可调用对象 apply_function 对其进行处理。消费者将继续批量读取,直到到达日志末尾或达到读取的最大消息数 (max_messages)。

有关参数定义,请查看 ConsumeFromTopicOperator

使用操作器

tests/system/providers/apache/kafka/example_dag_hello_kafka.py[源代码]

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

参考

有关更多信息,请参阅 Apache Kafka 消费者文档

ProduceToTopicOperator

一个将消息生成到 Kafka 主题的操作器。该操作器将生成由用户提供的 producer_function 创建的键/值对消息。

有关参数定义,请查看 ProduceToTopicOperator

使用操作器

tests/system/providers/apache/kafka/example_dag_hello_kafka.py[源代码]

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

参考

有关更多信息,请参阅 Apache Kafka 生产者文档

此条目有帮助吗?