airflow.providers.cncf.kubernetes.pod_generator

Pod 生成器。

此模块提供了一个接口,连接了之前的 Pod API 并输出 `kubernetes.client.models.V1Pod`。其优点在于支持完整的 Kubernetes API,并且无需编写序列化代码。

属性

log

MAX_LABEL_LEN

PodGenerator

包含 Kubernetes Airflow Worker 的配置逻辑。

函数

make_safe_label_value(string)

规范化提供的标签,使其具有有效长度和字符。

datetime_to_label_safe_datestring(datetime_obj)

将日期时间字符串转换为可用作标签的格式。

label_safe_datestring_to_datetime(string)

将标签转换回日期时间对象。

merge_objects(base_obj, client_obj)

合并对象。

extend_object_field(base_obj, client_obj, field_name)

将字段值添加到现有对象。

模块内容

airflow.providers.cncf.kubernetes.pod_generator.log[source]
airflow.providers.cncf.kubernetes.pod_generator.MAX_LABEL_LEN = 63[source]
airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value(string)[source]

规范化提供的标签,使其具有有效长度和字符。

有效的标签值必须小于或等于 63 个字符,并且必须为空或以字母数字字符 ([a-z0-9A-Z]) 开头和结尾,中间可以使用短划线 (-)、下划线 (_)、点 (.) 和字母数字。

如果标签值在规范化后超过 63 个字符,或与发送到此函数的原始值有任何不同,则我们需要将其截断至 53 个字符,并在末尾附加一个唯一的哈希值。

airflow.providers.cncf.kubernetes.pod_generator.datetime_to_label_safe_datestring(datetime_obj)[source]

将日期时间字符串转换为可用作标签的格式。

Kubernetes 不喜欢标签中使用“:”,由于 ISO 日期时间格式使用“:”而非“_”,我们将“:”替换为“_”。

参数:

datetime_obj (datetime.datetime) – datetime.datetime 对象

返回:

表示日期时间的类似 ISO 格式的字符串

返回类型:

str

airflow.providers.cncf.kubernetes.pod_generator.label_safe_datestring_to_datetime(string)[source]

将标签转换回日期时间对象。

Kubernetes 不允许标签中使用“:”。ISO 日期时间格式使用“:”而非“_”,我们将“:”替换为“_”。

参数:

string (str) – 字符串

返回:

datetime.datetime 对象

返回类型:

datetime.datetime

class airflow.providers.cncf.kubernetes.pod_generator.PodGenerator(pod=None, pod_template_file=None, extract_xcom=True)[source]

包含 Kubernetes Airflow Worker 的配置逻辑。

表示一个 Kubernetes pod,并管理单个 pod 的执行。任何特定于容器的配置都会应用于容器列表中的第一个容器。

参数:
  • pod (kubernetes.client.models.V1Pod | None) – 完整的 pod 定义。与 pod_template_file 互斥。

  • pod_template_file (str | None) – YAML 文件路径。与 pod 互斥。

  • extract_xcom (bool) – 是否为 xcom 启动一个容器。

extract_xcom =True[source]
static from_obj(obj)[source]

从对象转换为 pod。

static reconcile_pods(base_pod, client_pod)[source]

合并 Kubernetes Pod 对象。

参数:
  • base_pod (kubernetes.client.models.V1Pod) – 包含基础属性,如果客户端 pod 中存在,则会被覆盖;如果客户端 pod 中不存在,则保留。

  • client_pod (kubernetes.client.models.V1Pod | None) – 客户端想要创建的 pod。

返回:

合并后的 pods

返回类型:

kubernetes.client.models.V1Pod

这无法递归完成,因为某些字段会被覆盖,而另一些则会被连接。

static reconcile_metadata(base_meta, client_meta)[source]

合并 Kubernetes Metadata 对象。

参数:
  • base_meta – 包含基础属性,如果客户端 meta 中存在,则会被覆盖;如果客户端 meta 中不存在,则保留。

  • client_meta – 客户端想要创建的元数据。

返回:

合并后的元数据

static reconcile_specs(base_spec, client_spec)[source]

合并 Kubernetes PodSpec 对象。

参数:
  • base_spec (kubernetes.client.models.V1PodSpec | None) – 包含基础属性,如果客户端 spec 中存在,则会被覆盖;如果客户端 spec 中不存在,则保留。

  • client_spec (kubernetes.client.models.V1PodSpec | None) – 客户端想要创建的 spec。

返回:

合并后的元数据

返回类型:

kubernetes.client.models.V1PodSpec | None

static reconcile_containers(base_containers, client_containers)[source]

合并 Kubernetes Container 对象。

参数:
  • base_containers (list[kubernetes.client.models.V1Container]) – 包含基础属性,如果客户端 containers 中存在,则会被覆盖;如果客户端 containers 中不存在,则保留。

  • client_containers (list[kubernetes.client.models.V1Container]) – 客户端想要创建的容器。

返回:

合并后的容器

返回类型:

list[kubernetes.client.models.V1Container]

这会递归地作用于容器列表。

classmethod construct_pod(dag_id, task_id, pod_id, try_number, kube_image, date, args, pod_override_object, base_worker_pod, namespace, scheduler_job_id, run_id=None, map_index=-1, content_json_for_volume='', *, with_mutation_hook=False)[source]

创建一个 Pod。

通过从以下 3 个位置收集和合并配置来构造一个 pod
  • airflow.cfg

  • executor_config

  • 动态参数

classmethod build_selector_for_k8s_executor_pod(*, dag_id, task_id, try_number, map_index=None, logical_date=None, run_id=None, airflow_worker=None, include_version=False)[source]

为 Kubernetes Executor Pod 生成选择器。

classmethod build_labels_for_k8s_executor_pod(*, dag_id, task_id, try_number, airflow_worker=None, map_index=None, logical_date=None, run_id=None, include_version=True)[source]

为 Kubernetes Executor Pod 生成标签。

static serialize_pod(pod)[source]

将 k8s.V1Pod 转换为 JSON 可序列化的字典。

参数:

pod (kubernetes.client.models.V1Pod) – k8s.V1Pod 对象

返回:

Pod 的序列化版本,作为字典返回

返回类型:

dict

static deserialize_model_file(path)[source]

从文件生成一个 Pod。

参数:

path (str) – 文件路径

返回:

一个 kubernetes.client.models.V1Pod

返回类型:

kubernetes.client.models.V1Pod

static deserialize_model_dict(pod_dict)[source]

将 Python 字典反序列化为 k8s.V1Pod。

遗憾的是,我们需要访问 Kubernetes 客户端的私有方法 _ApiClient__deserialize_model。此问题在此处跟踪:https://github.com/kubernetes-client/python/issues/977

参数:

pod_dict (dict | None) – k8s.V1Pod 对象的序列化字典

返回:

反序列化的 k8s.V1Pod

返回类型:

kubernetes.client.models.V1Pod

airflow.providers.cncf.kubernetes.pod_generator.merge_objects(base_obj, client_obj)[source]

合并对象。

参数:
  • base_obj – 包含基础属性,如果客户端 obj 中存在,则会被覆盖;如果客户端 obj 中不存在,则保留。

  • client_obj – 客户端想要创建的对象。

返回:

合并后的对象

airflow.providers.cncf.kubernetes.pod_generator.extend_object_field(base_obj, client_obj, field_name)[source]

将字段值添加到现有对象。

参数:
  • base_obj – 一个具有属性 field_name 且该属性是一个列表的对象

  • client_obj – 一个具有属性 field_name 且该属性是一个列表的对象。返回此对象的一个副本,其 field_name 已修改。

  • field_name – 列表字段的名称

返回:

client_obj,其属性 field_name 是两个属性的拼接结果

此条目是否有帮助?