边缘执行器

EdgeExecutor 是一个选项,如果您想将任务分发到分布在不同位置的工作节点上。如果需要,您也可以将其与其他执行器并行使用。修改您的 airflow.cfg 文件,将 executor 参数指向 EdgeExecutor 并提供相关设置。

Edge 执行器的配置参数可以在 Edge provider 的 配置参考 中找到。

以下是您的工作节点的几个强制性要求

  • airflow 需要安装,并且 Airflow CLI 需要在 PATH 中

  • Airflow 配置设置在整个集群和边缘站点上应保持一致

  • 在边缘工作节点上执行的操作符需要满足其在该环境中的依赖项。请查阅各自 provider 包的文档

  • 工作节点需要访问其 DAGS_FOLDER,您需要自行同步文件系统。一种常见的设置是将 DAGS_FOLDER 存储在 Git 仓库中,并使用 Chef、Puppet、Ansible 或您在环境中用于配置机器的任何工具跨机器同步。如果您的所有机器都有一个共同的挂载点,在那里共享您的 pipeline 文件应该也行得通

边缘工作节点运行所需的最低配置如下

  • Section [core]

    • executor: Executor 必须设置为或添加为 airflow.providers.edge3.executors.EdgeExecutor

    • internal_api_secret_key: 必须在 webserver 和边缘工作节点组件上设置一个加密密钥作为共享密钥来验证流量。它应该是一个随机字符串,类似于 fernet key(但最好不是同一个)。

  • Section [edge]

    • api_enabled: 必须设置为 true。默认情况下有意禁用此项,以免暴露端点。这是工作节点连接的端点。在未来的版本中,可以启动一个专用的 API 服务器。

    • api_url: 必须设置为暴露 web 端点的 URL

要启动一个工作节点,您需要设置 Airflow 并启动 worker 子命令

airflow edge worker

您的工作节点应该在任务分配给它后立即开始接收任务。要停止机器上运行的工作节点,您可以使用

airflow edge stop

它将尝试通过向主进程发送 SIGINT 信号来优雅地停止工作节点,并等待所有正在运行的任务完成。

如果您想监控远程活动和工作节点,请使用 provider 包中包含的 UI 插件,将其安装在 webserver 上,并使用“Admin” - “Edge Worker Hosts”和“Edge Worker Jobs”页面。(注意:在撰写本文时,该插件尚未移植到 Airflow 3.0 web UI)

如果您想通过 CLI 检查工作节点的状态,可以使用命令

airflow edge status

一些注意事项

  • 任务会消耗资源。请确保您的工作节点有足够的资源来运行 worker_concurrency 任务

  • 请确保任务的 pool_slots 与工作节点的 worker_concurrency 匹配

  • 队列名称限制为 256 个字符

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

边缘执行器的当前限制

如果您计划在当前阶段使用边缘执行器/工作节点,您需要确保在使用前进行适当测试。以下功能已初步测试并正在运行

  • 一些核心操作符

    • BashOperator

    • PythonOperator

    • @task 装饰器

    • @task.branch 装饰器

    • @task.virtualenv 装饰器

    • @task.bash 装饰器

    • 动态映射任务

    • XCom 读/写

    • 变量和连接访问

    • Setup 和 Teardown 任务

  • 一些已知限制

    • 需要访问数据库的任务将失败 - 远程站点无法连接数据库(这是 Airflow 3.0 中的默认设置)

    • 这也意味着无法通过 Python 直接使用某些 Airflow API(例如 airflow.models.*)

    • 日志上传仅在使用单个 web server 实例或需要共享日志文件卷时才有效。日志以块的形式上传并通过 API 传输。如果您使用多个 webserver 但没有共享日志卷,则日志将分散在不同的 webserver 实例上。

    • 性能:尚未进行广泛的性能评估和扩展测试。边缘执行器包已针对稳定性进行了优化。这将在未来的版本中逐步改进。截至目前,有设置报告称在约 50 个工作节点下运行稳定。请注意,执行的任务需要更多的 webserver API 容量。

架构

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="(Central) Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="DAG files"] web->workers web->database workers->dag workers->database scheduler->dag scheduler->database } subgraph edge_worker_subgraph { label="Edge site"; edge_worker[label="Edge Worker"] edge_dag[label="DAG files (Remote)"] edge_worker->edge_dag } edge_worker->web[label="HTTP(s)"] }

Airflow 由多个组件组成

  • 工作节点(Workers) - 执行分配的任务 - 大多数标准设置都有本地或集中式工作节点,例如通过 Celery

  • 边缘工作节点(Edge Workers) - 特殊工作节点,通过 HTTP 拉取任务,此功能由本 provider 包提供

  • 调度器(Scheduler) - 负责将必要的任务添加到队列

  • Web 服务器(Web server) - HTTP 服务器提供 DAG/任务状态信息访问

  • 数据库(Database) - 包含任务、DAG、变量、连接等的状态信息

队列

使用 EdgeExecutor 时,可以指定任务发送到的工作节点。queue 是 BaseOperator 的一个属性,因此任何任务都可以分配给任何队列。环境的默认队列在 airflow.cfgoperators -> default_queue 中定义。这定义了未指定时任务分配到的队列,以及 Airflow 工作节点启动时监听的队列。

工作节点可以监听一个或多个任务队列。启动工作节点时(使用命令 airflow edge worker),可以指定一组逗号分隔(无空格)的队列名称(例如 airflow edge worker -q remote,wisconsin_site)。然后该工作节点将只接收指定队列中的任务。

这在您需要专用工作节点时非常有用,无论是从资源角度(例如,对于非常轻量级的任务,一个工作节点可以处理数千个任务而没有问题),还是从环境角度(您希望工作节点在具备所需基础设施的特定位置运行)。

并发槽处理

一些任务可能比其他任务需要更多资源,为了处理这些用例,边缘工作节点支持并发槽处理。其背后的逻辑与池槽(pool slot)功能相同,请参阅 池(Pools)。边缘工作节点复用 task_instance 的 pool_slots 值,以尽量减少任务实例参数的数量。pool_slots 值与在启动工作节点时定义的 worker_concurrency 值协同工作。如果一个任务需要更多资源,可以增加 pool_slots 的值以减少并行运行的任务数量。该值可用于阻止其他任务在同一工作节点上并行执行。如果 pool_slots 为 2 且 worker_concurrency 为 3,则执行此任务的工作节点只能并行执行一个 pool_slots 为 1 的任务。如果未为任务定义 pool_slots,则默认值为 1。pool_slots 值仅支持整数值。

以下是为一个任务设置 pool_slots 的示例

import os

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME

with DAG(
    dag_id="example_edge_pool_slots",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task(executor="EdgeExecutor", pool_slots=2)
    def task_with_template():
        print_stuff()

    task_with_template()

工作节点维护

有时基础设施需要维护。边缘工作节点提供了维护模式,以便 - 停止接受新任务 - 优雅地排空所有正在进行的任务

工作节点状态可以在 web UI 的“Admin” - “Edge Worker Hosts”页面中检查。

_images/worker_hosts.png

注意

截至撰写本文时,用于查看边缘作业和管理工作节点的 web UI 尚未移植到 Airflow 3.0

工作节点维护也可以通过 CLI 命令触发

airflow edge maintenance --comments "Some comments for the maintenance" on

这将阻止工作节点接受新任务,并会完成正在运行的任务。如果您添加命令行参数 --wait,CLI 将等待所有正在运行的任务完成后才返回。

如果您想在等待维护期间了解工作节点的状态,可以使用命令 .. code-block:: bash

airflow edge status

这将以 JSON 格式显示工作节点的状态以及在其上运行的任务。

状态和维护注释也将显示在 web UI 的“Admin” - “Edge Worker Hosts”页面中。

_images/worker_maintenance.png

工作节点可以通过命令启动以获取新任务

airflow edge maintenance off

这将再次启动工作节点,它将再次开始接受任务。

从 MVP 到发布就绪的功能待办事项

当前版本的 EdgeExecutor 是一个 MVP(Minimum Viable Product,最小可行产品)。它将随着时间的推移而成熟。

以下功能已知缺失,将逐步实现

  • 每个工作节点的 API token:目前只有全局 API token 可用

  • 边缘工作节点插件

    • 队列/每个队列的作业概览

    • 允许将边缘工作节点 REST API 与 webserver 分开启动

    • 添加关于如何设置额外工作节点的一些提示

  • 边缘工作节点 CLI

    • 使用 WebSockets 代替 HTTP 调用进行通信

    • 如果使用外部日志服务,也将日志发送到 TaskFileHandler

    • 集成到遥测系统,发送远程站点的指标

    • 通过心跳发布系统指标(CPU、磁盘空间、内存、负载)

    • 更宽松,例如在补丁版本上。目前需要版本完全匹配(在当前状态下,如果版本不匹配,工作节点将在作业完成后优雅关闭,不会启动新作业)

  • 测试

    • Github 中的集成测试

    • 在 Windows 上测试/支持边缘工作节点

  • 扩展测试 - 检查并定义工作节点/作业的边界。目前已知可扩展到 50 个工作节点的范围。这并非硬性限制,仅为报告的经验。

  • 负载测试 - 扩展执行和代码优化的影响

  • 任务执行期间的增量日志可以在 webserver 上无需共享日志磁盘即可提供

  • 文档

    • 详细描述部署选项和调优细节

    • 提供将边缘组件作为服务(systemd)安装的脚本和指南

    • 扩展 Helm-Chart 以提供所需支持

    • 提供工作节点设置的 docker compose 示例

此条目是否有帮助?