设置数据库后端

Airflow 构建为使用 SqlAlchemy 与其元数据进行交互。

以下文档描述了数据库引擎配置、要与 Airflow 一起使用需要对其配置进行的必要更改,以及对 Airflow 配置的更改以连接到这些数据库。

选择数据库后端

如果您想真正试用 Airflow,您应该考虑将数据库后端设置为 PostgreSQLMySQL。默认情况下,Airflow 使用 SQLite,它仅用于开发目的。

Airflow 支持以下数据库引擎版本,因此请确保您拥有哪个版本。旧版本可能不支持所有 SQL 语句。

  • PostgreSQL:12、13、14、15、16

  • MySQL:8.0、创新版

  • SQLite:3.15.0+

如果您计划运行多个调度程序,则必须满足其他要求。有关详细信息,请参阅 调度程序 HA 数据库要求

警告

尽管 MariaDB 和 MySQL 之间有很大的相似之处,但我们不支持将 MariaDB 作为 Airflow 的后端。MariaDB 和 MySQL 之间存在已知问题(例如索引处理),我们不会在 MariaDB 上测试我们的迁移脚本或应用程序执行。我们知道有些人将 MariaDB 用于 Airflow,这给他们带来了很多运营上的麻烦,因此我们强烈建议不要尝试将 MariaDB 用作后端,用户不能期望为此获得任何社区支持,因为尝试使用 MariaDB 的用户数量非常少用于气流。

数据库 URI

Airflow 使用 SQLAlchemy 连接到数据库,这需要您配置数据库 URL。您可以在 [database] 部分中的选项 sql_alchemy_conn 中执行此操作。通常也使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 环境变量配置此选项。

注意

有关设置配置的更多信息,请参阅 设置配置选项

如果要检查当前值,可以使用 airflow config get-value database sql_alchemy_conn 命令,如下例所示。

$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db

确切的格式描述在 SQLAlchemy 文档中进行了描述,请参阅 数据库 URL。我们还将在下面展示一些示例。

设置 SQLite 数据库

SQLite 数据库可用于运行 Airflow 以进行开发,因为它不需要任何数据库服务器(数据库存储在本地文件中)。使用 SQLite 数据库有很多限制(例如,它只能与 Sequential Executor 一起使用),并且它永远不应该用于生产环境。

运行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本为 3.15.0。一些较旧的系统默认安装了早期版本的 sqlite,对于这些系统,您需要手动升级 SQLite 以使用 3.15.0 以上的版本。请注意,这不是 python library 版本,而是需要升级的 SQLite 系统级应用程序。安装 SQLite 的方式有很多种,您可以在 SQLite 官方网站 和您的操作系统发行版特定的文档中找到有关此信息。

故障排除

有时,即使您将 SQLite 升级到更高版本并且您的本地 python 报告了更高版本,Airflow 使用的 python 解释器可能仍然使用为用于启动 Airflow 的 python 解释器设置的 LD_LIBRARY_PATH 中可用的旧版本。

您可以通过运行以下检查来确保解释器使用哪个版本

root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>

但请注意,为您的 Airflow 部署设置环境变量可能会更改首先找到哪个 SQLite 库,因此您可能希望确保“足够高”版本的 SQLite 是系统中安装的唯一版本。

sqlite 数据库的示例 URI

sqlite:////home/airflow/airflow.db

在 AmazonLinux AMI 或容器镜像上升级 SQLite

AmazonLinux SQLite 只能使用源存储库升级到 v3.7。Airflow 需要 v3.15 或更高版本。使用以下说明使用最新的 SQLite3 设置基本镜像(或 AMI)

先决条件:您需要 wgettargzipgccmakeexpect 才能使升级过程正常工作。

yum -y install wget tar gzip gcc make expect

https://sqlite.ac.cn/ 下载源代码,在本地进行 make 和 install。

wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
    -DSQLITE_ENABLE_FTS3_PARENTHESIS \
    -DSQLITE_ENABLE_FTS4 \
    -DSQLITE_ENABLE_FTS5 \
    -DSQLITE_ENABLE_JSON1 \
    -DSQLITE_ENABLE_LOAD_EXTENSION \
    -DSQLITE_ENABLE_RTREE \
    -DSQLITE_ENABLE_STAT4 \
    -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
    -DSQLITE_SOUNDEX \
    -DSQLITE_TEMP_STORE=3 \
    -DSQLITE_USE_URI \
    -O2 \
    -fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install

安装后将 /usr/local/lib 添加到库路径

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

设置 PostgreSQL 数据库

您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个数据库 airflow_db 和用户名为 airflow_user、密码为 airflow_pass 的用户

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
GRANT ALL ON SCHEMA public TO airflow_user;

注意

数据库必须使用 UTF-8 字符集

您可能需要更新您的 Postgres pg_hba.conf 以将 airflow 用户添加到数据库访问控制列表中;并重新加载数据库配置以加载您的更改。请参阅 Postgres 文档中的 pg_hba.conf 文件 以了解更多信息。

警告

当您使用 SQLAlchemy 1.4.0+ 时,您需要在 sql_alchemy_conn 中使用 postgresql:// 作为数据库。在 SQLAlchemy 的先前版本中,可以使用 postgres://,但在 SQLAlchemy 1.4.0+ 中使用它会导致

>       raise exc.NoSuchModuleError(
            "Can't load plugin: %s:%s" % (self.group, name)
        )
E       sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres

如果您不能立即更改 URL 的前缀,Airflow 将继续使用 SQLAlchemy 1.3,您可以降级 SQLAlchemy,但我们建议更新前缀。

详细信息请参阅 SQLAlchemy 更新日志

我们建议使用 psycopg2 驱动程序并在您的 SqlAlchemy 连接字符串中指定它。

postgresql+psycopg2://<user>:<password>@<host>/<db>

另请注意,由于 SqlAlchemy 没有公开在数据库 URI 中定位特定模式的方法,因此您需要确保模式 public 在您的 Postgres 用户的 search_path 中。

如果您为 Airflow 创建了一个新的 Postgres 帐户

  • 新 Postgres 用户的默认 search_path 为:"$user", public,无需更改。

如果您使用具有自定义 search_path 的当前 Postgres 用户,则可以使用以下命令更改 search_path

ALTER USER airflow_user SET search_path = public;

有关设置 PostgreSQL 连接的更多信息,请参阅 SQLAlchemy 文档中的 PostgreSQL 方言

注意

众所周知,Airflow,尤其是在高性能设置中,会打开许多到元数据数据库的连接。这可能会导致 Postgres 资源使用出现问题,因为在 Postgres 中,每个连接都会创建一个新进程,当打开大量连接时,这会使 Postgres 资源占用过多。因此,我们建议对所有 Postgres 生产安装使用 PGBouncer 作为数据库代理。PGBouncer 可以处理来自多个组件的连接池,而且,如果您有连接可能不稳定的远程数据库,它将使您的数据库连接对临时网络问题的适应能力更强。PGBouncer 部署的示例实现可以在 Apache Airflow 的 Helm Chart 中找到,您可以在其中通过翻转布尔标志来启用预先配置的 PGBouncer 实例。在您准备自己的部署时,即使您不使用官方 Helm Chart,也可以参考我们采用的方法并将其作为灵感。

另请参阅 Helm Chart 生产指南

注意

对于托管 Postgres(例如 Azure Postgresql、CloudSQL、Amazon RDS),您应该在连接参数中使用 keepalives_idle 并将其设置为小于空闲时间,因为这些服务会在一段时间不活动(通常为 300 秒)后关闭空闲连接,这会导致错误 错误: psycopg2.operationalerror: SSL SYSCALL 错误: 检测到 EOFkeepalive 设置可以通过 [database] 部分中的 sql_alchemy_connect_args 配置参数 配置参考 进行更改。例如,您可以在 local_settings.py 中配置 args,并且 sql_alchemy_connect_args 应该是存储配置参数的字典的完整导入路径。您可以阅读有关 Postgres Keepalives 的信息。已观察到可以解决此问题的 keepalives 示例设置可能是

keepalive_kwargs = {
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 5,
    "keepalives_count": 5,
}

然后,如果将其放置在 airflow_local_settings.py 中,则配置导入路径将为

sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs

有关如何配置本地设置的详细信息,请参阅 配置本地设置

设置 MySQL 数据库

您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个数据库 airflow_db 和用户名为 airflow_user、密码为 airflow_pass 的用户

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

注意

数据库必须使用 UTF-8 字符集。您必须注意的一个小问题是,较新版本 MySQL 中的 utf8 实际上是 utf8mb4,这会导致 Airflow 索引变得过大(请参阅 https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此,从 Airflow 2.2 开始,所有 MySQL 数据库都将 sql_engine_collation_for_ids 自动设置为 utf8mb3_bin(除非您覆盖它)。这可能会导致 Airflow 数据库中 id 字段的排序规则 id 混合,但这不会产生负面后果,因为 Airflow 中所有相关的 ID 都只使用 ASCII 字符。

为了获得合理的默认值,我们依赖于 MySQL 更严格的 ANSI SQL 设置。确保在 my.cnf 文件的 [mysqld] 部分下指定了 explicit_defaults_for_timestamp=1 选项。您也可以使用传递给 mysqld 可执行文件的 --explicit-defaults-for-timestamp 开关来激活这些选项

我们建议使用 mysqlclient 驱动程序并在您的 SqlAlchemy 连接字符串中指定它。

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

重要

MySQL 后端的集成仅在 Apache Airflow 的持续集成 (CI) 过程中使用 mysqlclient 驱动程序进行了验证。

如果您想使用其他驱动程序,请访问 SQLAlchemy 文档中的 MySQL Dialect,以获取有关 SqlAlchemy 连接的下载和设置的更多信息。

此外,您还应特别注意 MySQL 的编码。尽管 utf8mb4 字符集在 MySQL 中越来越流行(实际上,utf8mb4 成为 MySQL8.0 中的默认字符集),但在 Airflow 2+ 中使用 utf8mb4 编码需要进行其他设置(有关详细信息,请参阅 #7570。)。如果您使用 utf8mb4 作为字符集,则还应设置 sql_engine_collation_for_ids=utf8mb3_bin

注意

在严格模式下,MySQL 不允许将 0000-00-00 作为有效日期。然后,在某些情况下(某些 Airflow 表使用 0000-00-00 00:00:00 作为时间戳字段默认值),您可能会收到类似 “'end_date' 的默认值无效” 的错误。为了避免此错误,您可以在 MySQL 服务器上禁用 NO_ZERO_DATE 模式。阅读 https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field 了解如何禁用它。有关详细信息,请参阅 SQL 模式 - NO_ZERO_DATE

MsSQL 数据库

警告

经过 讨论投票,Airflow 的 PMC 和提交者已达成决议,不再维护 MsSQL 作为受支持的数据库后端。

从 Airflow 2.9.0 开始,Airflow 数据库后端不再支持 MsSQL。这不会影响现有的提供程序包(运算符和钩子),DAG 仍然可以访问和处理来自 MsSQL 的数据。

从 MsSQL Server 迁移

由于 Airflow 2.9.0 已停止支持 MSSQL,因此可以使用迁移脚本帮助从 Airflow 版本 2.7.x 或 2.8.x 迁移到 SQL-Server。迁移脚本可在 Github 上的 airflow-mssql-migration 存储库 中找到。

请注意,迁移脚本不提供支持和保修。

其他配置选项

还有更多用于配置 SQLAlchemy 行为的配置选项。有关详细信息,请参阅 [database] 部分中 sqlalchemy_* 选项的 参考文档

例如,您可以指定一个数据库架构,Airflow 将在其中创建其所需的表。如果希望 Airflow 在 PostgreSQL 数据库的 airflow 架构中安装其表,请指定以下环境变量

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

请注意 SQL_ALCHEMY_CONN 数据库 URL 末尾的 search_path

初始化数据库

在配置数据库并在 Airflow 配置中连接到数据库后,您应该创建数据库架构。

airflow db migrate

Airflow 中的数据库监控和维护

Airflow 广泛利用关系元数据数据库进行任务调度和执行。监控和正确配置此数据库对于实现最佳 Airflow 性能至关重要。

关键问题

  1. 性能影响:长时间或过多的查询会严重影响 Airflow 的功能。这些可能是由于工作流的特殊性、缺乏优化或代码错误造成的。

  2. 数据库统计信息:数据库引擎做出的不正确的优化决策(通常是由于数据统计信息过时)会导致性能下降。

职责

Airflow 环境中数据库监控和维护的职责因您是使用自管数据库和 Airflow 实例还是选择托管服务而异。

自管环境:

在数据库和 Airflow 都是自管的设置中,部署管理员负责设置、配置和维护数据库。这包括监控其性能、管理备份、定期清理以及确保其与 Airflow 的最佳运行。

托管服务:

  • 托管数据库服务:使用托管数据库服务时,许多维护任务(如备份、修补和基本监控)都由提供商处理。但是,部署管理员仍然需要监督 Airflow 的配置并优化特定于其工作流的性能设置,管理定期清理并监控其数据库以确保与 Airflow 的最佳运行。

  • 托管 Airflow 服务:使用托管 Airflow 服务时,这些服务提供商负责 Airflow 及其数据库的配置和维护。但是,部署管理员需要与服务配置协作,以确保规模和工作流要求与托管服务的规模和配置相匹配。

监控方面

定期监控应包括

  • CPU、I/O 和内存使用率。

  • 查询频率和次数。

  • 识别和记录缓慢或长时间运行的查询。

  • 检测低效的查询执行计划。

  • 磁盘交换空间与内存使用率和缓存交换频率分析。

工具和策略

  • Airflow 不提供用于数据库监控的直接工具。

  • 使用服务器端监控和日志记录来获取指标。

  • 根据定义的阈值启用对长时间运行查询的跟踪。

  • 定期运行维护任务(如 ANALYZE SQL 命令)。

数据库清理工具

  • Airflow DB 清理命令:使用 airflow db clean 命令帮助管理和清理数据库。

  • ``airflow.utils.db_cleanup`` 中的 Python 方法:此模块提供了用于数据库清理和维护的其他 Python 方法,为特定需求提供了更精细的控制和定制。

建议

  • 主动监控:在生产环境中实施监控和日志记录,而不会显著影响性能。

  • 特定于数据库的指导:查阅所选数据库的文档,了解具体的监控设置说明。

  • 托管数据库服务:检查您的数据库提供商是否提供自动维护任务。

SQLAlchemy 日志记录

要进行详细的查询分析,请启用 SQLAlchemy 客户端日志记录(在 SQLAlchemy 引擎配置中使用 echo=True)。

  • 此方法侵入性更强,可能会影响 Airflow 的客户端性能。

  • 它会生成大量日志,尤其是在繁忙的 Airflow 环境中。

  • 适用于非生产环境,例如测试系统。

您可以按照 SQLAlchemy 日志记录文档 中的说明,使用 echo=True 作为 sqlalchemy 引擎配置来实现。

使用 sql_alchemy_engine_args 配置参数将 echo 参数设置为 True。

注意

  • 启用大量日志记录时,请注意对 Airflow 性能和系统资源的影响。

  • 对于生产环境,优先选择服务器端监控而不是客户端日志记录,以最大程度地减少对性能的干扰。

下一步是什么?

默认情况下,Airflow 使用 SequentialExecutor,它不提供并行性。为了获得更好的性能,您应该考虑配置不同的 执行器

此条目有帮助吗?