设置数据库后端

Airflow 构建为使用 SqlAlchemy 与其元数据进行交互。

以下文档描述了数据库引擎配置、将其配置用于 Airflow 所需的更改,以及连接到这些数据库的 Airflow 配置的更改。

选择数据库后端

如果您想真正试用 Airflow,您应该考虑设置 PostgreSQLMySQL 数据库后端。默认情况下,Airflow 使用 SQLite,它仅用于开发目的。

Airflow 支持以下数据库引擎版本,请确保您拥有哪个版本。旧版本可能不支持所有 SQL 语句。

  • PostgreSQL:12、13、14、15、16

  • MySQL:8.0、创新版

  • SQLite:3.15.0+

如果您计划运行多个调度程序,则必须满足其他要求。有关详细信息,请参阅 调度程序 HA 数据库要求

警告

尽管 MariaDB 和 MySQL 之间存在很大的相似之处,但我们不支持将 MariaDB 作为 Airflow 的后端。MariaDB 和 MySQL 之间存在已知的问题(例如索引处理),并且我们没有在 MariaDB 上测试我们的迁移脚本或应用程序执行。我们知道有人将 MariaDB 用于 Airflow,这给他们带来了很多操作上的麻烦,因此我们强烈建议不要尝试使用 MariaDB 作为后端,并且用户不能期望社区对其提供任何支持,因为尝试将 MariaDB 用于 Airflow 的用户数量非常少。

数据库 URI

Airflow 使用 SQLAlchemy 连接到数据库,这需要您配置数据库 URL。您可以在 [database] 部分中的 sql_alchemy_conn 选项中执行此操作。使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 环境变量配置此选项也很常见。

注意

有关设置配置的更多信息,请参阅 设置配置选项

如果您想检查当前值,可以使用 airflow config get-value database sql_alchemy_conn 命令,如下例所示。

$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db

确切的格式描述在 SQLAlchemy 文档中进行了描述,请参阅 数据库 URL。我们还将在下面向您展示一些示例。

设置 SQLite 数据库

SQLite 数据库可用于运行 Airflow 进行开发,因为它不需要任何数据库服务器(数据库存储在本地文件中)。使用 SQLite 数据库有很多限制(例如,它仅适用于顺序执行器),并且永远不应将其用于生产。

运行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本为 3.15.0。某些较旧的系统默认安装了较早版本的 sqlite,对于这些系统,您需要手动升级 SQLite 以使用 3.15.0 以上的版本。请注意,这不是 python library 版本,而是需要升级的 SQLite 系统级应用程序。SQLite 的安装方式有很多种,您可以在 SQLite 官方网站以及您的操作系统发行版的文档中找到有关这方面的一些信息。

故障排除

有时,即使您将 SQLite 升级到更高的版本并且您的本地 python 报告了更高的版本,Airflow 使用的 python 解释器可能仍然使用为用于启动 Airflow 的 python 解释器设置的 LD_LIBRARY_PATH 中提供的旧版本。

您可以通过运行以下检查来确保解释器使用的是哪个版本

root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>

但请注意,为您的 Airflow 部署设置环境变量可能会更改首先找到的 SQLite 库,因此您可能需要确保“足够高”版本的 SQLite 是系统中安装的唯一版本。

sqlite 数据库的 URI 示例

sqlite:////home/airflow/airflow.db

在 AmazonLinux AMI 或容器映像上升级 SQLite

AmazonLinux SQLite 只能使用源仓库升级到 v3.7。Airflow 需要 v3.15 或更高版本。使用以下说明设置带有最新 SQLite3 的基本映像(或 AMI)

先决条件:您将需要 wgettargzipgccmakeexpect 才能使升级过程正常工作。

yum -y install wget tar gzip gcc make expect

https://sqlite.ac.cn/ 下载源,在本地进行 make 和安装。

wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
    -DSQLITE_ENABLE_FTS3_PARENTHESIS \
    -DSQLITE_ENABLE_FTS4 \
    -DSQLITE_ENABLE_FTS5 \
    -DSQLITE_ENABLE_JSON1 \
    -DSQLITE_ENABLE_LOAD_EXTENSION \
    -DSQLITE_ENABLE_RTREE \
    -DSQLITE_ENABLE_STAT4 \
    -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
    -DSQLITE_SOUNDEX \
    -DSQLITE_TEMP_STORE=3 \
    -DSQLITE_USE_URI \
    -O2 \
    -fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install

安装后将 /usr/local/lib 添加到库路径

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

设置 PostgreSQL 数据库

您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个名为 airflow_db 的数据库和一个用户名为 airflow_user、密码为 airflow_pass 的用户

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
GRANT ALL ON SCHEMA public TO airflow_user;

注意

该数据库必须使用 UTF-8 字符集

您可能需要更新 Postgres pg_hba.conf 以将 airflow 用户添加到数据库访问控制列表;并重新加载数据库配置以加载您的更改。请参阅 Postgres 文档中的 pg_hba.conf 文件以了解更多信息。

警告

当您使用 SQLAlchemy 1.4.0+ 时,您需要在 sql_alchemy_conn 中使用 postgresql:// 作为数据库。在以前版本的 SQLAlchemy 中,可以使用 postgres://,但在 SQLAlchemy 1.4.0+ 中使用会导致

>       raise exc.NoSuchModuleError(
            "Can't load plugin: %s:%s" % (self.group, name)
        )
E       sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres

如果您不能立即更改 URL 的前缀,Airflow 将继续使用 SQLAlchemy 1.3 工作,您可以降级 SQLAlchemy,但我们建议更新前缀。

详情请参阅 SQLAlchemy 更改日志

我们建议使用 psycopg2 驱动程序并在您的 SqlAlchemy 连接字符串中指定它。

postgresql+psycopg2://<user>:<password>@<host>/<db>

另请注意,由于 SqlAlchemy 没有提供在数据库 URI 中指定特定 schema 的方法,你需要确保 schema public 在你的 Postgres 用户的 search_path 中。

如果你为 Airflow 创建了一个新的 Postgres 账户

  • 新 Postgres 用户的默认 search_path 是:"$user", public,无需更改。

如果你使用具有自定义 search_path 的现有 Postgres 用户,则可以使用以下命令更改 search_path

ALTER USER airflow_user SET search_path = public;

有关 PostgreSQL 连接设置的更多信息,请参阅 SQLAlchemy 文档中的 PostgreSQL 方言

注意

众所周知,Airflow (尤其是在高性能设置中) 会打开许多到元数据数据库的连接。这可能会导致 Postgres 资源使用问题,因为在 Postgres 中,每个连接都会创建一个新进程,当打开大量连接时,会使 Postgres 非常消耗资源。因此,我们建议对所有 Postgres 生产安装使用 PGBouncer 作为数据库代理。PGBouncer 可以处理来自多个组件的连接池,而且如果你的远程数据库连接可能不稳定,它还可以使你的数据库连接更具弹性,能够应对临时网络问题。你可以在 Apache Airflow 的 Helm Chart 中找到 PGBouncer 部署的示例实现,你可以在其中通过翻转布尔标志来启用预配置的 PGBouncer 实例。你可以查看我们在那里采取的方法,并在准备自己的部署时将其用作灵感,即使你未使用官方 Helm Chart。

另请参阅 Helm Chart 生产指南

注意

对于托管的 Postgres (如 Azure Postgresql、CloudSQL、Amazon RDS),你应该在连接参数中使用 keepalives_idle 并将其设置为小于空闲时间的值,因为这些服务会在一段时间的不活动后(通常为 300 秒)关闭空闲连接,这会导致错误 The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detectedkeepalive 设置可以通过 sql_alchemy_connect_args 配置参数在 配置参考[database] 部分中更改。你可以在你的 local_settings.py 中配置参数,并且 sql_alchemy_connect_args 应该是存储配置参数的字典的完整导入路径。你可以阅读有关 Postgres Keepalives 的信息。keepalives 的一个示例设置 (已观察到可以解决该问题) 可能是

keepalive_kwargs = {
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 5,
    "keepalives_count": 5,
}

然后,如果将其放置在 airflow_local_settings.py 中,则配置导入路径将为

sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs

有关如何配置本地设置的详细信息,请参阅 配置本地设置

设置 MySQL 数据库

您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个名为 airflow_db 的数据库和一个用户名为 airflow_user、密码为 airflow_pass 的用户

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

注意

数据库必须使用 UTF-8 字符集。你需要注意一个小小的警告,即新版本的 MySQL 中的 utf8 实际上是 utf8mb4,这会导致 Airflow 索引变得过大 (请参阅 https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此,从 Airflow 2.2 开始,所有 MySQL 数据库都自动将 sql_engine_collation_for_ids 设置为 utf8mb3_bin (除非你覆盖它)。这可能会导致 Airflow 数据库中 id 字段的排序规则 id 混合,但这没有负面影响,因为 Airflow 中的所有相关 ID 都只使用 ASCII 字符。

我们依赖于 MySQL 更严格的 ANSI SQL 设置,以便具有合理的默认值。请确保在 my.cnf 文件中 [mysqld] 部分下指定了 explicit_defaults_for_timestamp=1 选项。你也可以使用传递给 mysqld 可执行文件的 --explicit-defaults-for-timestamp 开关来激活这些选项。

我们建议使用 mysqlclient 驱动程序并在你的 SqlAlchemy 连接字符串中指定它。

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

重要

在 Apache Airflow 的持续集成 (CI) 过程中,仅使用 mysqlclient 驱动程序验证了 MySQL 后端的集成。

如果你想使用其他驱动程序,请访问 SQLAlchemy 文档中的 MySQL 方言,了解有关下载和设置 SqlAlchemy 连接的更多信息。

此外,你还应该特别注意 MySQL 的编码。尽管 utf8mb4 字符集在 MySQL 中越来越流行 (实际上,utf8mb4 在 MySQL8.0 中成为默认字符集),但使用 utf8mb4 编码需要在 Airflow 2+ 中进行额外设置 (有关更多详细信息,请参阅 #7570)。如果你使用 utf8mb4 作为字符集,你还应该设置 sql_engine_collation_for_ids=utf8mb3_bin

注意

在严格模式下,MySQL 不允许 0000-00-00 作为有效日期。然后,在某些情况下,你可能会收到类似 "Invalid default value for 'end_date'" 的错误 (某些 Airflow 表使用 0000-00-00 00:00:00 作为时间戳字段的默认值)。为了避免此错误,你可以在 MySQL 服务器上禁用 NO_ZERO_DATE 模式。请阅读 https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field,了解如何禁用它。有关更多信息,请参阅 SQL 模式 - NO_ZERO_DATE

MsSQL 数据库

警告

在经过 讨论投票过程之后,Airflow 的 PMC 成员和提交者已达成决议,不再将 MsSQL 维护为受支持的数据库后端。

从 Airflow 2.9.0 开始,已删除对 Airflow 数据库后端 MsSQL 的支持。这不会影响现有的提供程序包 (运算符和钩子),DAG 仍然可以从 MsSQL 访问和处理数据。但是,进一步的使用可能会引发错误,导致 Airflow 的核心功能无法使用。

迁移脱离 MsSQL 服务器

随着 Airflow 2.9.0 结束了对 MSSQL 的支持,迁移脚本可以帮助 Airflow 2.7.x 或 2.8.x 版本迁移脱离 SQL Server。该迁移脚本可在 Github 上的 airflow-mssql-migration 存储库中找到。

请注意,该迁移脚本不提供支持和保证。

其他配置选项

还有更多配置选项用于配置 SQLAlchemy 行为。有关详细信息,请参阅 参考文档[database] 部分中的 sqlalchemy_* 选项。

例如,你可以指定 Airflow 将在其上创建所需表的数据库 schema。如果你希望 Airflow 将其表安装在 PostgreSQL 数据库的 airflow schema 中,请指定以下环境变量

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

请注意 SQL_ALCHEMY_CONN 数据库 URL 末尾的 search_path

初始化数据库

在配置数据库并在 Airflow 配置中连接到数据库后,你应该创建数据库 schema。

airflow db migrate

Airflow 中的数据库监控和维护

Airflow 广泛使用关系元数据数据库进行任务调度和执行。监控和正确配置此数据库对于优化 Airflow 性能至关重要。

主要关注点

  1. 性能影响:长时间或过多的查询会严重影响 Airflow 的功能。这些问题可能是由于工作流的特殊性、缺乏优化或代码错误引起的。

  2. 数据库统计信息:数据库引擎的错误优化决策 (通常是由于过时的数据统计信息导致的) 会降低性能。

职责

Airflow 环境中数据库监控和维护的职责因你使用的是自管理数据库和 Airflow 实例还是选择托管服务而异。

自管理环境:

在数据库和 Airflow 都是自管理的设置中,部署管理器负责设置、配置和维护数据库。这包括监控其性能、管理备份、定期清理并确保其与 Airflow 的最佳运行。

托管服务:

  • 托管数据库服务:使用托管数据库服务时,许多维护任务(如备份、修补程序和基本监控)都由提供商处理。但是,部署管理器仍然需要监督 Airflow 的配置并优化特定于其工作流的性能设置、管理定期清理并监控其数据库,以确保与 Airflow 的最佳运行。

  • 托管 Airflow 服务:使用托管 Airflow 服务时,这些服务提供商负责 Airflow 及其数据库的配置和维护。但是,部署管理器需要与服务配置协作,以确保大小和工作流要求与托管服务的大小和配置相匹配。

监控方面

定期监控应包括

  • CPU、I/O 和内存使用情况。

  • 查询频率和数量。

  • 识别和记录运行缓慢或长时间运行的查询。

  • 检测低效的查询执行计划。

  • 分析磁盘交换与内存使用情况以及缓存交换频率。

工具和策略

  • Airflow 不提供用于数据库监控的直接工具。

  • 使用服务器端监控和日志记录来获取指标。

  • 根据定义的阈值启用对长时间运行查询的跟踪。

  • 定期运行内务处理任务(如用于维护的 ANALYZE SQL 命令)。

数据库清理工具

  • Airflow DB Clean 命令:使用 airflow db clean 命令来帮助管理和清理你的数据库。

  • ``airflow.utils.db_cleanup`` 中的 Python 方法:此模块为数据库清理和维护提供了额外的 Python 方法,为特定需求提供更精细的控制和定制。

建议

  • 主动监控:在生产环境中实施监控和日志记录,而不会显著影响性能。

  • 数据库特定指南:请查阅所选数据库的文档,以获取具体的监控设置说明。

  • 托管数据库服务:检查您的数据库提供商是否提供自动维护任务。

SQLAlchemy 日志记录

对于详细的查询分析,请启用 SQLAlchemy 客户端日志记录(在 SQLAlchemy 引擎配置中使用 echo=True)。

  • 此方法更具侵入性,可能会影响 Airflow 的客户端性能。

  • 它会生成大量日志,尤其是在繁忙的 Airflow 环境中。

  • 适用于非生产环境,如暂存系统。

您可以使用 echo=True 作为 sqlalchemy 引擎配置来完成,如 SQLAlchemy 日志记录文档 中所述。

使用 sql_alchemy_engine_args 配置参数将 echo 参数设置为 True。

注意

  • 启用大量日志记录时,请注意对 Airflow 性能和系统资源的影响。

  • 对于生产环境,请优先使用服务器端监控而不是客户端日志记录,以最大程度地减少性能干扰。

下一步是什么?

默认情况下,Airflow 使用 SequentialExecutor,它不提供并行性。您应该考虑配置不同的执行器以获得更好的性能。

此条目是否有帮助?