Apache Airflow 2.4.0 包含总计超过 870 个提交,其中面向用户的提交超过 650 个(不包括对 provider 或 chart 的提交)。这包括 46 项新功能、39 项改进、52 个错误修复以及若干文档变更。
详情:
📦 PyPI: https://pypi.ac.cn/project/apache-airflow/2.4.0/
📚 文档: https://airflow.org.cn/docs/apache-airflow/2.4.0/
🛠️ 发行说明: https://airflow.org.cn/docs/apache-airflow/2.4.0/release_notes.html
🐳 Docker 镜像:docker pull apache/airflow:2.4.0
🚏 约束文件: https://github.com/apache/airflow/tree/constraints-2.4.0
数据感知调度 (AIP-48)
这是一个重要功能。Airflow 现在能够基于其他任务更新数据集来调度 DAG。
这具体意味着什么?这是一个很棒的新功能,它允许 DAG 作者创建更小、更独立的 DAG,这些 DAG 可以串联起来,形成更大的基于数据的流程。如果您目前正在使用 ExternalTaskSensor 或 TriggerDagRunOperator,您应该关注一下数据集 (datasets)——在大多数情况下,您可以使用数据集来替代它们,这将加快调度速度!
闲话少说,让我们来看一个简短的例子。首先,我们编写一个简单的 DAG,其中包含一个名为 my_task 的任务,该任务会产生一个名为 my-dataset 的数据集。
from airflow import Dataset
dataset = Dataset(uri='my-dataset')
with DAG(dag_id='producer', ...)
@task(outlets=[dataset])
def my_task():
...
数据集通过 URI 定义。现在,我们可以创建第二个 DAG(consumer),只要此数据集发生变化,该 DAG 就会被调度执行。
from airflow import Dataset
dataset = Dataset(uri='my-dataset')
with DAG(dag_id='dataset-consumer', schedule=[dataset]):
...
有了这两个 DAG,一旦 my_task 完成,Airflow 将会为 dataset-consumer 流程创建一个 DAG 运行。
我们知道目前的功能还不能满足人们对数据集的所有使用场景需求,在接下来的小版本中(2.5、2.6 等),我们将在此基础上进行扩展和改进。
数据集代表了数据集的抽象概念,并且(目前)不具备任何直接的读写能力——在此版本中,我们添加了这个基础功能,以便将来在其上进行构建——这也是我们推出小版本以便更快地向大家提供新功能目标的一部分!
有关数据集的更多信息,请参阅关于数据感知调度的文档。其中包含以下详细信息:数据集如何通过 URI 标识,如何依赖多个数据集,以及如何理解数据集的概念(提示:不要将“日期分区”包含在数据集中,数据集的概念级别更高)。
使用新的 ExternalPythonOperator 更轻松地管理冲突的 Python 依赖项
尽管我们希望所有的 Python 库都能愉快地协同工作,但这并非现实,有时在 Airflow 安装中尝试安装多个 Python 库时会发生冲突——目前我们经常听到关于 dbt-core 的这类问题。
为了简化此问题,我们引入了 @task.external_python(以及对应的 ExternalPythonOperator),它们允许您在预先配置好的虚拟环境中将 Python 函数作为 Airflow 任务运行,甚至可以使用完全不同的 Python 版本。例如:
@task.external_python(python='/opt/venvs/task_deps/bin/python')
def my_task(data_interval_start, data_interval_env)
print(f'Looking at data between {data_interval_start} and {data_interval_end}')
...
根据您访问的上下文变量,虚拟环境中需要安装的内容会有些微妙之处,因此请务必阅读关于使用 ExternalPythonOperator 的操作指南
动态任务映射的更多改进 (AIP-42)
我们听取了您的意见。动态任务映射现在支持以下功能:
expand_kwargs:用于为非 TaskFlow operator 分配多个参数。zip:用于组合多个对象,而非生成笛卡尔积(cross-product)。map:用于在任务运行前转换参数。
有关动态任务映射的更多信息,请参阅文档中的新章节:转换映射数据、组合上游数据(即“zipping”)以及为非 TaskFlow operator 分配多个参数。
自动注册在上下文管理器中使用的 DAG(不再需要 as dag:)
这是一项小的使用体验改进,我不想承认我曾多少次忘记了 as dag:,或者更糟,重复写了 as dag:。
with DAG(dag_id="example") as dag:
...
@dag
def dag_maker():
...
dag2 = dag_maker()
可以变成
with DAG(dag_id="example"):
...
@dag
def my_dag():
...
my_dag()
如果您出于任何原因想要禁用此行为,可以在 DAG 上设置 auto_register=False
# This dag will not be picked up by Airflow as it's not assigned to a variable
with DAG(dag_id="example", auto_register=False):
...
其他改进
由于有超过 650 个提交,完整的功能、修复和变更列表在此无法一一列举(请查看发行说明获取完整列表),但一些值得注意或有趣的小功能包括:
- 首页自动刷新
- 新增
@task.short_circuitTaskFlow 装饰器 - 新增角色删除命令到 CLI
- 在
ExternalTaskSensor中新增对TaskGroup的支持 - 新增
@task.kubernetesTaskFlow 装饰器 - 新增实验性
parsing_context,以优化 worker 中动态 DAG 的处理 - 整合为单个
schedule参数 - 允许在 Admin -> Configuration 中显示非敏感配置值(而非全部或不显示)
- Operator 名称与类分离(使用 TaskFlow 时不再出现
_PythonDecoratedOperator)
贡献者
感谢所有为本次发布做出贡献的人,包括 Andrey Anshin, Ash Berlin-Taylor, Bartłomiej Hirsz, Brent Bovenzi, Chenglong Yan, D. Ferruzzi, Daniel Standish, Drew Hubl, Elad Kalif, Ephraim Anierobi, Jarek Potiuk, Jed Cunningham, Josh Fell, Mark Norman Francis, Niko, Tzu-ping Chung, Vincent, Wojciech Januszek, chethanuk-plutoflume, pierrejeambrun, 以及所有提交代码的其他人,总计 152 位!正是你们让 Airflow 成为了一个成功的项目!
分享