airflow.providers.google.cloud.hooks.dataproc

此模块包含一个 Google Cloud Dataproc Hook。

异常

DataprocResourceIsNotReadyError

在资源未准备好创建 Dataproc 集群时抛出此异常。

DataProcJobBuilder

用于构建 Dataproc 作业的辅助类。

DataprocHook

Google Cloud Dataproc API。

DataprocAsyncHook

与 Google Cloud Dataproc API 的异步交互。

模块内容

exception airflow.providers.google.cloud.hooks.dataproc.DataprocResourceIsNotReadyError[源码]

基类: airflow.exceptions.AirflowException

在资源未准备好创建 Dataproc 集群时抛出此异常。

class airflow.providers.google.cloud.hooks.dataproc.DataProcJobBuilder(project_id, task_id, cluster_name, job_type, properties=None)[源码]

用于构建 Dataproc 作业的辅助类。

job_type[源码]
job: dict[str, Any][源码]
add_labels(labels=None)[源码]

设置 Dataproc 作业的标签。

参数:

labels (dict | None) – 作业查询的标签。

add_variables(variables=None)[源码]

设置 Dataproc 作业的变量。

参数:

variables (dict | None) – 作业查询的变量。

add_args(args=None)[源码]

设置 Dataproc 作业的参数 (args)。

参数:

args (list[str] | None) – 作业查询的参数 (args)。

add_query(query)[源码]

添加 Dataproc 作业的查询。

参数:

query (str | list[str]) – 作业的查询。

add_query_uri(query_uri)[源码]

设置 Dataproc 作业的查询 URI。

参数:

query_uri (str) – 作业查询的 URI。

add_jar_file_uris(jars=None)[源码]

设置 Dataproc 作业的 JAR URI。

参数:

jars (list[str] | None) – JAR URI 列表

add_archive_uris(archives=None)[源码]

设置 Dataproc 作业的 archives URI。

参数:

archives (list[str] | None) – archives URI 列表

add_file_uris(files=None)[源码]

设置 Dataproc 作业的文件 URI。

参数:

files (list[str] | None) – 文件 URI 列表

add_python_file_uris(pyfiles=None)[源码]

设置 Dataproc 作业的 Python 文件 URI。

参数:

pyfiles (list[str] | None) – Python 文件 URI 列表

set_main(main_jar=None, main_class=None)[源码]

设置 Dataproc 主类。

参数:
  • main_jar (str | None) – 主文件的 URI。

  • main_class (str | None) – 主类的名称。

抛出:

ValueError

set_python_main(main)[源码]

设置 Dataproc 主 Python 文件 URI。

参数:

main (str) – Python 主文件的 URI。

set_job_name(name)[源码]

设置 Dataproc 作业名称。

作业名称会被清理,将点替换为下划线。

参数:

name (str) – 作业名称。

build()[源码]

返回 Dataproc 作业。

返回:

Dataproc 作业

返回类型:

dict

class airflow.providers.google.cloud.hooks.dataproc.DataprocHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Google Cloud Dataproc API。

在 Hook 中使用 project_id 的所有方法都必须使用关键字参数而不是位置参数调用。

get_cluster_client(region=None)[源码]

创建一个 ClusterControllerClient。

get_template_client(region=None)[源码]

创建一个 WorkflowTemplateServiceClient。

get_job_client(region=None)[源码]

创建一个 JobControllerClient。

get_batch_client(region=None)[源码]

创建一个 BatchControllerClient。

get_operations_client(region)[源码]

创建一个 OperationsClient。

dataproc_options_to_args(options)[源码]

从参数字典返回格式化的集群参数。

参数:

options (dict) – 包含选项的字典

返回:

参数列表

返回类型:

list[str]

wait_for_operation(operation, timeout=None, result_retry=DEFAULT)[源码]

等待长时间运行的操作完成。

create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[源码]

在指定项目中创建一个集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 要创建的集群名称。

  • labels (dict[str, str] | None) – 将分配给所创建集群的标签。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要创建的集群配置。如果提供的是字典,其形式必须与 protobuf 消息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虚拟集群配置,用于创建不直接控制底层计算资源的 Dataproc 集群时,例如使用 VirtualClusterConfig 创建 Dataproc-on-GKE 集群时。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器收到两个 CreateClusterRequest 请求具有相同的 ID,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的某个操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[源码]

在项目中删除一个集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 要删除的集群名称。

  • cluster_uuid (str | None) – 如果指定,则如果具有该 UUID 的集群不存在,RPC 应该失败。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器收到两个 DeleteClusterRequest 请求具有相同的 ID,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的某个操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[源码]

获取集群诊断信息。

操作完成后,响应包含诊断输出报告的 Cloud Storage URI,其中包含收集的诊断信息的摘要。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • tarball_gcs_dir (str | None) – 诊断 tarball 的输出 Cloud Storage 目录。如果未指定,将使用集群暂存存储分区中的任务特定目录。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 应在集群上执行诊断的时间间隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要执行诊断的作业列表。格式: projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要执行诊断的 yarn 应用列表。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[源码]

获取项目中集群的资源表示。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[源码]

列出项目中的所有 regions/{region}/clusters。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • filter – 用于约束集群。区分大小写。

  • page_size (int | None) – 底层 API 响应中包含的最大资源数。如果按资源执行分页流,则此参数不影响返回值。如果按页执行分页流,则此参数确定一页中的最大资源数。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[源码]

在项目中更新一个集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 集群的更改。如果提供了字典,则其形式必须与 protobuf 消息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定相对于 Cluster 要更新字段的路径。例如,要将集群中的 worker 数量更改为 5,应指定为 config.worker_config.num_instances,并且 PATCH 请求正文将指定新值。

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同样,要将集群中的抢占式 worker 数量更改为 5,应指定为 config.secondary_worker_config.num_instances,并且 PATCH 请求正文将指定新值。

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供了字典,则其形式必须与 protobuf 消息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    优雅 YARN 退役的超时时间。优雅退役允许从集群中移除节点,而不会中断正在进行的作业。超时指定在强制移除节点(并可能中断作业)之前等待正在进行的作业完成的时间。默认超时为 0(表示强制退役),允许的最大超时时间为一天。

    仅支持 Dataproc 映像版本 1.2 及更高版本。

    如果提供了字典,则其形式必须与 protobuf 消息 Duration 相同。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 UpdateClusterRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

start_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一个项目中启动集群。

参数:
  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • cluster_name (str) – 集群名称。

  • cluster_uuid (str | None) – 集群 UUID

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 UpdateClusterRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

返回:

google.api_core.operation.Operation 的一个实例

返回类型:

google.api_core.operation.Operation

stop_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一个项目中启动集群。

参数:
  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • cluster_name (str) – 集群名称。

  • cluster_uuid (str | None) – 集群 UUID

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 UpdateClusterRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

返回:

google.api_core.operation.Operation 的一个实例

返回类型:

google.api_core.operation.Operation

create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

创建一个新的工作流模板。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要创建的 Dataproc 工作流模板。如果提供了字典,则其形式必须与 protobuf 消息 WorkflowTemplate 相同。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

实例化模板并开始执行。

参数:
  • template_name (str) – 要实例化的模板名称。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • version (int | None) – 要实例化的工作流模板版本。如果指定,仅当工作流模板的当前版本与提供的版本相同时,才会实例化工作流。此选项不能用于实例化工作流模板的先前版本。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • parameters (dict[str, str] | None) – 参数名称到应使用的值的映射。值不能超过 100 个字符。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

实例化模板并开始执行。

参数:
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要实例化的工作流模板。如果提供了字典,则其形式必须与 protobuf 消息 WorkflowTemplate 相同。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

wait_for_job(job_id, project_id, region, wait_time=10, timeout=None)[source]

轮询作业以检查其是否已完成。

参数:
  • job_id (str) – Dataproc 作业 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • wait_time (int) – 两次检查之间的秒数。

  • timeout (int | None) – 等待作业就绪的秒数。

get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

获取项目中作业的资源表示。

参数:
  • job_id (str) – Dataproc 作业 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

向集群提交作业。

参数:
  • job (dict | google.cloud.dataproc_v1.Job) – 作业资源。如果提供了字典,则其形式必须与 protobuf 消息 Job 相同。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

启动作业取消请求。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str | None) – 处理请求的 Cloud Dataproc 区域。

  • job_id (str) – 作业 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

创建批处理工作负载。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要创建的批处理。

  • batch_id (str | None) – 要用于批处理的 ID,它将成为批处理资源名称的最后一个组成部分。此值的长度必须为 4-63 个字符。有效字符为 [a-z][0-9]-

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 CreateBatchRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

删除批处理工作负载资源。

参数:
  • batch_id (str) – 批处理 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

获取批处理工作负载资源的表示。

参数:
  • batch_id (str) – 批处理 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批处理工作负载。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • page_size (int | None) – 每个响应中返回的最大批处理数量。服务返回的数量可能少于此值。默认页面大小为 20;最大页面大小为 1000。

  • page_token (str | None) – 从先前的 ListBatches 调用接收到的页面令牌。提供此令牌以检索后续页面。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

  • filter (str | None) – 如 ListBatchesRequest 中指定的筛选结果。

  • order_by (str | None) – 如 ListBatchesRequest 中指定的排序结果方式。

wait_for_batch(batch_id, region, project_id, wait_check_interval=10, retry=DEFAULT, timeout=None, metadata=())[source]

等待批处理作业完成。

提交批处理作业后,Operator 会等待作业完成。然而,当 Airflow 重启或任务 PID 因任何原因被终止时,此 hook 非常有用。在这种情况下,创建会再次发生,捕获抛出的 AlreadyExists 异常,并失败到此函数以等待完成。

参数:
  • batch_id (str) – 批处理 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • wait_check_interval (int) – 两次检查作业完成情况之间暂停的时间。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

check_error_for_resource_is_not_ready_msg(error_msg)[source]

检查错误原因是否为资源尚未准备就绪。

class airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

与 Google Cloud Dataproc API 的异步交互。

在 Hook 中使用 project_id 的所有方法都必须使用关键字参数而不是位置参数调用。

get_cluster_client(region=None)[source]

创建 ClusterControllerAsyncClient。

get_template_client(region=None)[source]

创建 WorkflowTemplateServiceAsyncClient。

get_job_client(region=None)[source]

创建 JobControllerAsyncClient。

get_batch_client(region=None)[source]

创建 BatchControllerAsyncClient。

get_operations_client(region)[source]

创建一个 OperationsClient。

async create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一个项目中创建集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 要创建的集群名称。

  • labels (dict[str, str] | None) – 将分配给所创建集群的标签。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要创建的集群配置。如果提供的是字典,其形式必须与 protobuf 消息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虚拟集群配置,用于创建不直接控制底层计算资源的 Dataproc 集群时,例如使用 VirtualClusterConfig 创建 Dataproc-on-GKE 集群时。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器收到两个 CreateClusterRequest 请求具有相同的 ID,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的某个操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在项目中删除一个集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 要删除的集群名称。

  • cluster_uuid (str | None) – 如果指定,则如果具有该 UUID 的集群不存在,RPC 应该失败。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器收到两个 DeleteClusterRequest 请求具有相同的 ID,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的某个操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[source]

获取集群诊断信息。

操作完成后,响应包含诊断输出报告的 Cloud Storage URI,其中包含收集的诊断信息的摘要。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • tarball_gcs_dir (str | None) – 诊断 tarball 的输出 Cloud Storage 目录。如果未指定,将使用集群暂存存储分区中的任务特定目录。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 应在集群上执行诊断的时间间隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要执行诊断的作业列表。格式: projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要执行诊断的 yarn 应用列表。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

获取项目中集群的资源表示。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[source]

列出项目中的所有 regions/{region}/clusters。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • filter – 用于约束集群。区分大小写。

  • page_size (int | None) – 底层 API 响应中包含的最大资源数。如果按资源执行分页流,则此参数不影响返回值。如果按页执行分页流,则此参数确定一页中的最大资源数。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在项目中更新一个集群。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • cluster_name (str) – 集群名称。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 集群的更改。如果提供了字典,则其形式必须与 protobuf 消息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定相对于 Cluster 要更新字段的路径。例如,要将集群中的 worker 数量更改为 5,应指定为 config.worker_config.num_instances,并且 PATCH 请求正文将指定新值。

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同样,要将集群中的抢占式 worker 数量更改为 5,应指定为 config.secondary_worker_config.num_instances,并且 PATCH 请求正文将指定新值。

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供了字典,则其形式必须与 protobuf 消息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    优雅 YARN 退役的超时时间。优雅退役允许从集群中移除节点,而不会中断正在进行的作业。超时指定在强制移除节点(并可能中断作业)之前等待正在进行的作业完成的时间。默认超时为 0(表示强制退役),允许的最大超时时间为一天。

    仅支持 Dataproc 映像版本 1.2 及更高版本。

    如果提供了字典,则其形式必须与 protobuf 消息 Duration 相同。

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 UpdateClusterRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

创建一个新的工作流模板。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要创建的 Dataproc 工作流模板。如果提供了字典,则其形式必须与 protobuf 消息 WorkflowTemplate 相同。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

实例化模板并开始执行。

参数:
  • template_name (str) – 要实例化的模板名称。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • version (int | None) – 要实例化的工作流模板版本。如果指定,仅当工作流模板的当前版本与提供的版本相同时,才会实例化工作流。此选项不能用于实例化工作流模板的先前版本。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • parameters (dict[str, str] | None) – 参数名称到应使用的值的映射。值不能超过 100 个字符。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

实例化模板并开始执行。

参数:
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要实例化的工作流模板。如果提供了字典,则其形式必须与 protobuf 消息 WorkflowTemplate 相同。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async get_operation(region, operation_name)[source]
async get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

获取项目中作业的资源表示。

参数:
  • job_id (str) – Dataproc 作业 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

向集群提交作业。

参数:
  • job (dict | google.cloud.dataproc_v1.Job) – 作业资源。如果提供了字典,则其形式必须与 protobuf 消息 Job 相同。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • request_id (str | None) – 用于防止多个具有相同标签的并发工作流实例运行的标签。这可减轻因重试而启动并发实例的风险。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

启动作业取消请求。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str | None) – 处理请求的 Cloud Dataproc 区域。

  • job_id (str) – 作业 ID。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

创建批处理工作负载。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要创建的批处理。

  • batch_id (str | None) – 要用于批处理的 ID,它将成为批处理资源名称的最后一个组成部分。此值的长度必须为 4-63 个字符。有效字符为 [a-z][0-9]-

  • request_id (str | None) – 用于标识请求的唯一 ID。如果服务器接收到两个具有相同 ID 的 CreateBatchRequest 请求,则第二个请求将被忽略,并返回为第一个请求创建并存储在后端的操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

删除批处理工作负载资源。

参数:
  • batch_id (str) – 批处理 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

获取批处理工作负载资源的表示。

参数:
  • batch_id (str) – 批处理 ID。

  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

async list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批处理工作负载。

参数:
  • project_id (str) – 集群所属的 Google Cloud 项目 ID。

  • region (str) – 用于处理请求的 Cloud Dataproc 区域。

  • page_size (int | None) – 每个响应中返回的最大批处理数量。服务返回的数量可能少于此值。默认页面大小为 20;最大页面大小为 1000。

  • page_token (str | None) – 从先前的 ListBatches 调用接收到的页面令牌。提供此令牌以检索后续页面。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用于重试请求的重试对象。如果为 None,则不会重试请求。

  • timeout (float | None) – 等待请求完成的时间量(以秒为单位)。如果指定了 retry,则超时适用于每次单独尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供给方法的附加元数据。

  • filter (str | None) – 如 ListBatchesRequest 中指定的筛选结果。

  • order_by (str | None) – 如 ListBatchesRequest 中指定的排序结果方式。

此条目有帮助吗?