airflow.providers.apache.kafka.operators.consume
¶
模块内容¶
类¶
一个从 Kafka 主题(或多个主题)消费并处理消息的操作符。 |
属性¶
- class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[源代码]¶
基类:
airflow.models.BaseOperator
一个从 Kafka 主题(或多个主题)消费并处理消息的操作符。
该操作符创建一个 Kafka 消费者,从集群读取一批消息,并使用用户提供的可调用函数处理它们。消费者将继续批量读取,直到到达日志末尾或读取到最大消息数。
- 参数
kafka_config_id (str) – 要使用的连接对象,默认为“kafka_default”
topics (str | collections.abc.Sequence[str]) – 消费者应订阅的主题列表或正则表达式模式。
apply_function (Callable[Ellipsis, Any] | str | None) – 应该一次应用于获取的消息的函数。执行函数的 dag 文件名和函数名用 . 分隔。
apply_function_batch (Callable[Ellipsis, Any] | str | None) – 应该应用于一批获取的消息的函数。不能与 apply_function 一起使用。适用于事务性工作负载,在对消息执行操作之前或之后可能调用昂贵的任务。
apply_function_args (collections.abc.Sequence[Any] | None) – 应该应用于可调用函数的其他参数,默认为 None
apply_function_kwargs (dict[Any, Any] | None) – 应该应用于可调用函数的其他关键字参数,默认为 None
commit_cadence (str | None) – 消费者何时提交偏移量(“never”、“end_of_batch”、“end_of_operator”),默认为 “end_of_operator”;如果为 end_of_operator,则根据 max_messages 参数调用 commit()。提交是在操作符处理完操作符中最大消息数量的 apply_function 方法之后进行的。如果为 end_of_batch,则根据 max_batch_size 参数调用 commit()。提交是在每批消息都被 apply_function 方法处理之后进行的。如果为 never,则在不调用 commit() 方法的情况下调用 close()。
max_messages (int | None) – 操作符应从 Kafka 读取的最大消息总数,默认为 None,表示读取到主题末尾。
max_batch_size (int) – 消费者在轮询时应读取的最大消息数,默认为 1000
poll_timeout (float) – Kafka 消费者在确定没有更多可用消息之前应等待的时间,默认为 60
另请参阅
有关如何使用此操作符的更多信息,请查看指南:ConsumeFromTopicOperator