Google Cloud Dataproc Operator¶
Dataproc 是一项托管式 Apache Spark 和 Apache Hadoop 服务,可让您利用开源数据工具进行批处理、查询、流处理和机器学习。Dataproc 自动化功能可帮助您快速创建集群、轻松管理集群,并在不再需要集群时将其关闭以节省成本。
有关此服务的更多信息,请访问 Dataproc 生产文档
前置任务¶
要使用这些 Operator,您必须做几件事
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算功能,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装提供了详细信息。
创建集群¶
创建 Dataproc 集群时,可以选择 Compute Engine 作为部署平台。在此配置中,Dataproc 会自动预置运行集群所需的 Compute Engine VM 实例。VM 实例用于主节点、主工作节点和次要工作节点(如果指定)。这些 VM 实例由 Compute Engine 创建和管理,而 Dataproc 负责配置大数据处理任务所需的软件和编排。通过提供节点的配置,您可以描述主节点和次要节点的配置,以及 Compute Engine 实例集群的状态。配置次要工作节点时,您可以指定工作节点的数量及其类型。通过启用“抢占式”选项以对这些节点使用抢占式 VM(等同于 Spot 实例),您可以利用这些实例为您的 Dataproc 工作负载带来的成本节约。主节点通常托管集群主控和各种控制服务,不具备抢占式选项,因为主节点保持稳定性和可用性至关重要。集群创建后,配置设置(包括次要工作节点的抢占性)无法直接修改。
有关创建集群时可传递的可用字段的更多信息,请访问 Dataproc create cluster API。
集群配置示例如下
tests/system/google/cloud/dataproc/example_dataproc_hive.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 32,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}
使用此配置,我们可以创建集群: DataprocCreateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_hive.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
GKE 上的 Dataproc 将 Dataproc 虚拟集群部署到 GKE 集群上。与 Compute Engine 集群上的 Dataproc 不同,GKE 上的 Dataproc 虚拟集群不包含单独的主 VM 和工作器 VM。相反,当您创建 GKE 上的 Dataproc 虚拟集群时,GKE 上的 Dataproc 会在 GKE 集群内创建节点池。GKE 上的 Dataproc 作业以 Pod 的形式在这些节点池上运行。节点池和 Pod 在节点池上的调度由 GKE 管理。
创建 GKE Dataproc 集群时,您可以指定底层计算资源使用抢占式 VM。GKE 支持使用抢占式 VM 作为节省成本的措施。通过启用抢占式 VM,GKE 将使用抢占式 VM 预置集群节点。或者,您可以将节点创建为 Spot VM 实例,这是旧版抢占式 VM 的最新更新。这有助于在 GKE 上运行 Dataproc 工作负载同时优化成本。
要在 Google Kubernetes Engine 中创建 Dataproc 集群,您可以传递集群配置
tests/system/google/cloud/dataproc/example_dataproc_gke.py
VIRTUAL_CLUSTER_CONFIG = {
"kubernetes_cluster_config": {
"gke_cluster_config": {
"gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
"node_pool_target": [
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
"roles": ["DEFAULT"],
"node_pool_config": {
"config": {
"preemptible": False,
"machine_type": "e2-standard-4",
}
},
}
],
},
"kubernetes_software_config": {"component_version": {"SPARK": "3"}},
},
"staging_bucket": "test-staging-bucket",
}
使用此配置,我们可以创建集群: DataprocCreateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_gke.py
create_cluster_in_gke = DataprocCreateClusterOperator(
task_id="create_cluster_in_gke",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
您还可以创建带有可选组件 Presto 的 Dataproc 集群。为此,请使用以下配置。请注意,默认镜像可能不支持所选的可选组件。如果出现这种情况,请指定您可以在文档中找到的正确 image_version
。
tests/system/google/cloud/dataproc/example_dataproc_presto.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"PRESTO",
],
"image_version": "2.0",
},
}
您还可以创建带有可选组件 Trino 的 Dataproc 集群。为此,请使用以下配置。请注意,默认镜像可能不支持所选的可选组件。如果出现这种情况,请指定您可以在文档中找到的正确 image_version
。
tests/system/google/cloud/dataproc/example_dataproc_trino.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"TRINO",
],
"image_version": "2.1",
},
}
您可以对此操作使用可延迟模式,以便异步运行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
生成集群配置¶
您还可以使用函数式 API 生成 CLUSTER_CONFIG,使用 ClusterGenerator
的 make() 可以轻松完成此操作。您可以按以下方式生成并使用配置
tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone=ZONE,
master_machine_type="n1-standard-4",
master_disk_size=32,
worker_machine_type="n1-standard-4",
worker_disk_size=32,
num_workers=2,
storage_bucket=BUCKET_NAME,
init_actions_uris=[GCS_INIT_FILE],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
num_preemptible_workers=1,
preemptibility="PREEMPTIBLE",
internal_ip_only=False,
).make()
诊断集群¶
Dataproc 支持收集集群诊断信息,如系统、Spark、Hadoop 和 Dataproc 日志、集群配置文件,这些信息可用于排查 Dataproc 集群或作业的故障。请务必注意,这些信息只能在删除集群之前收集。有关诊断集群时可传递的可用字段的更多信息,请访问 Dataproc diagnose cluster API。
要诊断 Dataproc 集群,请使用: DataprocDiagnoseClusterOperator
。
tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
diagnose_cluster = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
)
您也可以使用可延迟模式以异步运行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster_deferrable",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
deferrable=True,
)
更新集群¶
您可以通过提供集群配置和 updateMask 来向上或向下扩展集群。在 updateMask 参数中,您指定了相对于 Cluster 的字段路径以进行更新。有关 updateMask 和其他参数的更多信息,请参阅 Dataproc update cluster API。
新集群配置和 updateMask 示例如下
tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
CLUSTER_UPDATE = {
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}
要更新集群,您可以使用: DataprocUpdateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
scale_cluster = DataprocUpdateClusterOperator(
task_id="scale_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
您可以对此操作使用可延迟模式,以便异步运行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
update_cluster = DataprocUpdateClusterOperator(
task_id="update_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
启动集群¶
要启动集群,您可以使用 DataprocStartClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
start_cluster = DataprocStartClusterOperator(
task_id="start_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
停止集群¶
要停止集群,您可以使用 DataprocStopClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
stop_cluster = DataprocStopClusterOperator(
task_id="stop_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
删除集群¶
要删除集群,您可以使用: DataprocDeleteClusterOperator
。
tests/system/google/cloud/dataproc/example_dataproc_hive.py
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
)
您可以对此操作使用可延迟模式,以便异步运行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
deferrable=True,
)
向集群提交作业¶
Dataproc 支持提交不同大数据组件的作业。当前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。有关版本和镜像的更多信息,请参阅 Cloud Dataproc 镜像版本列表
要向集群提交作业,您需要提供作业源文件。作业源文件可以位于 GCS、集群或本地文件系统上。您可以指定 file:/// 路径来引用集群主节点上的本地文件。
可以使用以下 Operator 提交作业配置: DataprocSubmitJobOperator
。
tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
要提交的作业配置示例¶
我们在下面为每个框架提供了示例。作业中可提供的参数比示例所示更多。有关参数的完整列表,请参阅 DataProc 作业参数
PySpark 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}
SparkSQl 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
SPARK_SQL_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Spark 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark.py
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
在可延迟模式下运行的 Spark 作业配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
Hive 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_hive.py
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Hadoop 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
HADOOP_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
},
}
Pig 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_pig.py
PIG_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
SparkR 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}
Presto 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_presto.py
PRESTO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Trino 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_trino.py
TRINO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Flink 作业的配置示例
tests/system/google/cloud/dataproc/example_dataproc_flink.py
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
使用工作流模板¶
Dataproc 支持创建可以在稍后触发的工作流模板。
可以使用以下 Operator 创建工作流模板: DataprocCreateWorkflowTemplateOperator
。
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
region=REGION,
)
工作流创建后,用户可以使用 DataprocInstantiateWorkflowTemplateOperator
触发它
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
对于所有这些操作,您也可以在可延迟模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow_async",
region=REGION,
project_id=PROJECT_ID,
template_id=WORKFLOW_NAME,
deferrable=True,
)
内联 Operator 是另一种选择。它会创建工作流,运行它,并在之后删除它: DataprocInstantiateInlineWorkflowTemplateOperator
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)
对于所有这些操作,您也可以在可延迟模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template_async",
template=WORKFLOW_TEMPLATE,
region=REGION,
deferrable=True,
)
创建批处理¶
Dataproc 支持创建批处理工作负载。
可以使用以下 Operator 创建批处理: DataprocCreateBatchOperator
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_2 = DataprocCreateBatchOperator(
task_id="create_batch_2",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_3 = DataprocCreateBatchOperator(
task_id="create_batch_3",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_3,
asynchronous=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
要创建具有持久化历史服务器的批处理,首先您应该使用特定参数创建 Dataproc 集群。有关如何创建集群的文档,您可以在此处找到
tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster_for_phs",
project_id=PROJECT_ID,
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
集群创建后,您应该将其添加到批处理配置中。
tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch_with_phs",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
要检查操作是否成功,您可以使用 DataprocBatchSensor
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
batch_async_sensor = DataprocBatchSensor(
task_id="batch_async_sensor",
region=REGION,
project_id=PROJECT_ID,
batch_id=BATCH_ID_3,
poke_interval=10,
)
对于所有这些操作,您也可以在可延迟模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
deferrable=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
获取批处理¶
要获取批处理,您可以使用: DataprocGetBatchOperator
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
get_batch = DataprocGetBatchOperator(
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
列出批处理¶
要获取已存在的批处理列表,您可以使用: DataprocListBatchesOperator
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
list_batches = DataprocListBatchesOperator(
task_id="list_batches",
project_id=PROJECT_ID,
region=REGION,
)
删除批处理¶
要删除批处理,您可以使用: DataprocDeleteBatchOperator
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
delete_batch = DataprocDeleteBatchOperator(
task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)
取消批处理操作¶
要取消操作,您可以使用: DataprocCancelOperationOperator
。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
cancel_operation = DataprocCancelOperationOperator(
task_id="cancel_operation",
project_id=PROJECT_ID,
region=REGION,
operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)
参考¶
如需更多信息,请参阅