连接池¶
当太多进程同时访问某些系统时,这些系统可能会不堪重负。Airflow 连接池可用于限制任意任务集的并行执行。通过 UI (菜单 -> 管理员 -> 连接池
) 管理连接池列表,为连接池命名并分配工作槽位数量。在这里,您还可以决定连接池在计算已占用槽位时是否应包含延迟任务。
然后,在创建任务时,可以使用 pool
参数将任务与现有连接池之一关联起来
aggregate_db_message_job = BashOperator(
task_id="aggregate_db_message_job",
execution_timeout=timedelta(hours=3),
pool="ep_data_pipeline_db_msg_agg",
bash_command=aggregate_db_message_job_cmd,
dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
槽位被占用时,任务将照常调度。任务占用的槽位数量可通过 pool_slots
进行配置(详见下文)。一旦达到容量限制,可运行任务将被排队,其状态将在 UI 中显示为排队状态。随着槽位释放,排队任务将根据任务及其后代的优先级权重开始运行。
请注意,如果未给任务指定连接池,它们将被分配给默认连接池 default_pool
,该连接池初始化时有 128 个槽位,可通过 UI 或 CLI 修改(但不能删除)。
使用多个连接池槽位¶
Airflow 任务默认每个占用一个连接池槽位,但如果需要,可以通过 pool_slots
参数将其配置为占用更多槽位。当属于同一连接池的多个任务具有不同的“计算权重”时,这尤其有用。
例如,考虑一个具有 2 个槽位的连接池,Pool(pool='maintenance', slots=2)
,以及以下任务
BashOperator(
task_id="heavy_task",
bash_command="bash backup_data.sh",
pool_slots=2,
pool="maintenance",
)
BashOperator(
task_id="light_task1",
bash_command="bash check_files.sh",
pool_slots=1,
pool="maintenance",
)
BashOperator(
task_id="light_task2",
bash_command="bash remove_files.sh",
pool_slots=1,
pool="maintenance",
)
由于繁重任务配置为使用 2 个连接池槽位,因此运行时会耗尽连接池。因此,任何轻量任务都必须排队等待繁重任务完成才能执行。在这里,就资源使用而言,一个繁重任务相当于两个轻量任务同时运行。
这种实现可以防止系统资源过载,例如(在此示例中)当一个繁重任务和一个轻量任务同时运行时可能发生过载。另一方面,两个轻量任务可以同时运行,因为它们每个只占用一个连接池槽位,而繁重任务则必须等待两个连接池槽位可用后才能执行。