池¶
当太多进程同时访问某些系统时,这些系统可能会不堪重负。Airflow 池可用于限制任意任务集上的执行并行性。池列表在 UI 中进行管理(菜单 -> 管理 -> 池
),方法是为池命名并为其分配一定数量的工作槽。您还可以在此处决定池是否应在其已占用槽的计算中包括延迟任务。
当创建任务时,可以通过使用 pool
参数将任务与某个现有池关联
aggregate_db_message_job = BashOperator(
task_id="aggregate_db_message_job",
execution_timeout=timedelta(hours=3),
pool="ep_data_pipeline_db_msg_agg",
bash_command=aggregate_db_message_job_cmd,
dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
任务将按常规进行调度,直到填满槽位。可通过 pool_slots
配置任务占据的槽位数(请参阅以下部分)。达到容量后,可运行任务将排队,并且其状态将在 UI 中显示为排队。当槽位释放时,排队的任务将基于任务及其后代的 优先级权重 开始运行。
请注意,如果任务未分配池,则会将它们分配给默认池 default_pool
,该池使用 128 个槽位进行初始化,并且可以通过 UI 或 CLI 进行修改(但无法移除)。
使用多个池槽位¶
默认情况下,Airflow 任务将各占据一个池槽位,但如果需要,可使用 pool_slots
参数配置它们占据更多槽位。当属于同一池的多个任务没有相同的“计算权重”时,这特别有用。
例如,考虑一个具有 2 个槽位的池 Pool(pool='maintenance', slots=2)
,以及以下任务
BashOperator(
task_id="heavy_task",
bash_command="bash backup_data.sh",
pool_slots=2,
pool="maintenance",
)
BashOperator(
task_id="light_task1",
bash_command="bash check_files.sh",
pool_slots=1,
pool="maintenance",
)
BashOperator(
task_id="light_task2",
bash_command="bash remove_files.sh",
pool_slots=1,
pool="maintenance",
)
由于将重型任务配置为使用 2 个池槽位,因此它在运行时会耗尽池。因此,任何轻型任务都必须排队,并等待重型任务完成才能执行。在此,就资源使用而言,重型任务相当于同时运行的两个轻型任务。
此实现可以防止系统资源不堪重负,(在此示例中)当重型任务和轻型任务同时运行时可能会发生这种情况。另一方面,由于两个轻型任务各只占据一个池槽位,因此它们可以同时运行,而重型任务必须等待两个池槽位可用才能执行。
警告
池和 SubDAG 的交互方式可能与你的第一印象不同。SubDAG 不会遵守你在顶层为它们设置的任何池;必须直接在 SubDAG 内部的任务上设置池。