对象存储 XCom 后端

默认的 XCom 后端是 BaseXCom 类,它将 XCom 存储在 Airflow 数据库中。这对于小的值是没问题的,但对于大的值或大量的 XCom 来说可能会有问题。

要启用将 XCom 存储在对象存储中,您可以将 xcom_backend 配置选项设置为 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend。您还需要将 xcom_objectstorage_path 设置为所需的位置。连接 ID 是从您提供的 URL 的用户部分获取的,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key。此外,xcom_objectstorage_threshold 要求大于 -1。任何小于阈值(以字节为单位)的对象将存储在数据库中,而任何大于阈值的对象将存储在对象存储中。这将允许混合设置。如果 XCom 存储在对象存储中,则会在数据库中保存一个引用。最后,您可以将 xcom_objectstorage_compression 设置为 fsspec 支持的压缩方法,例如 zipsnappy,以便在将数据存储到对象存储之前对其进行压缩。

例如,以下配置会将任何大于 1MB 的内容存储在 S3 中,并使用 gzip 对其进行压缩

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip

使用本地文件系统的另一个示例

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = file://airflow/xcoms

本地文件系统方案也可以使用,它与上面显示的文件相同,参见 fsspec

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = local://airflow/xcoms

注意

压缩需要您的 Python 环境中安装了相应的支持。例如,要使用 snappy 压缩,您需要安装 python-snappy。Zip、gzip 和 bz2 可以直接使用。

这篇文章有用吗?