SFTP 传感器

使用 SFTP 协议在服务器中查找特定文件或具有特定模式的文件。要获取有关此传感器的更多信息,请访问 SFTPSensor

tests/system/providers/sftp/example_sftp_sensor.py[源代码]

sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)

我们也可以使用 Taskflow API。它采用与 SFTPSensor 相同的参数,以及 -

op_args(可选)

调用可调用对象时将解包的位置参数列表(模板化)

op_kwargs(可选)

将在函数中解包的关键字参数字典(模板化)

Python 可调用对象返回的任何内容都将放入 XCom 中。

tests/system/providers/sftp/example_sftp_sensor.py[源代码]

@task.sftp_sensor(  # type: ignore[attr-defined]
    task_id="sftp_sensor",  # type: ignore[attr-defined]
    path=FULL_FILE_PATH,
    poke_interval=10,
)
def sftp_sensor_decorator():
    print("Files were successfully found!")
    # add your logic
    return "done"

在可延迟模式下检查 SFTP 服务器上是否存在文件

tests/system/providers/sftp/example_sftp_sensor.py[源代码]

sftp_sensor_with_async = SFTPSensor(
    task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)

此条目有帮助吗?