Kubernetes 执行器

注意

从 Airflow 2.7.0 起,您需要安装 cncf.kubernetes 提供程序包才能使用此执行器。可以通过安装 apache-airflow-providers-cncf-kubernetes>=7.4.0,或使用 cncf.kubernetes extras 安装 Airflow:pip install 'apache-airflow[cncf.kubernetes]'

Kubernetes 执行器将在 Kubernetes 集群中为每个任务实例运行一个独立的 pod。

KubernetesExecutor 作为 Airflow Scheduler 中的一个进程运行。Scheduler 本身不一定需要运行在 Kubernetes 上,但需要能够访问 Kubernetes 集群。

KubernetesExecutor 需要后端使用非 SQLite 数据库。

当 Dag 提交任务时,KubernetesExecutor 会向 Kubernetes API 请求一个工作者 pod。该 worker pod 随后运行任务、报告结果并终止。

_images/arch-diag-kubernetes.png

下面展示了一个在 Kubernetes 集群中由五个节点组成的分布式 Airflow 部署示例。

_images/arch-diag-kubernetes2.png

与常规 Airflow 架构保持一致,Workers 需要访问 Dag 文件才能执行这些 Dag 中的任务并与元数据仓库交互。同时,需要在 Airflow 配置文件中指定 Kubernetes Executor 的特定配置信息,如 worker 命名空间和镜像信息。

此外,Kubernetes Executor 允许使用 Executor 配置在每个任务级别上指定额外特性。

_images/k8s-happy-path.png

配置

pod_template_file

要自定义用于 k8s executor 工作进程的 pod,您可以创建一个 pod 模板文件。必须在 airflow.cfgkubernetes_executor 部分的 pod_template_file 选项中提供模板文件的路径。

Airflow 对 pod 模板文件有两个严格要求:基础镜像和 pod 名称。

Base image

pod_template_file 必须在 spec.containers[0] 位置有一个名为 base 的容器,并且必须指定其 image

您可以在此必需容器后创建 sidecar 容器,但 Airflow 假设 airflow 工作容器位于容器数组的起始位置,并且容器名称为 base

注意

Airflow 可能会覆盖基础容器的 image,例如通过 pod_override 配置;但它必须在模板文件中存在且不能留空。

Pod name

pod 的 metadata.name 必须在模板文件中设置。此字段将在 pod 启动时动态设置,以确保在所有 pod 中唯一。但同样,它必须包含在模板中,且不能留空。

Example pod templates

考虑到这些要求,以下是一些基本 pod_template_file YAML 文件的示例。

注意

下面的示例在使用默认 Airflow 配置值时应能工作。但许多自定义配置值也需要通过此模板显式传递给 pod,包括但不限于 sql 配置、必需的 Airflow 连接、Dags 文件夹路径和日志设置。详见 Configuration Reference

在镜像中存储 Dags

---
apiVersion: v1
kind: Pod
metadata:
  name: placeholder-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

将 Dags 存储在 persistentVolume

---
apiVersion: v1
kind: Pod
metadata:
  name: placeholder-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

git 拉取 Dags

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  initContainers:
    - name: git-sync
      image: "registry.k8s.io/git-sync/git-sync:v3.6.3"
      env:
        - name: GIT_SYNC_BRANCH
          value: "v2-2-stable"
        - name: GIT_SYNC_REPO
          value: "https://github.com/apache/airflow.git"
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: "/git"
        - name: GIT_SYNC_DEST
          value: "repo"
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      volumeMounts:
        - name: airflow-dags
          mountPath: /git
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          subPath: repo/airflow/example_dags
          readOnly: false
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      emptyDir: {}
    - name: airflow-logs
      emptyDir: {}
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

pod_override

使用 KubernetesExecutor 时,Airflow 提供了在每个任务层面覆盖系统默认值的功能。要使用此功能,请创建一个 Kubernetes V1pod 对象并填写您想要的覆盖项。

要覆盖 KubernetesExecutor 启动的 pod 的基础容器,请创建仅包含一个容器的 V1pod,并按以下方式覆盖字段。

/opt/airflow/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py

        executor_config_volume_mount = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[
                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
                            ],
                        )
                    ],
                    volumes=[
                        k8s.V1Volume(
                            name="example-kubernetes-test-volume",
                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                        )
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_volume_mount)
        def test_volume_mount():
            """
            Tests whether the volume has been mounted.
            """

            with open("/foo/volume_mount_test.txt", "w") as foo:
                foo.write("Hello")

            return_code = os.system("cat /foo/volume_mount_test.txt")
            if return_code != 0:
                raise ValueError(f"Error when checking volume mount. Return code {return_code}")

        volume_task = test_volume_mount()

请注意以下字段都会被扩展而不是被覆盖。来自 spec:volumes,和 init_containers。来自 container:volume mounts,environment variables,ports,和 devices。

要向启动的 pod 添加 sidecar 容器,请创建一个 V1pod,其中第一个容器为空且名称为 base,第二个容器包含您想要的 sidecar。

/opt/airflow/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py

        executor_config_sidecar = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                        k8s.V1Container(
                            name="sidecar",
                            image="ubuntu",
                            args=['echo "retrieved from mount" > /shared/test.txt'],
                            command=["bash", "-cx"],
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                    ],
                    volumes=[
                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_sidecar)
        def test_sharedvolume_mount():
            """
            Tests whether the volume has been mounted.
            """
            for i in range(5):
                try:
                    return_code = os.system("cat /shared/test.txt")
                    if return_code != 0:
                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
                except ValueError as e:
                    if i > 4:
                        raise e

        sidecar_task = test_sharedvolume_mount()

您还可以针对每个任务创建自定义 pod_template_file,以便在多个任务之间复用相同的基础值。这将替换 airflow.cfg 中指定的默认 pod_template_file,然后使用 pod_override 覆盖该模板。该 pod_template_file 也将用于在 Airflow UI 中生成可见的 Pod K8s 规范。

以下是一个同时使用这两项功能的任务示例

import os

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME

from kubernetes.client import models as k8s

with DAG(
    dag_id="example_pod_template_file",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    executor_config_template = {
        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
    }

    @task(executor_config=executor_config_template)
    def task_with_template():
        print_stuff()

Managing Dags and logs

持久卷的使用是可选的,取决于您的配置。

  • Dags:

要将 Dags 传递给工作节点,您可以

  • 将 Dags 包含在镜像中。

  • 使用 git-sync,在启动 worker 容器之前执行一次 git pull 拉取 Dags 仓库。

  • 将 Dags 存储在持久卷上,可在所有工作节点挂载。

  • 日志:

要从工作节点获取任务日志,您可以

  • 使用在 webserver 和 workers 上都挂载的持久卷。

  • 启用远程日志记录。

注意

如果未启用日志持久化,也未启用远程日志记录,则工作 pod 关闭后日志将会丢失。

Comparison with CeleryExecutor

与 CeleryExecutor 相比,KubernetesExecutor 不需要额外组件如 Redis,但需要能够访问 Kubernetes 集群。

此外,可使用内置的 Kubernetes 监控对 Pod 进行监控。

使用 KubernetesExecutor 时,每个任务在自己的 pod 中运行。pod 在任务排队时创建,任务完成后终止。历史上,在突发工作负载等场景中,这相对于 CeleryExecutor(需要固定数量的长期运行的 Celery worker pod,无论是否有任务)具有资源利用率优势。

然而,官方 Apache Airflow Helm chart 可以根据队列中的任务数量自动将 celery worker 缩减至零,因此使用官方 chart 时,这已不再是优势。

使用 Celery worker 时,由于 worker pod 已经启动并运行,任务排队时延迟更低。另一方面,由于多个任务在同一个 pod 中运行,使用 Celery 时需要更注意任务设计中的资源利用,尤其是内存消耗。

KubernetesExecutor 在长时间运行的任务上可能更有帮助,因为即使在任务运行期间进行部署,任务会一直运行至完成(或超时等)。而在 CeleryExecutor 中,只要设置了宽限期,任务将在宽限期结束后被终止。另一个适合使用 KubernetesExecutor 的场景是任务在资源需求或镜像上不均匀。

最后,需要注意的是并非只能二选一;使用 CeleryKubernetesExecutor 可以在同一集群上同时使用 CeleryExecutor 和 KubernetesExecutor。CeleryKubernetesExecutor 会根据任务的 queue 判断在 Celery 还是 Kubernetes 上运行。默认情况下,任务发送到 Celery workers;若希望任务使用 KubernetesExecutor,只需发送到 kubernetes 队列,它将运行在独立的 pod 中。KubernetesPodOperator 也可以实现类似效果,无论使用何种 executor。

Fault Tolerance

提示

要排查 KubernetesExecutor 的问题,可以使用 airflow kubernetes generate-dag-yaml 命令。此命令会生成将在 Kubernetes 中启动的 pod 并将其导出为 yaml 文件供您检查。

Handling Worker Pod Crashes

在处理分布式系统时,我们需要假设任何组件都可能因 OOM 错误、节点升级等原因随时崩溃。

如果 worker 在向后端数据库报告状态之前死亡,executor 可以使用 Kubernetes watcher 线程来发现失败的 pod。

_images/k8s-failed-pod.png

Kubernetes watcher 是一个线程,可订阅 Kubernetes 数据库中发生的所有更改。当 pod 启动、运行、结束或失败时,它会收到通知。通过监控此流,KubernetesExecutor 能够发现 worker 崩溃并正确将任务标记为失败。

But What About Cases Where the Scheduler Pod Crashes?

在 scheduler 崩溃的情况下,scheduler 将使用 watcher 的 resourceVersion 恢复状态。

在监控 Kubernetes 集群的 watcher 线程时,每个事件都有一个单调递增的编号,称为 resourceVersion。每次 executor 读取 resourceVersion 时,会将最新值存入后端数据库。由于 resourceVersion 已被存储,scheduler 可以重新启动并从上次停止的地方继续读取 watcher 流。由于任务独立于 executor 运行并直接向数据库报告结果,scheduler 故障不会导致任务失败或重新运行。

此条目是否有帮助?