Google Kubernetes Engine 算子¶
Google Kubernetes Engine (GKE) 提供一个托管环境,用于利用 Google 基础设施部署、管理和扩展容器化应用程序。GKE 环境由多台机器(具体来说是 Compute Engine 实例)组成,它们组合在一起形成一个集群。
先决条件任务¶
要使用这些算子,您必须完成以下几项工作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,具体方法请参阅 Google Cloud 文档。
启用 API,具体方法请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅安装。
管理 GKE 集群¶
集群是 GKE 的基础 - 所有工作负载都在集群之上运行。它由集群主节点和工作节点组成。在创建或删除集群时,主节点的生命周期由 GKE 管理。工作节点表示为 Compute Engine 虚拟机实例,GKE 在您创建集群时代表您创建这些实例。
创建 GKE 集群¶
以下是集群定义的示例
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}
使用 GKECreateClusterOperator
创建集群时,需要一个像这样的 dict 对象,或一个 Cluster
定义。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
在集群内部安装特定版本的 Kueue¶
Kueue 是一个云原生 Job 调度器,它与默认的 Kubernetes 调度器、Job 控制器和集群自动扩缩器协同工作,以提供端到端的批处理系统。Kueue 实现 Job 排队,根据配额和公平共享资源的分层结构决定 Job 何时等待以及何时开始。Kueue 支持 Autopilot 集群、带有节点自动预配功能的标准 GKE 和常规的自动扩缩节点池。借助 GKEStartKueueInsideClusterOperator
在您的集群上安装和使用 Kueue,如本示例所示
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
删除 GKE 集群¶
要删除集群,请使用 GKEDeleteClusterOperator
。这也会删除分配给集群的所有节点。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
管理 GKE 集群上的工作负载¶
GKE 与容器化应用程序协同工作,例如在 Docker 上创建的应用程序,并将它们部署到集群上运行。这些称为工作负载,部署在集群上时,它们利用集群的 CPU 和内存资源来有效运行。
在 GKE 集群上运行 Pod¶
为了在 GKE 集群上运行 Pod,有两个算子可用
GKEStartPodOperator
扩展了 KubernetesPodOperator
,以使用 Google Cloud 凭据提供授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也对 GKEStartPodOperator
有效。有关 KubernetesPodOperator
的更多信息,请参阅:KubernetesPodOperator 指南。
与私有集群一起使用¶
所有集群都有一个规范端点。该端点是 Kubernetes API 服务器的 IP 地址,Airflow 使用它与您的集群主节点通信。该端点显示在 Cloud Console 中集群“详细信息”选项卡的“端点”字段下,以及 gcloud container clusters describe
命令输出的 endpoint 字段中。
私有集群有两个唯一的端点值:privateEndpoint
(内部 IP 地址)和 publicEndpoint
(外部 IP 地址)。默认情况下,对私有集群运行 GKEStartPodOperator
会将外部 IP 地址设置为端点。如果您倾向于使用内部 IP 作为端点,则需要将 use_internal_ip
参数设置为 True
。
与 DNS 端点集群一起使用¶
要在使用 DNS 端点时运行 GKEStartPodOperator
,您需要将 use_dns_endpoint
参数设置为 True
。
与 Autopilot(无服务器)集群一起使用¶
在像 GKE Autopilot 这样的无服务器集群上运行时,由于冷启动,Pod 启动有时会花费更长时间。在 Pod 启动期间,会定期短间隔检查状态,如果 Pod 尚未启动,则会发出警告消息。您可以通过 startup_check_interval_seconds
参数增加此间隔长度,建议设置为 60 秒。
XCom 的使用¶
我们可以在算子上启用 XCom 的使用。其工作原理是使用指定的 Pod 启动一个 Sidecar 容器。指定使用 XCom 时,Sidecar 会自动挂载,其挂载点路径为 /airflow/xcom
。要为 XCom 提供值,请确保您的 Pod 将其写入 Sidecar 中一个名为 return.json
的文件。该文件的内容可在您的 DAG 的下游使用。以下是使用示例
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
然后在其他算子中使用它
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom') }}\"",
task_id="pod_task_xcom_result",
)
您可以为此操作使用 deferrable 模式,以便异步运行算子。当算子知道需要等待时,它可以释放工作进程,并将恢复算子的工作交给 Trigger。因此,当它暂停(延迟)时,不会占用工作进程槽,并且您的集群将不会在空闲的 Operators 或 Sensors 上浪费大量资源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
在 GKE 集群上运行 Job¶
为了在 GKE 集群上运行 Job,有两个算子可用
GKEStartJobOperator
扩展了 KubernetesJobOperator
,以使用 Google Cloud 凭据提供授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也对 GKEStartJobOperator
有效。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator
也支持 deferrable 模式。请注意,仅当 wait_until_job_complete
参数设置为 True
时,此模式才有意义。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
要在启用了 Kueue 的 GKE 集群上运行 Job,请使用 GKEStartKueueJobOperator
。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
删除 GKE 集群上的 Job¶
为了删除 GKE 集群上的 Job,有两个算子可用
GKEDeleteJobOperator
扩展了 KubernetesDeleteJobOperator
,以使用 Google Cloud 凭据提供授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也对 GKEDeleteJobOperator
有效。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
按给定名称检索 Job 的信息¶
您可以使用 GKEDescribeJobOperator
通过提供 Job 的名称和命名空间来检索现有 Job 的详细描述。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
检索 Job 列表¶
您可以使用 GKEListJobsOperator
检索现有 Job 列表。如果提供了 namespace
参数,输出将包含给定命名空间中的 Job。如果未指定 namespace
参数,将输出所有命名空间中的信息。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
在 GKE 集群中创建资源¶
您可以使用 GKECreateCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中创建资源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
删除 GKE 集群中的资源¶
您可以使用 GKEDeleteCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中删除资源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
暂停 GKE 集群上的 Job¶
您可以使用 GKESuspendJobOperator
在指定的 Google Kubernetes Engine 集群中暂停 Job。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
恢复 GKE 集群上的 Job¶
您可以使用 GKEResumeJobOperator
在指定的 Google Kubernetes Engine 集群中恢复 Job。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
参考资料¶
欲了解更多信息,请参阅