airflow.providers.cncf.kubernetes.operators.spark_kubernetes

SparkKubernetesOperator

在 Kubernetes 集群中创建 sparkApplication 对象。

模块内容

class airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator(*, image=None, code_path=None, namespace='default', name=None, application_file=None, template_spec=None, get_logs=True, do_xcom_push=False, success_run_history_limit=1, startup_timeout_seconds=600, log_events_on_failure=False, reattach_on_restart=True, delete_on_termination=True, kubernetes_conn_id='kubernetes_default', random_name_suffix=True, **kwargs)[source]

基类: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

在 Kubernetes 集群中创建 sparkApplication 对象。

另请参阅

有关 Spark Application 对象的更多详细信息,请参阅参考资料: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication

参数:
  • image (str | None) – 希望启动的 Docker 镜像。默认为 hub.docker.com,

  • code_path (str | None) – 镜像中 spark 代码的路径,

  • namespace (str) – 放置 sparkApplication 的 Kubernetes 命名空间

  • name (str | None) – 任务运行所在的 Pod 名称,将用于(如果 random_name_suffix 为 True,则加上一个随机后缀)生成 Pod ID (DNS-1123 子域名,仅包含 [a-z0-9.-])。

  • application_file (str | None) – sparkApplication 的 Kubernetes custom_resource_definition 文件路径

  • template_spec – Kubernetes sparkApplication 规范

  • get_logs (bool) – 将容器的标准输出作为任务日志获取。

  • do_xcom_push (bool) – 如果为 True,容器中 /airflow/xcom/return.json 文件的内容也会在容器完成后推送到 XCom。

  • success_run_history_limit (int) – 要保留的应用程序过去成功运行的次数。

  • startup_timeout_seconds – 启动 Pod 的超时时间(秒)。

  • log_events_on_failure (bool) – 如果发生故障,则记录 Pod 的事件

  • reattach_on_restart (bool) – 如果调度器在 Pod 运行时终止,则重新连接并监控

  • delete_on_termination (bool) – 当 Pod 达到最终状态或执行中断时如何处理。如果为 True(默认),删除 Pod;如果为 False,保留 Pod。

  • kubernetes_conn_id (str) – 到 Kubernetes 集群的连接

  • random_name_suffix (bool) – 如果为 True,则在 Pod 名称后添加随机后缀

template_fields = ['application_file', 'namespace', 'template_spec', 'kubernetes_conn_id'][source]
template_fields_renderers[source]
template_ext = ('yaml', 'yml', 'json')[source]
ui_color = '#f4a460'[source]
BASE_CONTAINER_NAME = 'spark-kubernetes-driver'[source]
image = None[source]
code_path = None[source]
application_file = None[source]
template_spec = None[source]
kubernetes_conn_id = 'kubernetes_default'[source]
startup_timeout_seconds = 600[source]
reattach_on_restart = True[source]
delete_on_termination = True[source]
do_xcom_push = False[source]
namespace = 'default'[source]
get_logs = True[source]
log_events_on_failure = False[source]
success_run_history_limit = 1[source]
random_name_suffix = True[source]
base_container_name: str[source]
container_logs: list[str][source]
manage_template_specs()[source]
create_job_name()[source]
property pod_manager: airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager[source]
property template_body[source]

用于 CustomObjectLauncher 的模板化主体。

find_spark_job(context, exclude_checked=True)[source]
get_or_create_spark_crd(launcher, context)[source]
process_pod_deletion(pod, *, reraise=True)[source]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[source]
property client: kubernetes.client.CoreV1Api[source]
property custom_obj_api: kubernetes.client.CustomObjectsApi[source]
execute(context)[source]

根据 deferrable 参数异步或同步运行 Pod。

on_kill()[source]

重写此方法,以便在任务实例被终止时清理子进程。

Operator 中任何使用 threading、subprocess 或 multiprocessing 模块的地方都需要清理,否则会留下僵尸进程。

patch_already_checked(pod, *, reraise=True)[source]

添加一个“已检查”注解,确保重试时不会重新连接。

dry_run()[source]

打印出该 Operator 将创建的 Spark Job。

此条目有帮助吗?