警告

Batch 执行器目前处于 alpha/实验阶段,如有更改,恕不另行通知。

AWS Batch 执行器

这是一个由 Amazon Batch 提供支持的 Airflow 执行器。Airflow 调度的每个任务都在 Batch 调度的单独容器内运行。此类执行器的一些优势包括

  1. 可扩展性和更低的成本:AWS Batch 允许动态配置执行任务所需的资源。根据分配的资源,AWS Batch 可以根据工作负载自动向上或向下扩展,从而确保高效的资源利用率并降低成本。

  2. 作业队列和优先级:AWS Batch 提供了作业队列的概念,允许对任务的执行进行优先级排序和管理。这确保了在同时调度多个任务时,它们会按照所需的优先级顺序执行。

  3. 灵活性:AWS Batch 支持 Fargate (ECS)、EC2 和 EKS 计算环境。这种计算环境范围以及精细定义分配给计算环境的资源的能力,为用户选择最适合其工作负载的执行环境提供了很大的灵活性。

  4. 快速任务执行:通过在 AWS Batch 中维护一个活动的 worker,可以快速执行提交到该服务的任务。借助随时可用的 worker,启动延迟极短,确保任务在提交后立即开始。此功能对于时间敏感型工作负载或需要近实时处理的应用程序特别有利,可提高整体工作流程效率和响应能力。

有关快速入门指南,请参阅此处,它将帮助您启动并运行基本配置。

以下部分提供了有关配置、提供的示例 Dockerfile 和日志记录的更多通用详细信息。

配置选项

有许多配置选项可用,可以在“aws_batch_executor”部分下的 airflow.cfg 文件中直接设置,也可以使用 AIRFLOW__AWS_BATCH_EXECUTOR__<OPTION_NAME> 格式通过环境变量设置,例如 AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE = "myJobQueue"。有关如何设置这些选项的更多信息,请参阅设置配置选项

注意

配置选项在运行 Airflow 组件(调度器、Web 服务器、执行器托管资源等)的所有主机/环境中必须一致。有关设置配置的更多详细信息,请参阅此处

如果发生冲突,则优先级顺序从低到高为

  1. 加载具有默认值的选项的默认值。

  2. 加载通过 airflow.cfg 或环境变量显式提供的任何值。这些将使用 Airflow 的配置优先级进行检查。

  3. 如果提供了 SUBMIT_JOB_KWARGS 选项,则加载其中提供的任何值。

注意

exec_config 是一个可选参数,可以提供给操作器。它是一个字典类型,在 Batch 执行器的上下文中,它表示一个 submit_job_kwargs 配置,然后在上面 Airflow 配置中指定的 submit_job_kwargs 之上更新(如果存在)。它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:submit_job_kwargs.update(exec_config)

必需的配置选项:

  • JOB_QUEUE - 提交作业的作业队列。必需。

  • JOB_DEFINITION - 此作业使用的作业定义。必需。

  • JOB_NAME - AWS Batch 作业的名称。必需。

  • REGION_NAME - 配置了 Amazon Batch 的 AWS 区域的名称。必需。

可选的配置选项:

  • AWS_CONN_ID - Batch 执行器用于对 AWS Batch 进行 API 调用的 Airflow 连接(即凭据)。默认为“aws_default”。

  • SUBMIT_JOB_KWARGS - 一个 JSON 字符串,其中包含要提供给 Batch submit_job API 的参数。

  • MAX_SUBMIT_JOB_ATTEMPTS - Batch 执行器应尝试提交作业的最大次数。这指的是作业无法启动的情况(即 API 故障、容器故障等)

  • CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 Batch 执行器的运行状况

有关可用选项的更详细说明,包括类型提示和示例,请参阅 Amazon 提供程序包中的 config_templates 文件夹。

注意

exec_config 是一个可选参数,可以提供给操作器。它是一个字典类型,在 Batch 执行器的上下文中,它表示一个 submit_job_kwargs 配置,然后在上面 Airflow 配置中指定的 submit_job_kwargs 之上更新(如果存在)。它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:submit_job_kwargs.update(exec_config)

AWS Batch 执行器的 Dockerfile

可以在此处找到一个示例 Dockerfile,它创建了一个映像,AWS Batch 可以使用该映像在 Apache Airflow 中使用 AWS Batch 执行器运行 Airflow 任务。该映像支持 AWS CLI/API 集成,允许您在 Airflow 环境中与 AWS 服务进行交互。它还包括从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。

先决条件

您的系统上必须安装了 Docker。有关安装 Docker 的说明,请参阅此处

构建映像

AWS CLI 将安装在映像中,并且有多种方法可以将 AWS 身份验证信息传递给容器,因此有多种方法可以构建映像。本指南将介绍两种方法。

最安全的方法是使用 IAM 角色。创建 AWS Batch 作业定义时,您可以选择一个作业角色和一个执行角色。执行角色是容器代理用来代表您发出 AWS API 请求的角色。根据 Batch 执行器使用的计算类型,需要将相应的策略附加到执行角色。此外,该角色还需要至少具有 CloudWatchLogsFullAccess(或 CloudWatchLogsFullAccessV2)策略。作业角色是容器用来发出 AWS API 请求的角色。此角色需要根据正在运行的 DAG 中描述的任务具有权限。如果您通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。

要创建新的作业角色或执行角色,请按照以下步骤操作

  1. 导航到 AWS 控制台上的 IAM 页面,然后在左侧选项卡的“访问管理”下选择“角色”。

  2. 在“角色”页面上,单击右上角的“创建角色”。

  3. 在“受信任实体类型”下,选择“AWS 服务”。

  4. 选择适用的用例。

  5. 在“权限”页面中,根据角色是作业角色还是执行角色,选择角色需要的权限。选择所有必需的权限后,单击“下一步”。

  6. 输入新角色的名称和可选的描述。查看受信任实体和角色的权限。根据需要添加任何标签,然后单击“创建角色”。

创建 Batch 的作业定义时(有关更多详细信息,请参阅设置指南),为作业定义选择适当的新创建的作业角色和执行角色。

然后,您可以通过 cd 到包含 Dockerfile 的目录并运行以下命令来构建映像

docker build -t my-airflow-image \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION .

注意:重要的是,映像必须在相同的体系结构下构建和运行。例如,对于 Apple Silicon 上的用户,您可能希望使用 docker buildx 指定体系结构

docker buildx build --platform=linux/amd64 -t my-airflow-image \
  --build-arg aws_default_region=YOUR_DEFAULT_REGION .

有关使用 docker buildx 的更多信息,请参阅此处

第二种方法是使用构建时参数(aws_access_key_idaws_secret_access_keyaws_default_regionaws_session_token)。

注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能存在安全漏洞。

要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg 选项。例如

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN .

YOUR_ACCESS_KEYYOUR_SECRET_KEYYOUR_SESSION_TOKENYOUR_DEFAULT_REGION 替换为有效的 AWS 凭据。

基础映像

Docker 映像构建在 apache/airflow:latest 映像之上。有关该映像的更多信息,请参阅此处

重要说明:此映像中的 Airflow 和 python 版本必须与运行 Airflow 调度器进程(进而运行执行器)的主机/容器上的 Airflow 和 python 版本一致。可以通过使用以下命令在本地运行容器来验证映像的 Airflow 版本

docker run <image_name> version

同样,可以使用以下命令验证映像的 python 版本

docker run <image_name> python --version

确保这些版本与运行 Airflow 调度器进程(以及 Batch 执行器)的主机/容器上的版本匹配。可以从 Dockerhub 注册表下载具有特定 python 版本的 Apache Airflow 映像,并按python 版本过滤标签。例如,标签 latest-python3.8 指定该映像将安装 python 3.8。

加载 DAG

在 Batch 管理的容器上加载 DAG 有很多方法。此 Dockerfile 预先配置了两种可能的方法:从本地文件夹复制或从 S3 存储桶下载。也可以使用其他方法加载 DAG。

从 S3 存储桶

要从 S3 存储桶加载 DAG,请取消注释 Dockerfile 中的入口点行,以将 DAG 从指定的 S3 存储桶同步到容器内的 /opt/airflow/dags 目录。如果您想将 DAG 存储在 /opt/airflow/dags 以外的目录中,可以选择提供 container_dag_path 作为构建参数。

在 docker 构建命令中添加 --build-arg s3_uri=YOUR_S3_URI。将 YOUR_S3_URI 替换为您的 S3 存储桶的 URI。确保您具有从存储桶读取的适当权限。

请注意,以下命令还将 AWS 凭证作为构建参数传递。

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN \
 --build-arg s3_uri=YOUR_S3_URI .

从本地文件夹

要从本地文件夹加载 DAG,请将您的 DAG 文件放在主机上 docker 构建上下文中的一个文件夹中,并使用 host_dag_path 构建参数提供该文件夹的位置。默认情况下,DAG 将被复制到 /opt/airflow/dags,但这可以通过在 Docker 构建过程中传递 container_dag_path 构建时参数来更改

docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .

如果选择将 DAG 加载到 /opt/airflow/dags 以外的路径,则需要在 Airflow 配置中更新新路径。

安装 Python 依赖项

此 Dockerfile 支持通过 piprequirements.txt 文件安装 Python 依赖项。将您的 requirements.txt 文件放在与 Dockerfile 相同的目录中。如果它位于不同的位置,则可以使用 requirements_path 构建参数指定它。复制 requirements.txt 文件时,请记住 Docker 上下文。取消注释 Dockerfile 中将 requirements.txt 文件复制到容器的两行,然后运行 pip install 以在容器上安装依赖项。

为 AWS Batch 执行程序构建镜像

有关如何将您通过此自述文件创建的 Docker 镜像与 Batch 执行程序一起使用的详细说明,请参见此处

日志记录

通过此执行程序执行的 Airflow 任务在已配置 VPC 内的容器中运行。这意味着 Airflow Web 服务器无法直接访问日志,并且当容器在任务完成后停止时,日志将永久丢失。

使用 Batch 执行程序时,应采用远程日志记录来持久化您的 Airflow 任务日志,并使它们可以从 Airflow Web 服务器查看。

配置远程日志记录

配置远程日志记录的方法有很多种,并且支持多种目标。有关 Airflow 任务日志记录的概述,请参见此处。有关配置 S3 远程日志记录的说明,请参见此处,有关 Cloudwatch 远程日志记录的说明,请参见此处。在 Batch 执行程序的上下文中,远程日志记录需要注意的一些重要事项

  • 应在运行 Airflow 的所有主机和容器上配置 Airflow 远程日志记录的配置选项。例如,Web 服务器需要此配置才能从远程位置获取日志,而 Batch 执行程序运行的容器需要此配置才能将日志上传到远程位置。有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息,请参见此处

  • 可以通过多种方式将 Airflow 远程日志记录配置添加到容器中。一些示例包括但不限于

    • 在 Dockerfile 中直接作为环境变量导出(请参见上方的 Dockerfile 部分)

    • 更新 airflow.cfg 文件或在 Dockerfile 中复制/挂载/下载自定义 airflow.cfg

    • 在作业定义中作为环境变量添加

  • 您必须在容器中配置凭证才能与日志的远程服务(例如 S3、CloudWatch Logs 等)进行交互。这可以通过多种方式完成。一些示例包括但不限于

    • 将凭证直接导出到 Dockerfile 中(请参见上方的 Dockerfile 部分)

    • 配置 Airflow 连接并将其作为远程日志记录连接 ID提供(通过上述任何方式或您喜欢的方式导出到容器中)。然后,Airflow 将专门使用这些凭证与您选择的远程日志记录目标进行交互。

注意

配置选项在运行 Airflow 组件(调度器、Web 服务器、执行器托管资源等)的所有主机/环境中必须一致。有关设置配置的更多详细信息,请参阅此处

为 Apache Airflow 设置 Batch 执行程序

使 Batch 执行程序在 Apache Airflow 中工作需要执行 3 个步骤

  1. 创建一个 Airflow 和 Batch 执行的任务可以连接到的数据库。

  2. 创建和配置可以从 Airflow 运行任务的 Batch 资源。

  3. 配置 Airflow 以使用 Batch 执行程序和数据库。

选择数据库后端有不同的选项。有关 Airflow 支持的不同选项的更多信息,请参见此处。以下指南将说明如何在 AWS 上设置 PostgreSQL RDS 实例。

为 AWS Batch 执行程序设置 RDS 数据库实例

创建 RDS 数据库实例

  1. 登录到您的 AWS 管理控制台并导航到 RDS 服务。

  2. 单击“创建数据库”以开始创建新的 RDS 实例。

  3. 选择“标准创建”选项,然后选择 PostreSQL。

  4. 选择适当的模板、可用性和持久性。

    • 注意:在撰写本文时,“多可用区数据库**集群**”选项不支持设置数据库名称,这是下面需要的步骤。

  5. 设置数据库实例名称、用户名和密码。

  6. 选择实例配置和存储参数。

  7. 在“连接”部分中,选择“不连接到 EC2 计算资源”。

  8. 选择或创建一个 VPC 和子网,并允许从公网访问数据库。选择或创建安全组并选择可用区。

  9. 打开“其他配置”选项卡,并将数据库名称设置为 airflow_db

  10. 根据需要选择其他设置,然后单击“创建数据库”以创建数据库。

测试连接

为了能够连接到新的 RDS 实例,您需要允许从您的 IP 地址到数据库的入站流量。

  1. 在 RDS 实例的“连接和安全”选项卡的“安全”标题下,找到指向您的新 RDS 数据库实例的 VPC 安全组的链接。

  2. 创建一个入站规则,允许来自 TCP 端口 5432 (PostgreSQL) 上的 IP 地址的流量。

  3. 确认您可以在修改安全组后连接到数据库。这将需要安装 psql。有关安装 psql 的说明,请参见此处

**注意**:在测试连接之前,请确保您的数据库状态为“可用”。

psql -h <endpoint> -p 5432 -U <username> <db_name>

端点可以在“连接和安全”选项卡上找到,用户名(和密码)是创建数据库时使用的凭证。

数据库名称应为 airflow_db(除非在创建数据库时使用了不同的名称)。

如果连接成功,系统将提示您输入密码。

设置 AWS Batch

AWS Batch 可以通过多种方式配置,并根据用例提供不同的编排类型。为简单起见,本指南将介绍如何使用 EC2 设置 Batch。

为了设置 AWS Batch 以便它可以与 Apache Airflow 一起使用,您需要一个配置正确的 Docker 镜像。有关如何执行此操作的说明,请参见Dockerfile部分。

构建镜像后,需要将其放入容器可以拉取的存储库中。有多种方法可以实现这一点。本指南将介绍如何使用 Amazon Elastic Container Registry (ECR) 执行此操作。

创建 ECR 存储库

  1. 登录到您的 AWS 管理控制台并导航到 ECR 服务。

  2. 单击“创建存储库”。

  3. 命名存储库并根据需要填写其他信息。

  4. 单击“创建存储库”。

  5. 创建存储库后,单击该存储库。单击右上角的“查看推送命令”按钮。

  6. 按照说明推送 Docker 镜像,并根据需要替换镜像名称。在推送镜像后刷新页面以确保镜像已上传。

配置 AWS Batch

  1. 登录到您的 AWS 管理控制台并导航到 AWS Batch 登录页面。

  2. 在左侧边栏中,单击“向导”。此向导将指导您创建运行 Batch 作业所需的所有资源。

  3. 选择编排为 Amazon EC2。

  4. 单击“下一步”。

创建计算环境

  1. 为计算环境、标签和任何适当的实例配置选择一个名称。在这里,您可以选择 vCPU 的最小数量、最大数量和所需数量,以及您要使用的 EC2 实例类型。

  2. 对于实例角色,选择创建新的实例配置文件或使用附加了所需 IAM 权限的现有实例配置文件。此实例配置文件允许为您的计算环境创建的 Amazon ECS 容器实例代表您调用所需的 AWS API 操作。

  3. 选择一个允许访问互联网的 VPC,以及一个具有必要权限的安全组。

  4. 单击“下一步”。

创建作业队列

  1. 为作业队列选择一个名称和优先级。计算环境将设置为在上一步中创建的环境。

创建作业定义

  1. 为作业定义选择一个名称。

  2. 选择适当的平台配置。确保已启用 分配 公网 IP

  3. 选择一个执行角色,并确保该角色具有完成其任务所需的权限。

  4. 输入上一节中推送的映像的映像 URI。确保正在使用的角色具有提取映像所需的权限。

  5. 选择合适的作业角色,同时牢记正在运行的任务的要求。

  6. 根据需要配置环境。您可以指定容器可用的 vCPU 数量、内存或 GPU。此外,将以下环境变量添加到容器中

  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN,其值为 PostgreSQL 连接字符串,格式如下,使用上面数据库部分中设置的值

postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
  1. 根据需要为 Airflow(请参阅此处)、批处理执行器(请参阅此处)或远程日志记录(请参阅此处)添加其他配置。请注意,任何配置更改都应在整个 Airflow 环境中进行,以保持配置一致。

  2. 单击“下一步”。

  3. 在“审核和创建”页面上,查看所有选择,并在确认一切无误后,单击“创建资源”。

允许容器访问 RDS 数据库

最后一步是为 Batch 管理的容器配置对数据库的访问权限。可以使用许多不同的网络配置,但一种可能的方法是

  1. 登录到您的 AWS 管理控制台并导航到 VPC 控制面板。

  2. 在左侧的“安全”标题下,单击“安全组”。

  3. 选择与您的 RDS 实例关联的安全组,然后单击“编辑入站规则”。

  4. 添加一条新规则,允许 PostgreSQL 类型的流量进入与 Batch 计算环境关联的子网的 CIDR。

配置 Airflow

要将 Airflow 配置为使用 Batch 执行器并利用我们已设置的资源,请确保定义了以下环境变量

AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.batch.batch_executor.AwsBatchExecutor'

AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>

AIRFLOW__AWS_BATCH_EXECUTOR__REGION_NAME=<executor-region>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE=<batch-job-queue>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_DEFINITION=<batch-job-definition>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_NAME=<batch-job-name>

此脚本应在运行 Airflow 调度程序和 Web 服务器的主机上运行,然后再启动这些进程。

该脚本设置环境变量,这些变量将 Airflow 配置为使用 Batch 执行器并提供任务执行所需的信息。所做的任何其他配置更改(例如远程日志记录)都应添加到此示例脚本中,以保持整个 Airflow 环境中配置的一致性。

初始化 Airflow 数据库

Airflow 数据库需要先初始化才能使用,并且需要添加用户才能登录。以下命令添加一个管理员用户(如果数据库尚未初始化,该命令还将初始化数据库)

airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin

此条目有帮助吗?