资产定义¶
版本 2.4 新增。
版本 3.0 变更:此概念先前称为“数据集”。
什么是“资产”?¶
Airflow 资产是数据的逻辑分组。上游生产者任务可以更新资产,资产更新有助于调度下游消费者 DAG。
统一资源标识符 (URI) 定义资产
from airflow.sdk import Asset
example_asset = Asset("s3://asset-bucket/example.csv")
Airflow 对 URI 所代表的数据的内容或位置不做任何假设,并将 URI 视为一个字符串。这意味着 Airflow 会将任何正则表达式,例如 input_\d+.csv
,或文件 glob 模式,例如 input_2022*.csv
,视为尝试从一个声明创建多个资产,并且它们将无法工作。
您必须使用有效的 URI 创建资产。Airflow 核心和提供者定义了各种您可以使用的数据方案,例如 file
(核心)、postgres
(通过 Postgres 提供者)和 s3
(通过 Amazon 提供者)。第三方提供者和插件也可能提供自己的方案。这些预定义方案具有各自的语义,应遵循这些语义。您可以使用可选的 name 参数为资产提供一个更具可读性的标识符。
from airflow.sdk import Asset
example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")
什么是有效的 URI?¶
从技术上讲,URI 必须符合 RFC 3986 中的有效字符集,这基本上是 ASCII 字母数字字符,加上 %
、-
、_
、.
和 ~
。要标识无法用 URI 安全字符表示的资源,请使用百分比编码对资源名称进行编码。
URI 也区分大小写,因此 s3://example/asset
和 s3://Example/asset
被认为是不同的。请注意,URI 的 host 部分也区分大小写,这与 RFC 3986 不同。
对于预定义方案(例如 file
、postgres
和 s3
),您必须提供一个有意义的 URI。如果无法提供,请完全使用另一种没有语义限制的方案。Airflow 永远不会要求用户定义的 URI 方案(带前缀 x-)具有语义,因此这可能是一个不错的选择。如果您有一个只能稍后(例如,在任务执行期间)获得的 URI,请考虑改用 AssetAlias
并稍后更新 URI。
# invalid asset:
must_contain_bucket_name = Asset("s3://")
不要使用 airflow
方案,它保留用于 Airflow 内部使用。
Airflow 始终偏好在方案中使用小写字母,并且 URI 的主机部分需要区分大小写,以便正确区分资源。
# invalid assets:
reserved = Asset("airflow://example_asset")
not_ascii = Asset("èxample_datašet")
如果您想使用不包含额外语义约束的方案定义资产,请使用带有前缀 x-
的方案。Airflow 会跳过对这些方案的 URI 进行任何语义验证。
# valid asset, treated as a plain string
my_ds = Asset("x-my-thing://foobarbaz")
标识符不必是绝对的;它可以是没有方案的相对 URI,甚至只是一个简单的路径或字符串
# valid assets:
schemeless = Asset("//example/asset")
csv_file = Asset("example_asset")
非绝对标识符被认为是普通字符串,对 Airflow 不带任何语义含义。
关于资产的额外信息¶
如果需要,您可以在资产中包含一个额外字典
example_asset = Asset(
"s3://asset/example.csv",
extra={"team": "trainees"},
)
这可用于为资产提供自定义描述,例如目标文件的所有者是谁,或该文件用于何处。额外信息不影响资产的身份。
注意
安全注意:资产 URI 和额外字段未加密,它们以明文形式存储在 Airflow 的元数据数据库中。切勿在资产 URI 或额外键值中存储任何敏感值,特别是凭证!
创建一个任务来触发资产事件¶
一旦资产定义好,就可以通过指定 outlets
来创建任务以针对其触发事件
from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.python import PythonOperator
example_asset = Asset(name="example_asset", uri="s3://asset-bucket/example.csv")
def _write_example_asset():
"""Write data to example_asset..."""
with DAG(dag_id="example_asset", schedule="@daily"):
PythonOperator(task_id="example_asset", outlets=[example_asset], python_callable=_write_example_asset)
这相当多样板代码。Airflow 为这种简单但最常见的情况提供了一种简写方式:创建一个仅含一个任务并触发单个资产事件的 DAG。下面的代码块与上面的代码块完全等价
from airflow.sdk import asset
@asset(uri="s3://asset-bucket/example.csv", schedule="@daily")
def example_asset():
"""Write data to example_asset..."""
声明一个 @asset
会自动创建
一个
Asset
,其 name 设置为函数名。一个
DAG
,其 dag_id 设置为函数名。DAG
中的一个任务,其 task_id 设置为函数名,并且 outlet 指向创建的Asset
。
为触发中的资产事件附加额外信息¶
版本 2.10.0 新增。
带有资产出口的任务可以在触发资产事件之前可选地附加额外信息。这与关于资产的额外信息不同。资产的额外信息静态描述了资产 URI 指向的实体;而资产事件的额外信息则应用于标注触发的数据变更,例如更新改变了数据库中的多少行,或者覆盖的日期范围。
将额外信息附加到资产事件的最简单方法是从任务中 yield
一个 Metadata
对象
from airflow.sdk import Metadata, asset
@asset(uri="s3://asset/example.csv", schedule=None)
def example_s3(self): # 'self' here refers to the current asset.
df = ... # Get a Pandas DataFrame to write.
# Write df to asset...
yield Metadata(self, {"row_count": len(df)})
Airflow 会自动收集所有 yield 的元数据,并用相应的元数据对象的额外信息填充资产事件。
这也可以在经典操作符中完成。最好的方法是继承操作符并覆盖 execute
方法。或者,也可以在任务的 pre_execute
或 post_execute
钩子中添加额外信息。然而,如果您选择使用钩子,请记住任务重试时它们不会重新运行,这可能导致在某些情况下额外信息与实际数据不匹配。
实现同样效果的另一种方法是直接访问任务执行上下文中的 outlet_events
@asset(schedule=None)
def write_to_s3(self, context):
context["outlet_events"][self].extra = {"row_count": len(df)}
这里没什么魔法——Airflow 只是将 yield 的值写入完全相同的访问器。这在经典操作符中也适用,包括 execute
、pre_execute
和 post_execute
。
从之前触发的资产事件中获取信息¶
版本 2.10.0 新增。
在任务的 outlets
中定义的资产事件,如前一节所述,可以被在其 inlets
中声明相同资产的任务读取。资产事件条目包含 extra
(详情见前一节)、指示事件从任务中触发的时间的 timestamp
,以及将事件链接回其源的 source_task_instance
。
可以使用执行上下文中的 inlet_events
访问器读取入口资产事件。接前一节的 write_to_s3
资产
@asset(schedule=None)
def post_process_s3_file(context, write_to_s3): # Declaring an inlet to write_to_s3.
events = context["inlet_events"][write_to_s3]
last_row_count = events[-1].extra["row_count"]
inlet_events
映射中的每个值都是一个序列状对象,它按 timestamp
对给定资产的过去事件进行排序,从最早到最新。它支持 Python 列表的大部分接口,因此您可以使用 [-1]
访问最后一个事件,使用 [-2:]
访问最后两个事件等。访问器是惰性的,只有当您访问其中的项目时才会访问数据库。
@asset
、@task
和经典操作符之间的依赖关系¶
由于 @asset
只是一个包含任务和资产的 DAG 的简单包装器,因此在 @task
或经典操作符中读取和 @asset
非常容易。例如,上面的 post_process_s3_file
也可以写成一个任务(在一个 DAG 中,这里为简洁起见省略了)
@task(inlets=[write_to_s3])
def post_process_s3_file(*, inlet_events):
events = inlet_events[example_s3_asset]
last_row_count = events[-1].extra["row_count"]
post_process_s3_file()
反之亦然
example_asset = Asset("example_asset")
@task(outlets=[example_asset])
def emit_example_asset():
"""Write to example_asset..."""
@asset(schedule=None)
def process_example_asset(example_asset):
"""Process inlet example_asset..."""
在一个任务中输出到多个资产¶
一个任务可以为多个资产触发事件。这通常不建议,但在某些情况下需要,例如当您需要将一个数据源拆分成多个时。这对于任务来说很简单,因为 outlets
在设计上就是复数形式
from airflow.sdk import DAG, Asset, task
input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")
with DAG(dag_id="process_input", schedule=None):
@task(inlets=[input_asset], outlets=[out_asset_1, out_asset_2])
def process_input():
"""Split input into two."""
其简写形式是 @asset.multi
from airflow.sdk import Asset, asset
input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")
@asset.multi(schedule=None, outlets=[out_asset_1, out_asset_2])
def process_input(input_asset):
"""Split input into two."""
通过 AssetAlias 触发动态数据事件和创建资产¶
资产别名可用于触发与别名相关联的资产事件。下游可以依赖于解析的资产。此功能允许您基于资产更新定义 DAG 执行的复杂依赖关系。
如何使用 AssetAlias¶
AssetAlias
只有一个参数 name
,它唯一标识资产。任务必须首先将别名声明为出口,然后使用 outlet_events
或 yield Metadata
向其添加事件。
以下示例针对 S3 URI f"s3://bucket/my-task"
创建了一个资产事件,并附带可选的额外信息 extra
。如果资产不存在,Airflow 将动态创建它并记录警告消息。
通过 outlet_events 在任务执行期间触发资产事件
from airflow.sdk.definitions.asset import AssetAlias
@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
outlet_events[AssetAlias("my-task-outputs")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
通过 yielding Metadata 在任务执行期间触发资产事件
from airflow.sdk.definitions.asset.metadata import Metadata
@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_metadata():
s3_asset = Asset(uri="s3://bucket/my-task", name="example_s3")
yield Metadata(s3_asset, extra={"k": "v"}, alias="my-task-outputs")
添加的资产只会触发一个资产事件,即使它被多次添加到别名中,或添加到多个别名中。但是,如果传递了不同的 extra
值,它可以触发多个资产事件。在下面的示例中,将触发两个资产事件。
from airflow.sdk.definitions.asset import AssetAlias
@task(
outlets=[
AssetAlias("my-task-outputs-1"),
AssetAlias("my-task-outputs-2"),
AssetAlias("my-task-outputs-3"),
]
)
def my_task_with_outlet_events(*, outlet_events):
outlet_events[AssetAlias("my-task-outputs-1")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
# This line won't emit an additional asset event as the asset and extra are the same as the previous line.
outlet_events[AssetAlias("my-task-outputs-2")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
# This line will emit an additional asset event as the extra is different.
outlet_events[AssetAlias("my-task-outputs-3")].add(Asset("s3://bucket/my-task"), extra={"k2": "v2"})
通过解析的资产别名获取之前触发的资产事件信息¶
如从之前触发的资产事件中获取信息中所述,可以使用执行上下文中的 inlet_events
访问器读取入口资产事件,您也可以使用资产别名访问由它们触发的资产事件。
with DAG(dag_id="asset-alias-producer"):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events(*, outlet_events):
outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"), extra={"row_count": 1})
with DAG(dag_id="asset-alias-consumer", schedule=None):
@task(inlets=[AssetAlias("example-alias")])
def consume_asset_alias_events(*, inlet_events):
events = inlet_events[AssetAlias("example-alias")]
last_row_count = events[-1].extra["row_count"]