动态任务映射¶
动态任务映射允许工作流在运行时根据当前数据创建多个任务,而不是 DAG 作者必须预先知道需要多少个任务。
这类似于在 for 循环中定义任务,但不是让 DAG 文件获取数据并自行执行此操作,而是调度程序可以根据先前任务的输出执行此操作。在执行映射任务之前,调度程序将创建任务的 n 个副本,每个输入一个副本。
也可以让一个任务对映射任务收集的输出进行操作,通常称为 map 和 reduce。
简单映射¶
在最简单的形式中,您可以使用 expand()
函数而不是直接调用任务,来映射在 DAG 文件中直接定义的列表。
如果您想查看动态任务映射的简单用法,可以查看以下内容
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://apache.ac.cn/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""
from __future__ import annotations
from datetime import datetime
from airflow.decorators import task
from airflow.models.dag import DAG
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
执行时,这将在任务日志中显示 Total was 9
。
这是生成的 DAG 结构
网格视图还会在详细信息面板中提供映射任务的可见性
注意
只允许将关键字参数传递给 expand()
。
注意
从映射任务传递的值是延迟代理
在上面的示例中,sum_it
收到的 values
是每个映射的 add_one
实例返回的所有值的聚合。但是,由于不可能预先知道我们将有多少 add_one
实例,因此 values
不是一个普通的列表,而是一个“延迟序列”,它仅在被请求时才检索每个单独的值。因此,如果您直接运行 print(values)
,您将得到类似以下内容
LazySelectSequence([15 items])
您可以在此对象上使用正常的序列语法(例如,values[0]
),或者使用 for
循环正常遍历它。list(values)
将为您提供一个“真实的”list
,但是由于这将急切地从 所有 引用的上游映射任务加载值,因此您必须注意如果映射数量很大,可能会产生潜在的性能影响。
请注意,当您将此代理对象推送到 XCom 时,情况也是如此。Airflow 会尝试变得智能并自动强制转换值,但会为此发出警告,以便您了解这一点。例如
@task
def forward_values(values):
return values # This is a lazy proxy!
将发出类似这样的警告
Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.
可以通过像这样修改任务来抑制该消息
@task
def forward_values(values):
return list(values)
注意
不需要 reduce 任务。
尽管我们在这里展示了一个“reduce”任务(sum_it
),但您不必拥有一个,即使映射的任务没有下游任务,它们仍将被执行。
任务生成的映射¶
我们展示的上述示例都可以通过 DAG 文件中的 for
循环来实现,但动态任务映射的真正强大之处在于能够让任务生成要迭代的列表。
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(arg)
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
make_list
任务作为普通任务运行,并且必须返回列表或字典(请参阅哪些数据类型可以展开?),然后将调用 consumer
任务四次,每次使用 make_list
的返回值中的每个值。
警告
任务生成的映射不能与 TriggerRule.ALWAYS
一起使用
不允许在任务生成的映射中分配 trigger_rule=TriggerRule.ALWAYS
,因为展开的参数在任务的立即执行中未定义。这是在 DAG 解析时强制执行的,对于任务和映射的任务组,如果您尝试使用它,将引发错误。在最近的示例中,在 consumer
任务中设置 trigger_rule=TriggerRule.ALWAYS
将引发错误,因为 make_list
是任务生成的映射。
重复映射¶
一个映射任务的结果也可以用作下一个映射任务的输入。
with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
first = add_one.expand(x=[1, 2, 3])
second = add_one.expand(x=first)
结果将为 [3, 4, 5]
。
添加不展开的参数¶
除了传递在运行时展开的参数之外,还可以传递不更改的参数——为了清楚地区分这两种类型,我们使用不同的函数,expand()
用于映射的参数,partial()
用于未映射的参数。
@task
def add(x: int, y: int):
return x + y
added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)
这将导致值分别为 11、12 和 13。
这也适用于将诸如连接 ID、数据库表名称或存储桶名称之类的内容传递给任务。
映射多个参数¶
除了单个参数外,还可以传递多个参数进行展开。这将具有创建“笛卡尔积”的效果,使用参数的每个组合来调用映射任务。
@task
def add(x: int, y: int):
return x + y
added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# This results in the add function being called with
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)
这将导致 add 任务被调用 6 次。但是请注意,不能保证展开的顺序。
命名映射¶
默认情况下,映射的任务会被分配一个整数索引。可以使用基于任务输入的名称覆盖 Airflow UI 中每个映射任务的整数索引。这是通过为任务提供带有 map_index_template
的 Jinja 模板来实现的。当扩展看起来像 .expand(<property>=...)
时,这通常看起来像 map_index_template="{{ task.<property> }}"
。此模板在执行每个展开的任务后使用任务上下文进行渲染。这意味着您可以像这样引用任务上的属性
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
...,
sql="SELECT * FROM data WHERE date = %(date)s",
map_index_template="""{{ task.parameters['date'] }}""",
).expand(
parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)
在上面的示例中,展开的任务实例将分别命名为“2024-01-01”和“2024-01-02”。这些名称显示在 Airflow UI 中,而不是“0”和“1”。
由于模板在主执行块之后呈现,因此也可以动态注入到渲染上下文中。当渲染所需名称的逻辑难以用 Jinja 模板语法表达时,尤其是在 taskflow 函数中,这很有用。例如
from airflow.operators.python import get_current_context
@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
context = get_current_context()
context["my_variable"] = my_value * 3
... # Normal execution...
# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])
使用非 TaskFlow 操作符进行映射¶
也可以将 partial
和 expand
与经典样式操作符一起使用。某些参数不可映射,必须传递给 partial()
,例如 task_id
、queue
、pool
和 BaseOperator
的大多数其他参数。
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://apache.ac.cn/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""
from __future__ import annotations
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
class AddOneOperator(BaseOperator):
"""A custom operator that adds one to the input."""
def __init__(self, value, **kwargs):
super().__init__(**kwargs)
self.value = value
def execute(self, context):
return self.value + 1
class SumItOperator(BaseOperator):
"""A custom operator that sums the input."""
template_fields = ("values",)
def __init__(self, values, **kwargs):
super().__init__(**kwargs)
self.values = values
def execute(self, context):
total = sum(self.values)
print(f"Total was {total}")
return total
with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
schedule=None,
start_date=datetime(2022, 3, 4),
catchup=False,
):
# map the task to a list of values
add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])
# aggregate (reduce) the mapped tasks results
sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)
注意
只允许将关键字参数传递给 partial()
。
映射经典操作符的结果¶
如果要映射经典操作符的结果,则应显式引用 输出,而不是操作符本身。
# Create a list of data inputs.
extract = ExtractOperator(task_id="extract")
# Expand the operator to transform each input.
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
# Collect the transformed inputs, expand the operator to load each one of them to the target.
load = LoadOperator.partial(task_id="load").expand(input=transform.output)
混合使用 TaskFlow 和经典操作符¶
在此示例中,您有一个到 S3 存储桶的常规数据交付,并且希望将相同的处理应用于每次到达的每个文件,无论每次到达多少个文件。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
list_filenames = S3ListOperator(
task_id="get_input",
bucket="example-bucket",
prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
)
@task
def count_lines(aws_conn_id, bucket, filename):
hook = S3Hook(aws_conn_id=aws_conn_id)
return len(hook.read_key(filename, bucket).splitlines())
@task
def total(lines):
return sum(lines)
counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
filename=list_filenames.output
)
total(lines=counts)
将多个参数分配给非 TaskFlow 操作符¶
有时,上游需要为下游操作符指定多个参数。为此,您可以使用 expand_kwargs
函数,它接受一个映射序列进行映射。
BashOperator.partial(task_id="bash").expand_kwargs(
[
{"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
{"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
],
)
这将在运行时生成两个任务实例,分别打印 1
和 2
。
此外,还可以将 expand_kwargs
与大多数操作符参数混合使用,例如 PythonOperator 的 op_kwargs
。
def print_args(x, y):
print(x)
print(y)
return x + y
PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(
[
{"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
{"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
]
)
与 expand
类似,您还可以映射到返回字典列表的 XCom,或每个都返回字典的 XCom 列表。重用上面的 S3 示例,您可以使用映射任务来执行“分支”并将文件复制到不同的存储桶。
list_filenames = S3ListOperator(...) # Same as the above example.
@task
def create_copy_kwargs(filename):
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
dest_bucket_name = "my_text_bucket"
else:
dest_bucket_name = "my_other_bucket"
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": dest_bucket_name,
}
copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)
# Copy files to another bucket, based on the file's extension.
copy_filenames = S3CopyObjectOperator.partial(
task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)
映射任务组¶
与 TaskFlow 任务类似,您也可以在用 @task_group
修饰的函数上调用 expand
或 expand_kwargs
来创建映射的任务组。
注意
为简洁起见,本节中省略了各个任务的实现。
@task_group
def file_transforms(filename):
return convert_to_yaml(filename)
file_transforms.expand(filename=["data1.json", "data2.json"])
在上面的示例中,任务 convert_to_yaml
在运行时被扩展为两个任务实例。第一个扩展实例将接收 "data1.json"
作为输入,第二个实例将接收 "data2.json"
。
任务组函数中的值引用¶
任务函数(@task
)和任务组函数(@task_group
)之间的一个重要区别是,由于任务组没有关联的工作进程,因此任务组函数中的代码无法解析传递给它的参数;真实值仅在将引用传递给任务时才会被解析。
例如,此代码将无法工作
@task def my_task(value): print(value) @task_group def my_task_group(value): if not value: # DOES NOT work as you'd expect! task_a = EmptyOperator(...) else: task_a = PythonOperator(...) task_a << my_task(value) my_task_group.expand(value=[0, 1, 2])
当执行 my_task_group
中的代码时,value
仍然只是一个引用,而不是真实值,因此 if not value
分支将不会像您期望的那样工作。但是,如果您将该引用传递给一个任务,它将在任务执行时被解析,因此三个 my_task
实例将分别接收 1、2 和 3。
因此,重要的是要记住,如果您打算对传递到任务组函数的值执行任何逻辑,则必须始终使用任务来运行该逻辑,例如 @task.branch
(或 BranchPythonOperator
)用于条件,以及任务映射方法用于循环。
注意
不允许在映射的任务组中进行任务映射
目前不允许在映射的任务组内部进行嵌套的任务映射。虽然此功能的技术方面并不特别困难,但我们已决定有意省略此功能,因为它增加了相当多的 UI 复杂性,并且对于一般用例可能不是必需的。将来可能会根据用户反馈重新考虑此限制。
深度优先执行¶
如果一个映射的任务组包含多个任务,则组中的所有任务都会针对相同的输入“一起”扩展。例如
@task_group
def file_transforms(filename):
converted = convert_to_yaml(filename)
return replace_defaults(converted)
file_transforms.expand(filename=["data1.json", "data2.json"])
由于组 file_transforms
被扩展为两个,任务 convert_to_yaml
和 replace_defaults
在运行时都将成为两个实例。
可以通过像这样分别扩展两个任务来实现类似的效果
converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)
但是,不同之处在于,任务组允许内部的每个任务仅依赖于其“相关输入”。对于上面的示例,replace_defaults
将仅依赖于同一扩展组的 convert_to_yaml
,而不是同一任务的实例,而是在不同的组中。这种策略称为深度优先执行(与简单的无组广度优先执行相反),它允许更符合逻辑的任务分离、细粒度的依赖规则和准确的资源分配 - 使用上面的示例,第一个 replace_defaults
将能够在 convert_to_yaml("data2.json")
完成之前运行,并且不需要关心它是否成功。
依赖映射的任务组的输出¶
与映射的任务组类似,依赖映射的任务组的输出也会自动聚合该组的结果
@task_group
def add_to(value):
value = add_one(value)
return double(value)
results = add_to.expand(value=[1, 2, 3])
consumer(results) # Will receive [4, 6, 8].
也可以执行与普通映射任务的结果相同的任何操作。
基于映射的任务组的输出进行分支¶
虽然无法对映射任务的结果实现分支逻辑(例如使用 @task.branch
),但可以基于任务组的输入进行分支。以下示例演示了基于映射任务组的输入执行三个任务之一。
inputs = ["a", "b", "c"]
@task_group(group_id="my_task_group")
def my_task_group(input):
@task.branch
def branch(element):
if "a" in element:
return "my_task_group.a"
elif "b" in element:
return "my_task_group.b"
else:
return "my_task_group.c"
@task
def a():
print("a")
@task
def b():
print("b")
@task
def c():
print("c")
branch(input) >> [a(), b(), c()]
my_task_group.expand(input=inputs)
从映射任务中筛选项目¶
映射的任务可以通过返回 None
来删除任何要传递到其下游任务的元素。例如,如果我们只想将具有特定扩展名的文件从 S3 存储桶复制到另一个存储桶,则可以像这样实现 create_copy_kwargs
@task
def create_copy_kwargs(filename):
# Skip files not ending with these suffixes.
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
return None
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": "my_other_bucket",
}
# copy_kwargs and copy_files are implemented the same.
这使得 copy_files
仅针对 .json
和 .yml
文件进行扩展,而忽略其余文件。
转换扩展数据¶
由于通常需要转换任务映射的输出数据格式,特别是对于非 TaskFlow 操作符,其输出格式是预先确定的并且不容易转换(例如上面示例中的 create_copy_kwargs
),因此可以使用特殊的 map()
函数来轻松执行此类转换。因此,上面的示例可以像这样修改
from airflow.exceptions import AirflowSkipException
list_filenames = S3ListOperator(...) # Unchanged.
def create_copy_kwargs(filename):
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": "my_other_bucket",
}
copy_kwargs = list_filenames.output.map(create_copy_kwargs)
# Unchanged.
copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)
有几点需要注意
组合上游数据(又名“压缩”)¶
通常还需要将多个输入源组合到一个任务映射迭代器中。这通常被称为“压缩”(如 Python 的内置 zip()
函数),并且也是作为下游任务的预处理执行的。
这对于任务映射中的条件逻辑特别有用。例如,如果您想从 S3 下载文件,但重命名这些文件,则可以执行类似的操作
list_filenames_a = S3ListOperator(
task_id="list_files_in_a",
bucket="bucket",
prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]
filenames_a_b = list_filenames_a.output.zip(list_filenames_b)
@task
def download_filea_from_a_rename(filenames_a_b):
fn_a, fn_b = filenames_a_b
S3Hook().download_file(fn_a, local_path=fn_b)
download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)
与内置的 zip()
类似,您可以将任意数量的迭代器压缩在一起,以获取位置参数计数的元组的迭代器。默认情况下,压缩的迭代器的长度与压缩的迭代器中最短的长度相同,并删除多余的项目。可以传递可选的关键字参数 default
以将行为切换为与 Python 的 itertools.zip_longest()
匹配——压缩的迭代器将具有与压缩的迭代器中最长的迭代器相同的长度,并且缺少项目将填充 default
提供的值。
连接多个上游¶
组合输入源的另一种常见模式是对多个迭代器运行相同的任务。当然,简单地为每个迭代器分别运行相同的代码是完全有效的,例如
list_filenames_a = S3ListOperator(
task_id="list_files_in_a",
bucket="bucket",
prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = S3ListOperator(
task_id="list_files_in_b",
bucket="bucket",
prefix="incoming/provider_b/{{ data_interval_start|ds }}",
)
@task
def download_file(filename):
S3Hook().download_file(filename)
# process file...
download_file.override(task_id="download_file_a").expand(filename=list_filenames_a.output)
download_file.override(task_id="download_file_b").expand(filename=list_filenames_b.output)
然而,如果可以将任务组合成一个,DAG 的可扩展性和易于检查性都会更高。这可以使用 concat
来完成。
# Tasks list_filenames_a and list_filenames_b, and download_file stay unchanged.
list_filenames_concat = list_filenames_a.concat(list_filenames_b)
download_file.expand(filename=list_filenames_concat)
这将创建一个单一任务,以针对两个列表进行扩展。你可以使用 concat
将任意数量的可迭代对象连接在一起 (例如,foo.concat(bar, rex)
);或者,由于返回值也是一个 XCom 引用,因此可以链式调用 concat
(例如,foo.concat(bar).concat(rex)
) 来实现相同的结果:一个单一的可迭代对象,按顺序连接所有对象,类似于 Python 的 itertools.chain()
。
哪些数据类型可以被扩展?¶
目前,只能针对字典、列表或作为任务结果存储在 XCom 中的这些类型进行映射。
如果上游任务返回一个不可映射的类型,映射的任务将在运行时失败,并抛出 UnmappableXComTypePushed
异常。例如,你不能让上游任务返回一个纯字符串 – 它必须是一个列表或一个字典。
模板字段和映射的参数如何交互?¶
运算符的所有参数都可以被映射,即使是那些不接受模板化参数的参数。
如果一个字段被标记为模板化并且被映射,它将不会被模板化。
例如,这将打印 {{ ds }}
,而不是日期戳。
@task
def make_list():
return ["{{ ds }}"]
@task
def printer(val):
print(val)
printer.expand(val=make_list())
如果你想插入值,要么自己调用 task.render_template
,要么使用插值。
@task
def make_list(ds=None):
return [ds]
@task
def make_list(**context):
return [context["task"].render_template("{{ ds }}", context)]
对映射的任务设置限制¶
你可以对一个任务设置两个限制:
作为扩展结果可以创建的映射任务实例的数量。
可以同时运行的映射任务的数量。
限制映射的任务数量
[core]
max_map_length
配置选项是expand
可以创建的最大任务数 – 默认值为 1024。如果源任务(在前面的示例中是
make_list
)返回的列表长度超过此值,则会导致该任务失败。限制映射任务的并行副本
如果你不希望大型映射任务消耗所有可用的运行器插槽,你可以在任务上使用
max_active_tis_per_dag
设置来限制可以同时运行的数量。但是请注意,这适用于该任务针对所有活动 DagRuns 的所有副本,而不仅仅是这个特定的 DagRun。
@task(max_active_tis_per_dag=16) def add_one(x: int): return x + 1 BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand(bash_command=commands)
自动跳过零长度映射¶
如果输入为空(零长度),则不会创建新任务,并且映射的任务将被标记为 SKIPPED
。