airflow.providers.apache.kafka.triggers.await_message

模块内容

AwaitMessageTrigger

一个触发器,等待符合特定条件的消息到达 Kafka。

class airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5)[source]

基类: airflow.triggers.base.BaseTrigger

一个触发器,等待符合特定条件的消息到达 Kafka。

此触发器的消费者行为如下: - 轮询 Kafka 主题以获取消息,如果没有返回消息,则休眠 - 使用提供的可调用对象处理消息并提交消息偏移量

  • 如果可调用对象返回任何数据,则使用返回数据引发 TriggerEvent

  • 否则继续下一条消息

参数
  • kafka_config_id (str) – 要使用的连接对象,默认为“kafka_default”

  • topics (collections.abc.Sequence[str]) – 应在其中搜索消息的主题(或主题正则表达式)

  • apply_function (str) – 应用于消息以确定匹配条件的功能的位置。(在 Python 中,以点号分隔的字符串)

  • apply_function_args (collections.abc.Sequence[Any] | None) – 应用于可调用对象的一组参数,默认为 None

  • apply_function_kwargs (dict[Any, Any] | None) – 应用于可调用对象的一组关键字参数,默认为 None,默认为 None

  • poll_timeout (float) – Kafka 客户端在从对 Kafka 的轮询请求返回之前应等待多长时间(秒),默认为 1

  • poll_interval (float) – 触发器在到达 Kafka 日志末尾后应休眠多长时间(秒),默认为 5

serialize()[source]

返回重建此触发器所需的信息。

返回

(类路径,重新实例化所需的关键字参数)的元组。

返回类型

tuple[str, dict[str, Any]]

async run()[source]

在异步上下文中运行触发器。

触发器应在想要触发事件时生成一个 Event,如果已完成则返回 None。因此,单事件触发器应先生成,然后立即返回。

如果生成事件,则很可能会很快恢复,但也可能不会(例如,如果工作负载正在被移动到另一个触发器进程,或者多事件触发器被用于单事件任务延迟)。

在任何一种情况下,触发器类都应假定它们将被持久化,然后依赖于在不再需要它们时调用 cleanup()。

此条目是否有帮助?