Google Dataplex 运算符

Dataplex 是一种智能数据架构,可为您的数据湖、数据仓库和数据市场提供统一的分析和数据管理。

有关任务的更多信息,请访问 Dataplex production documentation <Product documentation

创建任务

在创建 Dataplex 任务之前,您需要定义其主体。有关创建任务时可传递的可用字段的更多信息,请访问 Dataplex create task API.

一个简单的任务配置如下所示

tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

使用此配置,我们可以同步和异步地创建任务: DataplexCreateTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

删除任务

要删除任务,您可以使用

DataplexDeleteTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

列出任务

要列出任务,您可以使用

DataplexListTasksOperator

tests/system/google/cloud/dataplex/example_dataplex.py

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

获取任务

要获取任务,您可以使用

DataplexGetTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

等待任务

要等待异步创建的任务,您可以使用

DataplexTaskStateSensor

tests/system/google/cloud/dataplex/example_dataplex.py

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

创建数据湖

在创建 Dataplex 数据湖之前,您需要定义其主体。

有关创建数据湖时可传递的可用字段的更多信息,请访问 Dataplex create lake API.

一个简单的任务配置如下所示

tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

使用此配置,我们可以创建数据湖

DataplexCreateLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

删除数据湖

要删除数据湖,您可以使用

DataplexDeleteLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

创建或更新数据质量扫描

在创建 Dataplex 数据质量扫描之前,您需要定义其主体。有关创建数据质量扫描时可传递的可用字段的更多信息,请访问 Dataplex create data quality API.

一个简单的数据质量扫描配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

使用此配置,我们可以创建或更新数据质量扫描

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

获取数据质量扫描

要获取数据质量扫描,您可以使用

DataplexGetDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

删除数据质量扫描

要删除数据质量扫描,您可以使用

DataplexDeleteDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行数据质量扫描

您可以在异步模式下运行 Dataplex 数据质量扫描,然后使用传感器检查其状态

DataplexRunDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要检查运行 Dataplex 数据质量扫描是否成功,您可以使用

DataplexDataQualityJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

您也可以在此操作中使用可延迟模式下的运算符

tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

获取数据质量扫描作业

要获取数据质量扫描作业,您可以使用

DataplexGetDataQualityScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

您也可以在此操作中使用可延迟模式下的运算符

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

创建区域

在创建 Dataplex 区域之前,您需要定义其主体。

有关创建区域时可传递的可用字段的更多信息,请访问 Dataplex create zone API.

一个简单的区域配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

使用此配置,我们可以创建区域

DataplexCreateZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

删除区域

要删除区域,您可以使用

DataplexDeleteZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建资产

在创建 Dataplex 资产之前,您需要定义其主体。

有关创建资产时可传递的可用字段的更多信息,请访问 Dataplex create asset API.

一个简单的资产配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

使用此配置,我们可以创建资产

DataplexCreateAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

删除资产

要删除资产,您可以使用

DataplexDeleteAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建或更新数据画像扫描

在创建 Dataplex 数据画像扫描之前,您需要定义其主体。有关创建数据画像扫描时可传递的可用字段的更多信息,请访问 Dataplex create data profile API.

一个简单的数据画像扫描配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dp.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

使用此配置,我们可以创建或更新数据画像扫描

DataplexCreateOrUpdateDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

获取数据画像扫描

要获取数据画像扫描,您可以使用

DataplexGetDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

删除数据画像扫描

要删除数据画像扫描,您可以使用

DataplexDeleteDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行数据画像扫描

您可以在异步模式下运行 Dataplex 数据画像扫描,然后使用传感器检查其状态

DataplexRunDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要检查运行 Dataplex 数据画像扫描是否成功,您可以使用

DataplexDataProfileJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

您也可以在此操作中使用可延迟模式下的运算符

tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

获取数据画像扫描作业

要获取数据画像扫描作业,您可以使用

DataplexGetDataProfileScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Google Dataplex Catalog 运算符

Dataplex Catalog 提供 Google Cloud 资源(如 BigQuery)以及其他资源(如本地资源)的统一清单。Dataplex Catalog 自动检索 Google Cloud 资源的元数据,您可以将第三方资源的元数据引入 Dataplex Catalog。

有关 Dataplex Catalog 的更多信息,请访问 Dataplex Catalog production documentation <Product documentation

创建 EntryGroup

要在 Dataplex Catalog 中的特定位置创建 Entry Group,您可以使用 DataplexCatalogCreateEntryGroupOperator 有关创建 Entry Group 时可传递的可用字段的更多信息,请访问 Entry Group resource configuration.

一个简单的 Entry Group 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}

使用此配置,您可以创建 Entry Group 资源

DataplexCatalogCreateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_group = DataplexCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration=ENTRY_GROUP_BODY,
    validate_request=False,
)

删除 EntryGroup

要删除 Dataplex Catalog 中的特定位置的 Entry Group,您可以使用 DataplexCatalogDeleteEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 EntryGroups

要列出 Dataplex Catalog 中特定位置的所有 Entry Groups,您可以使用 DataplexCatalogListEntryGroupsOperator. 此运算符还支持对操作结果进行过滤和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_group = DataplexCatalogListEntryGroupsOperator(
    task_id="list_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

获取 EntryGroup

要在 Dataplex Catalog 中的特定位置检索 Entry Group,您可以使用 DataplexCatalogGetEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_group = DataplexCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

更新 EntryGroup

要在 Dataplex Catalog 中的特定位置更新 Entry Group,您可以使用 DataplexCatalogUpdateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
    task_id="update_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

创建 EntryType

要在 Dataplex Catalog 中的特定位置创建 Entry Type,您可以使用 DataplexCatalogCreateEntryTypeOperator 有关创建 Entry Type 时可传递的可用字段的更多信息,请访问 Entry Type resource configuration.

一个简单的 Entry Group 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}

使用此配置,您可以创建 Entry Type 资源

DataplexCatalogCreateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_type = DataplexCatalogCreateEntryTypeOperator(
    task_id="create_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration=ENTRY_TYPE_BODY,
    validate_request=False,
)

删除 EntryType

要删除 Dataplex Catalog 中的特定位置的 Entry Type,您可以使用 DataplexCatalogDeleteEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
    task_id="delete_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 EntryTypes

要列出 Dataplex Catalog 中特定位置的所有 Entry Types,您可以使用 DataplexCatalogListEntryTypesOperator. 此运算符还支持对操作结果进行过滤和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_type = DataplexCatalogListEntryTypesOperator(
    task_id="list_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

获取 EntryType

要在 Dataplex Catalog 中的特定位置检索 Entry Type,您可以使用 DataplexCatalogGetEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_type = DataplexCatalogGetEntryTypeOperator(
    task_id="get_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
)

更新 EntryType

要在 Dataplex Catalog 中的特定位置更新 Entry Type,您可以使用 DataplexCatalogUpdateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
    task_id="update_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

创建 AspectType

要在 Dataplex Catalog 中的特定位置创建 Aspect Type,您可以使用 DataplexCatalogCreateAspectTypeOperator 有关创建 Aspect Type 时可传递的可用字段的更多信息,请访问 Aspect Type resource configuration.

一个简单的 Aspect Type 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ASPECT_TYPE_BODY = {
    "display_name": "Sample AspectType",
    "description": "A simple AspectType for demonstration purposes.",
    "metadata_template": {
        "name": "sample_field",
        "type": "record",
        "annotations": {
            "display_name": "Sample Field",
            "description": "A sample field within the AspectType.",
        },
    },
}

使用此配置,您可以创建 Aspect Type 资源

DataplexCatalogCreateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
    task_id="create_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration=ASPECT_TYPE_BODY,
    validate_request=False,
)

删除 AspectType

要删除 Dataplex Catalog 中的特定位置的 Aspect Type,您可以使用 DataplexCatalogDeleteAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
    task_id="delete_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 AspectTypes

要列出 Dataplex Catalog 中特定位置的所有 Aspect Types,您可以使用 DataplexCatalogListAspectTypesOperator. 此运算符还支持对操作结果进行过滤和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_aspect_type = DataplexCatalogListAspectTypesOperator(
    task_id="list_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

获取 AspectType

要在 Dataplex Catalog 中的特定位置检索 Aspect Type,您可以使用 DataplexCatalogGetAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_aspect_type = DataplexCatalogGetAspectTypeOperator(
    task_id="get_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
)

更新 AspectType

要在 Dataplex Catalog 中的特定位置更新 Aspect Type,您可以使用 DataplexCatalogUpdateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
    task_id="update_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

创建 Entry

要在 Dataplex Catalog 中的特定位置创建 Entry,您可以使用 DataplexCatalogCreateEntryOperator 有关创建 Entry 时可传递的可用字段的更多信息,请访问 Entry resource configuration.

一个简单的 Entry 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_BODY = {
    "name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
    "entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}

使用此配置,您可以创建 Entry 资源

DataplexCatalogCreateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry = DataplexCatalogCreateEntryOperator(
    task_id="create_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration=ENTRY_BODY,
)

删除 Entry

要删除 Dataplex Catalog 中的特定位置的 Entry,您可以使用 DataplexCatalogDeleteEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry = DataplexCatalogDeleteEntryOperator(
    task_id="delete_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 Entries

要列出 Dataplex Catalog 中特定位置的所有 Entries,您可以使用 DataplexCatalogListEntriesOperator. 此运算符还支持对操作结果进行过滤和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry = DataplexCatalogListEntriesOperator(
    task_id="list_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

获取 Entry

要在 Dataplex Catalog 中的特定位置检索 Entry,您可以使用 DataplexCatalogGetEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry = DataplexCatalogGetEntryOperator(
    task_id="get_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

更新 Entry

要在 Dataplex Catalog 中的特定位置更新 Entry,您可以使用 DataplexCatalogUpdateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry = DataplexCatalogUpdateEntryOperator(
    task_id="update_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration={
        "fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
    },
    update_mask=["fully_qualified_name"],
)

查找单个 Entry

要在 Dataplex Catalog 中使用源系统上的权限按名称查找单个 Entry,您可以使用 DataplexCatalogLookupEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

lookup_entry = DataplexCatalogLookupEntryOperator(
    task_id="lookup_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

搜索 Entries

要在 Dataplex Catalog 中搜索与给定查询和范围匹配的 Entries,您可以使用 DataplexCatalogSearchEntriesOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

search_entry = DataplexCatalogSearchEntriesOperator(
    task_id="search_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    query=f"name={ENTRY_NAME}",
)

本条目有帮助吗?