Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册可享早鸟票优惠!

连接池

当太多进程同时访问某些系统时,这些系统可能会不堪重负。Airflow 连接池可用于限制任意任务集的并行执行。通过 UI (菜单 -> 管理员 -> 连接池) 管理连接池列表,为连接池命名并分配工作槽位数量。在这里,您还可以决定连接池在计算已占用槽位时是否应包含延迟任务

然后,在创建任务时,可以使用 pool 参数将任务与现有连接池之一关联起来

aggregate_db_message_job = BashOperator(
    task_id="aggregate_db_message_job",
    execution_timeout=timedelta(hours=3),
    pool="ep_data_pipeline_db_msg_agg",
    bash_command=aggregate_db_message_job_cmd,
    dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

槽位被占用时,任务将照常调度。任务占用的槽位数量可通过 pool_slots 进行配置(详见下文)。一旦达到容量限制,可运行任务将被排队,其状态将在 UI 中显示为排队状态。随着槽位释放,排队任务将根据任务及其后代的优先级权重开始运行。

请注意,如果未给任务指定连接池,它们将被分配给默认连接池 default_pool,该连接池初始化时有 128 个槽位,可通过 UI 或 CLI 修改(但不能删除)。

使用多个连接池槽位

Airflow 任务默认每个占用一个连接池槽位,但如果需要,可以通过 pool_slots 参数将其配置为占用更多槽位。当属于同一连接池的多个任务具有不同的“计算权重”时,这尤其有用。

例如,考虑一个具有 2 个槽位的连接池,Pool(pool='maintenance', slots=2),以及以下任务

BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)

BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)

BashOperator(
    task_id="light_task2",
    bash_command="bash remove_files.sh",
    pool_slots=1,
    pool="maintenance",
)

由于繁重任务配置为使用 2 个连接池槽位,因此运行时会耗尽连接池。因此,任何轻量任务都必须排队等待繁重任务完成才能执行。在这里,就资源使用而言,一个繁重任务相当于两个轻量任务同时运行。

这种实现可以防止系统资源过载,例如(在此示例中)当一个繁重任务和一个轻量任务同时运行时可能发生过载。另一方面,两个轻量任务可以同时运行,因为它们每个只占用一个连接池槽位,而繁重任务则必须等待两个连接池槽位可用后才能执行。

此条目有帮助吗?