Apache Kafka 传感器

AwaitMessageSensor

一种传感器,它会一直延迟,直到特定消息发布到 Kafka 主题。该传感器将创建一个消费者,从 Kafka 主题读取消息,直到找到满足 apply_function 参数中定义的条件的消息。如果 apply_function 返回任何数据,则会引发 TriggerEvent,并且 AwaitMessageSensor 成功完成。

有关参数定义,请查看 AwaitMessageSensor

使用传感器

tests/system/providers/apache/kafka/example_dag_hello_kafka.py[源代码]

t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)

参考

有关更多信息,请参阅 Apache Kafka 消费者文档

AwaitMessageTriggerFunctionSensor

与上面的 AwaitMessageSensor 类似,此传感器将一直延迟,直到它从 Kafka 主题中消耗一条满足其 apply_function 条件的消息。一旦遇到正事件,AwaitMessageTriggerFunctionSensor 将触发提供给 event_triggered_function 的可调用对象。之后,传感器将再次延迟,继续消耗消息。

有关参数定义,请查看 AwaitMessageTriggerFunctionSensor

使用传感器

tests/system/providers/apache/kafka/example_dag_event_listener.py[源代码]

listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)

参考

有关更多信息,请参阅 Apache Kafka 消费者文档

此条目有帮助吗?