我们很自豪地宣布发布 Apache Airflow 3.2.0!Airflow 3.1 将人为因素置于自动化工作流的中心。3.2 将同样的精确性带入数据:资产分区用于细粒度的流水线编排,多团队部署实现企业规模,同步的截止日期警报回调,以及向完整 Task SDK 分离的持续进展。
详情:
📦 PyPI: https://pypi.ac.cn/project/apache-airflow/3.2.0/
📚 Docs: https://airflow.org.cn/docs/apache-airflow/3.2.0/
🛠️ Release Notes: https://airflow.org.cn/docs/apache-airflow/3.2.0/release_notes.html
🐳 Docker Image: docker pull apache/airflow:3.2.0
🚏 Constraints: https://github.com/apache/airflow/tree/constraints-3.2.0
🗂️ 资产分区 (AIP-76):仅触发正确的工作
资产分区一直是对数据感知调度需求最高的特性之一。如果你使用按日期分区的 S3 路径、Hive 表分区、BigQuery 分区,或任何分区的数据存储,你一定会遇到这种情形:上游任务更新了某个分区,而所有下游 DAG 都会被触发,且不管实际变化的是哪一片分区。这既浪费资源,也会在大规模部署时产生大量的运营噪声。
3.2 中的资产分区实现了细粒度控制。下游 DAG 仅在它们关心的特定分区被更新时才触发。这是自引入资产以来对数据感知调度的最大改动,它把基于分区的编排转变为 Airflow 原生支持的功能,而不再是需要自行绕行的方案。

关键功能
- 基于分区的调度:仅在特定分区更新时触发 DAG,而不是在每次资产更改时触发
- CronPartitionTimetable:使用 cron 表达式根据分区调度 DAG。Task SDK 也提供此功能
- 分区 DAG 的回填:回填历史分区而不重新触发所有下游任务(#61464)
- 多资产分区:单个 DAG 可以监听多个资产的分区,当下游工作依赖于多个来源对齐时非常重要(#60577)
对于更高级的使用场景,提供了时间和范围分区映射器(#61522,#55247),用于将时间范围和取值范围映射到分区键;在 Dag Run 引用中加入了分区键字段(#61725),便于精准检查是哪一个分区触发了运行;以及 PartitionedAssetTimetable,能够完整控制来自多个资产的分区事件如何统一解析为一次触发。
示例:三个上游摄取 DAG 按小时写入各自的独立资产。只有当这三个资产在同一小时分区上全部更新后,下游 DAG 才会触发。由于这三个资产本身不共享分区键,需要通过映射器将它们统一为同一个键。
from __future__ import annotations
from airflow.sdk import (
DAG,
Asset,
CronPartitionTimetable,
PartitionedAssetTimetable,
StartOfHourMapper,
asset,
task,
)
team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")
with DAG(
dag_id="ingest_team_a_player_stats",
schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
tags=["player-stats", "ingestion"],
):
@task(outlets=[team_a_player_stats])
def ingest_team_a_stats():
"""Materialize Team A player statistics for the current hourly partition."""
pass
ingest_team_a_stats()
@asset(schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"))
def team_b_player_stats():
pass
with DAG(
dag_id="clean_and_combine_player_stats",
schedule=PartitionedAssetTimetable(
assets=team_a_player_stats & team_b_player_stats,
default_partition_mapper=StartOfHourMapper(),
),
catchup=False,
):
@task(outlets=[combined_player_stats])
def combine_player_stats(dag_run=None):
"""Merge the aligned hourly partitions into a combined dataset."""
print(dag_run.partition_key)
combine_player_stats()
请参阅 example_asset_partition.py 以及 Task SDK API 文档中的 PartitionedAssetTimetable 和分区映射器。
🏢 多团队部署 (AIP-67):企业级 Airflow
⚠️ 实验性:多团队支持在 Airflow 3.2 中仍属实验性质,未来版本可能根据用户反馈进行更改。
Airflow 3.2 引入多团队支持,使组织能够在同一个 Airflow 实例中运行多个相互隔离的团队。每个团队拥有自己的 DAG、连接、变量、池和执行器——实现真正的资源与权限隔离,而无需为每个团队单独部署 Airflow 实例。
这对提供共享基础设施、为多个数据工程或数据科学团队服务的平台团队尤为重要,同时还能在团队资源和访问权限之间保持强大的边界。
关键功能
- 每团队资源隔离:每个团队拥有自己的 DAG、连接、变量和池
- 每团队执行器:不同团队可使用不同的执行器(如 Celery、Kubernetes、Local、AWS ECS 等),并可单独配置——#57837,#57910
- 团队作用域授权:Keycloak 与 Simple 认证管理器支持团队作用域的访问控制(#61351,#61861)
- 团队作用域密钥:使用
AIRFLOW_VAR__{TEAM}___{KEY}环境变量或AIRFLOW_CONN__<TEAM>___<CONN_ID>模式来保存团队专属的密钥(#62588) - CLI 管理:新增用于管理团队的 CLI 命令(#55283)
- UI 团队选择器:在连接、变量和池的创建/编辑表单中加入团队选择器(#60237,#60474,#61082)
- 完整 API 支持:在 Connection、Variable、Pool API 中添加
team_name字段(#59336,#57102,#60952)
启用多团队功能
⏰ 截止日期警报:现在支持同步回调 (AIP-86)
⚠️ 实验性:截止日期警报在 Airflow 3.2 中仍属实验性质,未来版本可能根据用户反馈进行更改。
基于 Airflow 3.1 引入的 Deadline Alerts 系统,本次发布加入了同步回调支持。3.1 中的回调只能通过 triggerer(仅异步)执行,限制了集成方式。同步回调直接通过 executor 在工作节点上执行,并可通过 executor 参数指定特定的执行器。
3.2 新特性
- SyncCallback 支持:不同于在 triggerer 上运行的
AsyncCallback,SyncCallback直接在 worker 上通过 executor 执行,可选地指定具体的 executor - 单个 DAG 的多条截止警报:deadline 参数可接受列表,以在同一 DAG 上配置多个阈值
- Grid API 中的 missed-deadline 元数据:Dag Run API 现在会返回 missed-deadline 信息,便于程序化监控
- 自定义 DeadlineReferences 的体验改进:在定义自定义截止参考点时提供更简洁的开发者体验(#57222)
with DAG(
dag_id="sync_deadline",
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
interval=timedelta(0),
callback=SyncCallback(
SlackWebhookNotifier,
{"text": "Sync Callback; Alert should trigger immediately!"},
)
)
):
EmptyOperator(task_id='empty_task')
🖥️ UI 增强
- HITL 审批历史:Human-in-the-Loop 审批界面现在会显示任何任务的完整审批与驳回审计轨迹(#56760,#55952)
- XCom 管理:现在可以直接在 UI 中添加、编辑、删除 XCom 值(#58921)
- 分段状态条:折叠的任务组和映射任务现在会展示分段状态条,一眼即可了解状态(#61854)
- 统一 Tooltip:Grid 与 Graph 视图的 tooltip 现在同时展示日期、持续时间以及子任务状态(#62119)
- DAG 代码标签页显示文件名:代码标签页中新增文件标识(#60759)
- 日志复制按钮:一键复制日志内容(#61185)
- 日期范围过滤器:可按日期范围过滤 DAG 执行记录(#60772)
- 任务上游/下游过滤器:在 Graph 与 Grid 视图中按上游或下游任务进行过滤(#57237)
- 数据脱敏:敏感字段在 UI 与公共 API 中均已脱敏处理(#59873)
- 自定义主题支持:通过
globalCss和主题配置实现白标/自定义部署(#61161,#58411) - React 插件继承核心 UI 主题:插件 UI 现在自动匹配核心 Airflow 主题(#60256)
- 甘特图中的任务显示名称:在甘特图中展示
task_display_name,提升可读性(#61438)
🚀 性能改进
渲染任务实例字段清理:约提升 42 倍。渲染任务实例字段的清理作业已重写,对包含大量映射任务的 DAG 提升约 42 倍。保留策略改为基于最近 N 次 DAG Run,而非最近 N 次任务执行,这既更直观也大幅提升性能。配置项重命名:max_num_rendered_ti_fields_per_task → num_dag_runs_to_retain_rendered_fields(旧名称仍可使用但会出现弃用警告)。(#60951)
Scheduler 改进。针对大规模部署,3.2 解决了若干已知瓶颈。
- Scheduler 不再一次性将所有 TaskInstance 加载到内存,避免了大规模部署时的内存峰值(#60956)
- 任务出队循环更快(#61376)
- 队列查询现在直接强制执行
max_active_tasks,防止过度排队(#54103)
API 服务器改进
- 取消在任务启动时加载 SerializedDag,降低内存占用(#60803)
serialized_dag数据列在 PostgreSQL 上改为使用 JSONB(#55979)
🔧 Task SDK 演进与开发者体验
Task SDK 解耦持续进行
Airflow 3.2 继续将组件从 airflow-core 移至 Task SDK,朝着完整的客户端‑服务器分离目标前进。这使得 DAG 编写者能够在不升级 Airflow Core 的前提下独立升级 Task SDK,从而降低了 DAG 编写者与运维团队之间的协同成本。
本次发布中迁移至 Task SDK 的模块(旧的导入路径仍可使用,但会提示弃用警告)
- 异常类:
AirflowSkipException、TaskDeferred等 →airflow.sdk.exceptions(#59780) - Serde:
airflow.serialization.serde→airflow.sdk.serde;serializer 移至airflow.sdk.serde.serializers.*(#58900) - SkipMixin / BranchMixIn:已迁移至 Task SDK;原有导入通过
common-compat仍可工作(#62749,#62776) - Lineage 模块:迁移至 Task SDK,实现客户端‑服务器分离(#60968,#61157)
- Listeners 模块:迁移至共享库(#59883)
- XCom API:已从
XComEncoder中解耦(#58900)
PythonOperator 异步支持
PythonOperator 现已支持异步可调用对象。你可以将 async 函数传递给 python_callable,运算符会自动 await,轻松实现异步 I/O 模式,无需自定义运算符。(#60268)
@task(show_return_value_in_logs=False)
async def load_xml_files(files):
import asyncio
from io import BytesIO
from more_itertools import chunked
from os import cpu_count
from tenacity import retry, stop_after_attempt, wait_fixed
from airflow.providers.sftp.hooks.sftp import SFTPClientPool
print("number of files:", len(files))
async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
async def download_file(file):
async with pool.get_sftp_client() as sftp:
print("downloading:", file)
buffer = BytesIO()
async with sftp.open(file, encoding=xml_encoding) as remote_file:
data = await remote_file.read()
buffer.write(data.encode(xml_encoding))
buffer.seek(0)
return buffer
for batch in chunked(files, cpu_count() * 2):
tasks = [asyncio.create_task(download_file(f)) for f in batch]
# Wait for this batch to finish before starting the next
for task in asyncio.as_completed(tasks):
result = await task
# Do something with result or accumulate it and return it as an XCom
更新的安全模型
我们正在改进 Airflow 部署的隔离性和安全性,并为用户提供更明确的安全期望。为此,安全模型已更新以反映 Airflow 3.2.0 中实现的更改,并阐述了我们在该领域的未来改进计划。详情请参阅:Airflow Security Model。
🙏 社区致谢
此版本凝聚了全球数百位贡献者的协作努力。特别感谢我们的发布经理以及所有开发者、文档编写者、测试人员和社区成员,正是他们让 Apache Airflow 3.2.0 成为可能。
感谢像你这样的贡献者,Airflow 项目才能持续蓬勃发展。无论是提交问题、PR、改进文档,还是帮助社区中的其他人,每一份贡献都至关重要。
🔗 参与项目
Apache Airflow 3.2.0 标志着数据感知、分区驱动工作流编排的新篇章。我们迫不及待想看到大家用它构建的精彩项目!
分享