KubernetesPodOperator

KubernetesPodOperator 允许您在 Kubernetes 集群上创建和运行 Pod。

注意

如果您使用托管 Kubernetes,请考虑使用专门的 KPO 操作符,因为它简化了 Kubernetes 授权过程

注意

使用此操作符**不需要** Kubernetes 执行器

此操作符如何工作?

KubernetesPodOperator 使用 Kubernetes API 在 Kubernetes 集群中启动 Pod。通过提供镜像 URL 和带有可选参数的命令,操作符使用 Kube Python 客户端生成一个 Kubernetes API 请求,该请求动态启动这些单独的 Pod。用户可以使用 config_file 参数指定 kubeconfig 文件,否则操作符将默认为 ~/.kube/config

KubernetesPodOperator 支持任务级别的资源配置,并且对于通过公共 PyPI 仓库不可用的自定义 Python 依赖项来说是最佳的。它还允许用户使用 pod_template_file 参数提供模板 YAML 文件。最终,它允许 Airflow 充当作业编排器 - 无论这些作业是用什么语言编写的。

调试 KubernetesPodOperator

您可以通过在操作符实例上调用 dry_run() 来打印出在运行时将创建的 Pod 的 Kubernetes 清单。

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()

参数优先级

当 KPO 定义 Pod 对象时,KubernetesPodOperator 参数之间可能存在重叠。通常,优先级顺序为 KPO 字段特定参数(例如,secretscmdsaffinity),更通用的模板 full_pod_specpod_template_filepod_template_dict,然后是默认的 V1Pod

对于 namespace,如果未通过任何这些方法提供命名空间,那么我们将首先尝试获取当前命名空间(如果任务已在 Kubernetes 中运行),如果失败,我们将使用 default 命名空间。

对于 Pod 名称,如果未明确提供,我们将使用 task_id。默认情况下会添加一个随机后缀,因此 Pod 名称通常没有太大意义。

如何在 Pod 中使用集群 ConfigMap、Secret 和 Volume?

要添加 ConfigMap、Volume 和其他 Kubernetes 原生对象,我们建议您像这样导入 Kubernetes 模型 API

from kubernetes.client import models as k8s

使用此 API 对象,您可以访问所有 Kubernetes API 对象,形式为 Python 类。使用此方法将确保正确性和类型安全。虽然我们已经删除了几乎所有的 Kubernetes 便利类,但我们保留了 Secret 类,以简化生成 Secret Volume/环境变量的过程。

tests/system/cncf/kubernetes/example_kubernetes.py[源代码]

secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)

configmaps = [
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)

port = k8s.V1ContainerPort(name="http", container_port=80)

init_container_volume_mounts = [
    k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]

init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]

init_container = k8s.V1Container(
    name="init-container",
    image="ubuntu:16.04",
    env=init_environments,
    volume_mounts=init_container_volume_mounts,
    command=["bash", "-cx"],
    args=["echo 10"],
)

affinity = k8s.V1Affinity(
    node_affinity=k8s.V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[
            k8s.V1PreferredSchedulingTerm(
                weight=1,
                preference=k8s.V1NodeSelectorTerm(
                    match_expressions=[
                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
                    ]
                ),
            )
        ]
    ),
    pod_affinity=k8s.V1PodAffinity(
        required_during_scheduling_ignored_during_execution=[
            k8s.V1WeightedPodAffinityTerm(
                weight=1,
                pod_affinity_term=k8s.V1PodAffinityTerm(
                    label_selector=k8s.V1LabelSelector(
                        match_expressions=[
                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
                        ]
                    ),
                    topology_key="failure-domain.beta.kubernetes.io/zone",
                ),
            )
        ]
    ),
)

tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]

KubernetesPodOperator 和 Kubernetes 对象规范之间的区别

KubernetesPodOperator 可以被认为是能够在 DAG 上下文中的 Airflow 调度程序中运行的 Kubernetes 对象规范定义的替代品。如果使用该操作符,则无需为您想要运行的 Pod 创建等效的 YAML/JSON 对象规范。YAML 文件仍然可以使用 pod_template_file 提供,甚至可以使用 full_pod_spec 参数在 Python 中构造 Pod 规范,该参数需要 Kubernetes V1Pod

如何使用私有镜像(容器注册表)?

默认情况下,KubernetesPodOperator 将查找在 Dockerhub 上公开托管的镜像。要从私有注册表(例如 ECR、GCR、Quay 或其他注册表)拉取镜像,您必须创建一个 Kubernetes Secret,该 Secret 表示从私有注册表访问镜像的凭据,最终在 image_pull_secrets 参数中指定。

使用 kubectl 创建 Secret

kubectl create secret docker-registry testquay \
    --docker-server=quay.io \
    --docker-username=<Profile name> \
    --docker-password=<password>

然后在您的 Pod 中像这样使用它

tests/system/cncf/kubernetes/example_kubernetes.py[源代码]

    quay_k8s = KubernetesPodOperator(
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="task-two",
        get_logs=True,
    )

此外,对于此操作,您可以使用可延迟模式下的操作符

tests/system/cncf/kubernetes/example_kubernetes_async.py[源代码]

    quay_k8s_async = KubernetesPodOperator(
        task_id="kubernetes_private_img_task_async",
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

定期获取和显示容器日志的示例

tests/system/cncf/kubernetes/example_kubernetes_async.py[源代码]

    kubernetes_task_async_log = KubernetesPodOperator(
        task_id="kubernetes_task_async_log",
        namespace="kubernetes_task_async_log",
        in_cluster=False,
        name="astro_k8s_test_pod",
        image="ubuntu",
        cmds=[
            "bash",
            "-cx",
            (
                "i=0; "
                "while [ $i -ne 100 ]; "
                "do i=$(($i+1)); "
                "echo $i; "
                "sleep 1; "
                "done; "
                "mkdir -p /airflow/xcom/; "
                'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
            ),
        ],
        do_xcom_push=True,
        deferrable=True,
        get_logs=True,
        logging_interval=5,
    )

XCom 如何工作?

KubernetesPodOperator 处理 XCom 值的方式与其他操作符不同。为了从您的 Pod 中传递 XCom 值,您必须将 do_xcom_push 指定为 True。这将创建一个与 Pod 并行运行的 sidecar 容器。Pod 必须将 XCom 值写入 /airflow/xcom/return.json 路径下的此位置。

注意

无效的 JSON 内容将失败,例如 echo 'hello' > /airflow/xcom/return.json 失败,而 echo '\"hello\"' > /airflow/xcom/return.json 则可以工作

请参阅以下示例,了解此过程如何发生

tests/system/cncf/kubernetes/example_kubernetes.py[源代码]

    write_xcom = KubernetesPodOperator(
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

注意

XCOM 将仅为标记为 State.SUCCESS 的任务推送。

此外,对于此操作,您可以使用可延迟模式下的操作符

tests/system/cncf/kubernetes/example_kubernetes_async.py[源代码]

    write_xcom_async = KubernetesPodOperator(
        task_id="kubernetes_write_xcom_task_async",
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

    pod_task_xcom_result_async = BashOperator(
        task_id="pod_task_xcom_result_async",
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
    )

    write_xcom_async >> pod_task_xcom_result_async

在电子邮件警报中包含错误消息

写入 /dev/termination-log 的任何内容都将由 Kubernetes 检索,并包含在任务失败时的异常消息中。

k = KubernetesPodOperator(
    task_id="test_error_message",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-error-message",
    email="[email protected]",
    email_on_failure=True,
)

在此处阅读有关 termination-log 的更多信息:此处

KubernetesPodOperator 回调

KubernetesPodOperator 支持不同的回调函数,这些回调函数可用于在 Pod 的生命周期中触发操作。要使用它们,您需要创建 KubernetesPodOperatorCallback 的子类,并重写您想要使用的回调方法。然后,您可以使用 callbacks 参数将回调类传递给运算符。

支持以下回调函数:

  • on_sync_client_creation: 在创建同步客户端后调用

  • on_pod_creation: 在创建 Pod 后调用

  • on_pod_starting: 在 Pod 启动后调用

  • on_pod_completion: 在 Pod 完成时调用

  • on_pod_cleanup: 在清理/删除 Pod 后调用

  • on_operator_resuming: 从延迟状态恢复任务时调用

  • progress_callback: 在容器日志的每一行调用

目前,回调方法不在异步模式下调用,此支持将在未来添加。

示例:

import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback


class MyCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
        client.create_namespaced_service(
            namespace=pod.metadata.namespace,
            body=k8s.V1Service(
                metadata=k8s.V1ObjectMeta(
                    name=pod.metadata.name,
                    labels=pod.metadata.labels,
                    owner_references=[
                        k8s.V1OwnerReference(
                            api_version=pod.api_version,
                            kind=pod.kind,
                            name=pod.metadata.name,
                            uid=pod.metadata.uid,
                            controller=True,
                            block_owner_deletion=True,
                        )
                    ],
                ),
                spec=k8s.V1ServiceSpec(
                    selector=pod.metadata.labels,
                    ports=[
                        k8s.V1ServicePort(
                            name="http",
                            port=80,
                            target_port=80,
                        )
                    ],
                ),
            ),
        )


k = KubernetesPodOperator(
    task_id="test_callback",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-callback",
    callbacks=MyCallback,
)

传递密钥

切勿使用环境变量将密钥(例如连接身份验证信息)传递给 Kubernetes Pod Operator。这些环境变量对于任何有权查看和描述 Kubernetes 中 POD 的人都是可见的。相反,请通过 Kubernetes 原生 Secrets 传递密钥,或使用 Airflow 中的连接和变量。对于后者,您需要在您的镜像中安装与运行 Kubernetes Pod Operator 的 Airflow 版本相同的 apache-airflow 包。

SparkKubernetesOperator

SparkKubernetesOperator 允许您在 Kubernetes 集群上创建和运行 Spark 作业。它基于 spark-on-k8s-operator 项目。

此运算符简化了接口,并接受不同的参数来配置和在 Kubernetes 上运行 Spark 应用程序。与 KubernetesOperator 类似,我们添加了在提交后等待作业、管理错误处理、从驱动程序 Pod 检索日志以及删除 Spark 作业的逻辑。它还支持开箱即用的 Kubernetes 功能,例如处理卷、配置映射、密钥等。

此运算符如何工作?

该运算符通过在 Kubernetes 中生成 SparkApplication 自定义资源定义 (CRD) 来启动 Spark 任务。此 SparkApplication 任务随后使用用户指定的参数生成驱动程序和所需的执行程序 Pod。运算符会持续监视任务的进度,直到任务成功或失败。它会从驱动程序 Pod 检索日志,并在 Airflow UI 中显示它们。

使用示例

为了创建 SparkKubernetesOperator 任务,您必须提供一个基本模板,其中包括 Spark 配置和 Kubernetes 相关资源配置。此模板可以是 YAML 或 JSON 格式,用作运算符的起点。以下是一个您可以使用的示例模板

spark_job_template.yaml

spark:
  apiVersion: sparkoperator.k8s.io/v1beta2
  version: v1beta2
  kind: SparkApplication
  apiGroup: sparkoperator.k8s.io
  metadata:
    namespace: ds
  spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    sparkVersion: 3.0.0
    successfulRunHistoryLimit: 1
    restartPolicy:
      type: Never
    imagePullPolicy: Always
    hadoopConf: {}
    imagePullSecrets: []
    dynamicAllocation:
      enabled: false
      initialExecutors: 1
      minExecutors: 1
      maxExecutors: 1
    labels: {}
    driver:
      serviceAccount: default
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
    executor:
      instances: 1
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
kubernetes:
  # example:
  # env_vars:
  # - name: TEST_NAME
  #   value: TEST_VALUE
  env_vars: []

  # example:
  # env_from:
  # - name: test
  #   valueFrom:
  #     secretKeyRef:
  #       name: mongo-secret
  #       key: mongo-password
  env_from: []

  # example:
  # node_selector:
  #   karpenter.sh/provisioner-name: spark
  node_selector: {}

  # example: https://kubernetes.ac.cn/docs/concepts/scheduling-eviction/assign-pod-node/
  # affinity:
  #   nodeAffinity:
  #     requiredDuringSchedulingIgnoredDuringExecution:
  #       nodeSelectorTerms:
  #       - matchExpressions:
  #         - key: beta.kubernetes.io/instance-type
  #           operator: In
  #           values:
  #           - r5.xlarge
  affinity:
    nodeAffinity: {}
    podAffinity: {}
    podAntiAffinity: {}

  # example: https://kubernetes.ac.cn/docs/concepts/scheduling-eviction/taint-and-toleration/
  # type: list
  # tolerations:
  # - key: "key1"
  #   operator: "Equal"
  #   value: "value1"
  #   effect: "NoSchedule"
  tolerations: []

  # example:
  # config_map_mounts:
  #   snowflake-default: /mnt/tmp
  config_map_mounts: {}

  # example:
  # volume_mounts:
  # - name: config
  #   mountPath: /airflow
  volume_mounts: []

  # https://kubernetes.ac.cn/docs/concepts/storage/volumes/
  # example:
  # volumes:
  # - name: config
  #   persistentVolumeClaim:
  #     claimName: airflow
  volumes: []

  # read config map into an env variable
  # example:
  # from_env_config_map:
  # - configmap_1
  # - configmap_2
  from_env_config_map: []

  # load secret into an env variable
  # example:
  # from_env_secret:
  # - secret_1
  # - secret_2
  from_env_secret: []

  in_cluster: true
  conn_id: kubernetes_default
  kube_config_file: null
  cluster_context: null

重要

  • 模板文件由两个主要类别组成:sparkkubernetes

    • spark:此部分包含任务的 Spark 配置,与 Spark API 模板的结构相呼应。

    • kubernetes:此部分包含任务的 Kubernetes 资源配置,与 Kubernetes API 文档直接对应。每种资源类型都包含模板中的示例。

  • 要使用的指定基本镜像为 gcr.io/spark-operator/spark-py:v3.1.1

  • 确保 Spark 代码已嵌入到镜像中,使用 persistentVolume 安装,或可从 S3 存储桶等外部位置访问。

接下来,使用以下命令创建任务

SparkKubernetesOperator(
    task_id="spark_task",
    image="gcr.io/spark-operator/spark-py:v3.1.1",  # OR custom image using that
    code_path="local://path/to/spark/code.py",
    application_file="spark_job_template.yaml",  # OR spark_job_template.json
    dag=dag,
)

注意:或者,application_file 也可以是 json 文件。请参见以下示例

spark_job_template.json

{
  "spark": {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "version": "v1beta2",
    "kind": "SparkApplication",
    "apiGroup": "sparkoperator.k8s.io",
    "metadata": {
      "namespace": "ds"
    },
    "spec": {
      "type": "Python",
      "pythonVersion": "3",
      "mode": "cluster",
      "sparkVersion": "3.0.0",
      "successfulRunHistoryLimit": 1,
      "restartPolicy": {
        "type": "Never"
      },
      "imagePullPolicy": "Always",
      "hadoopConf": {},
      "imagePullSecrets": [],
      "dynamicAllocation": {
        "enabled": false,
        "initialExecutors": 1,
        "minExecutors": 1,
        "maxExecutors": 1
      },
      "labels": {},
      "driver": {
        "serviceAccount": "default",
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      },
      "executor": {
        "instances": 1,
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      }
    }
  },
  "kubernetes": {
    "env_vars": [],
    "env_from": [],
    "node_selector": {},
    "affinity": {
      "nodeAffinity": {},
      "podAffinity": {},
      "podAntiAffinity": {}
    },
    "tolerations": [],
    "config_map_mounts": {},
    "volume_mounts": [
      {
        "name": "config",
        "mountPath": "/airflow"
      }
    ],
    "volumes": [
      {
        "name": "config",
        "persistentVolumeClaim": {
          "claimName": "hsaljoog-airflow"
        }
      }
    ],
    "from_env_config_map": [],
    "from_env_secret": [],
    "in_cluster": true,
    "conn_id": "kubernetes_default",
    "kube_config_file": null,
    "cluster_context": null
  }
}

除了使用 YAML 或 JSON 文件之外,另一种方法是直接传递 template_spec 字段,而不是 application_file,如果您不想使用文件进行配置。

KubernetesJobOperator

KubernetesJobOperator 允许您在 Kubernetes 集群上创建和运行作业。

注意

如果您使用托管的 Kubernetes,请考虑使用专门的 KJO 运算符,因为它简化了 Kubernetes 授权过程

注意

使用此操作符**不需要** Kubernetes 执行器

此运算符如何工作?

KubernetesJobOperator 使用 Kubernetes API 在 Kubernetes 集群中启动作业。该运算符使用 Kube Python 客户端生成一个 Kubernetes API 请求,该请求动态启动此作业。用户可以使用 config_file 参数指定 kubeconfig 文件,否则该运算符将默认使用 ~/.kube/config。它还允许用户使用 job_template_file 参数提供模板 YAML 文件。

tests/system/cncf/kubernetes/example_kubernetes_job.py[source]

k8s_job = KubernetesJobOperator(
    task_id="job-task",
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

KubernetesJobOperator 还支持可延迟模式

tests/system/cncf/kubernetes/example_kubernetes_job.py[source]

k8s_job_def = KubernetesJobOperator(
    task_id="job-task-def",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME + "-def",
    wait_until_job_complete=True,
    deferrable=True,
)

KubernetesPodOperatorKubernetesJobOperator 之间的区别

KubernetesJobOperator 是用于创建作业的运算符。作业创建一个或多个 Pod,并将继续重试 Pod 的执行,直到指定数量的 Pod 成功终止。当 Pod 成功完成时,作业会跟踪成功的完成情况。当达到指定数量的成功完成次数时,作业完成。用户可以使用 activeDeadlineSecondsbackoffLimit 等配置参数限制作业重试执行的次数。此运算符不使用用于创建 Pod 的 template 参数,而是使用 KubernetesPodOperator。这意味着用户可以使用 KubernetesPodOperator 中的所有参数,在 KubernetesJobOperator 中使用。

有关作业的更多信息,请访问此处:Kubernetes 作业文档

KubernetesDeleteJobOperator

KubernetesDeleteJobOperator 允许您在 Kubernetes 集群上删除作业。

tests/system/cncf/kubernetes/example_kubernetes_job.py[source]

delete_job_task = KubernetesDeleteJobOperator(
    task_id="delete_job_task",
    name=k8s_job.output["job_name"],
    namespace=JOB_NAMESPACE,
    wait_for_completion=True,
    delete_on_status="Complete",
    poll_interval=1.0,
)

KubernetesPatchJobOperator

KubernetesPatchJobOperator 允许您更新 Kubernetes 集群上的 Job。

tests/system/cncf/kubernetes/example_kubernetes_job.py[source]

update_job = KubernetesPatchJobOperator(
    task_id="update-job-task",
    namespace="default",
    name=k8s_job.output["job_name"],
    body={"spec": {"suspend": False}},
)

KubernetesInstallKueueOperator

KubernetesInstallKueueOperator 允许您在 Kubernetes 集群中安装 Kueue 组件。

tests/system/cncf/kubernetes/example_kubernetes_kueue.py[源代码]

install_kueue = KubernetesInstallKueueOperator(
    task_id="install_kueue",
    kueue_version="v0.9.1",
)

参考

更多信息,请查看

KubernetesStartKueueJobOperator

KubernetesStartKueueJobOperator 允许您在 Kubernetes 集群中启动 Kueue 作业。

tests/system/cncf/kubernetes/example_kubernetes_kueue.py[源代码]

start_kueue_job = KubernetesStartKueueJobOperator(
    task_id="kueue_job",
    queue_name=QUEUE_NAME,
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
    wait_until_job_complete=True,
)

更多信息,请查看

此条目是否有帮助?