将 Airflow® 升级到新版本¶
为何需要升级¶
较新的 Airflow 版本可能包含数据库迁移,因此您必须运行 airflow db migrate
来使用您正在升级到的 Airflow 版本中的模式更改迁移数据库。别担心,即使没有迁移要执行,运行此命令也是安全的。
Airflow 版本 x 和 y 之间有哪些变化?¶
发行说明 列出了任何给定 Airflow 版本中包含的更改。
升级准备 - 备份数据库¶
强烈建议在进行任何迁移之前备份元数据数据库。如果您的数据库没有“热备份”能力,则应在关闭 Airflow 实例后进行备份,以确保数据库备份的一致性。如果您没有进行备份而迁移失败,可能会陷入半迁移状态,此时从备份恢复数据库并重复迁移可能是唯一简便的出路。例如,这可能是由于迁移过程中您的 CLI 与数据库之间的网络连接中断造成的,因此进行备份是避免此类问题的重要预防措施。
何时需要升级¶
如果您有基于 virtualenv 或 Docker 容器的自定义部署,通常需要在升级过程中手动运行数据库迁移。
在某些情况下,升级会自动进行——这取决于您的部署中是否将升级作为安装后操作内置。例如,当您使用启用了升级后钩子 (post-upgrade hooks) 的 Apache Airflow Helm Chart 时,数据库升级在新软件安装完成后会自动进行。类似地,所有 Airflow 即服务 (Airflow-As-A-Service) 解决方案在您通过其用户界面选择升级 Airflow 时,也会自动为您执行升级。
如何升级¶
重新安装 Apache Airflow®,指定所需的新版本。
要升级引导启动的本地实例,您可以在重新运行安装命令之前将 AIRFLOW_VERSION
环境变量设置为目标版本。请按补丁版本递增升级:例如,如果要从版本 2.8.2 升级到 2.8.4,请先升级到 2.8.3。有关更详细的指导,请参阅快速入门。
要升级 PyPI 包,请在您的环境中重新运行 pip install
命令,使用所需版本作为约束。有关更详细的指导,请参阅从 PyPI 安装。
要手动迁移数据库,您应在您的环境中运行 airflow db migrate
命令。它可以在您的虚拟环境中运行,也可以在允许您访问 Airflow CLI
使用命令行接口 以及数据库的容器中运行。
离线 SQL 迁移脚本¶
如果您想离线运行升级脚本,可以使用 -s
或 --show-sql-only
标志来获取将要执行的 SQL 语句。您还可以使用 --from-version
标志指定起始 Airflow 版本,并使用 -n
或 --to-version
标志指定结束 Airflow 版本。此功能从 Airflow 2.0.0 版本起在 Postgres 和 MySQL 中受支持。
- Airflow 2.7.0 或更高版本的示例用法
airflow db migrate -s --from-version "2.4.3" -n "2.7.3"
airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"
警告
自 Airflow 2.7.0 版本起,airflow db upgrade
已被 airflow db migrate
替换,前者已被弃用。
处理迁移问题¶
MySQL 数据库编码错误¶
如果您使用旧的 Airflow 1.10 作为数据库,该数据库最初是手动创建的,或者使用之前版本的 MySQL 创建的,则根据数据库的原始字符集,您在迁移到较新版本的 Airflow 时可能会遇到问题,并且迁移可能会因奇怪的错误(“key size too big”、“missing indexes”等)而失败。下一章将描述如何手动修复此问题。
为什么可能会出现这个错误?MySQL 8 数据库推荐的字符集/排序规则分别是 utf8mb4
和 utf8mb4_bin
。然而,这在不同版本的 MySQL 中一直在变化,而且您可能自定义创建了使用不同字符集的数据库。如果您的数据库是使用旧版本的 Airflow 或 MySQL 创建的,则在创建数据库时或在迁移过程中,编码可能就是错误的或损坏的。
不幸的是,MySQL 限制了索引键的大小,在使用 utf8mb4
时,Airflow 的索引键大小可能对于 MySQL 来说太大了。因此,在 Airflow 中,我们强制所有“ID”键使用 utf8
字符集(在 MySQL 8 中等同于 utf8mb3
)。这限制了索引的大小,以便 MySQL 可以处理它们。
以下是在您尝试迁移之前可以遵循的修复步骤(但如果您知道自己在做什么,也可以选择自己的方式进行)。
熟悉 Airflow 的内部数据库结构,您可以在数据库 ERD 模式中找到它,并在数据库迁移参考中找到迁移列表。
备份您的数据库,以便在出错时可以恢复。
检查哪些表需要修复。查看这些表
SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;
确保复制输出。最后一步会用到。您的 dag_id
、run_id
、task_id
和 key
列应明确设置为 utf8
或 utf8mb3
字符集,类似于
``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, # correct
或
``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL, # correct
问题在于,如果您的字段没有编码
``task_id`` varchar(250), # wrong !!
或仅将排序规则设置为 utf8mb4
``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL, # wrong !!
或将字符集和排序规则都设置为 utf8mb4
``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, # wrong !!
您需要修复那些字符集/排序规则设置错误的字段。
3. 删除需要修改的表的外部键索引(您不需要删除所有索引——只需删除需要修改的那些表的索引)。您需要在最后一步重新创建它们(这就是为什么需要保留步骤 2 中 SHOW CREATE TABLE
的输出)。
ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;
4. 修改您的 ID
字段以拥有正确的字符集/编码。只对编码错误的字段进行修改(这里列出了您可能需要使用的所有潜在命令)
ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
重新创建在步骤 3 中删除的外部键。
对所有您删除的索引重复此操作。请注意,根据您拥有的 Airflow 版本,索引可能略有不同(例如 map_index
在 2.3.0 中添加),但如果您保留了步骤 2 中准备好的 SHOW CREATE TABLE
输出,您将找到正确的 CONSTRAINT_NAME
和 CONSTRAINT
来使用。
# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>
这应该能使数据库达到可以运行到新 Airflow 版本迁移的状态。
升级后警告¶
通常您只需要成功运行 airflow db migrate
命令即可完成。然而,在某些情况下,迁移可能会在您的数据库中发现一些旧的、过时的且可能错误的数据,并将其移到单独的表中。在这种情况下,您可能会在 Webserver UI 中看到关于找到这些数据的警告。
您可能看到的典型消息
Airflow 在元数据库的 <原始表> 表中发现不兼容的数据,并在数据库升级迁移期间将其移至 <新表>。请检查已移动的数据,决定是否需要保留它们,并手动删除 <新表> 以解除此警告。
当您看到这样的消息时,意味着您的一些数据已损坏,您应该检查它以确定是否要保留或删除部分数据。最有可能的是这些数据是损坏的,并且是某些 bug 留下的,可以安全删除——因为这些数据无论如何都不会在 Airflow 中可见或有用。但是,如果您出于审计或历史原因有特殊需要,可以选择将其存储在其他地方。除非您有特定理由保留数据,否则最有可能的最佳选择是删除它。
您可以通过多种方式检查和删除数据——如果您使用自己的工具(通常是显示数据库对象的图形工具)直接访问数据库,则可以使用这些工具删除、重命名该表或将其移动到另一个数据库。如果您没有此类工具,可以使用 airflow db shell
命令——这会将您带入数据库的数据库 shell 工具,您将能够检查和删除该表。
如何使用 Kubernetes 删除表
进入任何 Airflow Pods(Webserver 或 Scheduler):
kubectl exec -it <your-webserver-pod> python
在 Python shell 中运行以下命令
from airflow.settings import Session session = Session() session.execute("DROP TABLE _airflow_moved__2_2__task_instance") session.commit()
请在示例中将 <table>
替换为警告消息中显示的实际表名。
检查表
SELECT * FROM <table>;
删除表
DROP TABLE <table>;
迁移最佳实践¶
根据数据库的大小和实际迁移操作,迁移可能需要相当长的时间,因此如果您有较长的历史记录和庞大的数据库,建议先复制数据库并执行一次测试迁移,以评估迁移所需的时间。通常,“主要”升级可能需要更长时间,因为添加新功能有时需要重构数据库。