对象存储

本教程演示如何使用对象存储 API 来管理位于对象存储上的对象,例如 S3、GCS 和 Azure Blob 存储。该 API 作为 Airflow 2.8 的一部分引入。

本教程涵盖了数据工程和数据科学工作流程中常用的简单模式:访问 Web API,保存并分析结果。

先决条件

要完成本教程,您需要以下几项:

  • DuckDB,一个进程内分析数据库,可以通过运行 pip install duckdb 进行安装。

  • 一个 S3 存储桶,以及包含 s3fs 的 Amazon 提供程序。您可以通过运行 pip install apache-airflow-providers-amazon[s3fs] 来安装提供程序包。或者,您可以通过在 create_object_storage_path 函数中将 URL 更改为您提供程序的相应 URL,来使用不同的存储提供程序,例如将 s3:// 替换为 Google Cloud Storage 的 gs://,并安装不同的提供程序。

  • pandas,您可以通过运行 pip install pandas 来安装。

创建 ObjectStoragePath

ObjectStoragePath 是一个类似路径的对象,表示对象存储上的路径。它是对象存储 API 的基本构建块。

airflow/example_dags/tutorial_objectstorage.py[源代码]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

提供给 ObjectStoragePath 的 URL 的用户名部分应该是连接 ID。指定的连接将用于获取访问后端的正确凭据。如果省略,将使用后端的默认连接。

连接 ID 也可以通过关键字参数传入

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

当重用为其他目的定义的 URL(例如数据集)时,这很有用,这些 URL 通常不包含用户名部分。如果同时指定了 URL 的用户名值和显式关键字参数,则显式关键字参数优先。

在 DAG 的根目录中实例化 ObjectStoragePath 是安全的。在路径被使用之前,不会创建连接。这意味着您可以在 DAG 的全局范围内创建路径,并在多个任务中使用它。

将数据保存到对象存储

ObjectStoragePath 的行为方式与 pathlib.Path 对象类似。您可以使用它直接将数据保存到对象存储并从中加载数据。因此,典型的流程可能如下所示:

airflow/example_dags/tutorial_objectstorage.py[源代码]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

get_air_quality_data 调用芬兰气象研究所的 API,以获取赫尔辛基地区的空气质量数据。它从生成的 JSON 创建 Pandas DataFrame。然后,它将数据保存到对象存储,并在运行时将其转换为 parquet 格式。

对象的键是从任务的逻辑日期自动生成的,因此我们可以每天运行此任务,并且每天都会创建一个新对象。我们将此键与基本路径连接起来,以创建对象的完整路径。最后,在将对象写入存储后,我们返回对象的路径。这允许我们在下一个任务中使用该路径。

分析数据

在理解数据的过程中,您通常需要对其进行分析。Duck DB 是一个很好的工具。它是一个进程内分析数据库,允许您在内存中的数据上运行 SQL 查询。

由于数据已经是 parquet 格式,我们可以使用 read_parquet,并且由于 Duck DB 和 ObjectStoragePath 都使用 fsspec,我们可以将 ObjectStoragePath 的后端注册到 Duck DB。ObjectStoragePath 公开了 fs 属性以实现此目的。然后,我们可以使用 Duck DB 中的 register_filesystem 函数将后端注册到 Duck DB。

在 Duck DB 中,我们可以根据数据创建表,并在其上运行查询。查询将以 dataframe 的形式返回,该 dataframe 可用于进一步分析或保存到对象存储。

airflow/example_dags/tutorial_objectstorage.py[源代码]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

您可能会注意到,analyze 函数不知道对象的原始路径,但它是作为参数传入的并通过 XCom 获取的。您不需要重新实例化 Path 对象。连接详细信息也会透明地处理。

整合所有内容

最终的 DAG 如下所示,它包装了一些东西,以便我们可以运行它

airflow/example_dags/tutorial_objectstorage.py[源代码]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.org.cn/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

此条目是否有帮助?