airflow.providers.cncf.kubernetes.operators.pod

在 Kubernetes POD 中执行任务。

模块内容

PodEventType

kubernetes pod 发出的事件类型。

KubernetesPodOperator

在 Kubernetes Pod 中执行任务。

属性

alphanum_lower

KUBE_CONFIG_ENV_VAR

airflow.providers.cncf.kubernetes.operators.pod.alphanum_lower[源]
airflow.providers.cncf.kubernetes.operators.pod.KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'[源]
class airflow.providers.cncf.kubernetes.operators.pod.PodEventType[源]

基类: enum.Enum

kubernetes pod 发出的事件类型。

WARNING = 'Warning'[源]
NORMAL = 'Normal'[源]
exception airflow.providers.cncf.kubernetes.operators.pod.PodReattachFailure[源]

基类: airflow.exceptions.AirflowException

当我们期望能够找到一个 pod 但找不到时。

exception airflow.providers.cncf.kubernetes.operators.pod.PodCredentialsExpiredFailure[源]

基类: airflow.exceptions.AirflowException

当 pod 无法刷新凭据时。

class airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator(*, kubernetes_conn_id=KubernetesHook.default_conn_name, namespace=None, image=None, name=None, random_name_suffix=True, cmds=None, arguments=None, ports=None, volume_mounts=None, volumes=None, env_vars=None, env_from=None, secrets=None, in_cluster=None, cluster_context=None, labels=None, reattach_on_restart=True, startup_timeout_seconds=120, startup_check_interval_seconds=5, get_logs=True, base_container_name=None, init_container_logs=None, container_logs=None, image_pull_policy=None, annotations=None, container_resources=None, affinity=None, config_file=None, node_selector=None, image_pull_secrets=None, service_account_name=None, hostnetwork=False, host_aliases=None, tolerations=None, security_context=None, container_security_context=None, dnspolicy=None, dns_config=None, hostname=None, subdomain=None, schedulername=None, full_pod_spec=None, init_containers=None, log_events_on_failure=False, do_xcom_push=False, pod_template_file=None, pod_template_dict=None, priority_class_name=None, pod_runtime_info_envs=None, termination_grace_period=None, configmaps=None, skip_on_exit_code=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=2, log_pod_spec_on_failure=True, on_finish_action='delete_pod', is_delete_operator_pod=None, termination_message_policy='File', active_deadline_seconds=None, callbacks=None, progress_callback=None, logging_interval=None, **kwargs)[source]

基类: airflow.models.BaseOperator

在 Kubernetes Pod 中执行任务。

另请参阅

有关如何使用此操作符的更多信息,请查看指南: KubernetesPodOperator

注意

如果您使用 Google Kubernetes Engine 并且 Airflow 没有在同一集群中运行,请考虑使用 GKEStartPodOperator,它简化了授权过程。

参数
  • kubernetes_conn_id (str | None) – Kubernetes 集群的 kubernetes 连接 ID

  • namespace (str | None) – 在 Kubernetes 中运行的命名空间。

  • image (str | None) – 您希望启动的 Docker 镜像。默认为 hub.docker.com,但完全限定的 URL 将指向自定义存储库。(模板化)

  • name (str | None) – 任务将在其中运行的 pod 的名称,将使用该名称(如果 random_name_suffix 为 True,则加上随机后缀)来生成 pod ID (DNS-1123 子域,仅包含 [a-z0-9.-])。

  • random_name_suffix (bool) – 如果为 True,将生成随机后缀。

  • cmds (list[str] | None) – 容器的入口点。(已模板化)如果未提供,则使用 Docker 镜像的入口点。

  • arguments (list[str] | None) – 入口点的参数。(已模板化)如果未提供,则使用 Docker 镜像的 CMD。

  • ports (list[kubernetes.client.models.V1ContainerPort] | None) – 启动 Pod 的端口。

  • volume_mounts (list[kubernetes.client.models.V1VolumeMount] | None) – 启动 Pod 的 volumeMounts。

  • volumes (list[kubernetes.client.models.V1Volume] | None) – 启动 Pod 的卷。包括 ConfigMaps 和 PersistentVolumes。

  • env_vars (list[kubernetes.client.models.V1EnvVar] | dict[str, str] | None) – 在容器中初始化的环境变量。(已模板化)

  • env_from (list[kubernetes.client.models.V1EnvFromSource] | None) – (可选)用于在容器中填充环境变量的源列表。

  • secrets (list[airflow.providers.cncf.kubernetes.secret.Secret] | None) – 要注入容器中的 Kubernetes 密钥。它们可以作为环境变量或卷中的文件公开。

  • in_cluster (bool | None) – 使用 in_cluster 配置运行 kubernetes 客户端。

  • cluster_context (str | None) – 指向 Kubernetes 集群的上下文。当 in_cluster 为 True 时忽略。如果为 None,则使用 current-context。(已模板化)

  • reattach_on_restart (bool) – 如果 worker 在 Pod 运行时崩溃,则在下次尝试时重新附加并监控。如果为 False,则每次尝试都始终创建一个新的 Pod。

  • labels (dict | None) – 要应用于 Pod 的标签。(已模板化)

  • startup_timeout_seconds (int) – 启动 Pod 的超时时间(秒)。

  • startup_check_interval_seconds (int) – 检查 Pod 是否已启动的间隔时间(秒)

  • get_logs (bool) – 获取基本容器的 stdout 作为任务的日志。

  • init_container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 将其日志发布到 stdout 的初始化容器列表。接受容器序列、单个容器名称或 True。如果为 True,则发布所有容器的日志。

  • container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 将其日志发布到 stdout 的容器列表。接受容器序列、单个容器名称或 True。如果为 True,则发布所有容器的日志。与 get_logs 参数结合使用。默认值是基本容器。

  • image_pull_policy (str | None) – 指定缓存或始终拉取镜像的策略。

  • annotations (dict | None) – 可以附加到 Pod 的非标识元数据。可以是大量数据,并且可以包含标签不允许的字符。(已模板化)

  • container_resources (kubernetes.client.models.V1ResourceRequirements | None) – 启动 Pod 的资源。(已模板化)

  • affinity (kubernetes.client.models.V1Affinity | None) – 启动 Pod 的亲和性调度规则。

  • config_file (str | None) – Kubernetes 配置文件的路径。(已模板化)如果未指定,则默认值为 ~/.kube/config

  • node_selector (dict | None) – 包含一组调度规则的字典。(已模板化)

  • image_pull_secrets (list[kubernetes.client.models.V1LocalObjectReference] | None) – 要提供给 Pod 的任何镜像拉取密钥。如果需要多个密钥,请提供逗号分隔的列表:secret_a,secret_b

  • service_account_name (str | None) – 服务帐户的名称

  • hostnetwork (bool) – 如果为 True,则在 Pod 上启用主机网络。

  • host_aliases (list[kubernetes.client.models.V1HostAlias] | None) – 要应用于 Pod 中容器的主机别名列表。

  • tolerations (list[kubernetes.client.models.V1Toleration] | None) – Kubernetes 容忍列表。

  • security_context (kubernetes.client.models.V1PodSecurityContext | dict | None) – Pod 应运行的安全选项 (PodSecurityContext)。

  • container_security_context (kubernetes.client.models.V1SecurityContext | dict | None) – 容器应运行的安全选项。

  • dnspolicy (str | None) – Pod 的 dnspolicy。

  • dns_config (kubernetes.client.models.V1PodDNSConfig | None) – Pod 的 dns 配置(IP 地址、搜索、选项)。

  • hostname (str | None) – Pod 的主机名。

  • subdomain (str | None) – Pod 的子域。

  • schedulername (str | None) – 为 Pod 指定一个 schedulername

  • full_pod_spec (kubernetes.client.models.V1Pod | None) – 完整的 podSpec

  • init_containers (list[kubernetes.client.models.V1Container] | None) – 启动 Pod 的初始化容器

  • log_events_on_failure (bool) – 如果发生故障,记录 Pod 的事件

  • do_xcom_push (bool) – 如果为 True,当容器完成时,容器中 /airflow/xcom/return.json 文件的内容也将被推送到 XCom。

  • pod_template_file (str | None) – Pod 模板文件的路径(已模板化)

  • pod_template_dict (dict | None) – Pod 模板字典(已模板化)

  • priority_class_name (str | None) – 启动 Pod 的优先级类名称

  • pod_runtime_info_envs (list[kubernetes.client.models.V1EnvVar] | None) – (可选)要在容器中设置的环境变量列表。

  • termination_grace_period (int | None) – 如果任务在 UI 中被杀死,则终止宽限期,默认为 Kubernetes 默认值

  • configmaps (list[str] | None) – (可选)从中收集 ConfigMaps 以使用环境变量填充环境的配置映射的名称列表。目标 ConfigMap 的 Data 字段的内容将表示为环境变量的键值对。扩展 env_from。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任务以此退出代码退出,则将任务保留在 skipped 状态(默认值:None)。如果设置为 None,则任何非零退出代码都将被视为失败。

  • base_container_name (str | None) – Pod 中基础容器的名称。如果 get_logs 为 True,此容器的日志将作为此任务日志的一部分显示。默认为 None。如果为 None,将查阅类变量 BASE_CONTAINER_NAME(默认为 “base”)以获取要使用的基础容器名称。

  • deferrable (bool) – 在可延迟模式下运行操作符。

  • poll_interval (float) – 检查状态的轮询间隔(以秒为单位)。仅在可延迟模式下使用。

  • log_pod_spec_on_failure (bool) – 如果发生故障,则记录 Pod 的规范。

  • on_finish_action (str) – 当 Pod 达到最终状态或执行中断时执行的操作。如果为“delete_pod”,则无论其状态如何,都将删除 Pod;如果为“delete_succeeded_pod”,则仅删除成功的 Pod。可以设置为“keep_pod”以保留 Pod。

  • termination_message_policy (str) – 基础容器的终止消息策略。默认值为“File”。

  • active_deadline_seconds (int | None) – active_deadline_seconds,它会转换为 V1PodSpec 中的 active_deadline_seconds。

  • callbacks (type[airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback] | None) – KubernetesPodOperatorCallback 实例包含 KubernetesPodOperator 不同步骤的回调方法。

  • logging_interval (int | None) – 任务在恢复获取最新日志之前应处于延迟状态的最长时间(以秒为单位)。如果为 None,则任务将保持延迟状态,直到 Pod 完成,并且在此之前不会显示任何日志。

BASE_CONTAINER_NAME = 'base'[source]
ISTIO_CONTAINER_NAME = 'istio-proxy'[source]
KILL_ISTIO_PROXY_SUCCESS_MSG = 'HTTP/1.1 200'[source]
POD_CHECKED_KEY = 'already_checked'[source]
POST_TERMINATION_TIMEOUT = 120[source]
template_fields: collections.abc.Sequence[str] = ('image', 'cmds', 'annotations', 'arguments', 'env_vars', 'labels', 'config_file',...[source]
template_fields_renderers[source]
pod_manager()[source]
hook()[source]
client()[source]
find_pod(namespace, context, *, exclude_checked=True)[source]

如果存在,则返回此任务实例的已运行 Pod。

log_matching_pod(pod, context)[source]
get_or_create_pod(pod_request_obj, context)[source]
await_pod_start(pod)[source]
extract_xcom(pod)[source]

检索 xcom 值并终止 xcom sidecar 容器。

execute(context)[source]

根据 deferrable 参数异步或同步运行 Pod。

execute_sync(context)[source]
await_init_containers_completion(pod)[源代码]
await_pod_completion(pod)[源代码]
execute_async(context)[源代码]
convert_config_file_to_dict()[源代码]

将传递的 config_file 转换为字典表示形式。

invoke_defer_method(last_log_time=None)[源代码]

重新定义子类中正在使用的触发器。

trigger_reentry(context, event)[源代码]

从触发器重新进入的点。

如果 logging_interval 为 None,则此时 pod 应该已完成,我们将只获取日志并退出。

如果 logging_interval 不为 None,则可能是 pod 仍在运行,我们将只获取最新的日志并再次推迟到触发器。

post_complete_action(*, pod, remote_pod, **kwargs)[源代码]

在运算符完成 deferrable_execution 的逻辑后必须执行的操作。

cleanup(pod, remote_pod)[源代码]
is_istio_enabled(pod)[源代码]

通过检查命名空间标签来检查 pod 的命名空间是否启用了 istio。

kill_istio_sidecar(pod)[源代码]
process_pod_deletion(pod, *, reraise=True)[源代码]
patch_already_checked(pod, *, reraise=True)[源代码]

添加一个 “already checked” 标签,以确保我们不会在重试时重新附加。

on_kill()[源代码]

重写此方法以在任务实例被终止时清理子进程。

在运算符中使用 threading、subprocess 或 multiprocessing 模块的任何使用都需要清理,否则它会留下孤立进程。

build_pod_request_obj(context=None)[源代码]

根据 pod 模板文件、完整的 pod 规范和其他运算符参数返回 V1Pod 对象。

V1Pod 属性(按优先级)从运算符参数、完整的 pod 规范、pod 模板文件派生。

dry_run()[源代码]

打印此运算符将创建的 pod 定义。

不包括特定于任务实例的标签(因为在 dry_run 中没有),并排除所有空元素。

process_duplicate_label_pods(pod_list)[源代码]

修补或删除具有重复标签的现有 pod。

这用于处理仅当 reattach_on_restart 标志为 False 时才会发生的边缘情况,并且之前的运行尝试失败,因为任务进程已由集群或其他进程在外部终止。

如果任务进程在外部被终止,它会中断代码执行并立即退出任务。因此,先前尝试中创建的 pod 将不会通过 cleanup() 方法正确删除或修补。

返回新创建的 pod 以用于下一次运行尝试。

此条目是否有帮助?