airflow.providers.cncf.kubernetes.pod_generator

Pod 生成器。

此模块提供之前的 Pod API 和输出 kubernetes.client.models.V1Pod 之间的接口。 优点是支持完整的 Kubernetes API,并且不需要编写序列化。

模块内容

PodGenerator

包含 Kubernetes Airflow 工作器配置逻辑。

函数

make_safe_label_value(string)

规范化提供的标签,使其具有有效的长度和字符。

datetime_to_label_safe_datestring(datetime_obj)

将日期时间字符串转换为用作标签。

label_safe_datestring_to_datetime(string)

将标签转换回日期时间对象。

merge_objects(base_obj, client_obj)

合并对象。

extend_object_field(base_obj, client_obj, field_name)

将字段值添加到现有对象。

属性

log

MAX_LABEL_LEN

airflow.providers.cncf.kubernetes.pod_generator.log[源代码]
airflow.providers.cncf.kubernetes.pod_generator.MAX_LABEL_LEN = 63[源代码]
exception airflow.providers.cncf.kubernetes.pod_generator.PodMutationHookException[源代码]

基类:airflow.exceptions.AirflowException

在 Pod 突变 Hook 执行期间发生异常时引发。

exception airflow.providers.cncf.kubernetes.pod_generator.PodReconciliationError[源代码]

基类:airflow.exceptions.AirflowException

尝试合并 pod 配置时遇到错误时引发。

airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value(string)[源代码]

规范化提供的标签,使其具有有效的长度和字符。

有效的标签值必须小于等于 63 个字符,并且必须为空,或者以字母数字字符 ([a-z0-9A-Z]) 开头和结尾,中间使用破折号 (-)、下划线 (_)、点 (.) 和字母数字字符。

如果标签值在安全处理后大于 63 个字符,或者与发送给此函数的原始值有任何不同,则我们需要截断为 53 个字符,并附加一个唯一的哈希值。

airflow.providers.cncf.kubernetes.pod_generator.datetime_to_label_safe_datestring(datetime_obj)[源代码]

将日期时间字符串转换为用作标签。

Kubernetes 不喜欢标签中的“:”,因为 ISO 日期时间格式使用“:”但不是“_”,让我们将“:”替换为“_”

参数

datetime_obj (datetime.datetime) – datetime.datetime 对象

返回

表示日期时间的类 ISO 字符串

返回类型

str

airflow.providers.cncf.kubernetes.pod_generator.label_safe_datestring_to_datetime(string)[源代码]

将标签转换回日期时间对象。

Kubernetes 不允许标签中出现“:”。 ISO 日期时间格式使用“:”而不是“_”,让我们将“:”替换为“_”

参数

string (str) – str

返回

datetime.datetime 对象

返回类型

datetime.datetime

class airflow.providers.cncf.kubernetes.pod_generator.PodGenerator(pod=None, pod_template_file=None, extract_xcom=True)[源代码]

包含 Kubernetes Airflow 工作器配置逻辑。

表示 kubernetes pod 并管理单个 pod 的执行。 任何特定于容器的配置都会应用于容器列表中的第一个容器。

参数
  • pod (kubernetes.client.models.V1Pod | None) – 完全指定的 pod。 与 pod_template_file 互斥

  • pod_template_file (str | None) – YAML 文件的路径。 与 pod 互斥

  • extract_xcom (bool) – 是否启动一个用于 xcom 的容器

static from_obj(obj)[源代码]

从 obj 转换为 pod。

static reconcile_pods(base_pod, client_pod)[源代码]

合并 Kubernetes Pod 对象。

参数
  • base_pod (kubernetes.client.models.V1Pod) – 具有基本属性,如果它们存在于客户端 pod 中则会被覆盖,如果它们不存在于客户端 pod 中则会保留

  • client_pod (kubernetes.client.models.V1Pod | None) – 客户端想要创建的 pod。

返回

合并后的 pod

返回类型

kubernetes.client.models.V1Pod

这不能递归完成,因为某些字段会被覆盖,而某些字段会被连接。

static reconcile_metadata(base_meta, client_meta)[源代码]

合并 Kubernetes Metadata 对象。

参数
  • base_meta – 具有基本属性,如果它们存在于 client_meta 中则会被覆盖,如果它们不存在于 client_meta 中则会保留

  • client_meta – 客户端想要创建的 spec。

返回

合并后的 spec

static reconcile_specs(base_spec, client_spec)[源代码]

合并 Kubernetes PodSpec 对象。

参数
  • base_spec (kubernetes.client.models.V1PodSpec | None) – 具有基本属性,如果它们存在于 client_spec 中则会被覆盖,如果它们不存在于 client_spec 中则会保留

  • client_spec (kubernetes.client.models.V1PodSpec | None) – 客户端想要创建的 spec。

返回

合并后的 spec

返回类型

kubernetes.client.models.V1PodSpec | None

静态 reconcile_containers(base_containers, client_containers)[源代码]

合并 Kubernetes 容器对象。

参数
  • base_containers (list[kubernetes.client.models.V1Container]) – 包含基本属性,如果它们存在于 client_containers 中,则会被覆盖;如果它们不存在于 client_containers 中,则会保留。

  • client_containers (list[kubernetes.client.models.V1Container]) – 客户端想要创建的容器。

返回

合并后的容器

返回类型

list[kubernetes.client.models.V1Container]

递归遍历容器列表。

类方法 construct_pod(dag_id, task_id, pod_id, try_number, kube_image, date, args, pod_override_object, base_worker_pod, namespace, scheduler_job_id, run_id=None, map_index=-1, *, with_mutation_hook=False)[源代码]

创建一个 Pod。

通过从以下 3 个位置收集和整合配置来构建 Pod:
  • airflow.cfg

  • executor_config

  • 动态参数

静态 serialize_pod(pod)[源代码]

将 k8s.V1Pod 转换为 JSON 可序列化的字典。

参数

pod (kubernetes.client.models.V1Pod) – k8s.V1Pod 对象

返回

Pod 的序列化版本,以 dict 形式返回

返回类型

dict

静态 deserialize_model_file(path)[源代码]

从文件生成 Pod。

参数

path (str) – 文件路径

返回

kubernetes.client.models.V1Pod

返回类型

kubernetes.client.models.V1Pod

静态 deserialize_model_dict(pod_dict)[源代码]

将 Python 字典反序列化为 k8s.V1Pod。

不幸的是,我们需要访问 Kubernetes 客户端的私有方法 _ApiClient__deserialize_model。 此问题在此处跟踪: https://github.com/kubernetes-client/python/issues/977

参数

pod_dict (dict | None) – k8s.V1Pod 对象的序列化字典

返回

反序列化的 k8s.V1Pod

返回类型

kubernetes.client.models.V1Pod

airflow.providers.cncf.kubernetes.pod_generator.merge_objects(base_obj, client_obj)[源代码]

合并对象。

参数
  • base_obj – 包含基本属性,如果它们存在于 client_obj 中,则会被覆盖;如果它们不存在于 client_obj 中,则会保留。

  • client_obj – 客户端想要创建的对象。

返回

合并后的对象

airflow.providers.cncf.kubernetes.pod_generator.extend_object_field(base_obj, client_obj, field_name)[源代码]

将字段值添加到现有对象。

参数
  • base_obj – 具有属性 field_name(为列表)的对象

  • client_obj – 具有属性 field_name(为列表)的对象。 返回此对象的副本,其中 field_name 已修改

  • field_name – 列表字段的名称

返回

client_obj,其属性 field_name 为两个属性的附加结果

此条目是否有帮助?