Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

资产感知调度

新增于版本 2.4。

快速入门

除了基于时间调度 DAG 外,您还可以基于任务更新资产时调度 DAG 运行。

from airflow.sdk import DAG, Asset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Asset("s3://asset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Asset("s3://asset-bucket/example.csv")],
    ...,
):
    ...
../_images/asset_scheduled_dags.png

另请参阅

关于如何声明资产,请参阅资产定义

使用资产调度 DAG

您可以使用资产在 DAG 中指定数据依赖关系。以下示例展示了当 producer DAG 中的 producer 任务成功完成后,Airflow 如何调度 consumer DAG。Airflow 仅在任务成功完成后才将资产标记为 updated。如果任务失败或被跳过,则不会发生更新,Airflow 也不会调度 consumer DAG。

example_asset = Asset("s3://asset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_asset], ...)

with DAG(dag_id="consumer", schedule=[example_asset], ...):
    ...

您可以在资产视图中找到资产和 DAG 之间的关系列表。

多个资产

因为 schedule 参数是一个列表,所以 DAG 可以要求多个资产。Airflow 会在 DAG 消耗的**所有**资产自上次运行以来至少更新一次后调度该 DAG

with DAG(
    dag_id="multiple_assets_example",
    schedule=[
        example_asset_1,
        example_asset_2,
        example_asset_3,
    ],
    ...,
):
    ...

如果在所有消耗的资产更新之前,某个资产多次更新,下游 DAG 仍然只会运行一次,如下图所示

graph asset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_asset_1 [ pos="-0.5,2.5!"] example_asset_2 [ pos="-0.5,2!"] example_asset_3 [ pos="-0.5,1.5!"] dag_runs [label="DagRuns created" pos="-0.5,1!"] } edge [color=lightgrey] example_asset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_asset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_asset_3 -- e7 -- e9 -- e11 -- end_ds3 }

从触发资产事件中获取信息

触发的 DAG 可以使用 triggering_asset_events 模板或参数从触发它的资产中获取信息。更多信息请参见模板参考

示例

example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_asset_events(triggering_asset_events=None):
        for asset, asset_list in triggering_asset_events.items():
            print(asset, asset_list)
            print(asset_list[0].source_dag_run.dag_id)

    print_triggering_asset_events()

请注意,此示例使用 (.values() | first | first) 来获取提供给 DAG 的资产中的第一个 AssetEvent。如果您有多个资产,并且可能伴随多个 AssetEvent,则实现可能会相当复杂。

通过 REST API 操作排队的资产事件

新增于版本 2.9。

在此示例中,当任务同时更新资产“asset-1”和“asset-2”时,DAG waiting_for_asset_1_and_2 将被触发。一旦“asset-1”更新,Airflow 会创建一个记录。这确保了当“asset-2”更新时,Airflow 知道触发该 DAG。我们称这些记录为排队的资产事件。

with DAG(
    dag_id="waiting_for_asset_1_and_2",
    schedule=[Asset("asset-1"), Asset("asset-2")],
    ...,
):
    ...

引入了 queuedEvent API 端点来操作这些记录。

  • 获取 DAG 的排队资产事件: /assets/queuedEvent/{uri}

  • 获取 DAG 的排队资产事件列表: /dags/{dag_id}/assets/queuedEvent

  • 删除 DAG 的排队资产事件: /assets/queuedEvent/{uri}

  • 删除 DAG 的排队资产事件列表: /dags/{dag_id}/assets/queuedEvent

  • 获取资产的排队资产事件: /dags/{dag_id}/assets/queuedEvent/{uri}

  • 删除资产的排队资产事件: DELETE /dags/{dag_id}/assets/queuedEvent/{uri}

关于如何使用 REST API 以及这些端点所需的参数,请参阅Airflow API

使用条件表达式进行高级资产调度

Apache Airflow 包含高级调度功能,可与资产一起使用条件表达式。此功能允许您基于资产更新为 DAG 执行定义复杂的依赖关系,使用逻辑运算符可以更好地控制工作流触发器。

资产的逻辑运算符

Airflow 支持两种逻辑运算符来组合资产条件:

  • AND (``&``): 指定仅在所有指定的资产都已更新后才应触发 DAG。

  • OR (``|``): 指定当任何指定的资产更新时,就应触发 DAG。

这些运算符使您能够配置 Airflow 工作流使用更复杂的资产更新条件,使其更具动态性和灵活性。

使用示例

基于多个资产更新进行调度

要仅在两个特定资产都已更新时调度 DAG 运行,请使用 AND 运算符(&

dag1_asset = Asset("s3://dag1/output_1.txt")
dag2_asset = Asset("s3://dag2/output_1.txt")

with DAG(
    # Consume asset 1 and 2 with asset expressions
    schedule=(dag1_asset & dag2_asset),
    ...,
):
    ...

基于任何资产更新进行调度

要在两个资产中的任意一个更新时触发 DAG 执行,请应用 OR 运算符(|

with DAG(
    # Consume asset 1 or 2 with asset expressions
    schedule=(dag1_asset | dag2_asset),
    ...,
):
    ...

复杂条件逻辑

对于需要更复杂条件的情况,例如当一个资产更新时或当另外两个资产都更新时触发 DAG,请结合使用 OR 和 AND 运算符

dag3_asset = Asset("s3://dag3/output_3.txt")

with DAG(
    # Consume asset 1 or both 2 and 3 with asset expressions
    schedule=(dag1_asset | (dag2_asset & dag3_asset)),
    ...,
):
    ...

基于资产别名进行调度

由于添加到别名的资产事件只是简单的资产事件,依赖实际资产的下游 DAG 可以正常读取其资产事件,无需考虑相关的别名。下游 DAG 也可以依赖资产别名。编写语法是按名称引用 AssetAlias,并使用相关的资产事件进行调度。请注意,仅当别名解析为 Asset("s3://bucket/my-task") 时,具有 outlets=AssetAlias("xxx") 的任务才能触发 DAG。每当具有 outlet AssetAlias("out") 的任务在运行时与至少一个资产关联时,无论资产的身份如何,DAG 都会运行。如果在特定任务运行中没有资产与别名关联,下游 DAG 将不会被触发。这也意味着我们可以进行条件资产触发。

资产别名在 DAG 解析期间解析为资产。因此,如果“min_file_process_interval”配置设置为较高值,则资产别名可能无法解析。要解决此问题,您可以触发 DAG 解析。

with DAG(dag_id="asset-producer"):

    @task(outlets=[Asset("example-alias")])
    def produce_asset_events():
        pass


with DAG(dag_id="asset-alias-producer"):

    @task(outlets=[AssetAlias("example-alias")])
    def produce_asset_events(*, outlet_events):
        outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"))


with DAG(dag_id="asset-consumer", schedule=Asset("s3://bucket/my-task")):
    ...

with DAG(dag_id="asset-alias-consumer", schedule=AssetAlias("example-alias")):
    ...

在提供的示例中,一旦 DAG asset-alias-producer 执行完成,资产别名 AssetAlias("example-alias") 将解析为 Asset("s3://bucket/my-task")。然而,DAG asset-alias-consumer 必须等待下一次 DAG 重新解析来更新其调度。为解决此问题,当资产别名 AssetAlias("example-alias") 解析为这些 DAG 之前不依赖的资产时,Airflow 将重新解析依赖此别名的 DAG。结果,在 DAG asset-alias-producer 执行后,“asset-consumer”和“asset-alias-consumer”这两个 DAG 都将被触发。

结合资产调度和时间调度

AssetTimetable 集成

您可以使用 AssetOrTimeSchedule 同时基于资产事件和时间调度来调度 DAG。这使得您可以在 DAG 需要既通过数据更新触发,又根据固定时间表定期运行时创建工作流。

有关 AssetOrTimeSchedule 的更多详细信息,请参阅AssetOrTimeSchedule中的相应章节。

此条目是否有帮助?