Celery 执行器

注意

自 Airflow 2.7.0 起,使用此执行器需要安装 celery 提供程序包。可通过安装 apache-airflow-providers-celery>=3.3.0,或使用 celery extras 安装 Airflow:pip install 'apache-airflow[celery]'

CeleryExecutor 是扩展 worker 数量的一种方式。要使其工作,需要配置 Celery 后端(**RabbitMQ**、**Redis**、**Redis Sentinel** 等),安装所需依赖(如 librabbitmqredis 等),并在 airflow.cfg 中将 executor 参数指向 CeleryExecutor,同时提供相应的 Celery 配置。

有关设置 Celery broker 的更多信息,请参阅Celery 文档(关于此主题)

Celery 执行器的配置参数可在 Celery 提供程序的配置参考中找到。

以下是对 worker 的若干关键要求

  • airflow 必须已安装,并且 CLI 必须在系统路径中

  • Airflow 的配置设置应在整个集群中保持一致

  • 在 worker 上执行的 Operators 必须在该环境中满足其依赖。例如,使用 HiveOperator 时,需要在该机器上安装 Hive CLI;使用 MySqlOperator 时,需要在 PYTHONPATH 中可用相应的 Python 库。

  • worker 必须能够访问其 DAGS_FOLDER,并且需要自行同步文件系统。常见做法是将 DAGS_FOLDER 存入 Git 仓库,并使用 Chef、Puppet、Ansible 等工具在机器之间同步;如果所有机器拥有统一的挂载点,也可以直接共享管道文件。

要启动 worker,需要先配置好 Airflow 并运行 worker 子命令

airflow celery worker

worker 应在任务被触发后立即开始抢任务。要停止运行在某台机器上的 worker,可使用

airflow celery stop

这将尝试通过向主 Celery 进程发送 SIGTERM 信号来优雅地停止 worker,符合Celery 文档的建议。

请注意,你也可以运行Celery Flower——一个基于 Celery 的 Web UI,用于监控 worker。可以使用快捷命令启动 Flower Web 服务器。

airflow celery flower

请确保系统已安装 flower Python 库。推荐的做法是安装 Airflow 的 celery 捆绑包。

pip install 'apache-airflow[celery]'

一些注意事项

  • 确保使用基于数据库的结果后端

  • 确保在 [celery_broker_transport_options] 中设置的可见性超时时间超过最长运行任务的 ETA

  • 如果使用 Redis Sentinel 作为 broker 且 Redis 服务器已设置密码,请在 [celery_broker_transport_options] 部分指定 Redis 服务器的密码

  • 请在 [worker_umask] 中设置 umask,以确定 worker 创建新文件的权限

  • 任务会消耗资源。请确保你的 worker 拥有足够的资源来运行 worker_concurrency 个任务

  • 队列名称长度受限于 256 个字符,但每个 broker 后端可能还有其他限制

详情请参阅模块管理,了解 Python 与 Airflow 如何管理模块。

架构

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="Dag files"] subgraph cluster_queue { label="Celery"; {rank = same; queue_broker; queue_result_backend} queue_broker[label="Queue broker"] queue_result_backend[label="Result backend"] } web->workers[label="1"] web->dag[label="2"] web->database[label="3"] workers->dag[label="4"] workers->database[label="5"] workers->queue_result_backend[label="6"] workers->queue_broker[label="7"] scheduler->dag[label="8"] scheduler->database[label="9"] scheduler->queue_result_backend[label="10"] scheduler->queue_broker[label="11"] } }

Airflow 由多个组件构成

  • Workers —— 执行分配的任务

  • Scheduler —— 负责将必要的任务加入队列

  • Web server —— HTTP 服务器,提供对 Dag/任务状态信息的访问

  • Database —— 保存任务、Dag、变量、连接等状态信息

  • Celery —— 队列机制

请注意,Celery 的队列由两个组件组成

  • Broker —— 存储待执行的命令

  • Result backend —— 存储已完成命令的状态

这些组件在多个环节相互通信

  • [1] Web serverWorkers —— 获取任务执行日志

  • [2] Web serverDag files —— 显示 Dag 结构

  • [3] Web serverDatabase —— 获取任务状态

  • [4] WorkersDag files —— 显示 Dag 结构并执行任务

  • [5] WorkersDatabase —— 获取并存储连接配置、变量和 XCOM 信息

  • [6] WorkersCelery’s result backend —— 保存任务状态

  • [7] WorkersCelery’s broker —— 存储待执行的命令

  • [8] SchedulerDag files —— 显示 Dag 结构并执行任务

  • [9] SchedulerDatabase —— 存储 Dag 运行及相关任务

  • [10] SchedulerCelery’s result backend —— 获取已完成任务的状态信息

  • [11] SchedulerCelery’s broker —— 将待执行的命令放入队列

任务执行过程

_images/run_task_on_celery_executor.png

序列图 - 任务执行过程

最初会运行两个进程

  • SchedulerProcess —— 处理任务并使用 CeleryExecutor 运行

  • WorkerProcess —— 监听队列,等待新任务出现

  • WorkerChildProcess —— 等待新任务

还有两个数据库可用

  • QueueBroker

  • ResultBackend

在此过程中会创建两个新进程

  • LocalTaskJobProcess —— 其逻辑由 LocalTaskJob 描述。它监控 RawTaskProcess,并通过 TaskRunner 启动新进程。

  • RawTaskProcess —— 执行用户代码的进程,例如 execute()

[1] SchedulerProcess 处理任务,当发现需要执行的任务时,将其发送至 QueueBroker
[2] SchedulerProcess 还会定期查询 ResultBackend 以获取任务状态。
[3] QueueBroker 认识到任务后,将任务信息发送给某个 WorkerProcess
[4] WorkerProcess 将单个任务分配给一个 WorkerChildProcess
[5] WorkerChildProcess 执行任务处理函数——execute_command(),并创建新的进程——LocalTaskJobProcess
[6] LocalTaskJobProcess 的逻辑由 LocalTaskJob 类描述。它使用 TaskRunner 启动新进程。
[7][8] 当 RawTaskProcessLocalTaskJobProcess 完成工作后即被停止。
[10][12] WorkerChildProcess 向主进程 WorkerProcess 通知任务结束以及后续任务的可用性。
[11] WorkerProcess 将状态信息保存至 ResultBackend
[13] 当 SchedulerProcess 再次查询 ResultBackend 状态时,会获取到任务的状态信息。

队列

使用 CeleryExecutor 时,可指定任务发送到的 Celery 队列。queue 是 BaseOperator 的属性,任何任务均可指派到任意队列。环境的默认队列在 airflow.cfgoperators -> default_queue 中定义。该设置决定未显式指定时任务所属的队列,以及启动时 Airflow worker 所监听的队列。

Worker 可以监听一个或多个任务队列。启动 worker 时(使用命令 airflow celery worker),可提供以逗号分隔且不带空格的队列名称列表(例如 airflow celery worker -q spark,quark)。该 worker 只会抢取绑定到指定队列的任务。

如果需要专用的 worker,这在资源层面(例如非常轻量的任务,一个 worker 能处理成千上万的任务)或环境层面(例如希望在 Spark 集群内部运行 worker,因其需要特定的环境和安全权限)都非常有用。

此条目是否有帮助?