Kubernetes 执行器

注意

从 Airflow 2.7.0 版本开始,你需要安装 cncf.kubernetes 提供程序包才能使用此执行器。可以通过安装 apache-airflow-providers-cncf-kubernetes>=7.4.0 或使用 cncf.kubernetes 附加功能安装 Airflow 来完成:pip install 'apache-airflow[cncf.kubernetes]'

Kubernetes 执行器在 Kubernetes 集群中自己的 Pod 中运行每个任务实例。

KubernetesExecutor 作为 Airflow 调度器中的一个进程运行。调度器本身不一定需要在 Kubernetes 上运行,但需要访问 Kubernetes 集群。

KubernetesExecutor 需要后端使用非 SQLite 数据库。

当 DAG 提交一个任务时,KubernetesExecutor 会从 Kubernetes API 请求一个工作 Pod。然后,工作 Pod 运行任务,报告结果并终止。

_images/arch-diag-kubernetes.png

下面显示了一个在 Kubernetes 集群中分布在五个节点上运行的 Airflow 部署示例。

_images/arch-diag-kubernetes2.png

与常规 Airflow 架构一致,工作节点需要访问 DAG 文件才能执行这些 DAG 中的任务并与元数据存储库交互。此外,需要在 Airflow 配置文件中指定 Kubernetes 执行器特定的配置信息,例如工作节点命名空间和镜像信息。

此外,Kubernetes 执行器允许使用执行器配置在每个任务的基础上指定其他功能。

_images/k8s-happy-path.png

配置

pod_template_file

要自定义用于 k8s 执行器工作进程的 Pod,你可以创建一个 Pod 模板文件。你必须在 airflow.cfgkubernetes_executor 部分的 pod_template_file 选项中提供模板文件的路径。

Airflow 对 Pod 模板文件有两个严格的要求:基础镜像和 Pod 名称。

基础镜像

pod_template_file 必须在 spec.containers[0] 位置有一个名为 base 的容器,并且必须指定其 image

你可以自由地在此必需容器之后创建 sidecar 容器,但 Airflow 假设 airflow 工作容器存在于容器数组的开头,并假设该容器名为 base

注意

Airflow 可能会覆盖基础容器 image,例如通过 pod_override 配置;但它必须存在于模板文件中,并且不能为空。

Pod 名称

Pod 的 metadata.name 必须在模板文件中设置。此字段将始终在 Pod 启动时动态设置,以保证所有 Pod 的唯一性。但是,它必须再次包含在模板中,并且不能为空。

Pod 模板示例

考虑到这些要求,以下是一些基本的 pod_template_file YAML 文件示例。

注意

以下示例在使用默认 Airflow 配置值时应该可以正常工作。但是,许多自定义配置值也需要通过此模板显式传递给 Pod。这包括但不限于 SQL 配置、所需的 Airflow 连接、DAG 文件夹路径和日志设置。有关详细信息,请参阅 配置参考

将 DAG 存储在镜像中

---
apiVersion: v1
kind: Pod
metadata:
  name: placeholder-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

将 DAG 存储在 persistentVolume

---
apiVersion: v1
kind: Pod
metadata:
  name: placeholder-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

git 拉取 DAG

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  initContainers:
    - name: git-sync
      image: "registry.k8s.io/git-sync/git-sync:v3.6.3"
      env:
        - name: GIT_SYNC_BRANCH
          value: "v2-2-stable"
        - name: GIT_SYNC_REPO
          value: "https://github.com/apache/airflow.git"
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: "/git"
        - name: GIT_SYNC_DEST
          value: "repo"
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      volumeMounts:
        - name: airflow-dags
          mountPath: /git
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          subPath: repo/airflow/example_dags
          readOnly: false
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      emptyDir: {}
    - name: airflow-logs
      emptyDir: {}
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

pod_override

使用 KubernetesExecutor 时,Airflow 允许在每个任务的基础上覆盖系统默认值。要利用此功能,请创建一个 Kubernetes V1pod 对象并填写你想要的覆盖。

要覆盖 KubernetesExecutor 启动的 Pod 的基础容器,请创建一个带有单个容器的 V1pod,并按如下方式覆盖字段

airflow/example_dags/example_kubernetes_executor.py[源代码]

        executor_config_volume_mount = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[
                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
                            ],
                        )
                    ],
                    volumes=[
                        k8s.V1Volume(
                            name="example-kubernetes-test-volume",
                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                        )
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_volume_mount)
        def test_volume_mount():
            """
            Tests whether the volume has been mounted.
            """

            with open("/foo/volume_mount_test.txt", "w") as foo:
                foo.write("Hello")

            return_code = os.system("cat /foo/volume_mount_test.txt")
            if return_code != 0:
                raise ValueError(f"Error when checking volume mount. Return code {return_code}")

        volume_task = test_volume_mount()

请注意,以下字段将全部扩展而不是覆盖。从spec:volumes 和 init_containers。从container:卷挂载、环境变量、端口和设备。

要向启动的 Pod 添加 sidecar 容器,请创建一个带有空第一个容器(名称为 base)和一个包含你想要的 sidecar 的第二个容器的 V1pod。

airflow/example_dags/example_kubernetes_executor.py[源代码]

        executor_config_sidecar = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                        k8s.V1Container(
                            name="sidecar",
                            image="ubuntu",
                            args=['echo "retrieved from mount" > /shared/test.txt'],
                            command=["bash", "-cx"],
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                    ],
                    volumes=[
                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_sidecar)
        def test_sharedvolume_mount():
            """
            Tests whether the volume has been mounted.
            """
            for i in range(5):
                try:
                    return_code = os.system("cat /shared/test.txt")
                    if return_code != 0:
                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
                except ValueError as e:
                    if i > 4:
                        raise e

        sidecar_task = test_sharedvolume_mount()

你还可以在每个任务的基础上创建自定义的 pod_template_file,以便可以在多个任务之间重复使用相同的基本值。这将替换 airflow.cfg 中命名的默认 pod_template_file,然后使用 pod_override 覆盖该模板。

以下是一个同时具有这两个功能的任务示例

import os

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME

from kubernetes.client import models as k8s

with DAG(
    dag_id="example_pod_template_file",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    executor_config_template = {
        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
    }

    @task(executor_config=executor_config_template)
    def task_with_template():
        print_stuff()

管理 DAG 和日志

使用持久卷是可选的,具体取决于你的配置。

  • DAG:

要将 DAG 获取到工作节点中,你可以

  • 将 DAG 包含在镜像中。

  • 使用 git-sync,它会在启动工作容器之前运行 DAG 存储库的 git pull

  • 将 DAG 存储在持久卷上,该持久卷可以安装在所有工作节点上。

  • 日志:

要将任务日志从工作节点中取出,你可以

  • 使用一个同时安装在 Web 服务器和工作节点上的持久卷。

  • 启用远程日志记录。

注意

如果你不启用日志持久性,并且你没有启用远程日志记录,则日志将在工作 Pod 关闭后丢失。

与 CeleryExecutor 的比较

与 CeleryExecutor 相比,KubernetesExecutor 不需要 Redis 等其他组件,但需要访问 Kubernetes 集群。

此外,可以使用内置的 Kubernetes 监控来监控 Pod。

使用 KubernetesExecutor,每个任务都在自己的 Pod 中运行。Pod 在任务排队时创建,并在任务完成时终止。从历史上看,在诸如突发工作负载之类的场景中,这比 CeleryExecutor 具有资源利用率优势,在 CeleryExecutor 中,你需要固定数量的长期运行的 Celery 工作 Pod,无论是否有任务要运行。

但是,官方 Apache Airflow Helm chart 可以根据队列中的任务数量自动将 Celery 工作节点缩减为零,因此,当使用官方 chart 时,这不再是一个优势。

使用 Celery 工作节点,你往往会有更少的任务延迟,因为工作 Pod 在任务排队时已经启动并运行。另一方面,由于多个任务在同一个 Pod 中运行,因此使用 Celery 时,你可能需要更加注意任务设计中的资源利用率,尤其是内存消耗。

KubernetesExecutor 可以提供帮助的一种场景是,如果你有长期运行的任务,因为如果你在任务运行时进行部署,该任务将继续运行直到完成(或超时等)。但是,使用 CeleryExecutor,只要你设置了宽限期,任务将只运行到宽限期过去,此时任务将被终止。KubernetesExecutor 可以很好地工作的另一个场景是,你的任务在资源需求或镜像方面不是很统一。

最后,请注意,它不一定是两者择一;使用 CeleryKubernetesExecutor,可以在同一群集上同时使用 CeleryExecutor 和 KubernetesExecutor。CeleryKubernetesExecutor 将查看任务的 queue 来确定是在 Celery 上还是在 Kubernetes 上运行。默认情况下,任务会被发送到 Celery 工作节点,但如果你希望任务使用 KubernetesExecutor 运行,则将其发送到 kubernetes 队列,它将在自己的 Pod 中运行。无论你使用什么执行器,都可以使用 KubernetesPodOperator 实现类似的效果。

容错

提示

要排查 KubernetesExecutor 的问题,你可以使用 airflow kubernetes generate-dag-yaml 命令。此命令会生成将要在 Kubernetes 中启动的 Pod,并将它们转储到 YAML 文件中供你检查。

处理工作 Pod 崩溃

在处理分布式系统时,我们需要一个假设任何组件都可能在任何时刻崩溃的系统,原因包括 OOM 错误到节点升级。

如果工作节点在向后端数据库报告其状态之前死亡,则执行器可以使用 Kubernetes 监视器线程来发现失败的 Pod。

_images/k8s-failed-pod.png

Kubernetes 观察器是一个线程,它可以订阅 Kubernetes 数据库中发生的每一个更改。当 Pod 启动、运行、结束和失败时,它会收到警报。通过监控这个流,KubernetesExecutor 可以发现工作节点崩溃并正确地将任务报告为失败。

但是如果调度器 Pod 崩溃了怎么办?

在调度器崩溃的情况下,调度器将使用观察器的 resourceVersion 恢复其状态。

当监控 Kubernetes 集群的观察器线程时,每个事件都有一个单调递增的数字,称为 resourceVersion。每次执行器读取 resourceVersion 时,执行器都会将最新的值存储在后端数据库中。由于 resourceVersion 被存储,调度器可以重新启动并从上次停止的位置继续读取观察器流。由于任务是独立于执行器运行的,并将结果直接报告给数据库,因此调度器故障不会导致任务失败或重新运行。

此条目是否有帮助?