Google Cloud Dataproc 算子

Dataproc 是一种托管的 Apache Spark 和 Apache Hadoop 服务,可让您利用开源数据工具进行批量处理、查询、流式处理和机器学习。Dataproc 自动化功能可帮助您快速创建集群、轻松管理它们,并在不需要集群时关闭它们,从而节省资金。

有关该服务的更多信息,请访问 Dataproc 产品文档 <产品文档

先决条件任务

要使用这些算子,您必须执行以下几项操作

创建集群

创建 Dataproc 集群时,您可以选择 Compute Engine 作为部署平台。在此配置中,Dataproc 会自动配置运行集群所需的 Compute Engine 虚拟机实例。虚拟机实例用于主节点、主工作节点和辅助工作节点(如果指定)。这些虚拟机实例由 Compute Engine 创建和管理,而 Dataproc 则负责配置大数据处理任务所需的软件和编排。通过提供节点的配置,您可以描述主节点和辅助节点的配置以及 Compute Engine 实例集群的状态。配置辅助工作节点时,您可以指定工作节点的数量及其类型。通过启用“抢占式”选项,对这些节点使用抢占式虚拟机(相当于 Spot 实例),您可以利用这些实例为您的 Dataproc 工作负载带来的成本节约。主节点通常托管集群主服务和各种控制服务,它没有“抢占式”选项,因为主节点必须保持稳定性和可用性。创建集群后,配置设置(包括辅助工作节点的抢占性)无法直接修改。

有关创建集群时可传递的可用字段的更多信息,请访问 Dataproc 创建集群 API。

集群配置可能如下所示

tests/system/google/cloud/dataproc/example_dataproc_hive.py[源代码]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

通过此配置,我们可以创建集群: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py[源代码]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

GKE 上的 Dataproc 在 GKE 集群上部署 Dataproc 虚拟集群。与 Compute Engine 集群上的 Dataproc 不同,GKE 上的 Dataproc 虚拟集群不包括单独的主虚拟机和工作虚拟机。相反,当您在 GKE 上创建 Dataproc 虚拟集群时,GKE 上的 Dataproc 会在 GKE 集群内创建节点池。GKE 上的 Dataproc 作业将作为这些节点池上的 Pod 运行。节点池以及 Pod 在节点池上的调度由 GKE 管理。

创建 GKE Dataproc 集群时,您可以指定基础计算资源是否使用抢占式虚拟机。GKE 支持使用抢占式虚拟机作为一种节省成本的措施。通过启用抢占式虚拟机,GKE 将使用抢占式虚拟机配置集群节点。或者,您可以将节点创建为 Spot 虚拟机实例,这是对传统抢占式虚拟机的最新更新。这对于在 GKE 上运行 Dataproc 工作负载,同时优化成本非常有用。

要在 Google Kubernetes Engine 中创建 Dataproc 集群,您可以传递集群配置

tests/system/google/cloud/dataproc/example_dataproc_gke.py[源代码]


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": False,
                            "machine_type": "e2-standard-4",
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

通过此配置,我们可以创建集群: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_gke.py[源代码]

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

您还可以使用可选组件 Presto 创建 Dataproc 集群。为此,请使用以下配置。请注意,默认映像可能不支持所选的可选组件。如果是这种情况,请指定正确的 image_version,您可以在文档中找到

tests/system/google/cloud/dataproc/example_dataproc_presto.py[源代码]

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

您还可以使用可选组件 Trino 创建 Dataproc 集群。为此,请使用以下配置。请注意,默认映像可能不支持所选的可选组件。如果是这种情况,请指定正确的 image_version,您可以在文档中找到

tests/system/google/cloud/dataproc/example_dataproc_trino.py[源代码]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

您可以为此操作使用可延迟模式,以便异步运行算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

生成集群配置

您还可以使用功能性 API 生成 CLUSTER_CONFIG,这可以使用 ClusterGeneratormake() 轻松完成。您可以生成并按如下方式使用配置

tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py[源代码]

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[GCS_INIT_FILE],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
    internal_ip_only=False,
).make()

诊断集群

Dataproc 支持收集集群诊断信息,如系统、Spark、Hadoop 和 Dataproc 日志,以及可用于排查 Dataproc 集群或作业的集群配置文件。请务必注意,此信息只能在删除集群之前收集。有关诊断集群时可传递的可用字段的更多信息,请访问 Dataproc 诊断集群 API。

要诊断 Dataproc 集群,请使用: DataprocDiagnoseClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[源代码]

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
    )

您还可以使用可延迟模式,以便异步运行算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[源代码]

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        deferrable=True,
    )

更新集群

您可以通过提供集群配置和 updateMask 来扩大或缩小集群。在 updateMask 参数中,您可以指定要更新的字段的路径,该路径相对于集群。有关 updateMask 和其他参数的更多信息,请查看 Dataproc 更新集群 API。

新集群配置和 updateMask 的示例

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[源代码]

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

要更新集群,您可以使用: DataprocUpdateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[源代码]

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

您可以为此操作使用可延迟模式,以便异步运行算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

启动集群

要启动集群,您可以使用 DataprocStartClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[源代码]

start_cluster = DataprocStartClusterOperator(
    task_id="start_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

停止集群

要停止集群,您可以使用 DataprocStopClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[源代码]

stop_cluster = DataprocStopClusterOperator(
    task_id="stop_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

删除集群

要删除集群,您可以使用: DataprocDeleteClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py[源代码]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

您可以为此操作使用可延迟模式,以便异步运行算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

向集群提交作业

Dataproc 支持提交不同大数据组件的作业。当前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。有关版本和映像的更多信息,请查看 Cloud Dataproc 映像版本列表

要向集群提交作业,您需要提供作业源文件。作业源文件可以位于 GCS、集群或本地文件系统上。您可以指定 file:/// 路径来引用集群主节点上的本地文件。

可以使用以下方式提交作业配置:DataprocSubmitJobOperator

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[源代码]

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

提交作业配置的示例

我们为下面的每个框架提供了一个示例。作业中可以提供的参数比示例中显示的更多。有关完整的参数列表,请查看 DataProc 作业参数

PySpark 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[源代码]

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}

SparkSQl 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py[源代码]

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Spark 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark.py[源代码]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

可延迟模式 下运行的 Spark 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py[源代码]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Hive 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_hive.py[源代码]

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Hadoop 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_hadoop.py[源代码]

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Pig 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_pig.py[源代码]

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

SparkR 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_sparkr.py[源代码]

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}

Presto 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_presto.py[源代码]

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Trino 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_trino.py[源代码]

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Flink 作业的配置示例

tests/system/google/cloud/dataproc/example_dataproc_flink.py[源代码]

FLINK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "flink_job": {
        "main_class": "org.apache.flink.examples.java.wordcount.WordCount",
        "jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
    },
}

使用工作流模板

Dataproc 支持创建稍后可以触发的工作流模板。

可以使用以下方式创建工作流模板:DataprocCreateWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

创建工作流后,用户可以使用 DataprocInstantiateWorkflowTemplateOperator 触发它

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

此外,对于所有这些操作,您可以在可延迟模式下使用运算符

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[源代码]

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

内联运算符是另一种选择。它创建一个工作流,运行它,然后将其删除: DataprocInstantiateInlineWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

此外,对于所有这些操作,您可以在可延迟模式下使用运算符

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[源代码]

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

创建批处理

Dataproc 支持创建批处理工作负载。

可以使用以下方式创建批处理:DataprocCreateBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

要创建带有持久历史服务器的批处理,首先您应该创建一个带有特定参数的 Dataproc 集群。有关如何创建集群的文档,您可以在此处找到

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[源代码]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

创建集群后,您应该将其添加到批处理配置中。

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

要检查操作是否成功,您可以使用 DataprocBatchSensor

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

此外,对于所有这些操作,您可以在可延迟模式下使用运算符

tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

获取批处理

要获取批处理,您可以使用:DataprocGetBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

列出批处理

要获取现有批处理的列表,您可以使用:DataprocListBatchesOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

删除批处理

要删除批处理,您可以使用:DataprocDeleteBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

取消批处理操作

要取消操作,您可以使用:DataprocCancelOperationOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[源代码]

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)

参考

有关更多信息,请查看

此条目是否有帮助?