动态任务映射

动态任务映射允许工作流根据当前数据在运行时创建多个任务,而不是 DAG 作者必须事先知道需要多少个任务。

这类似于在 for 循环中定义任务,但不是让 DAG 文件自己获取数据并执行此操作,而是调度程序可以根据先前任务的输出执行此操作。在执行映射任务之前,调度程序将创建任务的 n 个副本,每个输入一个。

也可以让任务对映射任务的收集输出进行操作,通常称为映射和 reduce。

简单映射

在最简单的形式中,您可以使用 expand() 函数而不是直接调用任务来映射直接在 DAG 文件中定义的列表。

如果您想查看动态任务映射的简单用法,可以查看以下内容

airflow/example_dags/example_dynamic_task_mapping.py[源代码]

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""

from __future__ import annotations

from datetime import datetime

from airflow.decorators import task
from airflow.models.dag import DAG

with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

这将在执行时在任务日志中显示 总计为 9

这是生成的 DAG 结构

../_images/mapping-simple-graph.png

网格视图还在详细信息面板中提供了映射任务的可见性

../_images/mapping-simple-grid.png

注意

只允许将关键字参数传递给 expand()

注意

从映射任务传递的值是惰性代理

在上面的示例中,sum_it 接收到的 valuesadd_one 的每个映射实例返回的所有值的聚合。但是,由于不可能事先知道我们将有多少个 add_one 实例,因此 values 不是普通的列表,而是一个“惰性序列”,仅在被询问时才检索每个单独的值。因此,如果您直接运行 print(values),您将得到如下内容

LazyXComAccess(dag_id='simple_mapping', run_id='test_run', task_id='add_one')

您可以在此对象上使用正常的序列语法(例如 values[0]),或者使用 for 循环正常迭代它。list(values) 将为您提供一个“真实的”list,但由于这会急切地从*所有*引用的上游映射任务加载值,因此您必须注意如果映射数量很大,潜在的性能影响。

请注意,当您将此代理对象推送到 XCom 时,同样适用。Airflow 会尝试变得智能并自动强制转换值,但会为此发出警告,以便您了解这一点。例如

@task
def forward_values(values):
    return values  # This is a lazy proxy!

将发出如下警告

Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.

可以通过修改任务来抑制该消息,如下所示

@task
def forward_values(values):
    return list(values)

注意

不需要 reduce 任务。

尽管我们在这里展示了一个“reduce”任务(sum_it),但您不必拥有一个,即使映射的任务没有下游任务,它们仍然会被执行。

任务生成的映射

我们上面展示的示例都可以通过 DAG 文件中的 for 循环来实现,但动态任务映射的真正威力在于能够让任务生成要迭代的列表。

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(arg)


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

make_list 任务作为普通任务运行,并且必须返回一个列表或字典(请参阅可以扩展哪些数据类型?),然后 consumer 任务将被调用四次,每次使用 make_list 返回值中的一个值。

重复映射

一个映射任务的结果也可以用作下一个映射任务的输入。

with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    first = add_one.expand(x=[1, 2, 3])
    second = add_one.expand(x=first)

这将产生 [3, 4, 5] 的结果。

添加不扩展的参数

除了传递在运行时扩展的参数外,还可以传递不变的参数——为了清楚地区分这两种参数,我们使用了不同的函数,expand() 用于映射的参数,partial() 用于未映射的参数。

@task
def add(x: int, y: int):
    return x + y


added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)

这将导致值为 11、12 和 13。

这对于将连接 ID、数据库表名或存储桶名称等内容传递给任务也很有用。

映射多个参数

除了单个参数之外,还可以传递多个参数来扩展。这将产生创建“叉积”的效果,使用每种参数组合调用映射的任务。

@task
def add(x: int, y: int):
    return x + y


added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# This results in the add function being called with
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)

这将导致 add 任务被调用 6 次。但是请注意,扩展的顺序不能保证。

命名映射

默认情况下,映射的任务会被分配一个整数索引。可以使用基于任务输入的名称在 Airflow UI 中覆盖每个映射任务的整数索引。这是通过使用 map_index_template 为任务提供 Jinja 模板来完成的。当扩展看起来像 .expand(<property>=...) 时,这通常看起来像 map_index_template="{{ task.<property> }}"。在使用任务上下文执行每个扩展任务后,将呈现此模板。这意味着您可以像这样引用任务上的属性

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
    ...,
    sql="SELECT * FROM data WHERE date = %(date)s",
    map_index_template="""{{ task.parameters['date'] }}""",
).expand(
    parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)

在上面的示例中,扩展的任务实例将分别命名为“2024-01-01”和“2024-01-02”。这些名称将显示在 Airflow UI 中,而不是“0”和“1”。

由于模板是在主执行块之后呈现的,因此也可以动态地注入到呈现上下文中。当在 Jinja 模板语法中难以表达呈现所需名称的逻辑时,这很有用,尤其是在 taskflow 函数中。例如

from airflow.operators.python import get_current_context


@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
    context = get_current_context()
    context["my_variable"] = my_value * 3
    ...  # Normal execution...


# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])

使用非 TaskFlow 运算符进行映射

也可以将 partialexpand 与经典样式的运算符一起使用。某些参数不可映射,必须传递给 partial(),例如 task_idqueuepool 以及 BaseOperator 的大多数其他参数。

airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py[源代码]

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""

from __future__ import annotations

from datetime import datetime

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG


class AddOneOperator(BaseOperator):
    """A custom operator that adds one to the input."""

    def __init__(self, value, **kwargs):
        super().__init__(**kwargs)
        self.value = value

    def execute(self, context):
        return self.value + 1


class SumItOperator(BaseOperator):
    """A custom operator that sums the input."""

    template_fields = ("values",)

    def __init__(self, values, **kwargs):
        super().__init__(**kwargs)
        self.values = values

    def execute(self, context):
        total = sum(self.values)
        print(f"Total was {total}")
        return total


with DAG(
    dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
    start_date=datetime(2022, 3, 4),
    catchup=False,
):
    # map the task to a list of values
    add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])

    # aggregate (reduce) the mapped tasks results
    sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)

注意

只允许将关键字参数传递给 partial()

映射经典运算符的结果

如果要映射经典运算符的结果,则应显式引用*输出*,而不是运算符本身。

# Create a list of data inputs.
extract = ExtractOperator(task_id="extract")

# Expand the operator to transform each input.
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)

# Collect the transformed inputs, expand the operator to load each one of them to the target.
load = LoadOperator.partial(task_id="load").expand(input=transform.output)

混合使用 TaskFlow 和经典运算符

在此示例中,您将定期向 S3 存储桶交付数据,并且希望对到达的每个文件应用相同的处理,无论每次到达多少个文件。

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator


with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
    list_filenames = S3ListOperator(
        task_id="get_input",
        bucket="example-bucket",
        prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
    )

    @task
    def count_lines(aws_conn_id, bucket, filename):
        hook = S3Hook(aws_conn_id=aws_conn_id)

        return len(hook.read_key(filename, bucket).splitlines())

    @task
    def total(lines):
        return sum(lines)

    counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
        filename=list_filenames.output
    )

    total(lines=counts)

为非 TaskFlow 运算符分配多个参数

有时,上游需要为下游运算符指定多个参数。为此,您可以使用 expand_kwargs 函数,该函数采用一系列映射来进行映射。

BashOperator.partial(task_id="bash").expand_kwargs(
    [
        {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
        {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
    ],
)

这会在运行时生成两个任务实例,分别打印 12

还可以将 expand_kwargs 与大多数运算符参数混合使用,例如 PythonOperator 的 op_kwargs

def print_args(x, y):
    print(x)
    print(y)
    return x + y


PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(
    [
        {"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
        {"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
    ]
)

expand 类似,您还可以针对返回字典列表的 XCom 或每个返回字典的 XCom 列表进行映射。重复使用上面的 S3 示例,您可以使用映射的任务来执行“分支”并将文件复制到不同的存储桶

list_filenames = S3ListOperator(...)  # Same as the above example.


@task
def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        dest_bucket_name = "my_text_bucket"
    else:
        dest_bucket_name = "my_other_bucket"
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": dest_bucket_name,
    }


copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)

# Copy files to another bucket, based on the file's extension.
copy_filenames = S3CopyObjectOperator.partial(
    task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)

映射任务组

与 TaskFlow 任务类似,您也可以在 @task_group 修饰的函数上调用 expandexpand_kwargs 来创建映射的任务组

注意

为了简洁起见,省略了本节中各个任务的实现。

@task_group
def file_transforms(filename):
    return convert_to_yaml(filename)


file_transforms.expand(filename=["data1.json", "data2.json"])

在上面的例子中,任务 convert_to_yaml 在运行时被扩展成两个任务实例。第一个扩展将接收 "data1.json" 作为输入,第二个扩展将接收 "data2.json"

任务组函数中的值引用

任务函数 (@task) 和任务*组*函数 (@task_group) 之间的一个重要区别是,由于任务组没有关联的工作器,因此任务组函数中的代码无法解析传递给它的参数;真实值仅在引用传递给任务时才会被解析。

例如,以下代码将*无法*工作

@task
def my_task(value):
    print(value)


@task_group
def my_task_group(value):
    if not value:  # DOES NOT work as you'd expect!
        task_a = EmptyOperator(...)
    else:
        task_a = PythonOperator(...)
    task_a << my_task(value)


my_task_group.expand(value=[0, 1, 2])

my_task_group 中的代码被执行时,value 将仍然只是一个引用,而不是真实值,因此 if not value 分支将无法按预期工作。但是,如果将该引用传递给任务,则在执行任务时它将被解析,因此三个 my_task 实例将分别接收 1、2 和 3。

因此,重要的是要记住,如果打算对传递给任务组函数的值执行任何逻辑,则必须始终使用任务来运行逻辑,例如用于条件的 @task.branch(或 BranchPythonOperator),以及用于循环的任务映射方法。

注意

不允许在映射的任务组中进行任务映射

目前不允许在映射的任务组内嵌套进行任务映射。虽然此功能的技术方面并不是特别困难,但我们决定有意省略此功能,因为它增加了相当大的 UI 复杂性,并且对于一般用例可能不是必需的。将来可能会根据用户反馈重新考虑此限制。

深度优先执行

如果映射的任务组包含多个任务,则组中的所有任务将针对相同的输入“一起”扩展。例如

@task_group
def file_transforms(filename):
    converted = convert_to_yaml(filename)
    return replace_defaults(converted)


file_transforms.expand(filename=["data1.json", "data2.json"])

由于组 file_transforms 被扩展成两个,因此任务 convert_to_yamlreplace_defaults 在运行时都将成为两个实例。

可以通过像这样分别扩展两个任务来实现类似的效果

converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)

然而,区别在于任务组允许其中的每个任务仅依赖于其“相关输入”。对于上面的例子,replace_defaults 将只依赖于相同扩展组的 convert_to_yaml,而不是相同任务但在不同组中的实例。这种策略称为*深度优先执行*(与简单的无组*广度优先执行*形成对比),允许更合理的逻辑任务分离、细粒度的依赖规则和准确的资源分配——使用上面的例子,第一个 replace_defaults 将能够在 convert_to_yaml("data2.json") 完成之前运行,并且不需要关心它是否成功。

依赖于映射的任务组的输出

与映射的任务组类似,依赖于映射的任务组的输出也将自动聚合组的结果

@task_group
def add_to(value):
    value = add_one(value)
    return double(value)


results = add_to.expand(value=[1, 2, 3])
consumer(results)  # Will receive [4, 6, 8].

也可以对来自普通映射任务的结果执行任何操作。

根据映射的任务组的输出进行分支

虽然不可能根据映射任务的结果实现分支逻辑(例如使用 @task.branch),但可以根据任务组的*输入*进行分支。以下示例演示了如何根据映射任务组的输入执行三个任务之一。

inputs = ["a", "b", "c"]


@task_group(group_id="my_task_group")
def my_task_group(input):
    @task.branch
    def branch(element):
        if "a" in element:
            return "my_task_group.a"
        elif "b" in element:
            return "my_task_group.b"
        else:
            return "my_task_group.c"

    @task
    def a():
        print("a")

    @task
    def b():
        print("b")

    @task
    def c():
        print("c")

    branch(input) >> [a(), b(), c()]


my_task_group.expand(input=inputs)

从映射的任务中过滤项目

映射的任务可以通过返回 None 来删除任何传递给其下游任务的元素。例如,如果我们只想将具有某些扩展名的文件从一个 S3 存储桶复制到另一个存储桶,我们可以像这样实现 create_copy_kwargs

@task
def create_copy_kwargs(filename):
    # Skip files not ending with these suffixes.
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        return None
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


# copy_kwargs and copy_files are implemented the same.

这使得 copy_files 仅针对 .json.yml 文件进行扩展,而忽略其余文件。

转换扩展数据

由于通常希望转换任务映射的输出数据格式,尤其是来自非 TaskFlow 运算符的输出数据格式(其中输出格式是预先确定的,并且不容易转换,例如上面例子中的 create_copy_kwargs),因此可以使用特殊的 map() 函数轻松执行此类转换。因此,上面的例子可以修改如下

from airflow.exceptions import AirflowSkipException

list_filenames = S3ListOperator(...)  # Unchanged.


def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


copy_kwargs = list_filenames.output.map(create_copy_kwargs)

# Unchanged.
copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)

有几点需要注意

  1. map() 的可调用参数(示例中的 create_copy_kwargs)**不得**是任务,而应该是普通的 Python 函数。转换是下游任务(即 copy_files)的“预处理”的一部分,而不是 DAG 中的独立任务。

  2. 可调用对象始终只接受一个位置参数。此函数针对用于任务映射的可迭代对象中的每个项目调用,类似于 Python 的内置 map() 的工作方式。

  3. 由于可调用对象作为下游任务的一部分执行,因此您可以使用任何现有技术来编写任务函数。例如,要将组件标记为已跳过,您应该引发 AirflowSkipException。请注意,此处返回 None **不起作用**。

组合上游数据(也称为“压缩”)

通常还需要将多个输入源组合成一个任务映射可迭代对象。这通常称为“压缩”(类似于 Python 的内置 zip() 函数),并且也作为下游任务的预处理执行。

这对于任务映射中的条件逻辑特别有用。例如,如果要从 S3 下载文件,但要重命名这些文件,则可以执行如下操作

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]

filenames_a_b = list_filenames_a.output.zip(list_filenames_b)


@task
def download_filea_from_a_rename(filenames_a_b):
    fn_a, fn_b = filenames_a_b
    S3Hook().download_file(fn_a, local_path=fn_b)


download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)

zip 函数接受任意数量的位置参数,并返回一个由位置参数计数组成的元组的可迭代对象。默认情况下,压缩后的可迭代对象的长度与最短的压缩后的可迭代对象的长度相同,多余的项目将被删除。可以传递一个可选的关键字参数 default 来切换行为以匹配 Python 的 itertools.zip_longest——压缩后的可迭代对象的长度将与*最长*的压缩后的可迭代对象的长度相同,缺少的项目将填充 default 提供的值。

可以扩展哪些数据类型?

目前,只能针对字典、列表或存储在 XCom 中作为任务结果的这些类型之一进行映射。

如果上游任务返回不可映射的类型,则映射的任务将在运行时失败,并出现 UnmappableXComTypePushed 异常。例如,您不能让上游任务返回一个普通的字符串——它必须是一个列表或一个字典。

模板化字段和映射参数如何交互?

运算符的所有参数都可以映射,即使是不接受模板化参数的参数也可以映射。

如果一个字段被标记为模板化并且被映射,则它**不会被模板化**。

例如,这将打印 {{ ds }} 而不是日期戳

@task
def make_list():
    return ["{{ ds }}"]


@task
def printer(val):
    print(val)


printer.expand(val=make_list())

如果您想插入值,请自己调用 task.render_template,或者使用插值

@task
def make_list(ds=None):
    return [ds]


@task
def make_list(**context):
    return [context["task"].render_template("{{ ds }}", context)]

对映射的任务设置限制

您可以对任务设置两个限制

  1. 可以创建的映射任务实例的数量,作为扩展的结果。

  2. 映射的任务可以同时运行的数量。

  • 限制映射任务的数量

    [core] max_map_length 配置选项是 expand 可以创建的最大任务数量——默认值为 1024。

    如果源任务(我们之前例子中的 make_list)返回的列表长度超过此值,则会导致*该*任务失败。

  • 限制映射任务的并行副本数

    如果您不希望大型映射任务占用所有可用的运行器插槽,则可以使用任务上的 max_active_tis_per_dag 设置来限制可以同时运行的数量。

    但是请注意,这适用于针对所有活动 DagRun 的该任务的所有副本,而不仅仅是针对这一个特定的 DagRun。

    @task(max_active_tis_per_dag=16)
    def add_one(x: int):
        return x + 1
    
    
    BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand(bash_command=commands)
    

自动跳过零长度映射

如果输入为空(长度为零),则不会创建新任务,并且映射的任务将被标记为 SKIPPED

此条目对您有帮助吗?