Celery 执行器

注意

从 Airflow 2.7.0 开始,您需要安装 celery 提供程序包才能使用此执行器。可以通过安装 apache-airflow-providers-celery>=3.3.0 或通过安装带有 celery 额外功能的 Airflow 来完成:pip install 'apache-airflow[celery]'

CeleryExecutor 是扩展工作进程数量的方式之一。为了使其工作,您需要设置 Celery 后端(RabbitMQRedisRedis Sentinel ...),安装所需的依赖项(例如 librabbitmqredis ...),并将您的 airflow.cfg 中的执行器参数指向 CeleryExecutor 并提供相关的 Celery 设置。

有关设置 Celery 代理的更多信息,请参阅关于此主题的详尽 Celery 文档

Celery 执行器的配置参数可以在 Celery 提供程序的 配置参考 中找到。

以下是您的工作进程的一些必要要求

  • 需要安装 airflow,并且 CLI 需要在路径中

  • Airflow 配置设置在整个集群中应该是同质的

  • 在工作进程上执行的 Operator 需要在该上下文中满足其依赖项。例如,如果您使用 HiveOperator,则需要在该框上安装 Hive CLI,或者如果您使用 MySqlOperator,则需要以某种方式在 PYTHONPATH 中提供所需的 Python 库

  • 工作进程需要访问其 DAGS_FOLDER,并且您需要自行同步文件系统。一种常见的设置是将您的 DAGS_FOLDER 存储在 Git 存储库中,并使用 Chef、Puppet、Ansible 或您在环境中用来配置计算机的任何工具在机器之间同步它。如果您的所有框都有一个公共挂载点,那么在那里共享您的管道文件也应该可以工作

要启动工作进程,您需要设置 Airflow 并启动工作进程子命令

airflow celery worker

您的工作进程应该在任务被发送到其方向后立即开始接收任务。要停止在机器上运行的工作进程,您可以使用

airflow celery stop

它会尝试通过向主 Celery 进程发送 SIGTERM 信号来优雅地停止工作进程,正如 Celery 文档 建议的那样。

请注意,您还可以运行 Celery Flower,这是一个基于 Celery 构建的 Web UI,用于监视您的工作进程。您可以使用快捷命令启动 Flower Web 服务器

airflow celery flower

请注意,您必须在您的系统上安装 flower Python 库。推荐的方法是安装 airflow celery 包。

pip install 'apache-airflow[celery]'

一些注意事项

  • 请确保使用数据库支持的结果后端

  • 请确保在 [celery_broker_transport_options] 中设置的可见性超时时间超过您运行时间最长的任务的 ETA

  • 如果您使用 Redis Sentinel 作为您的代理并且 Redis 服务器受密码保护,请确保在 [celery_broker_transport_options] 部分中指定 Redis 服务器的密码

  • 请确保在 [worker_umask] 中设置 umask,以便为工作进程创建的新文件设置权限。

  • 任务会消耗资源。请确保您的工作进程有足够的资源来运行 worker_concurrency 个任务

  • 队列名称限制为 256 个字符,但每个代理后端可能有其自身的限制

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

架构

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="DAG files"] subgraph cluster_queue { label="Celery"; {rank = same; queue_broker; queue_result_backend} queue_broker[label="Queue broker"] queue_result_backend[label="Result backend"] } web->workers[label="1"] web->dag[label="2"] web->database[label="3"] workers->dag[label="4"] workers->database[label="5"] workers->queue_result_backend[label="6"] workers->queue_broker[label="7"] scheduler->dag[label="8"] scheduler->database[label="9"] scheduler->queue_result_backend[label="10"] scheduler->queue_broker[label="11"] } }

Airflow 由多个组件组成

  • 工作进程 - 执行分配的任务

  • 调度器 - 负责将必要的任务添加到队列中

  • Web 服务器 - HTTP 服务器提供对 DAG/任务状态信息的访问

  • 数据库 - 包含有关任务、DAG、变量、连接等状态的信息

  • Celery - 队列机制

请注意,Celery 的队列由两个组件组成

  • 代理 - 存储要执行的命令

  • 结果后端 - 存储已完成命令的状态

组件在许多地方相互通信

  • [1] Web 服务器 –> 工作进程 - 获取任务执行日志

  • [2] Web 服务器 –> DAG 文件 - 显示 DAG 结构

  • [3] Web 服务器 –> 数据库 - 获取任务的状态

  • [4] 工作进程 –> DAG 文件 - 显示 DAG 结构并执行任务

  • [5] 工作进程 –> 数据库 - 获取和存储有关连接配置、变量和 XCOM 的信息。

  • [6] 工作进程 –> Celery 的结果后端 - 保存任务的状态

  • [7] 工作进程 –> Celery 的代理 - 存储要执行的命令

  • [8] 调度器 –> DAG 文件 - 显示 DAG 结构并执行任务

  • [9] 调度器 –> 数据库 - 存储 DAG 运行和相关任务

  • [10] 调度器 –> Celery 的结果后端 - 获取有关已完成任务状态的信息

  • [11] 调度器 –> Celery 的代理 - 放入要执行的命令

任务执行过程

_images/run_task_on_celery_executor.png

序列图 - 任务执行过程

最初,两个进程正在运行

  • SchedulerProcess - 处理任务并使用 CeleryExecutor 运行

  • WorkerProcess - 观察队列,等待新任务出现

  • WorkerChildProcess - 等待新任务

还有两个数据库可用

  • QueueBroker

  • ResultBackend

在此过程中,会创建两个进程

  • LocalTaskJobProcess - 它的逻辑由 LocalTaskJob 描述。它正在监视 RawTaskProcess。使用 TaskRunner 启动新进程。

  • RawTaskProcess - 这是一个带有用户代码的进程,例如 execute()

[1] SchedulerProcess 处理任务,当它发现需要完成的任务时,将其发送到 QueueBroker
[2] SchedulerProcess 也开始定期查询 ResultBackend 以获取任务状态。
[3] QueueBroker 在意识到任务时,会将有关它的信息发送到一个 WorkerProcess。
[4] WorkerProcess 将单个任务分配给一个 WorkerChildProcess
[5] WorkerChildProcess 执行正确的任务处理功能 - execute_command()。它创建一个新进程 - LocalTaskJobProcess
[6] LocalTaskJobProcess 的逻辑由 LocalTaskJob 类描述。它使用 TaskRunner 启动新进程。
[7][8] 进程 RawTaskProcessLocalTaskJobProcess 在完成工作时停止。
[10][12] WorkerChildProcess 将任务结束以及后续任务的可用性通知主进程 - WorkerProcess
[11] WorkerProcess 将状态信息保存在 ResultBackend 中。
[13] 当 SchedulerProcess 再次向 ResultBackend 询问状态时,它将获得有关任务状态的信息。

队列

当使用 CeleryExecutor 时,可以指定任务发送到的 Celery 队列。queue 是 BaseOperator 的属性,因此任何任务都可以分配给任何队列。环境的默认队列在 airflow.cfgoperators -> default_queue 中定义。这定义了未指定时任务分配到的队列,以及 Airflow 工作进程启动时监听的队列。

工作进程可以监听一个或多个任务队列。当启动工作进程(使用命令 airflow celery worker)时,可以给出用逗号分隔的一组队列名称(没有空格)(例如 airflow celery worker -q spark,quark)。然后,此工作进程将仅接收连接到指定队列的任务。

如果您需要专业的工作进程,无论从资源的角度(例如,对于非常轻量级的任务,一个工作进程可以毫无问题地处理数千个任务),还是从环境的角度(您希望工作进程在 Spark 集群本身内运行,因为它需要非常特定的环境和安全权限),这都很有用。

这个条目是否有帮助?