Apache Kafka 传感器

AwaitMessageSensor

一个传感器,它会推迟执行,直到特定的消息被发布到 Kafka 主题。该传感器将创建一个消费者来读取 Kafka 主题中的消息,直到找到满足 apply_function 参数中定义的条件的消息。如果 apply_function 返回任何数据,则会引发一个 TriggerEvent,并且 AwaitMessageSensor 成功完成。

有关参数定义,请参考 AwaitMessageSensor

使用传感器

tests/system/apache/kafka/example_dag_hello_kafka.py

t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)

参考

更多信息,请参阅 Apache Kafka Consumer 文档

AwaitMessageTriggerFunctionSensor

与上面的 AwaitMessageSensor 类似,该传感器将推迟执行,直到从 Kafka 主题中消费到满足其 apply_function 条件的消息。一旦遇到一个积极事件,AwaitMessageTriggerFunctionSensor 将触发提供给 event_triggered_function 的可调用对象。之后,传感器将再次推迟,继续消费消息。

有关参数定义,请参考 AwaitMessageTriggerFunctionSensor

使用传感器

tests/system/apache/kafka/example_dag_event_listener.py

listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)

参考

更多信息,请参阅 Apache Kafka Consumer 文档

本条目有帮助吗?