Google Kubernetes Engine 运算符¶
Google Kubernetes Engine (GKE) 提供了一个托管环境,用于使用 Google 基础设施部署、管理和扩展容器化应用程序。GKE 环境由多台机器(具体来说是 Compute Engine 实例)组合在一起形成一个集群。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装提供了详细信息。
管理 GKE 集群¶
集群是 GKE 的基础 - 所有工作负载都在集群之上运行。它由集群主节点和工作节点组成。在创建或删除集群时,主节点的生命周期由 GKE 管理。工作节点表示为 Compute Engine VM 实例,当创建集群时,GKE 会代表您创建这些实例。
创建 GKE 集群¶
这是一个集群定义的示例
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}
使用 GKECreateClusterOperator
创建集群时,需要一个像这样的 dict 对象或 Cluster
定义。
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
您可以使用可延迟模式来执行此操作,以便异步运行运算符。它将使您有可能在知道必须等待时释放工作进程,并将恢复运算符的任务移交给触发器。因此,当它被暂停(延迟)时,它不会占用工作进程插槽,并且您的集群将减少大量资源浪费在空闲的运算符或传感器上
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
在集群内安装特定版本的 Kueue¶
Kueue 是一个云原生作业调度器,可与默认的 Kubernetes 调度器、作业控制器和集群自动缩放器配合使用,以提供端到端的批处理系统。Kueue 实现作业排队,根据配额和团队之间公平共享资源的层次结构来决定作业何时应该等待以及何时应该开始。Kueue 支持 Autopilot 集群、具有节点自动配置的标准 GKE 和常规自动缩放的节点池。要借助 GKEStartKueueInsideClusterOperator
在您的集群上安装和使用 Kueue,如本例所示
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
删除 GKE 集群¶
要删除集群,请使用 GKEDeleteClusterOperator
。 这也将删除分配给该集群的所有节点。
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
您可以使用可延迟模式来执行此操作,以便异步运行运算符。它将使您有可能在知道必须等待时释放工作进程,并将恢复运算符的任务移交给触发器。因此,当它被暂停(延迟)时,它不会占用工作进程插槽,并且您的集群将减少大量资源浪费在空闲的运算符或传感器上
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
管理 GKE 集群上的工作负载¶
GKE 可以与容器化的应用程序(例如在 Docker 上创建的应用程序)一起工作,并将它们部署在集群上运行。这些被称为工作负载,当部署在集群上时,它们会利用集群的 CPU 和内存资源来有效运行。
在 GKE 集群上运行 Pod¶
有两个运算符可用于在 GKE 集群上运行 Pod
GKEStartPodOperator
扩展了 KubernetesPodOperator
,以提供使用 Google Cloud 凭据的授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也适用于 GKEStartPodOperator
。KubernetesPodOperator
的更多信息,请查看:KubernetesPodOperator 指南。
与私有集群一起使用¶
所有集群都有一个规范的端点。端点是 Kubernetes API 服务器的 IP 地址,Airflow 使用该地址与您的集群主节点通信。端点显示在 Cloud Console 中集群“详细信息”选项卡的“端点”字段下,以及 gcloud container clusters describe
的输出中的端点字段中。
私有集群有两个唯一的端点值:privateEndpoint
(它是内部 IP 地址)和 publicEndpoint
(它是外部 IP 地址)。对私有集群运行 GKEStartPodOperator
默认将外部 IP 地址设置为端点。如果您希望使用内部 IP 作为端点,则需要将 use_internal_ip
参数设置为 True
。
与 Autopilot(无服务器)集群一起使用¶
在 GKE Autopilot 等无服务器集群上运行时,由于冷启动,Pod 启动有时会花费更长的时间。在 Pod 启动期间,会定期短时间间隔检查状态,如果 Pod 尚未启动,则会发出警告消息。您可以通过 startup_check_interval_seconds
参数增加此间隔长度,建议为 60 秒。
XCom 的使用¶
我们可以在运算符上启用 XCom 的使用。这通过启动带有指定 Pod 的边车容器来实现。当指定 XCom 使用时,边车会自动挂载,其挂载点是路径 /airflow/xcom
。要为 XCom 提供值,请确保您的 Pod 将其写入边车中名为 return.json
的文件中。然后,可以在您的 DAG 中下游使用这些内容。这是一个正在使用的示例
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
然后在其他运算符中使用它
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
您可以使用可延迟模式来执行此操作,以便异步运行运算符。它将使您有可能在知道必须等待时释放工作进程,并将恢复运算符的任务移交给触发器。因此,当它被暂停(延迟)时,它不会占用工作进程插槽,并且您的集群将减少大量资源浪费在空闲的运算符或传感器上
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
在 GKE 集群上运行作业¶
有两个运算符可用于在 GKE 集群上运行作业
GKEStartJobOperator
扩展了 KubernetesJobOperator
,以提供使用 Google Cloud 凭据的授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也适用于 GKEStartJobOperator
。
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator
也支持可延迟模式。请注意,只有当 wait_until_job_complete
参数设置为 True
时,它才有意义。
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
要在启用了 Kueue 的 GKE 集群上运行作业,请使用 GKEStartKueueJobOperator
。
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
删除 GKE 集群上的作业¶
有两个运算符可用于删除 GKE 集群上的作业
GKEDeleteJobOperator
扩展了 KubernetesDeleteJobOperator
,以提供使用 Google Cloud 凭据的授权。无需管理 kube_config
文件,因为它会自动生成。所有 Kubernetes 参数(config_file
除外)也适用于 GKEDeleteJobOperator
。
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
检索有关给定名称的作业的信息¶
您可以使用 GKEDescribeJobOperator
,通过提供其名称和命名空间来检索有关现有作业的详细说明。
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
检索作业列表¶
您可以使用 GKEListJobsOperator
来检索现有 Job 的列表。如果提供了 namespace
参数,则输出将包括给定命名空间中的 Job。如果未指定 namespace
参数,则将输出所有命名空间中的信息。
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
在 GKE 集群中创建资源¶
您可以使用 GKECreateCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中创建资源。
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
在 GKE 集群中删除资源¶
您可以使用 GKEDeleteCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中删除资源。
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
暂停 GKE 集群上的 Job¶
您可以使用 GKESuspendJobOperator
在指定的 Google Kubernetes Engine 集群中暂停 Job。
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
恢复 GKE 集群上的 Job¶
您可以使用 GKEResumeJobOperator
在指定的 Google Kubernetes Engine 集群中恢复 Job。
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)