airflow.providers.cncf.kubernetes.operators.pod¶
在 Kubernetes POD 中执行任务。
属性¶
异常¶
当我们期望能够找到 pod 但实际未能找到时抛出。 |
|
当 pod 无法刷新凭证时抛出。 |
|
在重新连接期间发现多个匹配的 pod 时抛出。 |
类¶
kubernetes pod 所发出的事件类型。 |
|
在 Kubernetes Pod 中执行任务。 |
模块内容¶
- airflow.providers.cncf.kubernetes.operators.pod.alphanum_lower = 'abcdefghijklmnopqrstuvwxyz0123456789'[source]¶
- class airflow.providers.cncf.kubernetes.operators.pod.PodEventType[source]¶
基类:
enum.Enumkubernetes pod 所发出的事件类型。
- exception airflow.providers.cncf.kubernetes.operators.pod.PodReattachFailure[source]¶
基类:
airflow.providers.common.compat.sdk.AirflowException当我们期望能够找到 pod 但实际未能找到时抛出。
- exception airflow.providers.cncf.kubernetes.operators.pod.PodCredentialsExpiredFailure[source]¶
基类:
airflow.providers.common.compat.sdk.AirflowException当 pod 无法刷新凭证时抛出。
- exception airflow.providers.cncf.kubernetes.operators.pod.FoundMoreThanOnePodFailure[source]¶
基类:
airflow.providers.common.compat.sdk.AirflowException在重新连接期间发现多个匹配的 pod 时抛出。
- class airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator(*, kubernetes_conn_id=KubernetesHook.default_conn_name, namespace=None, image=None, name=None, random_name_suffix=True, cmds=None, arguments=None, ports=None, volume_mounts=None, volumes=None, env_vars=None, env_from=None, secrets=None, in_cluster=None, cluster_context=None, labels=None, reattach_on_restart=True, startup_timeout_seconds=120, startup_check_interval_seconds=5, schedule_timeout_seconds=None, get_logs=True, base_container_name=None, base_container_status_polling_interval=1, init_container_logs=None, container_logs=None, image_pull_policy=None, annotations=None, container_resources=None, affinity=None, config_file=None, runtime_class_name=None, node_selector=None, image_pull_secrets=None, service_account_name=None, automount_service_account_token=None, hostnetwork=False, host_aliases=None, tolerations=None, security_context=None, container_security_context=None, dnspolicy=None, dns_config=None, hostname=None, subdomain=None, schedulername=None, full_pod_spec=None, init_containers=None, log_events_on_failure=False, do_xcom_push=False, pod_template_file=None, pod_template_dict=None, priority_class_name=None, pod_runtime_info_envs=None, termination_grace_period=None, configmaps=None, skip_on_exit_code=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=2, log_pod_spec_on_failure=True, on_finish_action='delete_pod', on_kill_action='delete_pod', is_delete_operator_pod=None, termination_message_policy='File', active_deadline_seconds=None, callbacks=None, progress_callback=None, logging_interval=None, trigger_kwargs=None, container_name_log_prefix_enabled=True, log_formatter=None, **kwargs)[source]¶
Bases:
airflow.sdk.BaseOperator在 Kubernetes Pod 中执行任务。
另请参阅
想了解如何使用此操作符,请参考指南: KubernetesPodOperator
注意
如果您使用 Google Kubernetes Engine 且 Airflow 未运行在同一集群中,建议使用
GKEStartPodOperator,它简化了授权流程。- 参数:
kubernetes_conn_id (str | None) – Kubernetes 连接 ID,用于指定 Kubernetes 集群。(支持模板)
namespace (str | None) – 要在其中运行的 Kubernetes 命名空间。
image (str | None) – 需要启动的容器镜像。默认使用 hub.docker.com,若提供完整 URL 则指向自定义仓库。(支持模板)
name (str | None) – 运行任务的 pod 名称。如果
random_name_suffix为 True,则会在此基础上添加随机后缀,以生成符合 DNS-1123 子域规范(仅包含 [a-z0-9.-])的 pod ID。(支持模板)random_name_suffix (bool) – 为 True 时会生成随机后缀。
arguments (list[str] | None) – 入口点的参数。如果未提供,则使用容器镜像的 CMD。(支持模板)
ports (list[kubernetes.client.models.V1ContainerPort] | None) – 为启动的 pod 定义的端口。
volume_mounts (list[kubernetes.client.models.V1VolumeMount] | None) – 为启动的 pod 定义的 volumeMounts。(支持模板)
volumes (list[kubernetes.client.models.V1Volume] | None) – 为启动的 pod 定义的 volumes,包含 ConfigMaps 与 PersistentVolumes。(支持模板)
env_vars (list[kubernetes.client.models.V1EnvVar] | dict[str, str] | None) – 在容器中初始化的环境变量。(支持模板)
env_from (list[kubernetes.client.models.V1EnvFromSource] | None) – (可选)用于在容器中填充环境变量的来源列表。(支持模板)
secrets (list[airflow.providers.cncf.kubernetes.secret.Secret] | None) – 要注入到容器中的 Kubernetes secret。它们可以以环境变量或卷中文件的形式暴露。
in_cluster (bool | None) – 使用 in_cluster 配置运行 Kubernetes 客户端。
cluster_context (str | None) – 指向 Kubernetes 集群的上下文。当
in_cluster为 True 时此参数被忽略。若为 None,则使用当前上下文。(支持模板)reattach_on_restart (bool) – 当 worker 在 pod 运行期间死亡时,是否在下次尝试时重新附加并继续监控。若为 False,则每次尝试都会创建新 pod。
labels (dict | None) – 要应用到 Pod 的标签。(支持模板)
startup_timeout_seconds (int) – pod 调度后启动的超时时间(秒)。
startup_check_interval_seconds (int) – 检查 pod 是否已启动的时间间隔(秒)。
schedule_timeout_seconds (int | None) – 在集群中调度 pod 的超时时间(秒)。
get_logs (bool) – 是否获取基础容器的 stdout 并将其记录为任务日志。
init_container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 要发布到 stdout 的 init 容器日志列表。可以是容器名序列、单个容器名或 True(表示全部容器日志)。
container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 要发布到 stdout 的容器日志列表。可以是容器名序列、单个容器名或 True(表示全部容器日志)。该参数与
get_logs配合使用,默认值为基础容器。image_pull_policy (str | None) – 指定镜像的拉取策略(缓存或始终拉取)。
annotations (dict | None) – 可附加到 Pod 的非标识元数据。可包含标签不允许的字符及更大范围的数据。(支持模板)
container_resources (kubernetes.client.models.V1ResourceRequirements | None) – 启动 pod 所需的资源配置。(支持模板)
affinity (kubernetes.client.models.V1Affinity | None) – 启动 pod 的亲和调度规则。
config_file (str | None) – Kubernetes 配置文件的路径。(可模板化)如果未指定,默认值为
~/.kube/configruntime_class_name (str | None) – 用于 pod 中所有容器的 RuntimeClass 名称。
node_selector (dict | None) – 包含一组调度规则的字典。(支持模板)
image_pull_secrets (list[kubernetes.client.models.V1LocalObjectReference] | None) – 为 pod 提供的镜像拉取 secret。若需要多个 secret,请使用逗号分隔的列表,例如:
secret_a,secret_b。service_account_name (str | None) – ServiceAccount 的名称。
automount_service_account_token (bool | None) – 指示使用此 ServiceAccount 运行的 pod 是否自动挂载 API token。
hostnetwork (bool) – 为 True 时在 pod 上启用主机网络。
host_aliases (list[kubernetes.client.models.V1HostAlias] | None) – 将要应用到 pod 中容器的主机别名列表。
tolerations (list[kubernetes.client.models.V1Toleration] | None) – Kubernetes 容忍度列表。
security_context (kubernetes.client.models.V1PodSecurityContext | dict | None) – pod 应运行的安全选项(PodSecurityContext)。
container_security_context (kubernetes.client.models.V1SecurityContext | dict | None) – 容器应运行的安全选项。
dnspolicy (str | None) – pod 的 DNS 策略。
dns_config (kubernetes.client.models.V1PodDNSConfig | None) – pod 的 DNS 配置(IP 地址、搜索域、选项)。
hostname (str | None) – pod 的主机名。(支持模板)
subdomain (str | None) – pod 的子域名。
schedulername (str | None) – 为 pod 指定的调度器名称。
full_pod_spec (kubernetes.client.models.V1Pod | None) – 完整的 pod 规范。
init_containers (list[kubernetes.client.models.V1Container] | None) – 为启动的 pod 定义的 init 容器。
log_events_on_failure (bool) – 当任务失败时记录 pod 的事件。
do_xcom_push (bool) – 如果为 True,容器完成时,文件 /airflow/xcom/return.json 的内容也会推送到 XCom。
pod_template_file (str | None) – pod 模板文件路径。(支持模板)
pod_template_dict (dict | None) – pod 模板字典。(支持模板)
priority_class_name (str | None) – 为启动的 pod 指定的优先级类名称。
pod_runtime_info_envs (list[kubernetes.client.models.V1EnvVar] | None) –(可选)将在容器中设置的一组环境变量。
termination_grace_period (int | None) – pod 的终止宽限期(秒)。这将设置 pod 的
terminationGracePeriodSeconds,并在任务被 kill 时用于删除 pod 的宽限期。如果未指定,则使用 Kubernetes 默认值(30 秒)。configmaps (list[str] | None) –(可选)要收集 ConfigMap 的名称列表,收集后它们的 Data 字段会被转换为环境变量键值对。该参数会扩展
env_from。skip_on_exit_code (int | collections.abc.Container[int] | None) – 若任务以此退出码结束,则将任务置为
skipped状态(默认:None)。若为None,则任何非零退出码均视为失败。base_container_name (str | None) – pod 中基础容器的名称。如果
get_logs为 True,则该容器的日志会作为任务日志输出。默认值为 None。若为 None,则会使用类变量BASE_CONTAINER_NAME(默认 “base”)作为基础容器名称。(支持模板)base_container_status_polling_interval (float) – 轮询基容器状态的时间间隔(秒)。默认值为 1 秒。
deferrable (bool) – 以可延迟模式运行操作符。
poll_interval (float) – 轮询状态的时间间隔(秒),仅在可延迟模式(deferrable mode)下使用。
log_pod_spec_on_failure (bool) – 当发生错误时记录 Pod 的规格(spec)。
on_finish_action (str) – 当 Pod 达到最终状态或执行被中断时的操作。若取值为 “delete_pod”,则无论状态如何都会删除 Pod;若取值为 “delete_succeeded_pod”,仅删除成功完成的 Pod。可以设置为 “keep_pod” 来保留 Pod;“delete_active_pod” 会删除仍处于 Pending 或 Running 状态的 Pod。
on_kill_action (str) – 当任务被用户手动终止(例如在 Airflow UI 中手动标记为成功/失败)时的操作。默认值 “delete_pod” 会删除 Pod;若设为 “keep_pod”,则保留 Pod。在可延迟模式下,此设置会传递给触发器,用于在取消延迟任务时控制清理行为。
termination_message_policy (str) – 基容器的终止信息策略。默认值为 “File”。
active_deadline_seconds (int | None) – 对应于 V1PodSpec 中的 active_deadline_seconds。
callbacks (list[type[airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback]] | type[airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback] | None) – 包含在 KubernetesPodOperator 不同阶段回调方法的 KubernetesPodOperatorCallback 实例。
logging_interval (int | None) – 任务在延迟状态下保持的最长时间(秒),在此时间后会恢复并获取最新日志。若为
None,任务将一直保持在延迟状态直至 Pod 完成,期间日志不可见。trigger_kwargs (dict | None) – 传递给触发器的额外关键字参数。
container_name_log_prefix_enabled (bool) – 若为 True,则在每行日志前加上容器名称前缀。默认值为 True。
log_formatter (collections.abc.Callable[[str, str], str] | None) – 自定义日志格式化函数,接受两个字符串参数:第一个为 container_name,第二个为 message_to_log。函数需返回已格式化的字符串。若为 None,则使用默认格式。
- template_fields: collections.abc.Sequence[str] = ('image', 'name', 'hostname', 'cmds', 'annotations', 'arguments', 'env_vars', 'labels',...[source]¶
- property pod_manager: airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager[source]¶
- trigger_reentry(context, event)[source]¶
触发器重新进入的入口点。
如果
logging_interval为None,则此时 Pod 已经结束,我们只需获取日志并退出。如果
logging_interval不为None,说明 Pod 可能仍在运行,此时我们仅抓取最新日志并再次交给触发器延迟执行。
- post_complete_action(*, pod, remote_pod, context, result, **kwargs)[source]¶
当 operator 完成 deferrable_execution 逻辑后必须执行的后置操作。
- on_kill()[source]¶
当任务实例被终止时,重写此方法以清理子进程。
在算子内部使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。