airflow.providers.apache.kafka.operators.consume

模块内容

ConsumeFromTopicOperator

一个从 Kafka 主题(或多个主题)消费并处理消息的操作符。

属性

VALID_COMMIT_CADENCE

airflow.providers.apache.kafka.operators.consume.VALID_COMMIT_CADENCE[源代码]
class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[源代码]

基类:airflow.models.BaseOperator

一个从 Kafka 主题(或多个主题)消费并处理消息的操作符。

该操作符创建一个 Kafka 消费者,从集群读取一批消息,并使用用户提供的可调用函数处理它们。消费者将继续批量读取,直到到达日志末尾或读取到最大消息数。

参数
  • kafka_config_id (str) – 要使用的连接对象,默认为“kafka_default”

  • topics (str | collections.abc.Sequence[str]) – 消费者应订阅的主题列表或正则表达式模式。

  • apply_function (Callable[Ellipsis, Any] | str | None) – 应该一次应用于获取的消息的函数。执行函数的 dag 文件名和函数名用 . 分隔。

  • apply_function_batch (Callable[Ellipsis, Any] | str | None) – 应该应用于一批获取的消息的函数。不能与 apply_function 一起使用。适用于事务性工作负载,在对消息执行操作之前或之后可能调用昂贵的任务。

  • apply_function_args (collections.abc.Sequence[Any] | None) – 应该应用于可调用函数的其他参数,默认为 None

  • apply_function_kwargs (dict[Any, Any] | None) – 应该应用于可调用函数的其他关键字参数,默认为 None

  • commit_cadence (str | None) – 消费者何时提交偏移量(“never”、“end_of_batch”、“end_of_operator”),默认为 “end_of_operator”;如果为 end_of_operator,则根据 max_messages 参数调用 commit()。提交是在操作符处理完操作符中最大消息数量的 apply_function 方法之后进行的。如果为 end_of_batch,则根据 max_batch_size 参数调用 commit()。提交是在每批消息都被 apply_function 方法处理之后进行的。如果为 never,则在不调用 commit() 方法的情况下调用 close()。

  • max_messages (int | None) – 操作符应从 Kafka 读取的最大消息总数,默认为 None,表示读取到主题末尾。

  • max_batch_size (int) – 消费者在轮询时应读取的最大消息数,默认为 1000

  • poll_timeout (float) – Kafka 消费者在确定没有更多可用消息之前应等待的时间,默认为 60

另请参阅

有关如何使用此操作符的更多信息,请查看指南:ConsumeFromTopicOperator

BLUE = '#ffefeb'[源代码]
ui_color[源代码]
template_fields = ('topics', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文是与渲染 jinja 模板时使用的相同的字典。

有关更多上下文,请参阅 get_template_context。

此条目是否有帮助?