airflow.providers.apache.kafka.triggers.await_message

AwaitMessageTrigger

一个 Trigger,等待符合特定条件的 Kafka 消息到达。

模块内容

class airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5)[source]

基类: airflow.triggers.base.BaseTrigger

一个 Trigger,等待符合特定条件的 Kafka 消息到达。

此 Trigger 的消费者行为如下: - 轮询 Kafka 主题以获取消息,如果未返回消息则休眠 - 使用提供的可调用对象处理消息并提交消息偏移量

  • 如果可调用对象返回任何数据,则引发带有返回数据的 TriggerEvent

  • 否则继续处理下一条消息

参数:
  • kafka_config_id (str) – 要使用的连接对象,默认为 “kafka_default”

  • topics (collections.abc.Sequence[str]) – 应该搜索消息的主题(或主题正则表达式)

  • apply_function (str) – 应用于消息以确定匹配条件的函数位置。(以 Python 点符号表示为字符串)

  • apply_function_args (collections.abc.Sequence[Any] | None) – 应用于可调用对象的一组参数,默认为 None

  • apply_function_kwargs (dict[Any, Any] | None) – 应用于可调用对象的一组关键字参数,默认为 None,默认为 None

  • poll_timeout (float) – Kafka 客户端在从 Kafka 的 poll 请求返回之前应等待多久(秒),默认为 1

  • poll_interval (float) – Trigger 在到达 Kafka 日志末尾后应休眠多久(秒),默认为 5

topics[source]
apply_function[source]
apply_function_args = ()[source]
apply_function_kwargs[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
serialize()[source]

返回重建此 Trigger 所需的信息。

返回:

元组 (类路径,重新实例化所需的关键字参数)。

返回类型:

tuple[str, dict[str, Any]]

async run()[source]

在异步上下文中运行 Trigger。

Trigger 应在需要触发事件时 yield 一个 Event,并在完成时返回 None。因此,单事件 Trigger 应该 yield 后立即返回。

如果它 yield,很可能会很快恢复,但也可能不会(例如,如果工作负载正在转移到另一个 triggerer 进程,或者多事件 trigger 用于单事件任务延迟)。

在任何情况下,Trigger 类都应假定它们会被持久化,并在不再需要时依赖 cleanup() 方法被调用。

本条目是否有帮助?