KubernetesPodOperator¶
KubernetesPodOperator
允许您在 Kubernetes 集群上创建和运行 Pod。
注意
如果您使用托管 Kubernetes,请考虑使用专门的 KPO Operator,因为它简化了 Kubernetes 授权过程
GKEStartPodOperator Operator for Google Kubernetes Engine,
EksPodOperator Operator for AWS Elastic Kubernetes Engine.
注意
使用此 Operator **不**需要Kubernetes executor。
此 Operator 如何工作?¶
KubernetesPodOperator
使用 Kubernetes API 在 Kubernetes 集群中启动一个 pod。通过提供镜像 URL 和带有可选参数的命令,Operator 使用 Kube Python Client 生成 Kubernetes API 请求,动态启动这些单独的 pod。用户可以使用 config_file
参数指定 kubeconfig 文件,否则 Operator 将默认使用 ~/.kube/config
。
KubernetesPodOperator
支持任务级别的资源配置,对于通过公共 PyPI 仓库无法获得的自定义 Python 依赖非常有用。它还允许用户使用 pod_template_file
参数提供模板 YAML 文件。最终,它使 Airflow 能够充当作业编排器 - 无论这些作业是用何种语言编写的。
调试 KubernetesPodOperator¶
通过在 Operator 实例上调用 dry_run()
,您可以打印出运行时将创建的 pod 的 Kubernetes manifest。
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)
k.dry_run()
参数优先级¶
当 KPO 定义 pod 对象时,KubernetesPodOperator
参数之间可能存在重叠。一般来说,优先级顺序是 KPO 字段特定的参数(例如 secrets
、cmds
、affinity
),然后是更通用的模板 full_pod_spec
、pod_template_file
、pod_template_dict
,最后默认是 V1Pod
。
对于 namespace
,如果未通过任何这些方法提供 namespace,那么我们将首先尝试获取当前 namespace(如果任务已在 Kubernetes 中运行),如果失败,我们将使用 default
namespace。
对于 pod 名称,如果未明确提供,我们将使用 task_id。默认情况下会添加一个随机后缀,因此 pod 名称通常不太重要。
如何在 Pod 中使用集群 ConfigMaps、Secrets 和 Volumes?¶
要添加 ConfigMaps、Volumes 和其他 Kubernetes 原生对象,我们建议您像这样导入 Kubernetes 模型 API
from kubernetes.client import models as k8s
通过此 API 对象,您可以以 Python 类形式访问所有 Kubernetes API 对象。使用此方法将确保正确性和类型安全。虽然我们几乎删除了所有 Kubernetes 便捷类,但我们保留了 Secret
类,以简化生成 secret volumes/env variables 的过程。
tests/system/cncf/kubernetes/example_kubernetes.py
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
configmaps = [
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
init_container_volume_mounts = [
k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]
init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]
init_container = k8s.V1Container(
name="init-container",
image="ubuntu:16.04",
env=init_environments,
volume_mounts=init_container_volume_mounts,
command=["bash", "-cx"],
args=["echo 10"],
)
affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s.V1PreferredSchedulingTerm(
weight=1,
preference=k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
]
),
)
]
),
pod_affinity=k8s.V1PodAffinity(
required_during_scheduling_ignored_during_execution=[
k8s.V1WeightedPodAffinityTerm(
weight=1,
pod_affinity_term=k8s.V1PodAffinityTerm(
label_selector=k8s.V1LabelSelector(
match_expressions=[
k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
]
),
topology_key="failure-domain.beta.kubernetes.io/zone",
),
)
]
),
)
tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
KubernetesPodOperator
与 Kubernetes 对象规范的区别¶
KubernetesPodOperator
可以被视为 Kubernetes 对象规范定义的替代品,它能够在 DAG 上下文中的 Airflow 调度器中运行。如果使用此 Operator,则无需为您想要运行的 Pod 创建等效的 YAML/JSON 对象规范。仍然可以使用 pod_template_file
提供 YAML 文件,甚至可以通过 full_pod_spec
参数在 Python 中构建 Pod Spec,该参数需要一个 Kubernetes V1Pod
。
如何使用私有镜像(容器 registry)?¶
默认情况下,KubernetesPodOperator
会查找在 Dockerhub 上公开托管的镜像。要从私有 registry(例如 ECR、GCR、Quay 或其他)拉取镜像,您必须创建一个 Kubernetes Secret,它代表访问私有 registry 中镜像的凭据,最终在 image_pull_secrets
参数中指定该 Secret。
使用 kubectl
创建 Secret
kubectl create secret docker-registry testquay \
--docker-server=quay.io \
--docker-username=<Profile name> \
--docker-password=<password>
然后在您的 pod 中像这样使用它
tests/system/cncf/kubernetes/example_kubernetes.py
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
task_id="task-two",
get_logs=True,
)
此外,您还可以此操作中使用可延迟模式的 Operator
tests/system/cncf/kubernetes/example_kubernetes_async.py
quay_k8s_async = KubernetesPodOperator(
task_id="kubernetes_private_img_task_async",
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
定期获取并显示容器日志的示例
tests/system/cncf/kubernetes/example_kubernetes_async.py
kubernetes_task_async_log = KubernetesPodOperator(
task_id="kubernetes_task_async_log",
namespace="kubernetes_task_async_log",
in_cluster=False,
name="astro_k8s_test_pod",
image="ubuntu",
cmds=[
"bash",
"-cx",
(
"i=0; "
"while [ $i -ne 100 ]; "
"do i=$(($i+1)); "
"echo $i; "
"sleep 1; "
"done; "
"mkdir -p /airflow/xcom/; "
'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
),
],
do_xcom_push=True,
deferrable=True,
get_logs=True,
logging_interval=5,
)
XCom 如何工作?¶
KubernetesPodOperator
处理 XCom 值的方式与其他 Operator 不同。为了从您的 Pod 中传递 XCom 值,您必须将 do_xcom_push
指定为 True
。这将创建一个与 Pod 一起运行的 sidecar 容器。Pod 必须将 XCom 值写入 /airflow/xcom/return.json
路径下的此位置。
注意
无效的 json 内容将导致失败,例如 echo 'hello' > /airflow/xcom/return.json
会失败,而 echo '\"hello\"' > /airflow/xcom/return.json
会成功
请参阅以下示例,了解这是如何发生的
tests/system/cncf/kubernetes/example_kubernetes.py
write_xcom = KubernetesPodOperator(
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
write_xcom >> pod_task_xcom_result
注意
XCOMs 只会为标记为 State.SUCCESS
的任务推送。
此外,您还可以此操作中使用可延迟模式的 Operator
tests/system/cncf/kubernetes/example_kubernetes_async.py
write_xcom_async = KubernetesPodOperator(
task_id="kubernetes_write_xcom_task_async",
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
pod_task_xcom_result_async = BashOperator(
task_id="pod_task_xcom_result_async",
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
)
write_xcom_async >> pod_task_xcom_result_async
在电子邮件警报中包含错误消息¶
写入 /dev/termination-log
的任何内容都将被 Kubernetes 检索,并在任务失败时包含在异常消息中。
k = KubernetesPodOperator(
task_id="test_error_message",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-error-message",
email="airflow@example.com",
email_on_failure=True,
)
在此处阅读有关 termination-log 的更多信息:此处。
KubernetesPodOperator 回调¶
KubernetesPodOperator
支持不同的回调,可在 pod 的生命周期中触发操作。要使用它们,您需要创建一个 KubernetesPodOperatorCallback
的子类,并重写您想要使用的回调方法。然后,您可以使用 callbacks
参数将您的回调类传递给 Operator。
支持以下回调
on_sync_client_creation: 在创建同步客户端后调用
on_pod_creation: 在创建 pod 后调用
on_pod_starting: 在 pod 启动后调用
on_pod_completion: 在 pod 完成时调用
on_pod_cleanup: 在清理/删除 pod 后调用
on_operator_resuming: 从延迟状态恢复任务时调用
progress_callback: 在容器日志的每一行上调用
目前,回调方法在异步模式下不被调用,未来将添加此支持。
示例:¶
import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
class MyCallback(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
client.create_namespaced_service(
namespace=pod.metadata.namespace,
body=k8s.V1Service(
metadata=k8s.V1ObjectMeta(
name=pod.metadata.name,
labels=pod.metadata.labels,
owner_references=[
k8s.V1OwnerReference(
api_version=pod.api_version,
kind=pod.kind,
name=pod.metadata.name,
uid=pod.metadata.uid,
controller=True,
block_owner_deletion=True,
)
],
),
spec=k8s.V1ServiceSpec(
selector=pod.metadata.labels,
ports=[
k8s.V1ServicePort(
name="http",
port=80,
target_port=80,
)
],
),
),
)
k = KubernetesPodOperator(
task_id="test_callback",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-callback",
callbacks=MyCallback,
)
传递 Secrets¶
切勿使用环境变量将 Secrets(例如连接认证信息)传递给 Kubernetes Pod Operator。此类环境变量对任何有权查看和描述 Kubernetes 中的 POD 的人都是可见的。相反,请通过原生的 Kubernetes Secrets
传递您的 secrets,或使用 Airflow 的 Connections 和 Variables。对于后者,您需要在镜像中安装与运行 Kubernetes Pod Operator 的 airflow 版本相同的 apache-airflow
包)。
参考¶
有关更多信息,请参阅
SparkKubernetesOperator¶
SparkKubernetesOperator
允许您在 Kubernetes 集群上创建和运行 Spark 作业。它基于 spark-on-k8s-operator 项目。
此 Operator 简化了接口并接受不同的参数来配置和在 Kubernetes 上运行 Spark 应用程序。与 KubernetesOperator 类似,我们添加了提交作业后等待、管理错误处理、从 driver pod 检索日志以及删除 Spark 作业的能力的逻辑。它还支持开箱即用的 Kubernetes 功能,例如处理 volumes、config maps、secrets 等。
此 Operator 如何工作?¶
该 Operator 通过在 Kubernetes 中生成 SparkApplication Custom Resource Definition (CRD) 来启动 Spark 任务。此 SparkApplication 任务随后使用用户指定的参数生成 driver pod 和所需的 executor pod。Operator 持续监控任务进度,直到成功或失败。它从 driver pod 检索日志并在 Airflow UI 中显示。
使用示例¶
为了创建 SparkKubernetesOperator 任务,您必须提供一个包含 Spark 配置和 Kubernetes 相关资源配置的基本模板。此模板可以是 YAML 或 JSON 格式,作为 Operator 的起点。下面是一个可供您使用的示例模板
spark_job_template.yaml
spark:
apiVersion: sparkoperator.k8s.io/v1beta2
version: v1beta2
kind: SparkApplication
apiGroup: sparkoperator.k8s.io
metadata:
namespace: ds
spec:
type: Python
pythonVersion: "3"
mode: cluster
sparkVersion: 3.0.0
successfulRunHistoryLimit: 1
restartPolicy:
type: Never
imagePullPolicy: Always
hadoopConf: {}
imagePullSecrets: []
dynamicAllocation:
enabled: false
initialExecutors: 1
minExecutors: 1
maxExecutors: 1
labels: {}
driver:
serviceAccount: default
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
executor:
instances: 1
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
kubernetes:
# example:
# env_vars:
# - name: TEST_NAME
# value: TEST_VALUE
env_vars: []
# example:
# env_from:
# - name: test
# valueFrom:
# secretKeyRef:
# name: mongo-secret
# key: mongo-password
env_from: []
# example:
# node_selector:
# karpenter.sh/provisioner-name: spark
node_selector: {}
# example: https://kubernetes.ac.cn/docs/concepts/scheduling-eviction/assign-pod-node/
# affinity:
# nodeAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# nodeSelectorTerms:
# - matchExpressions:
# - key: beta.kubernetes.io/instance-type
# operator: In
# values:
# - r5.xlarge
affinity:
nodeAffinity: {}
podAffinity: {}
podAntiAffinity: {}
# example: https://kubernetes.ac.cn/docs/concepts/scheduling-eviction/taint-and-toleration/
# type: list
# tolerations:
# - key: "key1"
# operator: "Equal"
# value: "value1"
# effect: "NoSchedule"
tolerations: []
# example:
# config_map_mounts:
# snowflake-default: /mnt/tmp
config_map_mounts: {}
# example:
# volume_mounts:
# - name: config
# mountPath: /airflow
volume_mounts: []
# https://kubernetes.ac.cn/docs/concepts/storage/volumes/
# example:
# volumes:
# - name: config
# persistentVolumeClaim:
# claimName: airflow
volumes: []
# read config map into an env variable
# example:
# from_env_config_map:
# - configmap_1
# - configmap_2
from_env_config_map: []
# load secret into an env variable
# example:
# from_env_secret:
# - secret_1
# - secret_2
from_env_secret: []
in_cluster: true
conn_id: kubernetes_default
kube_config_file: null
cluster_context: null
重要提示
模板文件包含两个主要类别:
spark
和kubernetes
。spark: 此部分包含任务的 Spark 配置,镜像了 Spark API 模板的结构。
kubernetes: 此部分包含任务的 Kubernetes 资源配置,直接对应 Kubernetes API 文档。每种资源类型在模板中都包含一个示例。
指定使用的基础镜像为
gcr.io/spark-operator/spark-py:v3.1.1
。确保 Spark 代码嵌入在镜像中、使用 persistentVolume 挂载,或从外部位置(如 S3 存储桶)访问。
接下来,使用以下方式创建任务
SparkKubernetesOperator(
task_id="spark_task",
image="gcr.io/spark-operator/spark-py:v3.1.1", # OR custom image using that
code_path="local://path/to/spark/code.py",
application_file="spark_job_template.yaml", # OR spark_job_template.json
dag=dag,
)
注意:application_file 也可以是 JSON 文件。请参阅以下示例
spark_job_template.json
{
"spark": {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"version": "v1beta2",
"kind": "SparkApplication",
"apiGroup": "sparkoperator.k8s.io",
"metadata": {
"namespace": "ds"
},
"spec": {
"type": "Python",
"pythonVersion": "3",
"mode": "cluster",
"sparkVersion": "3.0.0",
"successfulRunHistoryLimit": 1,
"restartPolicy": {
"type": "Never"
},
"imagePullPolicy": "Always",
"hadoopConf": {},
"imagePullSecrets": [],
"dynamicAllocation": {
"enabled": false,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 1
},
"labels": {},
"driver": {
"serviceAccount": "default",
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
},
"executor": {
"instances": 1,
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
}
}
},
"kubernetes": {
"env_vars": [],
"env_from": [],
"node_selector": {},
"affinity": {
"nodeAffinity": {},
"podAffinity": {},
"podAntiAffinity": {}
},
"tolerations": [],
"config_map_mounts": {},
"volume_mounts": [
{
"name": "config",
"mountPath": "/airflow"
}
],
"volumes": [
{
"name": "config",
"persistentVolumeClaim": {
"claimName": "hsaljoog-airflow"
}
}
],
"from_env_config_map": [],
"from_env_secret": [],
"in_cluster": true,
"conn_id": "kubernetes_default",
"kube_config_file": null,
"cluster_context": null
}
}
除了使用 YAML 或 JSON 文件外,另一种方法是直接传递 template_spec
字段,而不是 application_file,如果您不想使用文件进行配置。
参考¶
有关更多信息,请参阅
KubernetesJobOperator¶
KubernetesJobOperator
允许您在 Kubernetes 集群上创建和运行 Job。
注意
如果您使用托管 Kubernetes,请考虑使用专门的 KJO Operator,因为它简化了 Kubernetes 授权过程
Google Kubernetes Engine 的
GKEStartJobOperator
Operator。
注意
使用此 Operator **不**需要Kubernetes executor。
此 Operator 如何工作?¶
KubernetesJobOperator
使用 Kubernetes API 在 Kubernetes 集群中启动一个 Job。Operator 使用 Kube Python Client 生成 Kubernetes API 请求,动态启动此 Job。用户可以使用 config_file
参数指定 kubeconfig 文件,否则 Operator 将默认使用 ~/.kube/config
。它还允许用户使用 job_template_file
参数提供模板 YAML 文件。
tests/system/cncf/kubernetes/example_kubernetes_job.py
k8s_job = KubernetesJobOperator(
task_id="job-task",
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
KubernetesJobOperator
也支持可延迟模式
tests/system/cncf/kubernetes/example_kubernetes_job.py
k8s_job_def = KubernetesJobOperator(
task_id="job-task-def",
namespace="default",
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME + "-def",
wait_until_job_complete=True,
deferrable=True,
)
KubernetesPodOperator
与 KubernetesJobOperator
的区别¶
KubernetesJobOperator
是用于创建 Job 的 Operator。Job 会创建一个或多个 Pod,并会持续重试执行 Pod,直到指定数量的 Pod 成功终止。随着 Pod 成功完成,Job 会跟踪成功完成的数量。当达到指定数量的成功完成时,Job 就完成了。用户可以使用 activeDeadlineSeconds
和 backoffLimit
等配置参数限制 Job 重试执行的次数。此 Operator 使用 KubernetesPodOperator
来创建 Pod,而不是使用 template
参数。这意味着用户可以在 KubernetesJobOperator
中使用 KubernetesPodOperator
的所有参数。
此处有关 Jobs 的更多信息:Kubernetes Job Documentation
KubernetesDeleteJobOperator¶
KubernetesDeleteJobOperator
允许您在 Kubernetes 集群上删除 Job。
tests/system/cncf/kubernetes/example_kubernetes_job.py
delete_job_task = KubernetesDeleteJobOperator(
task_id="delete_job_task",
name=k8s_job.output["job_name"],
namespace=JOB_NAMESPACE,
wait_for_completion=True,
delete_on_status="Complete",
poll_interval=1.0,
)
KubernetesPatchJobOperator¶
KubernetesPatchJobOperator
允许您在 Kubernetes 集群上更新 Job。
tests/system/cncf/kubernetes/example_kubernetes_job.py
update_job = KubernetesPatchJobOperator(
task_id="update-job-task",
namespace="default",
name=k8s_job.output["job_name"],
body={"spec": {"suspend": False}},
)
KubernetesInstallKueueOperator¶
KubernetesInstallKueueOperator
允许您在 Kubernetes 集群中安装 Kueue 组件
tests/system/cncf/kubernetes/example_kubernetes_kueue.py
install_kueue = KubernetesInstallKueueOperator(
task_id="install_kueue",
kueue_version="v0.9.1",
)
参考¶
有关更多信息,请参阅
KubernetesStartKueueJobOperator¶
KubernetesStartKueueJobOperator
允许您在 Kubernetes 集群中启动 Kueue 作业
tests/system/cncf/kubernetes/example_kubernetes_kueue.py
start_kueue_job = KubernetesStartKueueJobOperator(
task_id="kueue_job",
queue_name=QUEUE_NAME,
namespace="default",
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
wait_until_job_complete=True,
)
有关更多信息,请参阅