airflow.providers.cncf.kubernetes.utils.pod_manager

启动 PODs。

属性

EMPTY_XCOM_RESULT

没有 xcom 结果的标记。

异常

PodLaunchFailedException

当在 KubernetesPodOperator 中启动 pod 失败时。

PodLaunchTimeoutException

当 pod 在指定的超时时间内未离开 Pending 阶段时。

PodNotFoundException

预期的 pod 在 kube-api 中不存在。

PodPhase

可能的 pod 阶段。

PodOperatorHookProtocol

定义 KubernetesPodOperator 所依赖方法的协议。

PodLogsConsumer

负责从流中拉取 pod 日志并在读取数据前检查容器状态。

PodLoggingStatus

fetch_container_logs 退出时返回 pod 的状态和最后日志时间。

PodManager

创建、监控并与 Kubernetes pods 交互,供 KubernetesPodOperator 使用。

OnFinishAction

当 pod 完成时采取的操作。

函数

should_retry_start_pod(exception)

检查异常是否指示瞬时错误并值得重试。

get_container_status(pod, container_name)

获取容器状态。

container_is_running(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否正在运行。

container_is_completed(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已完成。

container_is_succeeded(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已完成并成功。

container_is_wait(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否正在等待。

container_is_terminated(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已终止。

get_container_termination_message(pod, container_name)

check_exception_is_kubernetes_api_unauthorized(exc)

is_log_group_marker(line)

检查行是否是日志组标记,如 ::group::::endgroup::

模块内容

airflow.providers.cncf.kubernetes.utils.pod_manager.EMPTY_XCOM_RESULT = '__airflow_xcom_result_empty__'[源代码]

没有 xcom 结果的标记。

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[源代码]

基类: airflow.exceptions.AirflowException

当在 KubernetesPodOperator 中启动 pod 失败时。

airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[源代码]

检查异常是否指示瞬时错误并值得重试。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[源代码]

可能的 pod 阶段。

参见 https://kubernetes.ac.cn/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase

PENDING = 'Pending'[源代码]
RUNNING = 'Running'[源代码]
FAILED = 'Failed'[源代码]
SUCCEEDED = 'Succeeded'[源代码]
terminal_states[源代码]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodOperatorHookProtocol[源代码]

基类: Protocol

定义 KubernetesPodOperator 所依赖方法的协议。

KubernetesPodOperator 的子类,如 GKEStartPodOperator,可能会使用不继承 KubernetesHook 的 hook。我们使用此协议来记录 KPO 使用的方法,并确保这些方法在其他此类 hook 上存在。

property core_v1_client: kubernetes.client.CoreV1Api[源代码]

获取经过身份验证的客户端对象。

property is_in_cluster: bool[源代码]

暴露 hook 是否配置了 load_incluster_config

get_pod(name, namespace)[源代码]

从 kubernetes API 读取 pod 对象。

get_namespace()[源代码]

返回连接中定义的命名空间。

get_xcom_sidecar_container_image()[源代码]

返回连接中定义的 xcom sidecar 镜像。

get_xcom_sidecar_container_resources()[源代码]

返回连接中定义的 xcom sidecar 资源。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status(pod, container_name)[源代码]

获取容器状态。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running(pod, container_name)[源代码]

检查 V1Pod pod 以确定 container_name 是否正在运行。

如果该容器存在且正在运行,返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_completed(pod, container_name)[源代码]

检查 V1Pod pod 以确定 container_name 是否已完成。

如果该容器存在且已完成,返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded(pod, container_name)[源代码]

检查 V1Pod pod 以确定 container_name 是否已完成并成功。

如果该容器存在且已完成并成功,返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_wait(pod, container_name)[源代码]

检查 V1Pod pod 以确定 container_name 是否正在等待。

如果该容器存在且正在等待,返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_terminated(pod, container_name)[源代码]

检查 V1Pod pod 以确定 container_name 是否已终止。

如果该容器存在且已终止,返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_termination_message(pod, container_name)[源代码]
airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[源代码]
exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[源代码]

基类: airflow.exceptions.AirflowException

当 pod 在指定的超时时间内未离开 Pending 阶段时。

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[源代码]

基类: airflow.exceptions.AirflowException

预期的 pod 在 kube-api 中不存在。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer(response, pod, pod_manager, container_name, post_termination_timeout=120, read_pod_cache_timeout=120)[源代码]

负责从流中拉取 pod 日志并在读取数据前检查容器状态。

此类是针对问题 https://github.com/apache/airflow/issues/23497 的变通方法。

参数:
  • response (urllib3.response.HTTPResponse) – 包含日志的 HTTP 响应

  • pod (kubernetes.client.models.v1_pod.V1Pod) – 来自 Kubernetes 客户端的 Pod 实例

  • pod_manager (PodManager) – Pod 管理器实例

  • container_name (str) – 我们正在读取日志的容器名称

  • post_termination_timeout (int) – (可选) 以秒为单位的时间段,表示容器终止后日志可用的时长。

  • read_pod_cache_timeout (int) – (可选) 容器状态缓存生命周期。缓存容器状态以减少 API 调用。

response[源代码]
pod[源代码]
pod_manager[源代码]
container_name[源代码]
post_termination_timeout = 120[源代码]
last_read_pod_at = None[源代码]
read_pod_cache = None[源代码]
read_pod_cache_timeout = 120[源代码]
__iter__()[源代码]

生成以 'n' 符号分隔的日志项。

logs_available()[源代码]
read_pod()[源代码]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[源代码]

fetch_container_logs 退出时返回 pod 的状态和最后日志时间。

running: bool[源代码]
last_log_time: pendulum.DateTime | None[源代码]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[源代码]

基类: airflow.utils.log.logging_mixin.LoggingMixin

创建、监控并与 Kubernetes pods 交互,供 KubernetesPodOperator 使用。

run_pod_async(pod, **kwargs)[源代码]

异步运行 POD。

delete_pod(pod)[源代码]

删除 POD。

create_pod(pod)[源代码]

异步启动 pod。

await_pod_start(pod, startup_timeout=120, startup_check_interval=1)[源代码]

等待 pod 达到 `Pending` 之外的阶段。

参数:
  • pod (kubernetes.client.models.v1_pod.V1Pod)

  • startup_timeout (int) – pod 启动的超时时间(以秒为单位)(如果 pod 挂起时间过长,任务将失败)

  • startup_check_interval (int) – 检查之间的间隔(以秒为单位)

返回值:

返回类型:

None

fetch_container_logs(pod, container_name, *, follow=False, since_time=None, post_termination_timeout=120)[source]

实时跟踪容器日志并流式传输到 Airflow 日志系统。

容器退出时返回。

从 Pod 启动到日志可用之间,由于 CSR 尚未批准和签名,可能会出现延迟。在这种情况下,会抛出 ApiException。这就是为什么我们要对此特定异常进行重试。

fetch_requested_init_container_logs(pod, init_containers, follow_logs=False)[source]

实时跟踪指定 Pod 中容器的日志,并将其发布到 Airflow 日志系统。

所有容器退出时返回。

fetch_requested_container_logs(pod, containers, follow_logs=False)[source]

实时跟踪指定 Pod 中容器的日志,并将其发布到 Airflow 日志系统。

所有容器退出时返回。

await_container_completion(pod, container_name, polling_time=1)[source]

等待给定 Pod 中的指定容器完成。

参数:
  • pod (kubernetes.client.models.v1_pod.V1Pod) – 将要监控的 pod 规格

  • container_name (str) – 要监控的 pod 中容器的名称

  • polling_time (float) – 两次容器状态检查之间的轮询时间。默认为 1 秒。

await_pod_completion(pod, istio_enabled=False, container_name='base')[source]

监控一个 Pod 并返回最终状态。

参数:
  • istio_enabled (bool) – 命名空间中是否启用了 Istio

  • pod (kubernetes.client.models.v1_pod.V1Pod) – 将要监控的 pod 规格

  • container_name (str) – Pod 中容器的名称

返回值:

tuple[State, str | None]

返回类型:

kubernetes.client.models.v1_pod.V1Pod

parse_log_line(line)[source]

解析 K8s 日志行并返回最终状态。

参数:

line (str) – K8s 日志行

返回值:

时间戳和日志消息

返回类型:

tuple[pendulum.DateTime | None, str]

container_is_running(pod, container_name)[source]

读取 Pod 并检查容器是否正在运行。

container_is_terminated(pod, container_name)[source]

读取 Pod 并检查容器是否已终止。

read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]

从 Pod 读取日志。

get_init_container_names(pod)[source]

返回 Pod 中的容器名称,但不包括 airflow-xcom-sidecar 容器。

get_container_names(pod)[source]

返回 Pod 中的容器名称,但不包括 airflow-xcom-sidecar 容器。

read_pod_events(pod)[source]

从 Pod 读取事件。

read_pod(pod)[source]

读取 Pod 信息。

await_xcom_sidecar_container_start(pod, timeout=900, log_interval=30)[source]

在执行 do_xcom_push 之前,检查 sidecar 容器是否已达到 'Running' 状态。

extract_xcom(pod)[source]

检索 XCom 值并杀死 xcom sidecar 容器。

extract_xcom_json(pod)[source]

检索 XCom 值并检查 xcom json 是否有效。

extract_xcom_kill(pod)[source]

杀死 xcom sidecar 容器。

class airflow.providers.cncf.kubernetes.utils.pod_manager.OnFinishAction[source]

基类:str, enum.Enum

当 pod 完成时采取的操作。

KEEP_POD = 'keep_pod'[source]
DELETE_POD = 'delete_pod'[source]
DELETE_SUCCEEDED_POD = 'delete_succeeded_pod'[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.is_log_group_marker(line)[source]

检查行是否是日志组标记,如 ::group::::endgroup::

此条目有帮助吗?