2025 年 Airflow 峰会将于 10 月 07 日至 09 日举行。立即注册享早鸟票优惠!

将 Airflow® 升级到新版本

为何需要升级

较新的 Airflow 版本可能包含数据库迁移,因此您必须运行 airflow db migrate 来使用您正在升级到的 Airflow 版本中的模式更改迁移数据库。别担心,即使没有迁移要执行,运行此命令也是安全的。

Airflow 版本 x 和 y 之间有哪些变化?

发行说明 列出了任何给定 Airflow 版本中包含的更改。

升级准备 - 备份数据库

强烈建议在进行任何迁移之前备份元数据数据库。如果您的数据库没有“热备份”能力,则应在关闭 Airflow 实例后进行备份,以确保数据库备份的一致性。如果您没有进行备份而迁移失败,可能会陷入半迁移状态,此时从备份恢复数据库并重复迁移可能是唯一简便的出路。例如,这可能是由于迁移过程中您的 CLI 与数据库之间的网络连接中断造成的,因此进行备份是避免此类问题的重要预防措施。

何时需要升级

如果您有基于 virtualenv 或 Docker 容器的自定义部署,通常需要在升级过程中手动运行数据库迁移。

在某些情况下,升级会自动进行——这取决于您的部署中是否将升级作为安装后操作内置。例如,当您使用启用了升级后钩子 (post-upgrade hooks) 的 Apache Airflow Helm Chart 时,数据库升级在新软件安装完成后会自动进行。类似地,所有 Airflow 即服务 (Airflow-As-A-Service) 解决方案在您通过其用户界面选择升级 Airflow 时,也会自动为您执行升级。

如何升级

重新安装 Apache Airflow®,指定所需的新版本。

要升级引导启动的本地实例,您可以在重新运行安装命令之前将 AIRFLOW_VERSION 环境变量设置为目标版本。请按补丁版本递增升级:例如,如果要从版本 2.8.2 升级到 2.8.4,请先升级到 2.8.3。有关更详细的指导,请参阅快速入门

要升级 PyPI 包,请在您的环境中重新运行 pip install 命令,使用所需版本作为约束。有关更详细的指导,请参阅从 PyPI 安装

要手动迁移数据库,您应在您的环境中运行 airflow db migrate 命令。它可以在您的虚拟环境中运行,也可以在允许您访问 Airflow CLI 使用命令行接口 以及数据库的容器中运行。

离线 SQL 迁移脚本

如果您想离线运行升级脚本,可以使用 -s--show-sql-only 标志来获取将要执行的 SQL 语句。您还可以使用 --from-version 标志指定起始 Airflow 版本,并使用 -n--to-version 标志指定结束 Airflow 版本。此功能从 Airflow 2.0.0 版本起在 Postgres 和 MySQL 中受支持。

Airflow 2.7.0 或更高版本的示例用法

airflow db migrate -s --from-version "2.4.3" -n "2.7.3" airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"

警告

自 Airflow 2.7.0 版本起,airflow db upgrade 已被 airflow db migrate 替换,前者已被弃用。

处理迁移问题

MySQL 数据库编码错误

如果您使用旧的 Airflow 1.10 作为数据库,该数据库最初是手动创建的,或者使用之前版本的 MySQL 创建的,则根据数据库的原始字符集,您在迁移到较新版本的 Airflow 时可能会遇到问题,并且迁移可能会因奇怪的错误(“key size too big”、“missing indexes”等)而失败。下一章将描述如何手动修复此问题。

为什么可能会出现这个错误?MySQL 8 数据库推荐的字符集/排序规则分别是 utf8mb4utf8mb4_bin。然而,这在不同版本的 MySQL 中一直在变化,而且您可能自定义创建了使用不同字符集的数据库。如果您的数据库是使用旧版本的 Airflow 或 MySQL 创建的,则在创建数据库时或在迁移过程中,编码可能就是错误的或损坏的。

不幸的是,MySQL 限制了索引键的大小,在使用 utf8mb4 时,Airflow 的索引键大小可能对于 MySQL 来说太大了。因此,在 Airflow 中,我们强制所有“ID”键使用 utf8 字符集(在 MySQL 8 中等同于 utf8mb3)。这限制了索引的大小,以便 MySQL 可以处理它们。

以下是在您尝试迁移之前可以遵循的修复步骤(但如果您知道自己在做什么,也可以选择自己的方式进行)。

熟悉 Airflow 的内部数据库结构,您可以在数据库 ERD 模式中找到它,并在数据库迁移参考中找到迁移列表。

  1. 备份您的数据库,以便在出错时可以恢复。

  2. 检查哪些表需要修复。查看这些表

SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;

确保复制输出。最后一步会用到。您的 dag_idrun_idtask_idkey 列应明确设置为 utf8utf8mb3 字符集,类似于

``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,  # correct

``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL,  # correct

问题在于,如果您的字段没有编码

``task_id`` varchar(250),  # wrong !!

或仅将排序规则设置为 utf8mb4

``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL,  # wrong !!

或将字符集和排序规则都设置为 utf8mb4

``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,  # wrong !!

您需要修复那些字符集/排序规则设置错误的字段。

3. 删除需要修改的表的外部键索引(您不需要删除所有索引——只需删除需要修改的那些表的索引)。您需要在最后一步重新创建它们(这就是为什么需要保留步骤 2 中 SHOW CREATE TABLE 的输出)。

ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;

4. 修改您的 ID 字段以拥有正确的字符集/编码。只对编码错误的字段进行修改(这里列出了您可能需要使用的所有潜在命令)

ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
  1. 重新创建在步骤 3 中删除的外部键。

对所有您删除的索引重复此操作。请注意,根据您拥有的 Airflow 版本,索引可能略有不同(例如 map_index 在 2.3.0 中添加),但如果您保留了步骤 2 中准备好的 SHOW CREATE TABLE 输出,您将找到正确的 CONSTRAINT_NAMECONSTRAINT 来使用。

# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>

这应该能使数据库达到可以运行到新 Airflow 版本迁移的状态。

升级后警告

通常您只需要成功运行 airflow db migrate 命令即可完成。然而,在某些情况下,迁移可能会在您的数据库中发现一些旧的、过时的且可能错误的数据,并将其移到单独的表中。在这种情况下,您可能会在 Webserver UI 中看到关于找到这些数据的警告。

您可能看到的典型消息

Airflow 在元数据库的 <原始表> 表中发现不兼容的数据,并在数据库升级迁移期间将其移至 <新表>。请检查已移动的数据,决定是否需要保留它们,并手动删除 <新表> 以解除此警告。

当您看到这样的消息时,意味着您的一些数据已损坏,您应该检查它以确定是否要保留或删除部分数据。最有可能的是这些数据是损坏的,并且是某些 bug 留下的,可以安全删除——因为这些数据无论如何都不会在 Airflow 中可见或有用。但是,如果您出于审计或历史原因有特殊需要,可以选择将其存储在其他地方。除非您有特定理由保留数据,否则最有可能的最佳选择是删除它。

您可以通过多种方式检查和删除数据——如果您使用自己的工具(通常是显示数据库对象的图形工具)直接访问数据库,则可以使用这些工具删除、重命名该表或将其移动到另一个数据库。如果您没有此类工具,可以使用 airflow db shell 命令——这会将您带入数据库的数据库 shell 工具,您将能够检查和删除该表。

如何使用 Kubernetes 删除表

  1. 进入任何 Airflow Pods(Webserver 或 Scheduler):kubectl exec -it <your-webserver-pod> python

  2. 在 Python shell 中运行以下命令

from airflow.settings import Session

session = Session()
session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
session.commit()

请在示例中将 <table> 替换为警告消息中显示的实际表名。

检查表

SELECT * FROM <table>;

删除表

DROP TABLE <table>;

迁移最佳实践

根据数据库的大小和实际迁移操作,迁移可能需要相当长的时间,因此如果您有较长的历史记录和庞大的数据库,建议先复制数据库并执行一次测试迁移,以评估迁移所需的时间。通常,“主要”升级可能需要更长时间,因为添加新功能有时需要重构数据库。

此条目有帮助吗?