Apache Airflow 2.3.0 包含自 2.2.0 以来的 700 多次提交,带来了 50 项新特性、99 项改进、85 项 Bug 修复以及若干文档变更。

详情:

📦 PyPI: https://pypi.ac.cn/project/apache-airflow/2.3.0/
📚 文档: https://airflow.org.cn/docs/apache-airflow/2.3.0/
🛠️ 发布说明: https://airflow.org.cn/docs/apache-airflow/2.3.0/release_notes.html
🐳 Docker 镜像: docker pull apache/airflow:2.3.0
🚏 约束文件: https://github.com/apache/airflow/tree/constraints-2.3.0

由于变更日志非常庞大,以下列出本次发布中一些值得注意的新特性。

动态任务映射(AIP-42)

现在 Airflow 原生支持动态任务。这意味着你可以在运行时动态生成任务。就像使用 for 循环创建任务列表一样,这里你可以在事先不知道任务数量的情况下创建同样的任务。

你可以让 task 本身生成要遍历的列表,而这在传统的 for 循环中是做不到的。

下面是一个示例

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(list(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

更多信息请参阅:Dynamic Task Mapping

网格视图取代树视图

在 Airflow 2.3.0 中,网格视图取代了原来的树视图。

截图: 全新的网格视图

从元数据库清除历史记录

Airflow 2.3.0 引入了一个新的 airflow db clean 命令,可用于从元数据库中清除旧数据。

如果你想减小元数据库的体积,可以使用该命令。

更多信息请参阅:Purge history from metadata database

LocalKubernetesExecutor

新增了 LocalKubernetesExecutor。该执行器可以在同一次部署中,根据任务的队列,将部分任务交由 LocalExecutor 执行,另一部分任务交由 KubernetesExecutor 执行。

更多信息请参阅:LocalKubernetesExecutor

DagProcessorManager 作为独立进程(AIP-43)

从 2.3.0 起,你可以将 DagProcessorManager 以独立进程的方式运行。因为 DagProcessorManager 需要执行用户代码,将其从调度器进程中分离并在另一台机器上独立运行是更好的做法。

airflow dag-processor CLI 命令会启动一个新进程,在该进程中运行 DagProcessorManager。要想把 DagProcessorManager 作为独立进程运行,需要将 [scheduler] standalone_dag_processor 设置为 True

更多信息请参阅:dag-processor CLI command

连接的 JSON 序列化

现在可以使用 json 序列化格式来创建连接。

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

在环境变量中设置连接时,也可以使用 json 序列化格式。

更多信息请参阅:JSON serialization for connections

Airflow db downgrade 与离线生成 SQL 脚本

Airflow 2.3.0 引入了新命令 airflow db downgrade,可将数据库降级至指定版本。

你还可以为数据库生成降级/升级的 SQL 脚本,手动执行或仅仅查看降级/升级命令将会运行的 SQL 查询。

更多信息请参阅:Airflow db downgrade and Offline generation of SQL scripts

装饰任务的复用

现在可以在不同的 DAG 文件之间复用装饰任务。装饰任务提供了一个 override 方法,允许你覆盖其参数。

下面是一个示例

@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)

更多信息请参阅:Reuse of decorated DAGs

其他小特性

以下并非完整列表,但列出了一些值得关注的小特性:

  • 支持为 DAG 文件解析设置不同的超时时间
  • airflow dags reserialize 命令用于重新序列化 DAG
  • 事件时间表
  • SmoothOperator —— 这个算子基本不做任何事,只会在日志中打印一条指向 Sade 的 “Smooth Operator” 的 YouTube 链接,尽情享受吧!

贡献者

感谢所有为本次发布做出贡献的人员:Ash Berlin‑Taylor、Brent Bovenzi、Daniel Standish、Elad、Ephraim Anierobi、Jarek Potiuk、Jed Cunningham、Josh Fell、Kamil Breguła、Kanthi、Kaxil Naik、Khalid Mammadov、Malthe Borch、Ping Zhang、Tzu‑ping Chung 以及其他众多让 Airflow 越来越好的贡献者。

分享

阅读更多

Apache Airflow 2.8.0 来了

Ephraim Anierobi

全新推出 Apache Airflow 2.8.0:新增功能丰富、改进幅度显著

Apache Airflow 3.2.0:大规模数据感知工作流

Rahul Vats

Apache Airflow 3.2.0 引入了资产分区,用于细粒度的管道编排;支持面向企业规模的多团队部署;提供同步的截止日期警报回调;并在实现完整 Task SDK 分离方面取得持续进展。