对象存储 XCom 后端

默认的 XCom 后端是 BaseXCom 类,它将 XCom 存储在 Airflow 数据库中。对于小的值,这很好,但对于大的值或大量的 XCom,可能会有问题。

要启用在对象存储中存储 XCom,您可以将 xcom_backend 配置选项设置为 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend。您还需要将 xcom_objectstorage_path 设置为所需的位置。连接 ID 从您提供的 URL 的用户部分获取,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key。此外,xcom_objectstorage_threshold 必须设置为大于 -1 的值。任何小于该阈值(以字节为单位)的对象都将存储在数据库中,任何大于该阈值的对象都将存储在对象存储中。这将允许混合设置。如果 XCom 存储在对象存储上,则将在数据库中保存一个引用。最后,您可以将 xcom_objectstorage_compression 设置为 fsspec 支持的压缩方法,如 zipsnappy,以便在将其存储在对象存储中之前压缩数据。

因此,例如,以下配置会将任何大于 1MB 的数据存储在 S3 中,并使用 gzip 进行压缩

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip

注意

压缩需要您的 Python 环境中安装了对它的支持。例如,要使用 snappy 压缩,您需要安装 python-snappy。Zip、gzip 和 bz2 可以直接使用。

此条目是否有帮助?