资源池

某些系统在同一时间接收到过多进程时可能会不堪重负。Airflow 资源池可用于限制任意任务集合的执行并行度。资源池列表在 UI 中管理(Menu -> Admin -> Pools),通过为资源池命名并分配工作槽位数量。您还可以决定该资源池是否在计算已占用槽位时包括 延迟任务

随后,可在创建任务时使用 pool 参数将任务关联到现有的某个资源池。

aggregate_db_message_job = BashOperator(
    task_id="aggregate_db_message_job",
    execution_timeout=timedelta(hours=3),
    pool="ep_data_pipeline_db_msg_agg",
    bash_command=aggregate_db_message_job_cmd,
    dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

任务将像往常一样被调度,直到槽位被占满。任务占用的槽位数量可以通过 pool_slots(见下文)进行配置。达到容量后,可运行的任务会排队,UI 中的状态会相应显示。随着槽位释放,排队任务会根据任务及其后代的优先级权重开始运行。

请注意,如果任务未指定资源池,它们会被分配到默认资源池 default_pool,该池默认拥有 128 个槽位,可通过 UI 或 CLI 修改(但不能删除)。

使用多个资源池槽位

Airflow 任务默认各占用一个资源池槽位,但如果需要也可以通过 pool_slots 参数配置占用多个槽位。这在同一资源池内的若干任务“计算负载”不一致时尤为有用。

例如,考虑一个拥有 2 个槽位的资源池 Pool(pool='maintenance', slots=2),以及下面的任务:

BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)

BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)

BashOperator(
    task_id="light_task2",
    bash_command="bash remove_files.sh",
    pool_slots=1,
    pool="maintenance",
)

由于重任务被配置为占用 2 个资源池槽位,运行时会耗尽整个池。因此,任何轻任务都必须排队,等待重任务完成后才能执行。在资源使用上,重任务等价于同时运行的两个轻任务。

此实现可以防止系统资源被过度消耗(在本例中,即防止重任务与轻任务同时运行导致的资源争抢)。另一方面,两个轻任务可以并行运行,因为它们各只占用一个槽位,而重任务则必须等待两个槽位都空闲后才能执行。

此条目是否有帮助?