Apache Kafka 传感器¶
AwaitMessageSensor¶
一个传感器,它会推迟执行,直到特定的消息被发布到 Kafka 主题。该传感器将创建一个消费者来读取 Kafka 主题中的消息,直到找到满足 apply_function
参数中定义的条件的消息。如果 apply_function
返回任何数据,则会引发一个 TriggerEvent
,并且 AwaitMessageSensor
成功完成。
有关参数定义,请参考 AwaitMessageSensor
。
使用传感器¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t5 = AwaitMessageSensor(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
参考¶
更多信息,请参阅 Apache Kafka Consumer 文档。
AwaitMessageTriggerFunctionSensor¶
与上面的 AwaitMessageSensor
类似,该传感器将推迟执行,直到从 Kafka 主题中消费到满足其 apply_function
条件的消息。一旦遇到一个积极事件,AwaitMessageTriggerFunctionSensor
将触发提供给 event_triggered_function
的可调用对象。之后,传感器将再次推迟,继续消费消息。
有关参数定义,请参考 AwaitMessageTriggerFunctionSensor
。
使用传感器¶
tests/system/apache/kafka/example_dag_event_listener.py
listen_for_message = AwaitMessageTriggerFunctionSensor(
kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=wait_for_event,
)
参考¶
更多信息,请参阅 Apache Kafka Consumer 文档。