自 2.2.0 版本以来,Apache Airflow 2.3.0 包含超过 700 次提交,其中包括 50 个新功能、99 项改进、85 个错误修复以及多项文档变更。
详情:
📦 PyPI: https://pypi.ac.cn/project/apache-airflow/2.3.0/
📚 文档: https://airflow.org.cn/docs/apache-airflow/2.3.0/
🛠️ 发布说明: https://airflow.org.cn/docs/apache-airflow/2.3.0/release_notes.html
🐳 Docker 镜像: docker pull apache/airflow:2.3.0
🚏 约束文件: https://github.com/apache/airflow/tree/constraints-2.3.0
由于更新日志内容较多,以下是此版本中一些值得关注的新功能。
动态任务映射 (AIP-42)
Airflow 现在对动态任务提供了头等支持。这意味着您可以在运行时动态生成任务。就像使用 for 循环创建任务列表一样,您无需提前知道任务的确切数量即可在此处创建相同的任务。
您可以让一个 task 生成需要迭代的列表,这在使用 for 循环时是不可能的。
示例如下
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(list(arg))
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
更多信息请参见:动态任务映射
网格视图取代树状视图
在 Airflow 2.3.0 中,网格视图取代了树状视图。
截图: 
从元数据数据库中清除历史记录
Airflow 2.3.0 引入了一个新的 airflow db clean 命令,可用于从元数据数据库中清除旧数据。
如果您想减小元数据数据库的大小,可以使用此命令。
更多信息请参见:从元数据数据库中清除历史记录
LocalKubernetesExecutor
有一个名为 LocalKubernetesExecutor 的新执行器。此执行器可帮助您使用 LocalExecutor 运行一些任务,并根据任务的队列在同一部署中使用 KubernetesExecutor 运行另一组任务。
更多信息请参见:LocalKubernetesExecutor
DagProcessorManager 作为独立进程 (AIP-43)
自 2.3.0 起,您可以将 DagProcessorManager 作为独立进程运行。由于 DagProcessorManager 运行用户代码,将其与调度程序进程分离并在不同的主机中作为独立进程运行是一个好主意。
airflow dag-processor CLI 命令将启动一个新的进程,该进程将在单独的进程中运行 DagProcessorManager。在您将 DagProcessorManager 作为独立进程运行之前,您需要将 [scheduler] standalone_dag_processor 设置为 True。
更多信息请参见:dag-processor CLI 命令
连接的 JSON 序列化
您现在可以使用 json 序列化格式创建连接。
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
在环境变量中设置连接时,您也可以使用 json 序列化格式。
更多信息请参见:连接的 JSON 序列化
Airflow db downgrade 命令和 SQL 脚本的离线生成
Airflow 2.3.0 引入了一个新命令 airflow db downgrade,它将把数据库降级到您选择的版本。
您还可以为数据库生成降级/升级 SQL 脚本,并手动针对数据库运行它们,或者只是查看降级/升级命令将要运行的 SQL 查询。
更多信息请参见:Airflow db downgrade 命令和 SQL 脚本的离线生成
装饰器任务的复用
您现在可以在 DAG 文件中复用装饰器任务。装饰器任务有一个 override 方法,允许您覆盖其参数。
示例如下
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
更多信息请参见:装饰器 DAG 的复用
其他小功能
这不是一个全面的列表,但一些值得注意或有趣的小功能包括:
- 支持不同的 DAG 文件解析超时值
- 使用
airflow dags reserialize命令重新序列化 DAG - 事件时间表
- SmoothOperator - 除了记录 Sade 的歌曲“Smooth Operator”的 YouTube 链接外,此 Operator 实际上什么都不做。尽情享受吧!
贡献者
感谢为此版本做出贡献的所有人:Ash Berlin-Taylor, Brent Bovenzi, Daniel Standish, Elad, Ephraim Anierobi, Jarek Potiuk, Jed Cunningham, Josh Fell, Kamil Breguła, Kanthi, Kaxil Naik, Khalid Mammadov, Malthe Borch, Ping Zhang, Tzu-ping Chung 以及许多其他一直在让 Airflow 对每个人都变得更好的人。
分享