airflow.providers.cncf.kubernetes.operators.spark_kubernetes

模块内容

SparkKubernetesOperator

在 Kubernetes 集群中创建 sparkApplication 对象。

class airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator(*, image=None, code_path=None, namespace='default', name=None, application_file=None, template_spec=None, get_logs=True, do_xcom_push=False, success_run_history_limit=1, startup_timeout_seconds=600, log_events_on_failure=False, reattach_on_restart=True, delete_on_termination=True, kubernetes_conn_id='kubernetes_default', random_name_suffix=True, **kwargs)[源]

基类: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

在 Kubernetes 集群中创建 sparkApplication 对象。

另请参阅

有关 Spark Application Object 的更多详细信息,请参阅参考资料: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication

参数
  • image (str | None) – 你希望启动的 Docker 镜像。默认为 hub.docker.com,

  • code_path (str | None) – 镜像中 spark 代码的路径,

  • namespace (str) – 将 sparkApplication 放置到的 Kubernetes 命名空间

  • name (str | None) – 任务将在其中运行的 Pod 的名称,将用于(如果 random_name_suffix 为 True,则加上随机后缀)生成 Pod ID(DNS-1123 子域,仅包含 [a-z0-9.-])。

  • application_file (str | None) – sparkApplication 的 Kubernetes custom_resource_definition 的文件路径

  • template_spec – Kubernetes sparkApplication 规范

  • get_logs (bool) – 获取容器的 stdout 作为任务的日志。

  • do_xcom_push (bool) – 如果为 True,容器完成时,容器中文件 /airflow/xcom/return.json 的内容也将被推送到 XCom。

  • success_run_history_limit (int) – 要保留的应用的过去成功运行次数。

  • startup_timeout_seconds – 启动 Pod 的超时时间(秒)。

  • log_events_on_failure (bool) – 如果发生故障,则记录 Pod 的事件

  • reattach_on_restart (bool) – 如果调度程序在 Pod 运行时死亡,则重新附加并监控

  • delete_on_termination (bool) – 当 Pod 达到其最终状态或执行中断时,该如何处理。 如果为 True(默认值),则删除 Pod;如果为 False,则保留 Pod。

  • kubernetes_conn_id (str) – 与 Kubernetes 集群的连接

  • random_name_suffix (bool) – 如果为 True,则在 Pod 名称中添加随机后缀

property template_body[源]

CustomObjectLauncher 的模板化主体。

template_fields = ['application_file', 'namespace', 'template_spec', 'kubernetes_conn_id'][源]
template_fields_renderers[源]
template_ext = ('yaml', 'yml', 'json')[源]
ui_color = '#f4a460'[源]
BASE_CONTAINER_NAME = 'spark-kubernetes-driver'[源]
manage_template_specs()[源]
create_job_name()[source]
static create_labels_for_pod(context=None, include_try_number=True)[source]

生成 Pod 的标签,以便在 Operator 崩溃时跟踪 Pod。

参数
  • include_try_number ( bool) – 将尝试次数添加到标签中

  • context ( dict | None) – airflow DAG 提供的任务上下文

返回

dict。

返回类型

dict

pod_manager()[source]
find_spark_job(context)[source]
get_or_create_spark_crd(launcher, context)[source]
process_pod_deletion(pod, *, reraise=True)[source]
hook()[source]
client()[source]
custom_obj_api()[source]
execute(context)[source]

根据 deferrable 参数,异步或同步运行 Pod。

on_kill()[source]

当任务实例被终止时,覆盖此方法以清理子进程。

Operator 中对 threading、subprocess 或 multiprocessing 模块的任何使用都需要清理,否则会留下僵尸进程。

patch_already_checked(pod, *, reraise=True)[source]

添加一个“已检查”的注释,以确保我们在重试时不会重新附加。

dry_run()[source]

打印出此 Operator 将创建的 Spark 作业。

此条目是否有帮助?