airflow.providers.apache.kafka.operators.consume

属性

VALID_COMMIT_CADENCE

ConsumeFromTopicOperator

一个 operator,用于从 Kafka 主题(或多个主题)消费消息并处理它们。

模块内容

airflow.providers.apache.kafka.operators.consume.VALID_COMMIT_CADENCE[源码]
class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[源码]

基类: airflow.models.BaseOperator

一个 operator,用于从 Kafka 主题(或多个主题)消费消息并处理它们。

此 operator 创建一个 Kafka 消费者,该消费者从集群读取一批消息,并使用用户提供的可调用函数处理这些消息。消费者将继续批量读取,直到到达日志末尾或达到最大消息数为止。

参数:
  • kafka_config_id (str) – 要使用的连接对象,默认为 "kafka_default"

  • topics (str | collections.abc.Sequence[str]) – 消费者应订阅的主题列表或正则表达式模式。

  • apply_function (Callable[Ellipsis, Any] | str | None) – 应该应用于每次获取一个消息的函数。字符串格式为执行该函数的 dag 文件名和函数名,用 . 分隔。

  • apply_function_batch (Callable[Ellipsis, Any] | str | None) – 应该应用于获取的消息批次的函数。不能与 apply_function 一起使用。旨在用于事务性工作负载,其中可能在对消息进行操作之前或之后调用耗时任务。

  • apply_function_args (collections.abc.Sequence[Any] | None) – 应该应用于可调用函数的额外位置参数,默认为 None

  • apply_function_kwargs (dict[Any, Any] | None) – 应该应用于可调用函数的额外关键字参数,默认为 None

  • commit_cadence (str | None) – 消费者何时应提交 offset ("never", "end_of_batch","end_of_operator"),默认为 "end_of_operator";如果为 end_of_operator,则根据 max_messages 参数调用 commit()。Operator 在处理完 operator 中的最大消息数量(通过 apply_function 方法处理)后进行提交。如果为 end_of_batch,则根据 max_batch_size 参数调用 commit()。Operator 在处理完每个批次的所有消息(通过 apply_function 方法处理)后进行提交。如果为 never,则调用 close() 而不调用 commit() 方法。

  • max_messages (int | None) – operator 应从 Kafka 读取的最大总消息数,默认为 None 表示读取到主题末尾。

  • max_batch_size (int) – 消费者 polling 时应读取的最大消息数,默认为 1000

  • poll_timeout (float) – Kafka 消费者在判断没有更多可用消息之前应等待的时间,默认为 60

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:ConsumeFromTopicOperator

BLUE = '#ffefeb'[源码]
ui_color = '#ffefeb'[源码]
template_fields = ('topics', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[源码]
topics[源码]
apply_function = None[源码]
apply_function_batch = None[源码]
apply_function_args = ()[源码]
apply_function_kwargs[源码]
kafka_config_id = 'kafka_default'[源码]
commit_cadence = 'end_of_operator'[源码]
max_messages = True[源码]
max_batch_size = 1000[源码]
poll_timeout = 60[源码]
execute(context)[源码]

创建 operator 时需要派生此方法。

context 是用于渲染 jinja 模板的同一个字典。

请参考 get_template_context 获取更多 context 信息。

此条目是否有帮助?