我们很自豪地宣布发布 Apache Airflow 3.2.0!Airflow 3.1 将人为因素置于自动化工作流的中心。3.2 将同样的精确性带入数据:资产分区用于细粒度的流水线编排,多团队部署实现企业规模,同步的截止日期警报回调,以及向完整 Task SDK 分离的持续进展。

详情:

📦 PyPI: https://pypi.ac.cn/project/apache-airflow/3.2.0/
📚 Docs: https://airflow.org.cn/docs/apache-airflow/3.2.0/
🛠️ Release Notes: https://airflow.org.cn/docs/apache-airflow/3.2.0/release_notes.html
🐳 Docker Image: docker pull apache/airflow:3.2.0
🚏 Constraints: https://github.com/apache/airflow/tree/constraints-3.2.0

🗂️ 资产分区 (AIP-76):仅触发正确的工作

资产分区一直是对数据感知调度需求最高的特性之一。如果你使用按日期分区的 S3 路径、Hive 表分区、BigQuery 分区,或任何分区的数据存储,你一定会遇到这种情形:上游任务更新了某个分区,而所有下游 DAG 都会被触发,且不管实际变化的是哪一片分区。这既浪费资源,也会在大规模部署时产生大量的运营噪声。

3.2 中的资产分区实现了细粒度控制。下游 DAG 仅在它们关心的特定分区被更新时才触发。这是自引入资产以来对数据感知调度的最大改动,它把基于分区的编排转变为 Airflow 原生支持的功能,而不再是需要自行绕行的方案。

Asset Partitioning

关键功能

  • 基于分区的调度:仅在特定分区更新时触发 DAG,而不是在每次资产更改时触发
  • CronPartitionTimetable:使用 cron 表达式根据分区调度 DAG。Task SDK 也提供此功能
  • 分区 DAG 的回填:回填历史分区而不重新触发所有下游任务(#61464)
  • 多资产分区:单个 DAG 可以监听多个资产的分区,当下游工作依赖于多个来源对齐时非常重要(#60577)

对于更高级的使用场景,提供了时间和范围分区映射器(#61522,#55247),用于将时间范围和取值范围映射到分区键;在 Dag Run 引用中加入了分区键字段(#61725),便于精准检查是哪一个分区触发了运行;以及 PartitionedAssetTimetable,能够完整控制来自多个资产的分区事件如何统一解析为一次触发。

示例:三个上游摄取 DAG 按小时写入各自的独立资产。只有当这三个资产在同一小时分区上全部更新后,下游 DAG 才会触发。由于这三个资产本身不共享分区键,需要通过映射器将它们统一为同一个键。

from __future__ import annotations

from airflow.sdk import (
    DAG,
    Asset,
    CronPartitionTimetable,
    PartitionedAssetTimetable,
    StartOfHourMapper,
    asset,
    task,
)

team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")


with DAG(
    dag_id="ingest_team_a_player_stats",
    schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
    tags=["player-stats", "ingestion"],
):

    @task(outlets=[team_a_player_stats])
    def ingest_team_a_stats():
        """Materialize Team A player statistics for the current hourly partition."""
        pass

    ingest_team_a_stats()


@asset(schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"))
def team_b_player_stats():
    pass


with DAG(
    dag_id="clean_and_combine_player_stats",
    schedule=PartitionedAssetTimetable(
        assets=team_a_player_stats & team_b_player_stats,
        default_partition_mapper=StartOfHourMapper(),
    ),
    catchup=False,
):

    @task(outlets=[combined_player_stats])
    def combine_player_stats(dag_run=None):
        """Merge the aligned hourly partitions into a combined dataset."""
        print(dag_run.partition_key)

    combine_player_stats()

请参阅 example_asset_partition.py 以及 Task SDK API 文档中的 PartitionedAssetTimetable 和分区映射器。

🏢 多团队部署 (AIP-67):企业级 Airflow

⚠️ 实验性:多团队支持在 Airflow 3.2 中仍属实验性质,未来版本可能根据用户反馈进行更改。

Airflow 3.2 引入多团队支持,使组织能够在同一个 Airflow 实例中运行多个相互隔离的团队。每个团队拥有自己的 DAG、连接、变量、池和执行器——实现真正的资源与权限隔离,而无需为每个团队单独部署 Airflow 实例。

这对提供共享基础设施、为多个数据工程或数据科学团队服务的平台团队尤为重要,同时还能在团队资源和访问权限之间保持强大的边界。

关键功能

  • 每团队资源隔离:每个团队拥有自己的 DAG、连接、变量和池
  • 每团队执行器:不同团队可使用不同的执行器(如 Celery、Kubernetes、Local、AWS ECS 等),并可单独配置——#57837,#57910
  • 团队作用域授权:Keycloak 与 Simple 认证管理器支持团队作用域的访问控制(#61351,#61861)
  • 团队作用域密钥:使用 AIRFLOW_VAR__{TEAM}___{KEY} 环境变量或 AIRFLOW_CONN__<TEAM>___<CONN_ID> 模式来保存团队专属的密钥(#62588)
  • CLI 管理:新增用于管理团队的 CLI 命令(#55283)
  • UI 团队选择器:在连接、变量和池的创建/编辑表单中加入团队选择器(#60237,#60474,#61082)
  • 完整 API 支持:在 Connection、Variable、Pool API 中添加 team_name 字段(#59336,#57102,#60952)

启用多团队功能

$$# In airflow.cfg: [core] multi_team = True # Or via environment variable: export AIRFLOW__CORE__MULTI_TEAM=True$$

⏰ 截止日期警报:现在支持同步回调 (AIP-86)

⚠️ 实验性:截止日期警报在 Airflow 3.2 中仍属实验性质,未来版本可能根据用户反馈进行更改。

基于 Airflow 3.1 引入的 Deadline Alerts 系统,本次发布加入了同步回调支持。3.1 中的回调只能通过 triggerer(仅异步)执行,限制了集成方式。同步回调直接通过 executor 在工作节点上执行,并可通过 executor 参数指定特定的执行器。

3.2 新特性

  • SyncCallback 支持:不同于在 triggerer 上运行的 AsyncCallbackSyncCallback 直接在 worker 上通过 executor 执行,可选地指定具体的 executor
  • 单个 DAG 的多条截止警报:deadline 参数可接受列表,以在同一 DAG 上配置多个阈值
  • Grid API 中的 missed-deadline 元数据:Dag Run API 现在会返回 missed-deadline 信息,便于程序化监控
  • 自定义 DeadlineReferences 的体验改进:在定义自定义截止参考点时提供更简洁的开发者体验(#57222)
with DAG(
    dag_id="sync_deadline",
    deadline=DeadlineAlert(
        reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
        interval=timedelta(0),
        callback=SyncCallback(
            SlackWebhookNotifier,
            {"text": "Sync Callback; Alert should trigger immediately!"},
        )
    )
):
    EmptyOperator(task_id='empty_task')

🖥️ UI 增强

  • HITL 审批历史:Human-in-the-Loop 审批界面现在会显示任何任务的完整审批与驳回审计轨迹(#56760,#55952)
  • XCom 管理:现在可以直接在 UI 中添加、编辑、删除 XCom 值(#58921)
  • 分段状态条:折叠的任务组和映射任务现在会展示分段状态条,一眼即可了解状态(#61854)
  • 统一 Tooltip:Grid 与 Graph 视图的 tooltip 现在同时展示日期、持续时间以及子任务状态(#62119)
  • DAG 代码标签页显示文件名:代码标签页中新增文件标识(#60759)
  • 日志复制按钮:一键复制日志内容(#61185)
  • 日期范围过滤器:可按日期范围过滤 DAG 执行记录(#60772)
  • 任务上游/下游过滤器:在 Graph 与 Grid 视图中按上游或下游任务进行过滤(#57237)
  • 数据脱敏:敏感字段在 UI 与公共 API 中均已脱敏处理(#59873)
  • 自定义主题支持:通过 globalCss 和主题配置实现白标/自定义部署(#61161,#58411)
  • React 插件继承核心 UI 主题:插件 UI 现在自动匹配核心 Airflow 主题(#60256)
  • 甘特图中的任务显示名称:在甘特图中展示 task_display_name,提升可读性(#61438)

🚀 性能改进

渲染任务实例字段清理:约提升 42 倍。渲染任务实例字段的清理作业已重写,对包含大量映射任务的 DAG 提升约 42 倍。保留策略改为基于最近 N 次 DAG Run,而非最近 N 次任务执行,这既更直观也大幅提升性能。配置项重命名:max_num_rendered_ti_fields_per_tasknum_dag_runs_to_retain_rendered_fields(旧名称仍可使用但会出现弃用警告)。(#60951)

Scheduler 改进。针对大规模部署,3.2 解决了若干已知瓶颈。

  • Scheduler 不再一次性将所有 TaskInstance 加载到内存,避免了大规模部署时的内存峰值(#60956)
  • 任务出队循环更快(#61376)
  • 队列查询现在直接强制执行 max_active_tasks,防止过度排队(#54103)

API 服务器改进

  • 取消在任务启动时加载 SerializedDag,降低内存占用(#60803)
  • serialized_dag 数据列在 PostgreSQL 上改为使用 JSONB(#55979)

🔧 Task SDK 演进与开发者体验

Task SDK 解耦持续进行

Airflow 3.2 继续将组件从 airflow-core 移至 Task SDK,朝着完整的客户端‑服务器分离目标前进。这使得 DAG 编写者能够在不升级 Airflow Core 的前提下独立升级 Task SDK,从而降低了 DAG 编写者与运维团队之间的协同成本。

本次发布中迁移至 Task SDK 的模块(旧的导入路径仍可使用,但会提示弃用警告)

  • 异常类AirflowSkipExceptionTaskDeferred 等 → airflow.sdk.exceptions(#59780)
  • Serdeairflow.serialization.serdeairflow.sdk.serde;serializer 移至 airflow.sdk.serde.serializers.*(#58900)
  • SkipMixin / BranchMixIn:已迁移至 Task SDK;原有导入通过 common-compat 仍可工作(#62749,#62776)
  • Lineage 模块:迁移至 Task SDK,实现客户端‑服务器分离(#60968,#61157)
  • Listeners 模块:迁移至共享库(#59883)
  • XCom API:已从 XComEncoder 中解耦(#58900)

PythonOperator 异步支持

PythonOperator 现已支持异步可调用对象。你可以将 async 函数传递给 python_callable,运算符会自动 await,轻松实现异步 I/O 模式,无需自定义运算符。(#60268)

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
                # Do something with result or accumulate it and return it as an XCom

更新的安全模型

我们正在改进 Airflow 部署的隔离性和安全性,并为用户提供更明确的安全期望。为此,安全模型已更新以反映 Airflow 3.2.0 中实现的更改,并阐述了我们在该领域的未来改进计划。详情请参阅:Airflow Security Model

🙏 社区致谢

此版本凝聚了全球数百位贡献者的协作努力。特别感谢我们的发布经理以及所有开发者、文档编写者、测试人员和社区成员,正是他们让 Apache Airflow 3.2.0 成为可能。

感谢像你这样的贡献者,Airflow 项目才能持续蓬勃发展。无论是提交问题、PR、改进文档,还是帮助社区中的其他人,每一份贡献都至关重要。

🔗 参与项目

  • 尝试新版本:升级你的开发环境,探索全新特性
  • 加入讨论:通过 Slack开发者邮件列表 与我们联系
  • 贡献代码:查看我们的 贡献指南
  • 提供反馈:在 GitHub 分享你的使用体验和建议

Apache Airflow 3.2.0 标志着数据感知、分区驱动工作流编排的新篇章。我们迫不及待想看到大家用它构建的精彩项目!

分享

阅读更多

Apache Airflow 3.1.0:以人为本的工作流

Kaxil Naik

Apache Airflow 3.1.0 引入了人机交互(Human‑in‑the‑Loop)工作流、支持 17 种语言的国际化、截止日期警报,以及面向数据编排团队的 React 插件系统。

Apache Airflow® 3 已正式发布!

Kaxil Naik , Vikram Koka

我们自豪地宣布 Apache Airflow 3.0.0 已正式发布。