Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

云原生工作流与对象存储

在 2.8 版本中添加。

欢迎来到我们 Airflow 系列的最后一个教程!现在,你已经学习了如何使用 Python 和 Taskflow API 构建 DAG,使用 XComs 传递数据,并将任务串联成清晰、可重用的工作流。

在本教程中,我们将进一步介绍 对象存储 API。此 API 使读写云存储变得更容易,例如 Amazon S3、Google Cloud Storage (GCS) 或 Azure Blob Storage,而无需担心特定提供商的 SDK 或低级凭据管理。

我们将带你了解一个真实的用例

  1. 从公共 API 拉取数据

  2. 将数据以 Parquet 格式保存到对象存储

  3. 使用 DuckDB 通过 SQL 进行分析

在此过程中,我们将重点介绍新的 ObjectStoragePath 抽象,解释 Airflow 如何通过连接处理云凭据,并展示这如何实现可移植的、云无关的管道。

重要性

许多数据工作流依赖于文件——无论是原始 CSV、中间 Parquet 文件还是模型 artifact。传统上,你需要为此编写 S3 特定或 GCS 特定的代码。现在,有了 ObjectStoragePath,你可以编写适用于不同提供商的通用代码,只要你配置了正确的 Airflow 连接。

让我们开始吧!

前置条件

开始之前,请确保你已安装以下内容:

  • DuckDB,一个进程内 SQL 数据库:使用 pip install duckdb 安装

  • Amazon S3 访问权限带有 s3fs 的 Amazon Providerpip install apache-airflow-providers-amazon[s3fs] (你可以通过更改存储 URL 协议和安装相应的 Provider 来替换你偏好的 Provider。)

  • Pandas,用于处理表格数据:pip install pandas

创建 ObjectStoragePath

本教程的核心是 ObjectStoragePath,这是一个用于处理云对象存储路径的新抽象。可以将其视为 pathlib.Path,但作用于存储桶而非文件系统。

src/airflow/example_dags/tutorial_objectstorage.py

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

URL 语法很简单:protocol://bucket/path/to/file

  • protocol(例如 s3gsazure)决定了后端

  • URL 的“用户名”部分可以是 conn_id,告诉 Airflow 如何认证

  • 如果省略 conn_id,Airflow 将回退到该后端的默认连接

你也可以将 conn_id 作为关键字参数提供以提高清晰度

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

这在重用其他地方(例如在 Asset 中)定义的路径时,或者连接未直接包含在 URL 中时特别方便。关键字参数始终优先。

提示

你可以在全局 DAG 范围安全地创建一个 ObjectStoragePath。连接仅在使用路径时解析,而不是创建路径时解析。

将数据保存到对象存储

让我们获取一些数据并将其保存到云端。

src/airflow/example_dags/tutorial_objectstorage.py

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path


以下是正在发生的事情:

  • 我们调用芬兰气象研究所的公共 API 获取赫尔辛基空气质量数据

  • 将 JSON 响应解析为 pandas DataFrame

  • 根据任务的逻辑日期生成文件名

  • 使用 ObjectStoragePath,我们将数据直接以 Parquet 格式写入云存储

这是一个经典的 Taskflow 模式。对象 key 每天变化,这使得我们可以每天运行此任务并随时间构建数据集。我们返回最终的对象路径,以便在下一个任务中使用。

这很棒的原因:无需 boto3、无需 GCS 客户端设置、无需凭据处理。只有适用于各种存储后端的简单文件语义。

使用 DuckDB 分析数据

现在,让我们使用 DuckDB 通过 SQL 分析这些数据。

src/airflow/example_dags/tutorial_objectstorage.py

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())


需要注意的几个关键点:

  • DuckDB 原生支持读取 Parquet

  • DuckDB 和 ObjectStoragePath 都依赖于 fsspec,这使得注册对象存储后端变得容易

  • 我们使用 path.fs 获取正确的文件系统对象并将其注册到 DuckDB

  • 最后,我们使用 SQL 查询 Parquet 文件并返回一个 pandas DataFrame

请注意,该函数不会手动重新创建路径——它使用 Xcom 从上一个任务中获取完整路径。这使得任务具有可移植性,并且与之前的逻辑解耦。

整合所有内容

以下是连接所有部分的完整 DAG

src/airflow/example_dags/tutorial_objectstorage.py


import pendulum
import requests

from airflow.sdk import ObjectStoragePath, dag, task

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

你可以触发此 DAG 并在 Airflow UI 的 Graph View 中查看它。每个任务都清晰地记录其输入和输出,你可以在 Xcom tab 中检查返回的路径。

接下来探索什么

以下是一些进一步探索的方法:

  • 使用对象传感器(例如 S3KeySensor)等待外部系统上传的文件

  • 协调 S3 到 GCS 的传输或跨区域数据同步

  • 添加分支逻辑来处理缺失或格式错误的文件

  • 尝试使用不同的格式,如 CSV 或 JSON

另请参阅

本条目是否有帮助?