边缘执行器¶
EdgeExecutor
是一个选项,如果您想将任务分发到分布在不同位置的工作节点上。如果需要,您也可以将其与其他执行器并行使用。修改您的 airflow.cfg
文件,将 executor 参数指向 EdgeExecutor
并提供相关设置。
Edge 执行器的配置参数可以在 Edge provider 的 配置参考 中找到。
以下是您的工作节点的几个强制性要求
airflow
需要安装,并且 Airflow CLI 需要在 PATH 中Airflow 配置设置在整个集群和边缘站点上应保持一致
在边缘工作节点上执行的操作符需要满足其在该环境中的依赖项。请查阅各自 provider 包的文档
工作节点需要访问其
DAGS_FOLDER
,您需要自行同步文件系统。一种常见的设置是将DAGS_FOLDER
存储在 Git 仓库中,并使用 Chef、Puppet、Ansible 或您在环境中用于配置机器的任何工具跨机器同步。如果您的所有机器都有一个共同的挂载点,在那里共享您的 pipeline 文件应该也行得通
边缘工作节点运行所需的最低配置如下
Section
[core]
executor
: Executor 必须设置为或添加为airflow.providers.edge3.executors.EdgeExecutor
internal_api_secret_key
: 必须在 webserver 和边缘工作节点组件上设置一个加密密钥作为共享密钥来验证流量。它应该是一个随机字符串,类似于 fernet key(但最好不是同一个)。
Section
[edge]
api_enabled
: 必须设置为 true。默认情况下有意禁用此项,以免暴露端点。这是工作节点连接的端点。在未来的版本中,可以启动一个专用的 API 服务器。api_url
: 必须设置为暴露 web 端点的 URL
要启动一个工作节点,您需要设置 Airflow 并启动 worker 子命令
airflow edge worker
您的工作节点应该在任务分配给它后立即开始接收任务。要停止机器上运行的工作节点,您可以使用
airflow edge stop
它将尝试通过向主进程发送 SIGINT
信号来优雅地停止工作节点,并等待所有正在运行的任务完成。
如果您想监控远程活动和工作节点,请使用 provider 包中包含的 UI 插件,将其安装在 webserver 上,并使用“Admin” - “Edge Worker Hosts”和“Edge Worker Jobs”页面。(注意:在撰写本文时,该插件尚未移植到 Airflow 3.0 web UI)
如果您想通过 CLI 检查工作节点的状态,可以使用命令
airflow edge status
一些注意事项
任务会消耗资源。请确保您的工作节点有足够的资源来运行
worker_concurrency
任务请确保任务的
pool_slots
与工作节点的worker_concurrency
匹配队列名称限制为 256 个字符
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理。
边缘执行器的当前限制¶
如果您计划在当前阶段使用边缘执行器/工作节点,您需要确保在使用前进行适当测试。以下功能已初步测试并正在运行
一些核心操作符
BashOperator
PythonOperator
@task
装饰器@task.branch
装饰器@task.virtualenv
装饰器@task.bash
装饰器动态映射任务
XCom 读/写
变量和连接访问
Setup 和 Teardown 任务
一些已知限制
需要访问数据库的任务将失败 - 远程站点无法连接数据库(这是 Airflow 3.0 中的默认设置)
这也意味着无法通过 Python 直接使用某些 Airflow API(例如 airflow.models.*)
日志上传仅在使用单个 web server 实例或需要共享日志文件卷时才有效。日志以块的形式上传并通过 API 传输。如果您使用多个 webserver 但没有共享日志卷,则日志将分散在不同的 webserver 实例上。
性能:尚未进行广泛的性能评估和扩展测试。边缘执行器包已针对稳定性进行了优化。这将在未来的版本中逐步改进。截至目前,有设置报告称在约 50 个工作节点下运行稳定。请注意,执行的任务需要更多的 webserver API 容量。
架构¶
Airflow 由多个组件组成
工作节点(Workers) - 执行分配的任务 - 大多数标准设置都有本地或集中式工作节点,例如通过 Celery
边缘工作节点(Edge Workers) - 特殊工作节点,通过 HTTP 拉取任务,此功能由本 provider 包提供
调度器(Scheduler) - 负责将必要的任务添加到队列
Web 服务器(Web server) - HTTP 服务器提供 DAG/任务状态信息访问
数据库(Database) - 包含任务、DAG、变量、连接等的状态信息
队列¶
使用 EdgeExecutor 时,可以指定任务发送到的工作节点。queue
是 BaseOperator 的一个属性,因此任何任务都可以分配给任何队列。环境的默认队列在 airflow.cfg
的 operators -> default_queue
中定义。这定义了未指定时任务分配到的队列,以及 Airflow 工作节点启动时监听的队列。
工作节点可以监听一个或多个任务队列。启动工作节点时(使用命令 airflow edge worker
),可以指定一组逗号分隔(无空格)的队列名称(例如 airflow edge worker -q remote,wisconsin_site
)。然后该工作节点将只接收指定队列中的任务。
这在您需要专用工作节点时非常有用,无论是从资源角度(例如,对于非常轻量级的任务,一个工作节点可以处理数千个任务而没有问题),还是从环境角度(您希望工作节点在具备所需基础设施的特定位置运行)。
并发槽处理¶
一些任务可能比其他任务需要更多资源,为了处理这些用例,边缘工作节点支持并发槽处理。其背后的逻辑与池槽(pool slot)功能相同,请参阅 池(Pools)。边缘工作节点复用 task_instance 的 pool_slots
值,以尽量减少任务实例参数的数量。pool_slots
值与在启动工作节点时定义的 worker_concurrency
值协同工作。如果一个任务需要更多资源,可以增加 pool_slots
的值以减少并行运行的任务数量。该值可用于阻止其他任务在同一工作节点上并行执行。如果 pool_slots
为 2 且 worker_concurrency
为 3,则执行此任务的工作节点只能并行执行一个 pool_slots
为 1 的任务。如果未为任务定义 pool_slots
,则默认值为 1。pool_slots
值仅支持整数值。
以下是为一个任务设置 pool_slots
的示例
import os
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME
with DAG(
dag_id="example_edge_pool_slots",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task(executor="EdgeExecutor", pool_slots=2)
def task_with_template():
print_stuff()
task_with_template()
工作节点维护¶
有时基础设施需要维护。边缘工作节点提供了维护模式,以便 - 停止接受新任务 - 优雅地排空所有正在进行的任务
工作节点状态可以在 web UI 的“Admin” - “Edge Worker Hosts”页面中检查。

注意
截至撰写本文时,用于查看边缘作业和管理工作节点的 web UI 尚未移植到 Airflow 3.0
工作节点维护也可以通过 CLI 命令触发
airflow edge maintenance --comments "Some comments for the maintenance" on
这将阻止工作节点接受新任务,并会完成正在运行的任务。如果您添加命令行参数 --wait
,CLI 将等待所有正在运行的任务完成后才返回。
如果您想在等待维护期间了解工作节点的状态,可以使用命令 .. code-block:: bash
airflow edge status
这将以 JSON 格式显示工作节点的状态以及在其上运行的任务。
状态和维护注释也将显示在 web UI 的“Admin” - “Edge Worker Hosts”页面中。

工作节点可以通过命令启动以获取新任务
airflow edge maintenance off
这将再次启动工作节点,它将再次开始接受任务。
从 MVP 到发布就绪的功能待办事项¶
当前版本的 EdgeExecutor 是一个 MVP(Minimum Viable Product,最小可行产品)。它将随着时间的推移而成熟。
以下功能已知缺失,将逐步实现
每个工作节点的 API token:目前只有全局 API token 可用
边缘工作节点插件
队列/每个队列的作业概览
允许将边缘工作节点 REST API 与 webserver 分开启动
添加关于如何设置额外工作节点的一些提示
边缘工作节点 CLI
使用 WebSockets 代替 HTTP 调用进行通信
如果使用外部日志服务,也将日志发送到 TaskFileHandler
集成到遥测系统,发送远程站点的指标
通过心跳发布系统指标(CPU、磁盘空间、内存、负载)
更宽松,例如在补丁版本上。目前需要版本完全匹配(在当前状态下,如果版本不匹配,工作节点将在作业完成后优雅关闭,不会启动新作业)
测试
Github 中的集成测试
在 Windows 上测试/支持边缘工作节点
扩展测试 - 检查并定义工作节点/作业的边界。目前已知可扩展到 50 个工作节点的范围。这并非硬性限制,仅为报告的经验。
负载测试 - 扩展执行和代码优化的影响
任务执行期间的增量日志可以在 webserver 上无需共享日志磁盘即可提供
文档
详细描述部署选项和调优细节
提供将边缘组件作为服务(systemd)安装的脚本和指南
扩展 Helm-Chart 以提供所需支持
提供工作节点设置的 docker compose 示例