Apache Kafka 传感器¶
AwaitMessageSensor¶
一个传感器,它会延迟执行,直到特定的消息发布到 Kafka 主题。该传感器将创建一个消费者,从 Kafka 主题读取消息,直到找到符合 apply_function
参数中定义的标准的消息。如果 apply_function
返回任何数据,则会引发 TriggerEvent
,并且 AwaitMessageSensor
将成功完成。
有关参数定义,请查看 AwaitMessageSensor
。
使用传感器¶
t5 = AwaitMessageSensor(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
参考¶
有关更多信息,请参阅 Apache Kafka 消费者文档。
AwaitMessageTriggerFunctionSensor¶
与上面的 AwaitMessageSensor
类似,此传感器将延迟执行,直到它从 Kafka 主题消耗满足其 apply_function
标准的消息。一旦遇到正事件,AwaitMessageTriggerFunctionSensor
将触发提供给 event_triggered_function
的可调用对象。之后,传感器将再次延迟执行,继续消耗消息。
有关参数定义,请查看 AwaitMessageTriggerFunctionSensor
。
使用传感器¶
listen_for_message = AwaitMessageTriggerFunctionSensor(
kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=wait_for_event,
)
参考¶
有关更多信息,请参阅 Apache Kafka 消费者文档。