Airflow 2025 峰会将于 10 月 07-09 日召开。立即注册享早鸟票优惠!

对象存储

在 2.8.0 版本中添加。

所有主要的云提供商都在对象存储中提供持久化数据存储。这些不是经典的“POSIX”文件系统。为了存储数百 PB 的数据而没有任何单点故障,对象存储用更简单的对象名 => 数据模型取代了传统的文件系统目录树。为了启用远程访问,对象操作通常作为(较慢的)HTTP REST 操作提供。

Airflow 在对象存储(如 s3、gcs 和 Azure Blob Storage)之上提供了一个通用抽象层。这个抽象层允许你在 DAGs 中使用多种对象存储系统,而无需更改代码来处理每一种不同的对象存储系统。此外,它还允许你使用大多数可以处理类文件对象的标准 Python 模块,例如 shutil

对特定对象存储系统的支持取决于你已安装的 Provider(提供者)。例如,如果你安装了 apache-airflow-providers-google Provider(提供者),你就可以使用 gcs Scheme(方案)进行对象存储。默认情况下,Airflow 支持 file Scheme(方案)。

注意

支持 s3 需要你安装 apache-airflow-providers-amazon[s3fs]。这是因为它依赖于 aiobotocore,该库默认不会安装,因为它可能与 botocore 产生依赖冲突。

云对象存储不是真正的文件系统

对象存储看起来像文件系统,但它们并非真正的文件系统。它们不支持真正的文件系统所支持的所有操作。主要区别包括:

  • 不保证原子重命名操作。这意味着如果你将文件从一个位置移动到另一个位置,它会被先复制然后删除。如果复制失败,你将丢失文件。

  • 目录是模拟的,可能导致对其操作变慢。例如,列出目录可能需要列出 bucket 中的所有对象并按前缀过滤它们。

  • 在文件内进行查找可能需要显著的调用开销,损害性能,或者根本不受支持。

Airflow 依赖 fsspec 来在不同对象存储系统之间提供一致的使用体验。它实现了本地文件缓存以加速访问。但是,在设计 DAGs 时,你应该了解对象存储的限制。

基本用法

要使用对象存储,你需要使用要交互的对象的 URI 实例化一个 Path(参见下文)对象。例如,要指向 s3 中的一个 bucket,你可以这样做:

from airflow.sdk import ObjectStoragePath

base = ObjectStoragePath("s3://aws_default@my-bucket/")

URI 中的用户名部分代表 Airflow 连接 ID,是可选的。它也可以作为单独的关键字参数传入

# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")

列出文件对象

@task
def list_files() -> list[ObjectStoragePath]:
    files = [f for f in base.iterdir() if f.is_file()]
    return files

在目录树内导航

base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"

# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)

打开文件

@task
def read_file(path: ObjectStoragePath) -> str:
    with path.open() as f:
        return f.read()

利用 XCOM,你可以在任务之间传递路径

@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
    return path / "new_file.txt"


@task
def write_file(path: ObjectStoragePath, content: str):
    with path.open("wb") as f:
        f.write(content)


new_file = create(base)
write = write_file(new_file, b"data")

read >> write

配置

在其基本用法中,对象存储抽象层不需要太多配置,并依赖于标准的 Airflow 连接机制。这意味着你可以使用 conn_id 参数来指定要使用的连接。连接中的任何设置都会下推到底层实现。例如,如果你使用 s3,你可以指定 aws_access_key_idaws_secret_access_key,也可以添加额外的参数,例如 endpoint_url 来指定自定义端点。

备选后端

可以为 Scheme(方案)或 Protocol(协议)配置备选后端。这通过将 backend 附加到 Scheme(方案)来完成。例如,要为 dbfs Scheme(方案)启用 Databricks 后端,你可以执行以下操作:

from airflow.sdk import ObjectStoragePath
from airflow.sdk.io import attach

from fsspec.implementations.dbfs import DBFSFileSystem

attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")

注意

为了在任务之间重用注册,请确保在 DAG 的顶层附加后端。否则,该后端将无法在多个任务之间使用。

路径 API

对象存储抽象被实现为一个 Path API,并构建在 Universal Pathlib 之上。这意味着你与对象存储交互时,基本上可以使用与本地文件系统相同的 API。在本节中,我们仅列出这两种 API 之间的差异。超出标准 Path API 的扩展操作,例如复制和移动,列在下一节中。有关每个操作的详细信息,例如它们接受哪些参数,请参阅 ObjectStoragePath 类的文档。

mkdir

在指定路径或 bucket/container 内创建一个目录条目。对于没有真正目录的系统,它可能仅为此实例创建一个目录条目,而不影响实际文件系统。

如果 parentsTrue,则会根据需要创建此路径的任何缺失父级。

touch

在此给定路径创建文件,或更新时间戳。如果 truncateTrue,则文件将被截断,这是默认行为。如果文件已存在,当 exists_ok 为 True 时函数会成功(并且其修改时间更新为当前时间),否则会引发 FileExistsError

stat

返回一个类似 stat_result 的对象,该对象支持以下属性:st_sizest_mtimest_mode,但它也像一个字典一样,可以提供有关对象的额外元数据。例如,对于 s3,它将返回额外的键,例如:['ETag', 'ContentType']。如果你的代码需要在不同的对象存储之间移植,请不要依赖扩展元数据。

扩展

以下操作不属于标准 Path API 的一部分,但对象存储抽象层支持它们。

bucket

返回 bucket 名称。

校验和

返回文件的校验和。

container

bucket 的别名

fs

访问已实例化文件系统的便捷属性

key

返回对象 key。

namespace

返回对象的 namespace。通常这是 Protocol(协议),例如带有 bucket 名称的 s3://

path

fsspec 兼容的路径,用于文件系统实例

protocol

filesystem_spec Protocol(协议)。

read_block

从此给定路径的文件中读取一块字节。

从文件 offset 位置开始,读取 length 字节。如果设置了 delimiter(分隔符),则确保读取操作在位于 offset 和 offset + length 范围内的分隔符边界处开始和停止。如果 offset 为零,则从零开始。返回的字节串将包含结束分隔符字符串。

如果 offset + length 超出文件末尾 (eof),则读取到 eof。

sign

创建一个表示给定路径的签名 URL。某些实现允许生成临时 URL,作为委托凭证的一种方式。

size

返回给定路径处文件的大小(以字节为单位)。

storage_options

实例化底层文件系统的存储选项。

ukey

文件属性的哈希值,用于判断文件是否已更改。

复制和移动

本文档记录了 copymove 操作的预期行为,特别是跨对象存储(例如文件 -> s3)的行为。每个方法将文件或目录从 source 位置复制或移动到 target 位置。预期的行为与 fsspec 指定的行为相同。对于跨对象存储的目录复制,Airflow 需要遍历目录树并单独复制每个文件。这是通过将每个文件从源流式传输到目标来完成的。

外部集成

许多其他项目,如 DuckDB、Apache Iceberg 等,都可以利用对象存储抽象。通常这是通过传递底层 fsspec 实现来完成的。为此,ObjectStoragePath 公开了 fs 属性。例如,以下代码适用于 duckdb,以便使用来自 Airflow 的连接详细信息连接到 s3,并读取由 ObjectStoragePath 指示的 Parquet 文件

import duckdb
from airflow.sdk import ObjectStoragePath

path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")

此条目是否有帮助?