airflow.providers.apache.kafka.operators.consume¶
属性¶
类¶
一个 operator,用于从 Kafka 主题(或多个主题)消费消息并处理它们。 |
模块内容¶
- class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[源码]¶
基类:
airflow.models.BaseOperator
一个 operator,用于从 Kafka 主题(或多个主题)消费消息并处理它们。
此 operator 创建一个 Kafka 消费者,该消费者从集群读取一批消息,并使用用户提供的可调用函数处理这些消息。消费者将继续批量读取,直到到达日志末尾或达到最大消息数为止。
- 参数:
kafka_config_id (str) – 要使用的连接对象,默认为 "kafka_default"
topics (str | collections.abc.Sequence[str]) – 消费者应订阅的主题列表或正则表达式模式。
apply_function (Callable[Ellipsis, Any] | str | None) – 应该应用于每次获取一个消息的函数。字符串格式为执行该函数的 dag 文件名和函数名,用 . 分隔。
apply_function_batch (Callable[Ellipsis, Any] | str | None) – 应该应用于获取的消息批次的函数。不能与 apply_function 一起使用。旨在用于事务性工作负载,其中可能在对消息进行操作之前或之后调用耗时任务。
apply_function_args (collections.abc.Sequence[Any] | None) – 应该应用于可调用函数的额外位置参数,默认为 None
apply_function_kwargs (dict[Any, Any] | None) – 应该应用于可调用函数的额外关键字参数,默认为 None
commit_cadence (str | None) – 消费者何时应提交 offset ("never", "end_of_batch","end_of_operator"),默认为 "end_of_operator";如果为 end_of_operator,则根据 max_messages 参数调用 commit()。Operator 在处理完 operator 中的最大消息数量(通过 apply_function 方法处理)后进行提交。如果为 end_of_batch,则根据 max_batch_size 参数调用 commit()。Operator 在处理完每个批次的所有消息(通过 apply_function 方法处理)后进行提交。如果为 never,则调用 close() 而不调用 commit() 方法。
max_messages (int | None) – operator 应从 Kafka 读取的最大总消息数,默认为 None 表示读取到主题末尾。
max_batch_size (int) – 消费者 polling 时应读取的最大消息数,默认为 1000
poll_timeout (float) – Kafka 消费者在判断没有更多可用消息之前应等待的时间,默认为 60
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:ConsumeFromTopicOperator