Celery 执行器¶
注意
自 Airflow 2.7.0 起,使用此执行器需要安装 celery 提供程序包。可通过安装 apache-airflow-providers-celery>=3.3.0,或使用 celery extras 安装 Airflow:pip install 'apache-airflow[celery]'。
CeleryExecutor 是扩展 worker 数量的一种方式。要使其工作,需要配置 Celery 后端(**RabbitMQ**、**Redis**、**Redis Sentinel** 等),安装所需依赖(如 librabbitmq、redis 等),并在 airflow.cfg 中将 executor 参数指向 CeleryExecutor,同时提供相应的 Celery 配置。
有关设置 Celery broker 的更多信息,请参阅Celery 文档(关于此主题)。
Celery 执行器的配置参数可在 Celery 提供程序的配置参考中找到。
以下是对 worker 的若干关键要求
airflow必须已安装,并且 CLI 必须在系统路径中Airflow 的配置设置应在整个集群中保持一致
在 worker 上执行的 Operators 必须在该环境中满足其依赖。例如,使用
HiveOperator时,需要在该机器上安装 Hive CLI;使用MySqlOperator时,需要在PYTHONPATH中可用相应的 Python 库。worker 必须能够访问其
DAGS_FOLDER,并且需要自行同步文件系统。常见做法是将DAGS_FOLDER存入 Git 仓库,并使用 Chef、Puppet、Ansible 等工具在机器之间同步;如果所有机器拥有统一的挂载点,也可以直接共享管道文件。
要启动 worker,需要先配置好 Airflow 并运行 worker 子命令
airflow celery worker
worker 应在任务被触发后立即开始抢任务。要停止运行在某台机器上的 worker,可使用
airflow celery stop
这将尝试通过向主 Celery 进程发送 SIGTERM 信号来优雅地停止 worker,符合Celery 文档的建议。
请注意,你也可以运行Celery Flower——一个基于 Celery 的 Web UI,用于监控 worker。可以使用快捷命令启动 Flower Web 服务器。
airflow celery flower
请确保系统已安装 flower Python 库。推荐的做法是安装 Airflow 的 celery 捆绑包。
pip install 'apache-airflow[celery]'
一些注意事项
确保使用基于数据库的结果后端
确保在
[celery_broker_transport_options]中设置的可见性超时时间超过最长运行任务的 ETA如果使用 Redis Sentinel 作为 broker 且 Redis 服务器已设置密码,请在
[celery_broker_transport_options]部分指定 Redis 服务器的密码请在
[worker_umask]中设置 umask,以确定 worker 创建新文件的权限任务会消耗资源。请确保你的 worker 拥有足够的资源来运行
worker_concurrency个任务队列名称长度受限于 256 个字符,但每个 broker 后端可能还有其他限制
详情请参阅模块管理,了解 Python 与 Airflow 如何管理模块。
架构¶
Airflow 由多个组件构成
Workers —— 执行分配的任务
Scheduler —— 负责将必要的任务加入队列
Web server —— HTTP 服务器,提供对 Dag/任务状态信息的访问
Database —— 保存任务、Dag、变量、连接等状态信息
Celery —— 队列机制
请注意,Celery 的队列由两个组件组成
Broker —— 存储待执行的命令
Result backend —— 存储已完成命令的状态
这些组件在多个环节相互通信
[1] Web server → Workers —— 获取任务执行日志
[2] Web server → Dag files —— 显示 Dag 结构
[3] Web server → Database —— 获取任务状态
[4] Workers → Dag files —— 显示 Dag 结构并执行任务
[5] Workers → Database —— 获取并存储连接配置、变量和 XCOM 信息
[6] Workers → Celery’s result backend —— 保存任务状态
[7] Workers → Celery’s broker —— 存储待执行的命令
[8] Scheduler → Dag files —— 显示 Dag 结构并执行任务
[9] Scheduler → Database —— 存储 Dag 运行及相关任务
[10] Scheduler → Celery’s result backend —— 获取已完成任务的状态信息
[11] Scheduler → Celery’s broker —— 将待执行的命令放入队列
任务执行过程¶
序列图 - 任务执行过程¶
最初会运行两个进程
SchedulerProcess —— 处理任务并使用 CeleryExecutor 运行
WorkerProcess —— 监听队列,等待新任务出现
WorkerChildProcess —— 等待新任务
还有两个数据库可用
QueueBroker
ResultBackend
在此过程中会创建两个新进程
LocalTaskJobProcess —— 其逻辑由
LocalTaskJob描述。它监控RawTaskProcess,并通过TaskRunner启动新进程。RawTaskProcess —— 执行用户代码的进程,例如
execute()。
execute_command(),并创建新的进程——LocalTaskJobProcess。LocalTaskJob 类描述。它使用 TaskRunner 启动新进程。队列¶
使用 CeleryExecutor 时,可指定任务发送到的 Celery 队列。queue 是 BaseOperator 的属性,任何任务均可指派到任意队列。环境的默认队列在 airflow.cfg 的 operators -> default_queue 中定义。该设置决定未显式指定时任务所属的队列,以及启动时 Airflow worker 所监听的队列。
Worker 可以监听一个或多个任务队列。启动 worker 时(使用命令 airflow celery worker),可提供以逗号分隔且不带空格的队列名称列表(例如 airflow celery worker -q spark,quark)。该 worker 只会抢取绑定到指定队列的任务。
如果需要专用的 worker,这在资源层面(例如非常轻量的任务,一个 worker 能处理成千上万的任务)或环境层面(例如希望在 Spark 集群内部运行 worker,因其需要特定的环境和安全权限)都非常有用。