SFTP 传感器

使用 SFTP 协议在服务器上查找特定文件或符合特定模式的文件。要获取有关此传感器的更多信息,请访问 SFTPSensor

tests/system/sftp/example_sftp_sensor.py

sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)

我们也可以使用 Taskflow API。它接受与 SFTPSensor 相同的参数,以及:

op_args(可选)

一个位置参数列表,将在调用可调用对象时解包(模板化)

op_kwargs(可选)

一个关键字参数字典,将在您的函数中解包(模板化)

Python 可调用对象返回的任何内容都将放入 XCom。

tests/system/sftp/example_sftp_sensor.py

@task.sftp_sensor(  # type: ignore[attr-defined]
    task_id="sftp_sensor",  # type: ignore[attr-defined]
    path=FULL_FILE_PATH,
    poke_interval=10,
)
def sftp_sensor_decorator():
    print("Files were successfully found!")
    # add your logic
    return "done"

在可延迟模式下检查 SFTP 服务器上是否存在文件

tests/system/sftp/example_sftp_sensor.py

sftp_sensor_with_async = SFTPSensor(
    task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)

本条目有帮助吗?