优先级权重
priority_weight 定义执行器队列中的优先级。默认的 priority_weight 为 1,可以提升为任意整数;数值越大优先级越高。此外,每个任务都有一个真实的 priority_weight,它是根据其 weight_rule 计算得到的,该规则定义了任务有效总优先级权重的加权方式。
下面是加权方法。默认情况下,Airflow 的加权方法是 downstream。
downstream任务的有效权重是其所有下游子任务的累计总和。因此,在使用正权重值时,上游任务的权重会更高,并且会更积极地被调度。此方式在您拥有多个 Dag 运行实例并希望在每个 Dag 继续处理下游任务之前,先让所有上游任务在所有运行中完成时非常有用。
upstream任务的有效权重是其所有上游祖先任务的累计总和。这恰好相反:在使用正权重值时,下游任务拥有更高的权重,并会更积极地被调度。当您拥有多个 Dag 运行实例并且希望每个 Dag 完成后再启动其他 Dag 运行的上游任务时,此方式非常有用。
absolute任务的有效权重就是直接指定的 priority_weight,不进行额外加权。如果您明确知道每个任务应该拥有的优先级权重,可以使用此方式。此外,将 weight_rule 设置为 absolute 时,还会显著加速非常大型 Dag 的任务创建过程。
priority_weight 参数可以与 资源池 结合使用。
注意
由于大多数数据库引擎使用 32 位整数,任何计算或定义的 priority_weight 的最大值为 2,147,483,647,最小值为 -2,147,483,648。
自定义权重规则
添加于版本 2.9.0。
您可以通过扩展 PriorityWeightStrategy 类并在插件中注册,来实现自定义的加权方法。
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance) -> int:
try_number = ti.try_number or 0
return max(3 - try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
要检查自定义优先级权重策略是否已在 Airflow 中可用,您可以运行 bash 命令 airflow plugins。随后,要使用它,您可以创建自定义类的实例并在任务的 weight_rule 参数中提供该实例,或提供自定义类的路径。
with DAG(
dag_id="example_custom_weight",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
) as dag:
start = EmptyOperator(
task_id="start",
)
# provide the class instance
task_1 = BashOperator(task_id="task_1", bash_command="echo 1", weight_rule=DecreasingPriorityStrategy())
# or provide the path of the class
task_2 = BashOperator(
task_id="task_2",
bash_command="echo 1",
weight_rule="airflow.example_dags.plugins.decreasing_priority_weight_strategy.DecreasingPriorityStrategy",
)
task_non_custom = BashOperator(task_id="task_non_custom", bash_command="echo 1", priority_weight=2)
start >> [task_1, task_2, task_non_custom]
Dag 运行后,您可以检查任务上的 priority_weight 参数,以验证它是否使用了自定义的优先级策略规则。
这是一个 实验性特性。