Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册可享早鸟票优惠!

资产定义

版本 2.4 新增。

版本 3.0 变更:此概念先前称为“数据集”。

什么是“资产”?

Airflow 资产是数据的逻辑分组。上游生产者任务可以更新资产,资产更新有助于调度下游消费者 DAG。

统一资源标识符 (URI) 定义资产

from airflow.sdk import Asset

example_asset = Asset("s3://asset-bucket/example.csv")

Airflow 对 URI 所代表的数据的内容或位置不做任何假设,并将 URI 视为一个字符串。这意味着 Airflow 会将任何正则表达式,例如 input_\d+.csv,或文件 glob 模式,例如 input_2022*.csv,视为尝试从一个声明创建多个资产,并且它们将无法工作。

您必须使用有效的 URI 创建资产。Airflow 核心和提供者定义了各种您可以使用的数据方案,例如 file(核心)、postgres(通过 Postgres 提供者)和 s3(通过 Amazon 提供者)。第三方提供者和插件也可能提供自己的方案。这些预定义方案具有各自的语义,应遵循这些语义。您可以使用可选的 name 参数为资产提供一个更具可读性的标识符。

from airflow.sdk import Asset

example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")

什么是有效的 URI?

从技术上讲,URI 必须符合 RFC 3986 中的有效字符集,这基本上是 ASCII 字母数字字符,加上 %-_.~。要标识无法用 URI 安全字符表示的资源,请使用百分比编码对资源名称进行编码。

URI 也区分大小写,因此 s3://example/assets3://Example/asset 被认为是不同的。请注意,URI 的 host 部分也区分大小写,这与 RFC 3986 不同。

对于预定义方案(例如 filepostgress3),您必须提供一个有意义的 URI。如果无法提供,请完全使用另一种没有语义限制的方案。Airflow 永远不会要求用户定义的 URI 方案(带前缀 x-)具有语义,因此这可能是一个不错的选择。如果您有一个只能稍后(例如,在任务执行期间)获得的 URI,请考虑改用 AssetAlias 并稍后更新 URI。

# invalid asset:
must_contain_bucket_name = Asset("s3://")

不要使用 airflow 方案,它保留用于 Airflow 内部使用。

Airflow 始终偏好在方案中使用小写字母,并且 URI 的主机部分需要区分大小写,以便正确区分资源。

# invalid assets:
reserved = Asset("airflow://example_asset")
not_ascii = Asset("èxample_datašet")

如果您想使用不包含额外语义约束的方案定义资产,请使用带有前缀 x- 的方案。Airflow 会跳过对这些方案的 URI 进行任何语义验证。

# valid asset, treated as a plain string
my_ds = Asset("x-my-thing://foobarbaz")

标识符不必是绝对的;它可以是没有方案的相对 URI,甚至只是一个简单的路径或字符串

# valid assets:
schemeless = Asset("//example/asset")
csv_file = Asset("example_asset")

非绝对标识符被认为是普通字符串,对 Airflow 不带任何语义含义。

关于资产的额外信息

如果需要,您可以在资产中包含一个额外字典

example_asset = Asset(
    "s3://asset/example.csv",
    extra={"team": "trainees"},
)

这可用于为资产提供自定义描述,例如目标文件的所有者是谁,或该文件用于何处。额外信息不影响资产的身份。

注意

安全注意:资产 URI 和额外字段未加密,它们以明文形式存储在 Airflow 的元数据数据库中。切勿在资产 URI 或额外键值中存储任何敏感值,特别是凭证!

创建一个任务来触发资产事件

一旦资产定义好,就可以通过指定 outlets 来创建任务以针对其触发事件

from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.python import PythonOperator

example_asset = Asset(name="example_asset", uri="s3://asset-bucket/example.csv")


def _write_example_asset():
    """Write data to example_asset..."""


with DAG(dag_id="example_asset", schedule="@daily"):
    PythonOperator(task_id="example_asset", outlets=[example_asset], python_callable=_write_example_asset)

这相当多样板代码。Airflow 为这种简单但最常见的情况提供了一种简写方式:创建一个仅含一个任务并触发单个资产事件的 DAG。下面的代码块与上面的代码块完全等价

from airflow.sdk import asset


@asset(uri="s3://asset-bucket/example.csv", schedule="@daily")
def example_asset():
    """Write data to example_asset..."""

声明一个 @asset 会自动创建

  • 一个 Asset,其 name 设置为函数名。

  • 一个 DAG,其 dag_id 设置为函数名。

  • DAG 中的一个任务,其 task_id 设置为函数名,并且 outlet 指向创建的 Asset

为触发中的资产事件附加额外信息

版本 2.10.0 新增。

带有资产出口的任务可以在触发资产事件之前可选地附加额外信息。这与关于资产的额外信息不同。资产的额外信息静态描述了资产 URI 指向的实体;而资产事件的额外信息则应用于标注触发的数据变更,例如更新改变了数据库中的多少行,或者覆盖的日期范围。

将额外信息附加到资产事件的最简单方法是从任务中 yield 一个 Metadata 对象

from airflow.sdk import Metadata, asset


@asset(uri="s3://asset/example.csv", schedule=None)
def example_s3(self):  # 'self' here refers to the current asset.
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to asset...
    yield Metadata(self, {"row_count": len(df)})

Airflow 会自动收集所有 yield 的元数据,并用相应的元数据对象的额外信息填充资产事件。

这也可以在经典操作符中完成。最好的方法是继承操作符并覆盖 execute 方法。或者,也可以在任务的 pre_executepost_execute 钩子中添加额外信息。然而,如果您选择使用钩子,请记住任务重试时它们不会重新运行,这可能导致在某些情况下额外信息与实际数据不匹配。

实现同样效果的另一种方法是直接访问任务执行上下文中的 outlet_events

@asset(schedule=None)
def write_to_s3(self, context):
    context["outlet_events"][self].extra = {"row_count": len(df)}

这里没什么魔法——Airflow 只是将 yield 的值写入完全相同的访问器。这在经典操作符中也适用,包括 executepre_executepost_execute

从之前触发的资产事件中获取信息

版本 2.10.0 新增。

在任务的 outlets 中定义的资产事件,如前一节所述,可以被在其 inlets 中声明相同资产的任务读取。资产事件条目包含 extra(详情见前一节)、指示事件从任务中触发的时间的 timestamp,以及将事件链接回其源的 source_task_instance

可以使用执行上下文中的 inlet_events 访问器读取入口资产事件。接前一节的 write_to_s3 资产

@asset(schedule=None)
def post_process_s3_file(context, write_to_s3):  # Declaring an inlet to write_to_s3.
    events = context["inlet_events"][write_to_s3]
    last_row_count = events[-1].extra["row_count"]

inlet_events 映射中的每个值都是一个序列状对象,它按 timestamp 对给定资产的过去事件进行排序,从最早到最新。它支持 Python 列表的大部分接口,因此您可以使用 [-1] 访问最后一个事件,使用 [-2:] 访问最后两个事件等。访问器是惰性的,只有当您访问其中的项目时才会访问数据库。

@asset@task 和经典操作符之间的依赖关系

由于 @asset 只是一个包含任务和资产的 DAG 的简单包装器,因此在 @task 或经典操作符中读取和 @asset 非常容易。例如,上面的 post_process_s3_file 也可以写成一个任务(在一个 DAG 中,这里为简洁起见省略了)

@task(inlets=[write_to_s3])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_asset]
    last_row_count = events[-1].extra["row_count"]


post_process_s3_file()

反之亦然

example_asset = Asset("example_asset")


@task(outlets=[example_asset])
def emit_example_asset():
    """Write to example_asset..."""


@asset(schedule=None)
def process_example_asset(example_asset):
    """Process inlet example_asset..."""

在一个任务中输出到多个资产

一个任务可以为多个资产触发事件。这通常不建议,但在某些情况下需要,例如当您需要将一个数据源拆分成多个时。这对于任务来说很简单,因为 outlets 在设计上就是复数形式

from airflow.sdk import DAG, Asset, task

input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")

with DAG(dag_id="process_input", schedule=None):

    @task(inlets=[input_asset], outlets=[out_asset_1, out_asset_2])
    def process_input():
        """Split input into two."""

其简写形式是 @asset.multi

from airflow.sdk import Asset, asset

input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")


@asset.multi(schedule=None, outlets=[out_asset_1, out_asset_2])
def process_input(input_asset):
    """Split input into two."""

通过 AssetAlias 触发动态数据事件和创建资产

资产别名可用于触发与别名相关联的资产事件。下游可以依赖于解析的资产。此功能允许您基于资产更新定义 DAG 执行的复杂依赖关系。

如何使用 AssetAlias

AssetAlias 只有一个参数 name,它唯一标识资产。任务必须首先将别名声明为出口,然后使用 outlet_events 或 yield Metadata 向其添加事件。

以下示例针对 S3 URI f"s3://bucket/my-task" 创建了一个资产事件,并附带可选的额外信息 extra。如果资产不存在,Airflow 将动态创建它并记录警告消息。

通过 outlet_events 在任务执行期间触发资产事件

from airflow.sdk.definitions.asset import AssetAlias


@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[AssetAlias("my-task-outputs")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})

通过 yielding Metadata 在任务执行期间触发资产事件

from airflow.sdk.definitions.asset.metadata import Metadata


@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_asset = Asset(uri="s3://bucket/my-task", name="example_s3")
    yield Metadata(s3_asset, extra={"k": "v"}, alias="my-task-outputs")

添加的资产只会触发一个资产事件,即使它被多次添加到别名中,或添加到多个别名中。但是,如果传递了不同的 extra 值,它可以触发多个资产事件。在下面的示例中,将触发两个资产事件。

from airflow.sdk.definitions.asset import AssetAlias


@task(
    outlets=[
        AssetAlias("my-task-outputs-1"),
        AssetAlias("my-task-outputs-2"),
        AssetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[AssetAlias("my-task-outputs-1")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
    # This line won't emit an additional asset event as the asset and extra are the same as the previous line.
    outlet_events[AssetAlias("my-task-outputs-2")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
    # This line will emit an additional asset event as the extra is different.
    outlet_events[AssetAlias("my-task-outputs-3")].add(Asset("s3://bucket/my-task"), extra={"k2": "v2"})

通过解析的资产别名获取之前触发的资产事件信息

从之前触发的资产事件中获取信息中所述,可以使用执行上下文中的 inlet_events 访问器读取入口资产事件,您也可以使用资产别名访问由它们触发的资产事件。

with DAG(dag_id="asset-alias-producer"):

    @task(outlets=[AssetAlias("example-alias")])
    def produce_asset_events(*, outlet_events):
        outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"), extra={"row_count": 1})


with DAG(dag_id="asset-alias-consumer", schedule=None):

    @task(inlets=[AssetAlias("example-alias")])
    def consume_asset_alias_events(*, inlet_events):
        events = inlet_events[AssetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

此条目有帮助吗?