资产感知调度¶
新增于版本 2.4。
快速入门¶
除了基于时间调度 DAG 外,您还可以基于任务更新资产时调度 DAG 运行。
from airflow.sdk import DAG, Asset
with DAG(...):
MyOperator(
# this task updates example.csv
outlets=[Asset("s3://asset-bucket/example.csv")],
...,
)
with DAG(
# this DAG should be run when example.csv is updated (by dag1)
schedule=[Asset("s3://asset-bucket/example.csv")],
...,
):
...

另请参阅
关于如何声明资产,请参阅资产定义。
使用资产调度 DAG¶
您可以使用资产在 DAG 中指定数据依赖关系。以下示例展示了当 producer
DAG 中的 producer
任务成功完成后,Airflow 如何调度 consumer
DAG。Airflow 仅在任务成功完成后才将资产标记为 updated
。如果任务失败或被跳过,则不会发生更新,Airflow 也不会调度 consumer
DAG。
example_asset = Asset("s3://asset/example.csv")
with DAG(dag_id="producer", ...):
BashOperator(task_id="producer", outlets=[example_asset], ...)
with DAG(dag_id="consumer", schedule=[example_asset], ...):
...
您可以在资产视图中找到资产和 DAG 之间的关系列表。
多个资产¶
因为 schedule
参数是一个列表,所以 DAG 可以要求多个资产。Airflow 会在 DAG 消耗的**所有**资产自上次运行以来至少更新一次后调度该 DAG
with DAG(
dag_id="multiple_assets_example",
schedule=[
example_asset_1,
example_asset_2,
example_asset_3,
],
...,
):
...
如果在所有消耗的资产更新之前,某个资产多次更新,下游 DAG 仍然只会运行一次,如下图所示
从触发资产事件中获取信息¶
触发的 DAG 可以使用 triggering_asset_events
模板或参数从触发它的资产中获取信息。更多信息请参见模板参考。
示例
example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
SQLExecuteQueryOperator(
task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ...
)
with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...):
SQLExecuteQueryOperator(
task_id="query",
conn_id="snowflake_default",
sql="""
SELECT *
FROM my_db.my_schema.my_table
WHERE "updated_at" >= '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_start }}'
AND "updated_at" < '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_end }}';
""",
)
@task
def print_triggering_asset_events(triggering_asset_events=None):
for asset, asset_list in triggering_asset_events.items():
print(asset, asset_list)
print(asset_list[0].source_dag_run.dag_id)
print_triggering_asset_events()
请注意,此示例使用 (.values() | first | first) 来获取提供给 DAG 的资产中的第一个 AssetEvent。如果您有多个资产,并且可能伴随多个 AssetEvent,则实现可能会相当复杂。
通过 REST API 操作排队的资产事件¶
新增于版本 2.9。
在此示例中,当任务同时更新资产“asset-1”和“asset-2”时,DAG waiting_for_asset_1_and_2
将被触发。一旦“asset-1”更新,Airflow 会创建一个记录。这确保了当“asset-2”更新时,Airflow 知道触发该 DAG。我们称这些记录为排队的资产事件。
with DAG(
dag_id="waiting_for_asset_1_and_2",
schedule=[Asset("asset-1"), Asset("asset-2")],
...,
):
...
引入了 queuedEvent
API 端点来操作这些记录。
获取 DAG 的排队资产事件:
/assets/queuedEvent/{uri}
获取 DAG 的排队资产事件列表:
/dags/{dag_id}/assets/queuedEvent
删除 DAG 的排队资产事件:
/assets/queuedEvent/{uri}
删除 DAG 的排队资产事件列表:
/dags/{dag_id}/assets/queuedEvent
获取资产的排队资产事件:
/dags/{dag_id}/assets/queuedEvent/{uri}
删除资产的排队资产事件:
DELETE /dags/{dag_id}/assets/queuedEvent/{uri}
关于如何使用 REST API 以及这些端点所需的参数,请参阅Airflow API。
使用条件表达式进行高级资产调度¶
Apache Airflow 包含高级调度功能,可与资产一起使用条件表达式。此功能允许您基于资产更新为 DAG 执行定义复杂的依赖关系,使用逻辑运算符可以更好地控制工作流触发器。
资产的逻辑运算符¶
Airflow 支持两种逻辑运算符来组合资产条件:
AND (``&``): 指定仅在所有指定的资产都已更新后才应触发 DAG。
OR (``|``): 指定当任何指定的资产更新时,就应触发 DAG。
这些运算符使您能够配置 Airflow 工作流使用更复杂的资产更新条件,使其更具动态性和灵活性。
使用示例¶
基于多个资产更新进行调度
要仅在两个特定资产都已更新时调度 DAG 运行,请使用 AND 运算符(&
)
dag1_asset = Asset("s3://dag1/output_1.txt")
dag2_asset = Asset("s3://dag2/output_1.txt")
with DAG(
# Consume asset 1 and 2 with asset expressions
schedule=(dag1_asset & dag2_asset),
...,
):
...
基于任何资产更新进行调度
要在两个资产中的任意一个更新时触发 DAG 执行,请应用 OR 运算符(|
)
with DAG(
# Consume asset 1 or 2 with asset expressions
schedule=(dag1_asset | dag2_asset),
...,
):
...
复杂条件逻辑
对于需要更复杂条件的情况,例如当一个资产更新时或当另外两个资产都更新时触发 DAG,请结合使用 OR 和 AND 运算符
dag3_asset = Asset("s3://dag3/output_3.txt")
with DAG(
# Consume asset 1 or both 2 and 3 with asset expressions
schedule=(dag1_asset | (dag2_asset & dag3_asset)),
...,
):
...
基于资产别名进行调度¶
由于添加到别名的资产事件只是简单的资产事件,依赖实际资产的下游 DAG 可以正常读取其资产事件,无需考虑相关的别名。下游 DAG 也可以依赖资产别名。编写语法是按名称引用 AssetAlias
,并使用相关的资产事件进行调度。请注意,仅当别名解析为 Asset("s3://bucket/my-task")
时,具有 outlets=AssetAlias("xxx")
的任务才能触发 DAG。每当具有 outlet AssetAlias("out")
的任务在运行时与至少一个资产关联时,无论资产的身份如何,DAG 都会运行。如果在特定任务运行中没有资产与别名关联,下游 DAG 将不会被触发。这也意味着我们可以进行条件资产触发。
资产别名在 DAG 解析期间解析为资产。因此,如果“min_file_process_interval”配置设置为较高值,则资产别名可能无法解析。要解决此问题,您可以触发 DAG 解析。
with DAG(dag_id="asset-producer"):
@task(outlets=[Asset("example-alias")])
def produce_asset_events():
pass
with DAG(dag_id="asset-alias-producer"):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events(*, outlet_events):
outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"))
with DAG(dag_id="asset-consumer", schedule=Asset("s3://bucket/my-task")):
...
with DAG(dag_id="asset-alias-consumer", schedule=AssetAlias("example-alias")):
...
在提供的示例中,一旦 DAG asset-alias-producer
执行完成,资产别名 AssetAlias("example-alias")
将解析为 Asset("s3://bucket/my-task")
。然而,DAG asset-alias-consumer
必须等待下一次 DAG 重新解析来更新其调度。为解决此问题,当资产别名 AssetAlias("example-alias")
解析为这些 DAG 之前不依赖的资产时,Airflow 将重新解析依赖此别名的 DAG。结果,在 DAG asset-alias-producer
执行后,“asset-consumer”和“asset-alias-consumer”这两个 DAG 都将被触发。
结合资产调度和时间调度¶
AssetTimetable 集成¶
您可以使用 AssetOrTimeSchedule
同时基于资产事件和时间调度来调度 DAG。这使得您可以在 DAG 需要既通过数据更新触发,又根据固定时间表定期运行时创建工作流。
有关 AssetOrTimeSchedule
的更多详细信息,请参阅AssetOrTimeSchedule中的相应章节。