airflow.providers.cncf.kubernetes.utils.pod_manager¶
启动 PODs。
属性¶
没有 xcom 结果的标记。 |
异常¶
当在 KubernetesPodOperator 中启动 pod 失败时。 |
|
当 pod 在指定的超时时间内未离开 |
|
预期的 pod 在 kube-api 中不存在。 |
类¶
可能的 pod 阶段。 |
|
定义 KubernetesPodOperator 所依赖方法的协议。 |
|
负责从流中拉取 pod 日志并在读取数据前检查容器状态。 |
|
从 fetch_container_logs 退出时返回 pod 的状态和最后日志时间。 |
|
创建、监控并与 Kubernetes pods 交互,供 KubernetesPodOperator 使用。 |
|
当 pod 完成时采取的操作。 |
函数¶
|
检查异常是否指示瞬时错误并值得重试。 |
|
获取容器状态。 |
|
检查 V1Pod |
|
检查 V1Pod |
|
检查 V1Pod |
|
检查 V1Pod |
|
检查 V1Pod |
|
|
|
检查行是否是日志组标记,如 ::group:: 或 ::endgroup::。 |
模块内容¶
- airflow.providers.cncf.kubernetes.utils.pod_manager.EMPTY_XCOM_RESULT = '__airflow_xcom_result_empty__'[源代码]¶
没有 xcom 结果的标记。
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[源代码]¶
基类:
airflow.exceptions.AirflowException
当在 KubernetesPodOperator 中启动 pod 失败时。
- airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[源代码]¶
检查异常是否指示瞬时错误并值得重试。
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[源代码]¶
可能的 pod 阶段。
参见 https://kubernetes.ac.cn/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase。
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodOperatorHookProtocol[源代码]¶
基类:
Protocol
定义 KubernetesPodOperator 所依赖方法的协议。
KubernetesPodOperator 的子类,如 GKEStartPodOperator,可能会使用不继承 KubernetesHook 的 hook。我们使用此协议来记录 KPO 使用的方法,并确保这些方法在其他此类 hook 上存在。
- airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status(pod, container_name)[源代码]¶
获取容器状态。
- airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running(pod, container_name)[源代码]¶
检查 V1Pod
pod
以确定container_name
是否正在运行。如果该容器存在且正在运行,返回 True。否则返回 False。
- airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_completed(pod, container_name)[源代码]¶
检查 V1Pod
pod
以确定container_name
是否已完成。如果该容器存在且已完成,返回 True。否则返回 False。
- airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded(pod, container_name)[源代码]¶
检查 V1Pod
pod
以确定container_name
是否已完成并成功。如果该容器存在且已完成并成功,返回 True。否则返回 False。
- airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_wait(pod, container_name)[源代码]¶
检查 V1Pod
pod
以确定container_name
是否正在等待。如果该容器存在且正在等待,返回 True。否则返回 False。
- airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_terminated(pod, container_name)[源代码]¶
检查 V1Pod
pod
以确定container_name
是否已终止。如果该容器存在且已终止,返回 True。否则返回 False。
- airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_termination_message(pod, container_name)[源代码]¶
- airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[源代码]¶
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[源代码]¶
基类:
airflow.exceptions.AirflowException
当 pod 在指定的超时时间内未离开
Pending
阶段时。
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[源代码]¶
基类:
airflow.exceptions.AirflowException
预期的 pod 在 kube-api 中不存在。
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer(response, pod, pod_manager, container_name, post_termination_timeout=120, read_pod_cache_timeout=120)[源代码]¶
负责从流中拉取 pod 日志并在读取数据前检查容器状态。
此类是针对问题 https://github.com/apache/airflow/issues/23497 的变通方法。
- 参数:
response (urllib3.response.HTTPResponse) – 包含日志的 HTTP 响应
pod (kubernetes.client.models.v1_pod.V1Pod) – 来自 Kubernetes 客户端的 Pod 实例
pod_manager (PodManager) – Pod 管理器实例
container_name (str) – 我们正在读取日志的容器名称
post_termination_timeout (int) – (可选) 以秒为单位的时间段,表示容器终止后日志可用的时长。
read_pod_cache_timeout (int) – (可选) 容器状态缓存生命周期。缓存容器状态以减少 API 调用。
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[源代码]¶
从 fetch_container_logs 退出时返回 pod 的状态和最后日志时间。
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[源代码]¶
基类:
airflow.utils.log.logging_mixin.LoggingMixin
创建、监控并与 Kubernetes pods 交互,供 KubernetesPodOperator 使用。
- await_pod_start(pod, startup_timeout=120, startup_check_interval=1)[源代码]¶
等待 pod 达到 `Pending` 之外的阶段。
- fetch_container_logs(pod, container_name, *, follow=False, since_time=None, post_termination_timeout=120)[source]¶
实时跟踪容器日志并流式传输到 Airflow 日志系统。
容器退出时返回。
从 Pod 启动到日志可用之间,由于 CSR 尚未批准和签名,可能会出现延迟。在这种情况下,会抛出 ApiException。这就是为什么我们要对此特定异常进行重试。
- fetch_requested_init_container_logs(pod, init_containers, follow_logs=False)[source]¶
实时跟踪指定 Pod 中容器的日志,并将其发布到 Airflow 日志系统。
所有容器退出时返回。
- fetch_requested_container_logs(pod, containers, follow_logs=False)[source]¶
实时跟踪指定 Pod 中容器的日志,并将其发布到 Airflow 日志系统。
所有容器退出时返回。
- read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]¶
从 Pod 读取日志。