警告

ECS 执行器目前处于 alpha/实验阶段,如有更改,恕不另行通知。

AWS ECS 执行器

这是一个由 Amazon Elastic Container Service (ECS) 提供支持的 Airflow 执行器。 Airflow 调度执行的每个任务都在其自己的 ECS 容器中运行。此类执行器的一些优点包括

  1. 任务隔离:任何任务都不会成为其他任务的嘈杂邻居。 CPU、内存和磁盘等资源对每个单独的任务都是隔离的。 任何影响网络或导致整个容器失败的操作或故障只会影响在其中运行的单个任务。 任何单个用户都无法通过触发太多任务来使环境过载,因为没有共享工作器。

  2. 自定义环境:您可以构建不同的容器镜像,其中包含运行任务所需的特定依赖项(例如系统级依赖项)、二进制文件或数据。

  3. 经济高效:计算资源仅在 Airflow 任务本身的生命周期内存在。 这通过不需要始终准备好持久/长期运行的工作器来节省成本,这些工作器也需要维护和修补。

有关快速入门指南,请参阅此处,它将帮助您启动并运行基本配置。

以下部分提供了有关配置、提供的示例 Dockerfile 和日志记录的更多一般性详细信息。

配置选项

有许多配置选项可用,可以在“aws_ecs_executor”部分的 airflow.cfg 文件中直接设置,也可以使用 AIRFLOW__AWS_ECS_EXECUTOR__<OPTION_NAME> 格式通过环境变量设置,例如 AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME = "myEcsContainer"。 有关如何设置这些选项的更多信息,请参阅设置配置选项

注意

配置选项在运行 Airflow 组件(调度器、Web 服务器、ECS 任务容器等)的所有主机/环境中必须一致。 有关设置配置的更多详细信息,请参阅此处

如果发生冲突,则优先级从低到高的顺序为

  1. 加载具有默认值的选项的默认值。

  2. 加载通过 airflow.cfg 或环境变量明确提供的任何值。 这些将使用 Airflow 的配置优先级进行检查。

  3. 如果提供了 RUN_TASK_KWARGS 选项,则加载其中提供的任何值。

注意

exec_config 是一个可选参数,可以提供给操作符。 它是一个字典类型,在 ECS 执行器的上下文中,它表示一个 run_task_kwargs 配置,然后在上面 Airflow 配置中指定的 run_task_kwargs(如果存在)之上更新。 它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。 大致近似为:run_task_kwargs.update(exec_config)

必需的配置选项:

  • CLUSTER - Amazon ECS 集群的名称。 必填。

  • CONTAINER_NAME - 将用于通过 ECS 执行器执行 Airflow 任务的容器的名称。 应在 ECS 任务定义中指定容器。 必填。

  • REGION_NAME - 配置了 Amazon ECS 的 AWS 区域的名称。 必填。

可选的配置选项:

  • ASSIGN_PUBLIC_IP - 是否为 ECS 执行器启动的容器分配公共 IP 地址。 默认为“False”。

  • AWS_CONN_ID - ECS 执行器用于对 AWS ECS 进行 API 调用的 Airflow 连接(即凭据)。 默认为“aws_default”。

  • LAUNCH_TYPE - 启动类型可以是“FARGATE”或“EC2”。 默认为“FARGATE”。

  • PLATFORM_VERSION - 如果使用 FARGATE 启动类型,则 ECS 任务使用的平台版本。 默认为“LATEST”。

  • RUN_TASK_KWARGS - 包含要提供给 ECS run_task API 的参数的 JSON 字符串。

  • SECURITY_GROUPS - 与 ECS 任务关联的最多 5 个逗号分隔的安全组 ID。 默认为 VPC 默认值。

  • SUBNETS - 与 ECS 任务或服务关联的最多 16 个逗号分隔的子网 ID。 默认为 VPC 默认值。

  • TASK_DEFINITION - 要运行的 ECS 任务定义的系列和修订版(系列:修订版)或完整 ARN。 默认为最新的 ACTIVE 修订版。

  • MAX_RUN_TASK_ATTEMPTS - Ecs 执行器应尝试运行任务的最大次数。 这是指任务无法启动的情况(即 ECS API 故障、容器故障等)

  • CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 ECS 执行器的运行状况

有关可用选项的更详细说明,包括类型提示和示例,请参阅 Amazon 提供程序包中的 config_templates 文件夹。

注意

exec_config 是一个可选参数,可以提供给操作符。 它是一个字典类型,在 ECS 执行器的上下文中,它表示一个 run_task_kwargs 配置,然后在上面 Airflow 配置中指定的 run_task_kwargs(如果存在)之上更新。 它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。 大致近似为:run_task_kwargs.update(exec_config)

AWS ECS 执行器的 Dockerfile

可以在此处找到示例 Dockerfile,它创建了一个镜像,AWS ECS 可以使用该镜像在 Apache Airflow 中使用 AWS ECS 执行器运行 Airflow 任务。 该镜像支持 AWS CLI/API 集成,允许您在 Airflow 环境中与 AWS 服务交互。 它还包括从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。

先决条件

您的系统上必须安装了 Docker。 有关安装 Docker 的说明,请参阅此处

构建镜像

AWS CLI 将安装在镜像中,并且有多种方法可以将 AWS 身份验证信息传递给容器,因此有多种方法可以构建镜像。 本指南将介绍 2 种方法。

最安全的方法是使用 IAM 角色。 创建 ECS 任务定义时,您可以选择任务角色和任务执行角色。 任务执行角色是容器代理用来代表您发出 AWS API 请求的角色。 为了使用 ECS 执行器,此角色至少需要具有 AmazonECSTaskExecutionRolePolicy 以及 CloudWatchLogsFullAccess(或 CloudWatchLogsFullAccessV2)策略。 任务角色是容器用来发出 AWS API 请求的角色。 此角色需要的权限取决于正在运行的 DAG 中描述的任务。 如果您要通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。

要创建新的任务角色或任务执行角色,请按照以下步骤操作

  1. 导航到 AWS 控制台上的 IAM 页面,然后在左侧选项卡的“访问管理”下选择“角色”。

  2. 在“角色”页面上,单击右上角的“创建角色”。

  3. 在“受信任实体类型”下,选择“AWS 服务”。

  4. 在“用例”下选择“Elastic Container Service”,然后选择“Elastic Container Service 任务”作为特定用例。 单击“下一步”。

  5. 在“权限”页面中,选择角色需要的权限,具体取决于角色是任务角色还是任务执行角色。 选择所有必需的权限后,单击“下一步”。

  6. 输入新角色的名称和可选的描述。 查看受信任实体以及角色的权限。 根据需要添加任何标签,然后单击“创建角色”。

为 ECS 集群创建任务定义时(有关更多详细信息,请参阅设置指南),请选择新创建的适当任务角色和任务执行角色作为任务定义。

然后,您可以通过 cd 到包含 Dockerfile 的目录并运行以下命令来构建镜像

docker build -t my-airflow-image \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION .

注意:重要的是,镜像必须在相同的架构下构建和运行。 例如,对于 Apple Silicon 上的用户,您可能希望使用 docker buildx 指定架构

docker buildx build --platform=linux/amd64 -t my-airflow-image \
  --build-arg aws_default_region=YOUR_DEFAULT_REGION .

有关使用 docker buildx 的更多信息,请参阅此处

第二种方法是使用构建时参数(aws_access_key_idaws_secret_access_keyaws_default_regionaws_session_token)。

注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能存在安全漏洞。

要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg 选项。 例如

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN .

YOUR_ACCESS_KEYYOUR_SECRET_KEYYOUR_SESSION_TOKENYOUR_DEFAULT_REGION 替换为有效的 AWS 凭据。

基础镜像

Docker 镜像是基于 apache/airflow:latest 镜像构建的。 有关该镜像的更多信息,请参阅此处

重要提示:此镜像中的 Airflow 和 Python 版本必须与运行 Airflow 调度器进程(进而运行执行器)的主机/容器上的 Airflow 和 Python 版本一致。可以通过使用以下命令在本地运行容器来验证镜像的 Airflow 版本

docker run <image_name> version

类似地,可以使用以下命令验证镜像的 Python 版本

docker run <image_name> python --version

确保这些版本与运行 Airflow 调度器进程(以及 ECS 执行器)的主机/容器上的版本相匹配。可以从 Dockerhub 注册表下载具有特定 Python 版本的 Apache Airflow 镜像,并按 Python 版本 过滤标签。例如,标签 latest-python3.8 指定镜像将安装 Python 3.8。

加载 DAG

在 ECS 管理的容器上加载 DAG 有很多方法。此 Dockerfile 已预先配置了两种可能的方法:从本地文件夹复制或从 S3 存储桶下载。其他加载 DAG 的方法也是可行的。

从 S3 存储桶

要从 S3 存储桶加载 DAG,请取消注释 Dockerfile 中的入口点行,以将指定 S3 存储桶中的 DAG 同步到容器内的 /opt/airflow/dags 目录。如果要将 DAG 存储在 /opt/airflow/dags 以外的目录中,可以选择提供 container_dag_path 作为构建参数。

在 docker build 命令中添加 --build-arg s3_uri=YOUR_S3_URI。将 YOUR_S3_URI 替换为您的 S3 存储桶的 URI。确保您具有从存储桶读取的适当权限。

请注意,以下命令还将 AWS 凭证作为构建参数传递。

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN \
 --build-arg s3_uri=YOUR_S3_URI .

从本地文件夹

要从本地文件夹加载 DAG,请将您的 DAG 文件放在主机上 docker 构建上下文中的文件夹中,并使用 host_dag_path 构建参数提供文件夹的位置。默认情况下,DAG 将被复制到 /opt/airflow/dags,但这可以通过在 Docker 构建过程中传递 container_dag_path 构建时参数来更改

docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .

如果选择将 DAG 加载到 /opt/airflow/dags 以外的路径,则需要在 Airflow 配置中更新新路径。

安装 Python 依赖项

此 Dockerfile 支持通过 piprequirements.txt 文件安装 Python 依赖项。将您的 requirements.txt 文件放在与 Dockerfile 相同的目录中。如果它位于不同的位置,则可以使用 requirements_path 构建参数指定它。复制 requirements.txt 文件时,请记住 Docker 上下文。取消注释 Dockerfile 中将 requirements.txt 文件复制到容器的两行,并运行 pip install 以在容器上安装依赖项。

为 AWS ECS 执行器构建镜像

有关如何将您通过此自述文件创建的 Docker 镜像与 ECS 执行器一起使用的详细说明,请参见此处

日志记录

通过此执行器执行的 Airflow 任务在已配置 VPC 内的容器中运行。这意味着 Airflow Web 服务器无法直接访问日志,并且当任务完成后容器停止时,日志将永久丢失。

使用 ECS 执行器时,应采用远程日志记录来持久保存您的 Airflow 任务日志,并使其可从 Airflow Web 服务器查看。

配置远程日志记录

配置远程日志记录和几个受支持的目标有很多方法。可以在此处找到 Airflow 任务日志记录的概述。有关配置 S3 远程日志记录的说明,请参见此处,有关 Cloudwatch 远程日志记录的说明,请参见此处。在 ECS 执行器的上下文中,远程日志记录需要注意的一些重要事项

  • 应在运行 Airflow 的所有主机和容器上配置 Airflow 远程日志记录的配置选项。例如,Web 服务器需要此配置才能从远程位置获取日志,而 ECS 容器需要此配置才能将日志上传到远程位置。有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息,请参见此处

  • 可以通过多种方式将 Airflow 远程日志记录配置添加到容器中。一些示例包括但不限于

    • 在 Dockerfile 中直接作为环境变量导出(请参见上方的 Dockerfile 部分)

    • 更新 airflow.cfg 文件或在 Dockerfile 中复制/挂载/下载自定义 airflow.cfg

    • 在 ECS 任务定义中以纯文本形式添加,或通过Secrets/System Manager添加

    • 或者,使用ECS 任务环境文件

  • 您必须在容器中配置凭证才能与日志的远程服务(例如 S3、CloudWatch Logs 等)进行交互。这可以通过多种方式完成。一些示例包括但不限于

    • 将凭证直接导出到 Dockerfile 中(请参见上方的 Dockerfile 部分)

    • 配置 Airflow 连接并将其作为远程日志记录连接 ID提供(通过上面列出的任何方式或您喜欢的方式导出到容器中)。然后,Airflow 将专门使用这些凭证与您选择的远程日志记录目标进行交互。

注意

配置选项在运行 Airflow 组件(调度器、Web 服务器、ECS 任务容器等)的所有主机/环境中必须一致。 有关设置配置的更多详细信息,请参阅此处

ECS 任务日志记录

可以将 ECS 配置为使用 awslogs 日志驱动程序将日志信息发送到 ECS 任务本身的 CloudWatch Logs。这些日志将包括 Airflow 任务操作员日志记录以及在容器中运行的进程生命周期中发生的任何其他日志记录(在本例中为 Airflow CLI 命令 airflow tasks run ...)。这有助于调试远程日志记录问题或测试远程日志记录配置。有关启用此日志记录的信息,请参见此处

注意:这些日志将无法从 Airflow Web 服务器 UI 查看。

性能和调优

虽然 ECS 执行器由于容器启动时间而为每个 Airflow 任务执行增加了大约 50-60 秒的延迟,但它允许更高程度的并行性和隔离性。我们已经使用超过 1,000 个并行调度的任务测试了此执行器,并观察到最多可以同时并行运行 500 个任务。500 个任务的限制符合ECS 服务配额

在运行此执行器以及通常在较大规模上运行 Airflow 时,需要考虑一些配置选项。以下许多配置将限制可以并发运行的任务数或调度器的性能。

为 Apache Airflow 设置 ECS 执行器

使 ECS 执行器在 Apache Airflow 中工作涉及 3 个步骤

  1. 创建一个 Airflow 和在 ECS 中运行的任务可以连接到的数据库。

  2. 创建和配置一个可以从 Airflow 运行任务的 ECS 集群。

  3. 将 Airflow 配置为使用 ECS 执行器和数据库。

选择数据库后端有不同的选项。有关 Airflow 支持的不同选项的更多信息,请参见此处。以下指南将说明如何在 AWS 上设置 PostgreSQL RDS 实例。该指南还将介绍如何设置 ECS 集群。ECS 执行器支持各种启动类型,但本指南将说明如何设置 ECS Fargate 集群。

为 AWS ECS 执行器设置 RDS 数据库实例

创建 RDS 数据库实例

  1. 登录到您的 AWS 管理控制台并导航到 RDS 服务。

  2. 单击“创建数据库”以开始创建新的 RDS 实例。

  3. 选择“标准创建”选项,然后选择 PostreSQL。

  4. 选择适当的模板、可用性和持久性。

    • 注意:在撰写本文时,“多可用区数据库**集群**”选项不支持设置数据库名称,这是下面的必需步骤。

  5. 设置数据库实例名称、用户名和密码。

  6. 选择实例配置和存储参数。

  7. 在“连接”部分中,选择“不连接到 EC2 计算资源”。

  8. 选择或创建一个 VPC 和子网,并允许公共访问数据库。选择或创建安全组并选择可用区。

  9. 打开“其他配置”选项卡,并将数据库名称设置为 airflow_db

  10. 根据需要选择其他设置,然后单击“创建数据库”以创建数据库。

测试连接

为了能够连接到新的 RDS 实例,您需要允许从您的 IP 地址到数据库的入站流量。

  1. 在 RDS 实例的“连接和安全”选项卡的“安全”标题下,找到指向您的新 RDS 数据库实例的 VPC 安全组的链接。

  2. 创建一个入站规则,允许来自您的 IP 地址在 TCP 端口 5432(PostgreSQL)上的流量。

  3. 修改安全组后,确认您可以连接到数据库。这将需要安装 psql。有关安装 psql 的说明,请参见此处

**注意**:在测试连接之前,请确保您的数据库状态为“可用”。

psql -h <endpoint> -p 5432 -U <username> <db_name>

端点可以在“连接和安全”选项卡上找到,用户名(和密码)是创建数据库时使用的凭据。

db_name 应该是 airflow_db(除非在创建数据库时使用了不同的名称)。

如果连接成功,系统将提示您输入密码。

使用 Fargate 创建 ECS 集群和任务定义

为了为将与 Apache Airflow 配合使用的 ECS 集群创建任务定义,您需要一个配置正确的 Docker 映像。有关如何执行此操作的说明,请参见Dockerfile 部分。

构建映像后,需要将其放入 ECS 可以拉取的存储库中。有多种方法可以实现这一点。本指南将介绍如何使用 Amazon Elastic Container Registry (ECR) 执行此操作。

创建 ECR 存储库

  1. 登录到您的 AWS 管理控制台并导航到 ECR 服务。

  2. 单击“创建存储库”。

  3. 命名存储库并根据需要填写其他信息。

  4. 单击“创建存储库”。

  5. 创建存储库后,单击该存储库。单击右上角的“查看推送命令”按钮。

  6. 按照说明推送 Docker 映像,并根据需要替换映像名称。推送映像后,刷新页面以确保映像已上传。

创建 ECS 集群

  1. 登录到您的 AWS 管理控制台并导航到 Amazon Elastic Container Service。

  2. 单击“集群”,然后单击“创建集群”。

  3. 确保在“基础设施”下选择了“AWS Fargate(无服务器)”。

  4. 根据需要选择其他选项,然后单击“创建”以创建集群。

创建任务定义

  1. 单击左侧栏上的“任务定义”,然后单击“创建新任务定义”。

  2. 选择“任务定义系列”名称。为“启动类型”选择“AWS Fargate”。

  3. 选择或创建“任务角色”和“任务执行角色”,并确保这些角色具有完成各自任务所需的权限。您可以选择创建一个新的任务执行角色,该角色将具有任务运行所需的基本最低权限。

  4. 为容器选择一个名称,并使用上一节中推送的映像的映像 URI。确保正在使用的角色具有拉取映像所需的权限。

  5. 将以下环境变量添加到容器中

  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN,其值为 PostgreSQL 连接字符串,格式如下,使用上面数据库部分中设置的值

postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
  • AIRFLOW__ECS_EXECUTOR__SECURITY_GROUPS,其值为与用于 RDS 实例的 VPC 关联的安全组 ID 的逗号分隔列表。

  • AIRFLOW__ECS_EXECUTOR__SUBNETS,其值为与 RDS 实例关联的子网的子网 ID 的逗号分隔列表。

  1. 通常为 Airflow 添加其他配置(请参见此处)、ECS 执行器(请参见此处)或远程日志记录(请参见此处)。请注意,应在整个 Airflow 环境中进行任何配置更改,以保持配置一致。

  2. 单击“创建”。

允许 ECS 容器访问 RDS 数据库

最后一步是为 ECS 容器配置对数据库的访问权限。可以使用许多不同的网络配置,但一种可能的方法是

  1. 登录到您的 AWS 管理控制台并导航到 VPC 仪表板。

  2. 在左侧的“安全”标题下,单击“安全组”。

  3. 选择与您的 RDS 实例关联的安全组,然后单击“编辑入站规则”。

  4. 添加一个新规则,允许将 PostgreSQL 类型的流量发送到与 Ecs 集群关联的子网的 CIDR。

配置 Airflow

要将 Airflow 配置为利用 ECS 执行器并利用我们已设置的资源,请创建一个脚本(例如,ecs_executor_config.sh),其内容如下

export AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor'

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>

export AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME=<executor-region>

export AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER=<ecs-cluster-name>

export AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME=<ecs-container-name>

export AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION=<task-definition-name>

export AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE='FARGATE'

export AIRFLOW__AWS_ECS_EXECUTOR__PLATFORM_VERSION='LATEST'

export AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP='True'

export AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS=<security-group-id-for-rds>

export AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS=<subnet-id-for-rds>

此脚本应在运行 Airflow 调度程序和 Web 服务器的主机上运行,然后再启动这些进程。

该脚本设置环境变量,将 Airflow 配置为使用 Batch 执行器,并提供任务执行所需的信息。任何其他配置更改(例如远程日志记录)都应添加到此示例脚本中,以在整个 Airflow 环境中保持配置一致。

初始化 Airflow 数据库

Airflow 数据库需要先初始化才能使用,并且需要添加一个用户才能登录。以下命令添加一个管理员用户(如果数据库尚未初始化,该命令还将初始化数据库)

airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin

此条目有帮助吗?