集群策略
如果您想在整个集群层面检查或修改 DAG 或任务,则可以使用集群策略来实现。它们还可以根据 dag_id 或其他属性对 DAG 应用集群范围的设置。
常见使用场景包括
检查 DAG/任务是否符合特定标准
为 DAG/任务设置默认参数
执行自定义路由逻辑
集群策略主要有三种类型
dag_policy:接受一个名为dag的DAG参数。在从DagBag加载 DAG 时运行。task_policy:接受一个名为task的BaseOperator参数。该策略在从 DagBag 解析任务并在加载时创建任务时执行。这意味着可以在task_policy中修改整个任务定义。它并不针对在 DagRun 中运行的特定任务。定义的task_policy将应用于所有将来要执行的任务实例。task_instance_mutation_hook:接受一个名为task_instance的TaskInstance参数。task_instance_mutation_hook作用于与特定 DagRun 关联的任务实例,而不是任务本身。它在“worker”上执行,而不是在 DAG 文件处理器中,在任务实例执行之前运行。该策略仅适用于该任务当前执行的运行(即实例)。
DAG 和任务的集群策略可以抛出 AirflowClusterPolicyViolation 异常,以表明传入的 DAG/任务不符合要求,且不应被加载。
它们也可以在需要有意跳过该 DAG 时抛出 AirflowClusterPolicySkipDag 异常。与 AirflowClusterPolicyViolation 不同,此异常不会在 Airflow Web UI 上显示(内部也不会记录在 meta 数据库的 import_error 表中)。
任何由集群策略设置的额外属性都会优先于在 DAG 文件中定义的属性;例如,如果您在 DAG 文件的任务上设置了 sla,而集群策略也设置了 sla,则集群策略的值将优先。
如何定义策略函数
配置集群策略有两种方式
在 Python 搜索路径中的任意位置(例如 $AIRFLOW_HOME 下的
config/文件夹是一个很好的默认位置)创建一个airflow_local_settings.py文件,然后在该文件中添加与上述一个或多个集群策略名称匹配的可调用对象(例如dag_policy)。
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
通过在自定义模块中使用 setuptools entrypoint,结合 Pluggy 接口实现。
在 2.6 版本中加入。
此方法更为高级,适用于熟悉 Python 打包的用户。
首先在模块中编写您的策略函数
from airflow.policies import hookimpl @hookimpl def task_policy(task) -> None: # Mutate task in place # ... print(f"Hello from {__file__}")
然后将 entrypoint 添加到项目配置中。例如,使用
pyproject.toml和setuptools。[build-system] requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [project] name = "my-airflow-plugin" version = "0.0.1" # ... dependencies = ["apache-airflow>=2.6"] [project.entry-points.'airflow.policy'] _ = 'my_airflow_plugin.policies'
entrypoint 组必须为
airflow.policy,名称在每个 entry 中必须唯一,否则 Pluggy 会忽略重复的 entry。value 应为使用@hookimpl标记的模块(或类)。完成上述步骤并将您的发行版安装到 Airflow 环境后,策略函数将由各个 Airflow 组件调用。(调用顺序未定义,因此若有多个插件请不要依赖特定的调用顺序。)
需要注意的一点是(无论使用哪种方式定义策略函数),参数名称必须严格匹配下文所述的名称。
可用的策略函数
- airflow.policies.task_policy(task)[source]
允许在任务被加载到 DagBag 后进行修改。
它允许管理员重新配置某些任务的参数。或者可以抛出
AirflowClusterPolicyViolation异常,以阻止 DAG 被执行。以下是一些此功能的实用示例
您可以强制为使用
SparkOperator的任务指定特定队列(例如spark队列),以确保这些任务被分配到正确的工作节点。您可以实施任务超时策略,确保没有任务运行超过 48 小时。
- 参数:
task – 需要被修改的任务
- airflow.policies.dag_policy(dag)[source]
允许在 DAG 被加载到 DagBag 后进行修改。
它允许管理员重新配置某些 DAG 的参数。或者可以抛出
AirflowClusterPolicyViolation异常,以阻止 DAG 被执行。以下是一些此功能的实用示例
您可以为 DAG 强制设置默认用户
检查每个 DAG 是否配置了标签
- 参数:
dag – 需要被修改的 DAG
- airflow.policies.task_instance_mutation_hook(task_instance)[source]
允许在任务实例被 Airflow 调度器排队之前进行修改。
例如,可用于在重试期间修改任务实例。
- 参数:
task_instance (airflow.models.taskinstance.TaskInstance) – 需要被修改的任务实例
- airflow.policies.pod_mutation_hook(pod)[source]
在调度之前修改 pod。
此设置允许在将
kubernetes.client.models.V1Pod对象传递给 Kubernetes 客户端进行调度之前进行修改。例如,可用于向 KubernetesExecutor 或 KubernetesPodOperator 启动的每个工作 pod 添加 sidecar 或 init 容器。
- airflow.policies.get_airflow_context_vars(context)[source]
将 airflow 上下文变量注入默认的 airflow 上下文变量中。
此设置允许获取 airflow 上下文变量,这些变量是键值对。它们随后被注入到默认的 airflow 上下文变量中,最终在运行任务时作为环境变量可用,dag_id、task_id、logical_date、dag_run_id、try_number 为保留键。
- 参数:
context – 所关注的 task_instance 的上下文。
示例
DAG 策略
此策略检查每个 DAG 是否至少定义了一个标签
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
)
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
)
注意
为避免导入循环,如果在集群策略的类型注解中使用 DAG,请确保从 airflow.models 导入,而不是从 airflow 导入。
注意
DAG 策略在 DAG 完全加载后才会应用,因此覆盖 default_args 参数不会产生效果。如需覆盖默认的 operator 设置,请改用任务策略。
任务策略
以下示例演示如何对每个任务强制执行最大超时策略
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
您也可以实现此策略以防止常见错误,而非仅作为技术安全控制。例如,不运行没有 Airflow 所有者的任务。
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
)
如果有多个检查要应用,最佳实践是将这些规则整理在单独的 Python 模块中,并使用单个策略/任务变异钩子执行这些自定义检查并聚合各种错误信息,以便在 UI(以及数据库的 import_errors 表)中仅报告一次 AirflowClusterPolicyViolation。
例如,您的 airflow_local_settings.py 可以遵循如下模式
TASK_RULES: list[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
f"Notices:\n"
f"{notices_list}"
)
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
任务实例变异
以下示例展示如何将第二次(或更高)重试的任务重新路由到不同的队列
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
请注意,由于优先级权重是使用权重规则动态确定的,在变异钩子中无法更改任务实例的 priority_weight。