数据感知调度

2.4 版本新增。

快速开始

除了基于时间调度 DAG 之外,您还可以根据任务何时更新数据集来调度 DAG 运行。

from airflow.datasets import Dataset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
):
    ...
../_images/dataset-scheduled-dags.png

什么是“数据集”?

Airflow 数据集是数据的逻辑分组。上游生产者任务可以更新数据集,数据集更新有助于调度下游消费者 DAG。

统一资源标识符 (URI) 定义数据集

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

Airflow 不对 URI 表示的数据内容或位置做任何假设,并将 URI 视为字符串。这意味着 Airflow 将任何正则表达式(如 input_\d+.csv)或文件 glob 模式(如 input_2022*.csv)视为尝试从一个声明创建多个数据集,它们将无法工作。

您必须使用有效的 URI 创建数据集。Airflow 核心和提供者定义了各种您可以使用的 URI 方案,例如 file(核心)、postgres(由 Postgres 提供者提供)和 s3(由 Amazon 提供者提供)。第三方提供者和插件也可能提供他们自己的方案。这些预定义的方案具有各自的语义,需要遵循。

什么是有效的 URI?

从技术上讲,URI 必须符合 RFC 3986 中有效的字符集,基本上是 ASCII 字母数字字符,加上 %-_.~。要标识无法用 URI 安全字符表示的资源,请使用 百分号编码 对资源名称进行编码。

URI 也区分大小写,因此 s3://example/datasets3://Example/Dataset 被认为是不同的。请注意,URI 的 *主机* 部分也区分大小写,这与 RFC 3986 不同。

不要使用 airflow 方案,该方案保留给 Airflow 内部使用。

Airflow 始终优先在方案中使用小写字母,并且 URI 的主机部分需要区分大小写,以便正确区分资源。

# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")

如果您想使用不包含其他语义约束的方案定义数据集,请使用带有前缀 x- 的方案。Airflow 会跳过对使用这些方案的 URI 的任何语义验证。

# valid dataset, treated as a plain string
my_ds = Dataset("x-my-thing://foobarbaz")

标识符不必是绝对的;它可以是无方案的相对 URI,甚至只是一个简单的路径或字符串。

# valid datasets:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")

非绝对标识符被认为是纯字符串,对 Airflow 没有任何语义含义。

关于数据集的额外信息

如果需要,您可以在数据集中包含一个额外的字典。

example_dataset = Dataset(
    "s3://dataset/example.csv",
    extra={"team": "trainees"},
)

这可用于向数据集提供自定义描述,例如谁拥有目标文件的所有权,或者该文件用于什么目的。额外信息不影响数据集的标识。这意味着即使额外的字典不同,DAG 仍会由具有相同 URI 的数据集触发。

with DAG(
    dag_id="consumer",
    schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
    ...

with DAG(dag_id="producer", ...):
    MyOperator(
        # triggers "consumer" with the given extra!
        outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})],
        ...,
    )

注意

安全注意:数据集 URI 和额外字段未加密,它们以明文形式存储在 Airflow 的元数据数据库中。请勿在数据集 URI 或额外的键值中存储任何敏感值,尤其是凭据!

如何在您的 DAG 中使用数据集

您可以使用数据集在 DAG 中指定数据依赖项。以下示例显示了在 producer DAG 中的 producer 任务成功完成后,Airflow 如何调度 consumer DAG。仅当任务成功完成时,Airflow 才会将数据集标记为 updated。如果任务失败或跳过,则不会发生更新,并且 Airflow 不会调度 consumer DAG。

example_dataset = Dataset("s3://dataset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

您可以在 数据集视图 中找到数据集和 DAG 之间关系的列表。

多个数据集

由于 schedule 参数是一个列表,因此 DAG 可以需要多个数据集。在 DAG 消费的所有数据集自上次 DAG 运行以来至少更新一次后,Airflow 才会调度 DAG。

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        example_dataset_1,
        example_dataset_2,
        example_dataset_3,
    ],
    ...,
):
    ...

如果一个数据集在所有消耗的数据集更新之前更新多次,则下游 DAG 仍然只运行一次,如此图所示。

graph dataset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_dataset_1 [ pos="-0.5,2.5!"] example_dataset_2 [ pos="-0.5,2!"] example_dataset_3 [ pos="-0.5,1.5!"] dag_runs [label="DagRuns created" pos="-0.5,1!"] } edge [color=lightgrey] example_dataset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_dataset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_dataset_3 -- e7 -- e9 -- e11 -- end_ds3 }

向正在发出的数据集事件附加额外信息

2.10.0 版本新增。

具有数据集出口的任务可以选择在发出数据集事件之前附加额外信息。这与 关于数据集的额外信息 不同。数据集上的额外信息静态地描述了数据集 URI 指向的实体;而*数据集事件*上的额外信息则应用于注释触发数据更改,例如数据库中更改了多少行,或它涵盖的日期范围。

将额外信息附加到数据集事件的最简单方法是从任务中 yield 一个 Metadata 对象。

from airflow.datasets import Dataset
from airflow.datasets.metadata import Metadata

example_s3_dataset = Dataset("s3://dataset/example.csv")


@task(outlets=[example_s3_dataset])
def write_to_s3():
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to dataset...
    yield Metadata(example_s3_dataset, {"row_count": len(df)})

Airflow 会自动收集所有产生的元数据,并使用相应元数据对象的额外信息填充数据集事件。

这也可以在经典操作符中完成。最好的方法是子类化操作符并覆盖 execute。或者,也可以在任务的 pre_executepost_execute 钩子中添加额外信息。但是,如果您选择使用钩子,请记住它们在重试任务时不会重新运行,并且可能导致额外信息在某些情况下与实际数据不匹配。

实现相同目的的另一种方法是直接访问任务执行上下文中的 outlet_events

@task(outlets=[example_s3_dataset])
def write_to_s3(*, outlet_events):
    outlet_events[example_s3_dataset].extra = {"row_count": len(df)}

这里没有什么魔法——Airflow 只是将产生的值写入到完全相同的访问器。这在经典操作符(包括 executepre_executepost_execute)中也适用。

从先前发出的数据集事件中获取信息

2.10.0 版本新增。

任务的 outlets 中定义的数据集的事件(如上一节所述)可以由在其 inlets 中声明相同数据集的任务读取。数据集事件条目包含 extra(有关详细信息,请参见上一节)、timestamp(指示事件何时从任务发出)和 source_task_instance(将事件链接回其源)。

可以使用执行上下文中的 inlet_events 访问器读取入口数据集事件。继续上一节中的 write_to_s3 任务

@task(inlets=[example_s3_dataset])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_dataset]
    last_row_count = events[-1].extra["row_count"]

inlet_events 映射中的每个值都是一个类似序列的对象,该对象按 timestamp 对给定数据集的过去事件进行排序,从最早到最新。它支持大多数 Python 的列表接口,因此您可以使用 [-1] 访问最后一个事件,使用 [-2:] 访问最后两个事件,等等。该访问器是惰性的,仅当您访问其中的项时才会命中数据库。

从触发数据集事件中获取信息

触发的 DAG 可以使用 triggering_dataset_events 模板或参数从触发它的数据集中获取信息。有关详细信息,请参见 模板参考

示例

example_snowflake_dataset = Dataset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        for dataset, dataset_list in triggering_dataset_events.items():
            print(dataset, dataset_list)
            print(dataset_list[0].source_dag_run.dag_id)

    print_triggering_dataset_events()

请注意,此示例使用了 (.values() | first | first) 来获取传递给 DAG 的一个数据集中的第一个值,以及该数据集的第一个 DatasetEvent。 如果您有多个数据集,并且每个数据集可能都有多个 DatasetEvent,那么实现起来可能会非常复杂。

通过 REST API 操作排队的数据集事件

2.9 版本新增功能。

在此示例中,当任务更新“dataset-1”和“dataset-2”这两个数据集时,DAG waiting_for_dataset_1_and_2 将会被触发。一旦“dataset-1”被更新,Airflow 会创建一个记录。这确保了 Airflow 知道在“dataset-2”被更新时触发 DAG。我们将此类记录称为排队的数据集事件。

with DAG(
    dag_id="waiting_for_dataset_1_and_2",
    schedule=[Dataset("dataset-1"), Dataset("dataset-2")],
    ...,
):
    ...

引入 queuedEvent API 端点来操作此类记录。

  • 获取 DAG 的排队数据集事件:/datasets/queuedEvent/{uri}

  • 获取 DAG 的排队数据集事件:/dags/{dag_id}/datasets/queuedEvent

  • 删除 DAG 的排队数据集事件:/datasets/queuedEvent/{uri}

  • 删除 DAG 的排队数据集事件:/dags/{dag_id}/datasets/queuedEvent

  • 获取数据集的排队数据集事件:/dags/{dag_id}/datasets/queuedEvent/{uri}

  • 删除数据集的排队数据集事件:DELETE /dags/{dag_id}/datasets/queuedEvent/{uri}

有关如何使用 REST API 以及这些端点所需的参数,请参阅 Airflow API

使用条件表达式进行高级数据集调度

Apache Airflow 包含使用带数据集的条件表达式的高级调度功能。此功能允许您基于数据集更新定义 DAG 执行的复杂依赖关系,并使用逻辑运算符来更好地控制工作流触发器。

数据集的逻辑运算符

Airflow 支持两个用于组合数据集条件的逻辑运算符

  • AND (``&``):指定仅当所有指定的数据集都已更新后,才应触发 DAG。

  • OR (``|``):指定当任何指定的数据集被更新时,应触发 DAG。

这些运算符使您能够配置 Airflow 工作流以使用更复杂的数据集更新条件,从而使其更具动态性和灵活性。

示例用法

基于多个数据集更新进行调度

要计划一个 DAG 仅在两个特定数据集都已更新时才运行,请使用 AND 运算符 (&)

dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")

with DAG(
    # Consume dataset 1 and 2 with dataset expressions
    schedule=(dag1_dataset & dag2_dataset),
    ...,
):
    ...

基于任何数据集更新进行调度

要在两个数据集中的任何一个被更新时触发 DAG 执行,请应用 OR 运算符 (|)

with DAG(
    # Consume dataset 1 or 2 with dataset expressions
    schedule=(dag1_dataset | dag2_dataset),
    ...,
):
    ...

复杂的条件逻辑

对于需要更复杂条件的情况,例如当一个数据集被更新或当其他两个数据集都被更新时触发 DAG,请组合使用 OR 和 AND 运算符

dag3_dataset = Dataset("s3://dag3/output_3.txt")

with DAG(
    # Consume dataset 1 or both 2 and 3 with dataset expressions
    schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
    ...,
):
    ...

通过 DatasetAlias 动态发出数据事件和创建数据集

数据集别名可以用于发出与别名关联的数据集的数据集事件。下游可以依赖于已解析的数据集。此功能允许您根据数据集更新为 DAG 执行定义复杂的依赖关系。

如何使用 DatasetAlias

DatasetAlias 具有一个单独的参数 name,用于唯一标识数据集。任务必须首先将别名声明为出口,并使用 outlet_events 或产生 Metadata 来向其添加事件。

以下示例创建一个针对 S3 URI f"s3://bucket/my-task" 的数据集事件,并带有可选的额外信息 extra。如果数据集不存在,Airflow 将动态创建它并记录警告消息。

在任务执行期间通过 outlet_events 发出数据集事件

from airflow.datasets import DatasetAlias


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})

在任务执行期间通过产生 Metadata 发出数据集事件

from airflow.datasets.metadata import Metadata


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_dataset = Dataset("s3://bucket/my-task")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

即使数据集多次添加到别名中,或者添加到多个别名中,也仅为添加的数据集发出一个数据集事件。但是,如果传递了不同的 extra 值,则可以发出多个数据集事件。在以下示例中,将发出两个数据集事件。

from airflow.datasets import DatasetAlias


@task(
    outlets=[
        DatasetAlias("my-task-outputs-1"),
        DatasetAlias("my-task-outputs-2"),
        DatasetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line won't emit an additional dataset event as the dataset and extra are the same as the previous line.
    outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line will emit an additional dataset event as the extra is different.
    outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"})

基于数据集别名进行调度

由于添加到别名的数据集事件只是简单的数据集事件,因此依赖于实际数据集的下游 DAG 可以正常读取其数据集事件,而无需考虑关联的别名。下游 DAG 也可以依赖数据集别名。创作语法是通过名称引用 DatasetAlias,并且将选取关联的数据集事件进行调度。请注意,当且仅当别名解析为 Dataset("s3://bucket/my-task") 时,才会通过具有 outlets=DatasetAlias("xxx") 的任务触发 DAG。无论数据集的标识如何,只要具有出口 DatasetAlias("out") 的任务在运行时与至少一个数据集相关联,DAG 就会运行。如果对于特定的给定任务运行,没有数据集与别名相关联,则不会触发下游 DAG。这也意味着我们可以进行有条件的数据集触发。

在 DAG 解析期间,数据集别名会解析为数据集。因此,如果“min_file_process_interval”配置设置为较高的值,则数据集别名可能无法解析。要解决此问题,您可以触发 DAG 解析。

with DAG(dag_id="dataset-producer"):

    @task(outlets=[Dataset("example-alias")])
    def produce_dataset_events():
        pass


with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))


with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")):
    ...

with DAG(dag_id="dataset-alias-consumer", schedule=DatasetAlias("example-alias")):
    ...

在提供的示例中,一旦执行 DAG dataset-alias-producer,数据集别名 DatasetAlias("example-alias") 将被解析为 Dataset("s3://bucket/my-task")。但是,DAG dataset-alias-consumer 将必须等待下一次 DAG 重新解析才能更新其计划。为了解决这个问题,当数据集别名 DatasetAlias("example-alias") 解析为这些 DAG 以前不依赖的数据集时,Airflow 将重新解析依赖于数据集别名的 DAG。因此,在执行 DAG dataset-alias-producer 后,将触发 “dataset-consumer” 和 “dataset-alias-consumer” DAG。

通过已解析的数据集别名获取先前发出的数据集事件的信息

正如 从先前发出的数据集事件中获取信息 中提到的,可以使用执行上下文中的 inlet_events 访问器读取入口数据集事件,您还可以使用数据集别名来访问它们触发的数据集事件。

with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(
            Dataset("s3://bucket/my-task"), extra={"row_count": 1}
        )


with DAG(dag_id="dataset-alias-consumer", schedule=None):

    @task(inlets=[DatasetAlias("example-alias")])
    def consume_dataset_alias_events(*, inlet_events):
        events = inlet_events[DatasetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

结合数据集和基于时间的计划

DatasetTimetable 集成

您可以使用 DatasetOrTimeSchedule 基于数据集事件和基于时间的计划来安排 DAG。 这允许您在 DAG 需要通过数据更新触发并根据固定时间表定期运行时创建工作流。

有关 DatasetOrTimeSchedule 的更多详细信息,请参阅 DatasetOrTimeSchedule 中的相应部分。

这个条目有帮助吗?