Google Kubernetes Engine 算子

Google Kubernetes Engine (GKE) 提供一个托管环境,用于利用 Google 基础设施部署、管理和扩展容器化应用程序。GKE 环境由多台机器(具体来说是 Compute Engine 实例)组成,它们组合在一起形成一个集群。

先决条件任务

要使用这些算子,您必须完成以下几项工作

管理 GKE 集群

集群是 GKE 的基础 - 所有工作负载都在集群之上运行。它由集群主节点和工作节点组成。在创建或删除集群时,主节点的生命周期由 GKE 管理。工作节点表示为 Compute Engine 虚拟机实例,GKE 在您创建集群时代表您创建这些实例。

创建 GKE 集群

以下是集群定义的示例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}

使用 GKECreateClusterOperator 创建集群时,需要一个像这样的 dict 对象,或一个 Cluster 定义。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

在集群内部安装特定版本的 Kueue

Kueue 是一个云原生 Job 调度器,它与默认的 Kubernetes 调度器、Job 控制器和集群自动扩缩器协同工作,以提供端到端的批处理系统。Kueue 实现 Job 排队,根据配额和公平共享资源的分层结构决定 Job 何时等待以及何时开始。Kueue 支持 Autopilot 集群、带有节点自动预配功能的标准 GKE 和常规的自动扩缩节点池。借助 GKEStartKueueInsideClusterOperator 在您的集群上安装和使用 Kueue,如本示例所示

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py

    add_kueue_cluster = GKEStartKueueInsideClusterOperator(
        task_id="add_kueue_cluster",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        kueue_version="v0.6.2",
    )

删除 GKE 集群

要删除集群,请使用 GKEDeleteClusterOperator。这也会删除分配给集群的所有节点。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

管理 GKE 集群上的工作负载

GKE 与容器化应用程序协同工作,例如在 Docker 上创建的应用程序,并将它们部署到集群上运行。这些称为工作负载,部署在集群上时,它们利用集群的 CPU 和内存资源来有效运行。

在 GKE 集群上运行 Pod

为了在 GKE 集群上运行 Pod,有两个算子可用

GKEStartPodOperator 扩展了 KubernetesPodOperator,以使用 Google Cloud 凭据提供授权。无需管理 kube_config 文件,因为它会自动生成。所有 Kubernetes 参数(config_file 除外)也对 GKEStartPodOperator 有效。有关 KubernetesPodOperator 的更多信息,请参阅:KubernetesPodOperator 指南。

与私有集群一起使用

所有集群都有一个规范端点。该端点是 Kubernetes API 服务器的 IP 地址,Airflow 使用它与您的集群主节点通信。该端点显示在 Cloud Console 中集群“详细信息”选项卡的“端点”字段下,以及 gcloud container clusters describe 命令输出的 endpoint 字段中。

私有集群有两个唯一的端点值:privateEndpoint(内部 IP 地址)和 publicEndpoint(外部 IP 地址)。默认情况下,对私有集群运行 GKEStartPodOperator 会将外部 IP 地址设置为端点。如果您倾向于使用内部 IP 作为端点,则需要将 use_internal_ip 参数设置为 True

与 DNS 端点集群一起使用

要在使用 DNS 端点时运行 GKEStartPodOperator,您需要将 use_dns_endpoint 参数设置为 True

与 Autopilot(无服务器)集群一起使用

在像 GKE Autopilot 这样的无服务器集群上运行时,由于冷启动,Pod 启动有时会花费更长时间。在 Pod 启动期间,会定期短间隔检查状态,如果 Pod 尚未启动,则会发出警告消息。您可以通过 startup_check_interval_seconds 参数增加此间隔长度,建议设置为 60 秒。

XCom 的使用

我们可以在算子上启用 XCom 的使用。其工作原理是使用指定的 Pod 启动一个 Sidecar 容器。指定使用 XCom 时,Sidecar 会自动挂载,其挂载点路径为 /airflow/xcom。要为 XCom 提供值,请确保您的 Pod 将其写入 Sidecar 中一个名为 return.json 的文件。该文件的内容可在您的 DAG 的下游使用。以下是使用示例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

然后在其他算子中使用它

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom') }}\"",
    task_id="pod_task_xcom_result",
)

您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

在 GKE 集群上运行 Job

为了在 GKE 集群上运行 Job,有两个算子可用

GKEStartJobOperator 扩展了 KubernetesJobOperator,以使用 Google Cloud 凭据提供授权。无需管理 kube_config 文件,因为它会自动生成。所有 Kubernetes 参数(config_file 除外)也对 GKEStartJobOperator 有效。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

job_task = GKEStartJobOperator(
    task_id="job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

GKEStartJobOperator 也支持 deferrable 模式。请注意,仅当 wait_until_job_complete 参数设置为 True 时,此模式才有意义。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

job_task_def = GKEStartJobOperator(
    task_id="job_task_def",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME_DEF,
    wait_until_job_complete=True,
    deferrable=True,
)

要在启用了 Kueue 的 GKE 集群上运行 Job,请使用 GKEStartKueueJobOperator

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py

kueue_job_task = GKEStartKueueJobOperator(
    task_id="kueue_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    queue_name=QUEUE_NAME,
    namespace="default",
    parallelism=3,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
)

删除 GKE 集群上的 Job

为了删除 GKE 集群上的 Job,有两个算子可用

GKEDeleteJobOperator 扩展了 KubernetesDeleteJobOperator,以使用 Google Cloud 凭据提供授权。无需管理 kube_config 文件,因为它会自动生成。所有 Kubernetes 参数(config_file 除外)也对 GKEDeleteJobOperator 有效。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

delete_job = GKEDeleteJobOperator(
    task_id="delete_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=JOB_NAME,
    namespace=JOB_NAMESPACE,
)

按给定名称检索 Job 的信息

您可以使用 GKEDescribeJobOperator 通过提供 Job 的名称和命名空间来检索现有 Job 的详细描述。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

describe_job_task = GKEDescribeJobOperator(
    task_id="describe_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    job_name=job_task.output["job_name"],
    namespace="default",
    cluster_name=CLUSTER_NAME,
)

检索 Job 列表

您可以使用 GKEListJobsOperator 检索现有 Job 列表。如果提供了 namespace 参数,输出将包含给定命名空间中的 Job。如果未指定 namespace 参数,将输出所有命名空间中的信息。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

list_job_task = GKEListJobsOperator(
    task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)

在 GKE 集群中创建资源

您可以使用 GKECreateCustomResourceOperator 在指定的 Google Kubernetes Engine 集群中创建资源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py

create_resource_task = GKECreateCustomResourceOperator(
    task_id="create_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

删除 GKE 集群中的资源

您可以使用 GKEDeleteCustomResourceOperator 在指定的 Google Kubernetes Engine 集群中删除资源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py

delete_resource_task = GKEDeleteCustomResourceOperator(
    task_id="delete_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

暂停 GKE 集群上的 Job

您可以使用 GKESuspendJobOperator 在指定的 Google Kubernetes Engine 集群中暂停 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

suspend_job = GKESuspendJobOperator(
    task_id="suspend_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

恢复 GKE 集群上的 Job

您可以使用 GKEResumeJobOperator 在指定的 Google Kubernetes Engine 集群中恢复 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

resume_job = GKEResumeJobOperator(
    task_id="resume_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

参考资料

欲了解更多信息,请参阅

此条目有帮助吗?