很高兴宣布 Apache Airflow 2.9.0 已正式发布!这次我们带来了数据感知调度的新功能以及一系列与 UI 相关的改进。
Apache Airflow 2.9.0 包含了超过 550 次提交,其中包括 38 个新功能、70 项改进、31 个错误修复和 18 项文档变更。
详情:
📦 PyPI: https://pypi.ac.cn/project/apache-airflow/2.9.0/
📚 文档: https://airflow.org.cn/docs/apache-airflow/2.9.0/
🛠 发布说明: https://airflow.org.cn/docs/apache-airflow/2.9.0/release_notes.html
🐳 Docker 镜像: “docker pull apache/airflow:2.9.0”
🚏 约束: https://github.com/apache/airflow/tree/constraints-2.9.0
Airflow 2.9.0 也是第一个支持 Python 3.12 的版本。然而,Pendulum 2 不支持 Python 3.12,因此如果您升级到 Python 3.12,需要使用 Pendulum 3。
新的数据感知调度选项
DAG 调度的逻辑运算符和条件表达式
在 Airflow 2.4 中添加数据集(Datasets)时,DAG 仅支持对数据集进行逻辑 AND 组合的调度。简单来说,您可以基于多个数据集进行调度,但只有在上次运行后所有数据集都更新了,才会创建一个 DAG 运行。现在,在 Airflow 2.9 中,我们支持逻辑 OR 甚至 AND 和 OR 的任意组合。
例如,您可以根据 dataset_1 或 dataset_2 更新时调度 DAG
with DAG(schedule=(dataset_1 | dataset_2), ...):
...
您可以有任意组合
with DAG(schedule=((dataset_1 | dataset_2) & dataset_3), ...):
...
您可以在数据感知调度文档中阅读更多关于此新功能的详情。
结合数据集和基于时间的调度
Airflow 2.9 提供了一个新的时间表 DatasetOrTimeSchedule,它允许您同时基于数据集事件和时间表来调度 DAG。现在您可以兼顾两者的优势。
例如,当 dataset_1 更新时和每天协调世界时午夜运行时
with DAG(
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=[dag1_dataset],
),
...
):
...
数据集事件 REST API 端点
引入了新的 REST API 端点,用于创建、列出和删除数据集事件。这使得外部系统能够通知 Airflow 数据集更新,并为更复杂的用例解锁了事件队列的管理功能。
有关更多详细信息,请参阅数据集 API 文档。
数据集 UI 增强
DAG 的图形视图得到了增强,可以显示它基于哪些数据集进行调度以及任务输出中的数据集,从而提供对 DAG 消费和生成的数据集的全面概述。

主数据集视图现在允许您同时按 DAG 和数据集进行过滤

查看数据集时,您现在可以通过点击右上角显示的播放按钮来手动创建数据集事件

动态任务映射的自定义名称
点击索引数字来查找您想查看的动态映射任务的日子一去不复返了!自 Airflow 2.3 添加任务映射以来,这一直是大家要求的功能,我们很高兴它终于来了。
您可以为映射操作符提供一个 map_index_template
BashOperator.partial(
task_id="hello",
bash_command="echo Hello $NAME",
map_index_template="{{ task.env['NAME'] }}",
).expand(
env=[{"NAME": "John"}, {"NAME": "Bob"}, {"NAME": "Fred"}],
)
该模板将在每个任务运行完成后渲染,并在 UI 中填充名称

有关此功能的更多详细信息,包括一个任务流示例,请参阅动态任务映射文档。
对象存储作为 XCom 后端
您现在可以将对象存储配置为 XCom 后端,从而更轻松地将 XCom 结果存入对象存储。部署管理器可以配置他们选择的对象存储、一个大小阈值来决定部分结果存入 Airflow 元数据数据库,部分存入对象存储,甚至可以指定在存储数据之前应用的压缩方法。
以下配置会将超过 1MB 的数据存储到 S3 并使用 gzip 进行压缩
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip
有关更多详细信息,请参阅对象存储 XCom 后端文档。
DAG 和任务的显示名称
准备好您的表情符号!您现在可以为 DAG 和任务设置显示名称,与 dag_id 和 task_id 分开。这允许您在 UI 中使用本地化的显示名称,或者只是使用一大堆表情符号。
使用 dag_display_name 和 task_display_name,您可以摆脱 ascii 的限制
with DAG("not_a_fun_dag_id", dag_display_name="📣 Best DAG ever 🎉", ...):
BashOperator(task_id="some_task", task_display_name="🥳 Fun task!", ...)

任务日志分组
Airflow 现在支持对任务日志进行任意分组。
默认情况下,执行前和执行后的日志会被分组并折叠,使您更容易查看任务日志

您也可以在任务代码中使用此功能,使您的日志更易于跟踪
@task
def big_hello():
print("::group::Setup our big Hello")
greeting = ""
for c in "Hello Airflow 2.9":
greeting += c
print(f"Adding {c} to our greeting. Current greeting: {greeting}")
print("::endgroup::")
print(greeting)
该自定义组默认是折叠的

如果您想深入查看详情,可以将其展开

UI 现代化
除了上面提到的所有 UI 改进之外,我们在 Airflow 2.9 中还有更多改进!
其余的 DAG 级别视图已迁移到 React 和网格视图界面,从而提供更一致的体验。这包括日历、任务时长、运行时长(取代了落地时间)和审计日志。这些不仅仅是“迁移”,它们也都得到了改进。
这是新的运行时长视图,取代了落地时间。用户可以在落地时间和简单运行时长之间切换

以及新的任务时长视图。用户可以切换是否显示队列等待时间,并查看所显示运行的中间值。

其他新功能
以下是一些有趣的新功能,因为全部列出实在太多了
- REST API 中的所有创建/更新/删除操作现在都会记录在审计日志中
- 新的
on_skipped_callback回调 - 连续 n 次失败后自动暂停 DAG
- 支持使用 Matomo 作为分析工具
- 新的
@task.bashTaskFlow 装饰器 - DAG 暂停和恢复 CLI 命令中的
dag_id支持正则表达式 airflow tasks test现在支持可延迟操作符
贡献者
感谢所有为本次发布做出贡献的人,包括 Amogh Desai, Andrey Anshin, Brent Bovenzi, Daniel Standish, Ephraim Anierobi, Hussein Awala, Jarek Potiuk, Jed Cunningham, Jens Scheffler, Tzu-ping Chung, Vincent Beck, Wei Lee, 以及其他超过 120 位贡献者!
我特别要感谢我们的发布经理 Ephraim,是他促成了本次版本的发布。
希望您喜欢使用 Apache Airflow 2.9.0!
分享