对象存储

2.8.0 版本新增。

所有主要云提供商都在对象存储中提供持久化数据存储。这些并不是传统的 “POSIX” 文件系统。为了在没有单点故障的情况下存储数百 PB 的数据,对象存储用一种更简单的模型(对象名称 => 数据)取代了传统文件系统的目录树。为了实现远程访问,对象的操作通常以(较慢的)HTTP REST 方式提供。

Airflow 在对象存储之上提供了一个通用抽象层,支持 s3、gcs、Azure Blob Storage 等。该抽象层使您能够在 DAG 中使用多种对象存储系统,而无需为每种系统编写不同的代码。此外,它还能让您使用大多数标准 Python 模块(如 shutil),这些模块可以处理类文件对象。

对特定对象存储系统的支持取决于您已安装的 Provider。例如,如果您已安装 apache-airflow-providers-google,就可以使用 gcs 方案。Airflow 开箱即支持 file 方案。

注意

使用 s3 需要额外安装 apache-airflow-providers-amazon[s3fs]。原因是它依赖 aiobotocore,而后者默认不安装,以免与 botocore 产生依赖冲突。

云对象存储不是实际的文件系统

对象存储虽然表面上看起来像文件系统,但实际上并非真实的文件系统。它们并不支持真实文件系统的全部操作。关键区别包括:

  • 没有保证的原子重命名操作。这意味着将文件从一个位置移动到另一个位置时,会先复制再删除。如果复制失败,文件将会丢失。

  • 目录是模拟的,可能导致操作变慢。例如,列出一个目录可能需要列出桶中全部对象并按前缀过滤。

  • 在文件内部定位(seek)可能需要大量调用开销,影响性能,甚至根本不受支持。

Airflow 依赖 fsspec 在不同对象存储系统之间提供一致的使用体验。它实现了本地文件缓存以加速访问。不过,在设计 DAG 时应当了解对象存储的这些局限性。

基本用法

要使用对象存储,需要使用目标对象的 URI 实例化一个 Path(见下文)对象。例如,要指向 s3 中的一个桶,可按如下方式操作:

from airflow.sdk import ObjectStoragePath

base = ObjectStoragePath("s3://aws_default@my-bucket/")

URI 中的用户名部分代表 Airflow 连接 ID,属于可选项。也可以将其作为单独的关键字参数传入。

# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")

列出文件对象

@task
def list_files() -> list[ObjectStoragePath]:
    files = [f for f in base.iterdir() if f.is_file()]
    return files

在目录树中导航

base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"

# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)

打开文件

@task
def read_file(path: ObjectStoragePath) -> str:
    with path.open() as f:
        return f.read()

利用 XCOM,您可以在任务之间传递路径

@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
    return path / "new_file.txt"


@task
def write_file(path: ObjectStoragePath, content: str):
    with path.open("wb") as f:
        f.write(content)


new_file = create(base)
write = write_file(new_file, b"data")

read >> write

配置

在基本使用场景下,对象存储抽象几乎不需要额外配置,依赖标准的 Airflow 连接机制。也就是说,您可以使用 conn_id 参数指定要使用的连接。连接中的所有设置都会向下传递到底层实现。例如,使用 s3 时,可以通过连接指定 aws_access_key_idaws_secret_access_key,并可额外添加 endpoint_url 以指定自定义端点。

替代后端

可以为特定的 scheme 或 protocol 配置替代后端。这通过在 scheme 上附加一个 backend 来实现。例如,要为 dbfs scheme 启用 databricks 后端,可按以下方式操作:

from airflow.sdk import ObjectStoragePath
from airflow.sdk.io import attach

from fsspec.implementations.dbfs import DBFSFileSystem

attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")

注意

若希望在多个任务之间复用该注册,请确保在 DAG 顶层就附加后端。否则,后端在跨任务使用时将不可用。

路径 API

对象存储抽象实现为 Path API,并基于 Universal Pathlib。这意味着您基本可以使用与本地文件系统相同的 API 与对象存储交互。本节仅列出两者 API 的差异。对标准 Path API 之外的扩展操作(如复制、移动)将在下一节说明。如需了解每个操作接受哪些参数,请参考 ObjectStoragePath 类的文档。

mkdir

在指定路径或桶/容器内创建目录条目。对于没有真实目录概念的系统,这可能仅在当前实例上创建目录条目,不会影响实际文件系统。

如果 parentsTrue,则会根据需要创建该路径缺失的所有父级目录。

touch

在给定路径创建文件,或更新其时间戳。如果 truncateTrue(默认),文件会被截断。若文件已存在且 exists_ok 为真,则操作成功并将修改时间更新为当前时间;否则会抛出 FileExistsError

stat

返回一个类似 stat_result 的对象,支持属性 st_sizest_mtimest_mode,同时也表现为字典,可提供对象的额外元数据。例如,对 s3 会返回诸如 ['ETag','ContentType'] 的额外键。若代码需在不同对象存储之间可移植,请勿依赖这些扩展元数据。

扩展

以下操作不属于标准 Path API,但在对象存储抽象中得到支持。

bucket

返回存储桶名称。

checksum

返回文件的校验和。

container

bucket 的别名

fs

便捷属性,用于访问已实例化的文件系统对象

key

返回对象的键(key)。

namespace

返回对象的命名空间,通常为协议前缀(如 s3://)加上桶名。

path

fsspec 兼容的路径,用于配合文件系统实例使用

protocol

文件系统规范的协议。

read_block

从指定路径的文件中读取一定字节数的块。

从文件的 offset 位置开始读取 length 字节。如果设置了 delimiter,则确保读取的起始和结束都位于 delimiter 边界(即在 offset 与 offset+length 之后的 delimiter 位置)。若 offset 为 0,则从文件开头读取。返回的字节串将包含结束 delimiter。

如果 offset+length 超出文件末尾,则读取至 EOF。

sign

创建一个表示给定路径的签名 URL。某些实现允许生成临时 URL,以实现凭证委托。

size

返回指定路径文件的字节大小。

storage_options

实例化底层文件系统时使用的存储选项。

ukey

用于标识文件属性的哈希,以判断文件是否发生变化。

复制与移动

本节记录了 copymove 操作的预期行为,尤其是跨对象存储(例如 file → s3)时的表现。每个方法都会将 source 中的文件或目录复制/移动到 target 位置。其行为与 fsspec 的规定保持一致。跨对象存储进行目录复制时,Airflow 需要遍历目录树并逐个文件复制,这通过将源文件流式传输至目标实现。

外部集成

许多其他项目(如 DuckDB、Apache Iceberg 等)都可以利用对象存储抽象。通常通过传递底层的 fsspec 实现来完成。例如,下面的代码在 duckdb 中使用 ObjectStoragePath,从而让 Airflow 的连接信息用于连接 s3 并读取 parquet 文件。

import duckdb
from airflow.sdk import ObjectStoragePath

path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")

此条目是否有帮助?