Google Cloud Dataproc Metastore 操作符

Dataproc Metastore 是运行在 Google Cloud 上的一个完全托管、高可用、自动修复的无服务器 Apache Hive Metastore (HMS)。它支持 HMS,作为管理关系实体元数据的关键组件,并为开源数据生态系统中的数据处理应用程序提供互操作性。

有关该服务的更多信息,请访问 Dataproc Metastore 产品文档 <产品文档

创建服务

在创建 Dataproc Metastore 服务之前,您需要定义该服务。有关创建服务时可传递的字段的更多信息,请访问 Dataproc Metastore 创建服务 API。

一个简单的服务配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

SERVICE = {
    "name": "test-service",
}

通过此配置,我们可以创建服务:DataprocMetastoreCreateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

create_service = DataprocMetastoreCreateServiceOperator(
    task_id="create_service",
    region=REGION,
    project_id=PROJECT_ID,
    service=SERVICE,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

获取服务

要获取服务,您可以使用

DataprocMetastoreGetServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

get_service = DataprocMetastoreGetServiceOperator(
    task_id="get_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
)

更新服务

您可以通过提供服务配置和 updateMask 来更新服务。在 updateMask 参数中,您指定了要更新字段的路径,该路径相对于 Service。有关 updateMask 和其他参数的更多信息,请查阅 Dataproc Metastore 更新服务 API。

一个新的服务配置和 updateMask 示例

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

SERVICE_TO_UPDATE = {
    "labels": {
        "mylocalmachine": "mylocalmachine",
        "systemtest": "systemtest",
    }
}
UPDATE_MASK = FieldMask(paths=["labels"])

要更新服务,您可以使用:DataprocMetastoreUpdateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

update_service = DataprocMetastoreUpdateServiceOperator(
    task_id="update_service",
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    region=REGION,
    service=SERVICE_TO_UPDATE,
    update_mask=UPDATE_MASK,
    timeout=TIMEOUT,
)

删除服务

要删除服务,您可以使用

DataprocMetastoreDeleteServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

delete_service = DataprocMetastoreDeleteServiceOperator(
    task_id="delete_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

导出服务元数据

要导出元数据,您可以使用

DataprocMetastoreExportMetadataOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

export_metadata = DataprocMetastoreExportMetadataOperator(
    task_id="export_metadata",
    destination_gcs_folder=DESTINATION_GCS_FOLDER,
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

恢复服务

要恢复服务,您可以使用

DataprocMetastoreRestoreServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

restore_service = DataprocMetastoreRestoreServiceOperator(
    task_id="restore_metastore",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    backup_region=REGION,
    backup_project_id=PROJECT_ID,
    backup_service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

创建元数据导入

在创建 Dataproc Metastore 元数据导入之前,您需要定义该元数据导入。有关创建元数据导入时可传递的字段的更多信息,请访问 Dataproc Metastore 创建元数据导入 API。

一个简单的元数据导入配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

METADATA_IMPORT = {
    "name": "test-metadata-import",
    "database_dump": {
        "gcs_uri": GCS_URI,
        "database_type": DB_TYPE,
    },
}

要创建元数据导入,您可以使用:DataprocMetastoreCreateMetadataImportOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

import_metadata = DataprocMetastoreCreateMetadataImportOperator(
    task_id="import_metadata",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    metadata_import=METADATA_IMPORT,
    metadata_import_id=METADATA_IMPORT_ID,
    timeout=TIMEOUT,
)

创建备份

在创建 Dataproc Metastore 服务备份之前,您需要定义该备份。有关创建备份时可传递的字段的更多信息,请访问 Dataproc Metastore 创建备份 API。

一个简单的备份配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

BACKUP = {
    "name": "test-backup",
}

通过此配置,我们可以创建备份:DataprocMetastoreCreateBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

backup_service = DataprocMetastoreCreateBackupOperator(
    task_id="create_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup=BACKUP,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

删除备份

要删除备份,您可以使用

DataprocMetastoreDeleteBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

delete_backup = DataprocMetastoreDeleteBackupOperator(
    task_id="delete_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

列出备份

要列出备份,您可以使用

DataprocMetastoreListBackupsOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

list_backups = DataprocMetastoreListBackupsOperator(
    task_id="list_backups",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
)

检查 Hive 分区是否存在

要检查 Metastore 中是否已为给定表创建了 Hive 分区,您可以使用:MetastoreHivePartitionSensor

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py

hive_partition_sensor = MetastoreHivePartitionSensor(
    task_id="hive_partition_sensor",
    service_id=METASTORE_SERVICE_ID,
    region=REGION,
    table=TABLE_NAME,
    partitions=[PARTITION_1, PARTITION_2],
)

此条目有帮助吗?