Google Dataplex 运算符¶
Dataplex 是一种智能数据架构,可为您的数据湖、数据仓库和数据市场提供统一的分析和数据管理。
有关任务的更多信息,请访问 Dataplex production documentation <Product documentation
创建任务¶
在创建 Dataplex 任务之前,您需要定义其主体。有关创建任务时可传递的可用字段的更多信息,请访问 Dataplex create task API.
一个简单的任务配置如下所示
tests/system/google/cloud/dataplex/example_dataplex.py
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
使用此配置,我们可以同步和异步地创建任务: DataplexCreateTaskOperator
tests/system/google/cloud/dataplex/example_dataplex.py
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
tests/system/google/cloud/dataplex/example_dataplex.py
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
删除任务¶
要删除任务,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
列出任务¶
要列出任务,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
获取任务¶
要获取任务,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
等待任务¶
要等待异步创建的任务,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
创建数据湖¶
在创建 Dataplex 数据湖之前,您需要定义其主体。
有关创建数据湖时可传递的可用字段的更多信息,请访问 Dataplex create lake API.
一个简单的任务配置如下所示
tests/system/google/cloud/dataplex/example_dataplex.py
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
使用此配置,我们可以创建数据湖
tests/system/google/cloud/dataplex/example_dataplex.py
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
删除数据湖¶
要删除数据湖,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
创建或更新数据质量扫描¶
在创建 Dataplex 数据质量扫描之前,您需要定义其主体。有关创建数据质量扫描时可传递的可用字段的更多信息,请访问 Dataplex create data quality API.
一个简单的数据质量扫描配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
{
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
)
使用此配置,我们可以创建或更新数据质量扫描
DataplexCreateOrUpdateDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
获取数据质量扫描¶
要获取数据质量扫描,您可以使用
DataplexGetDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
删除数据质量扫描¶
要删除数据质量扫描,您可以使用
DataplexDeleteDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
运行数据质量扫描¶
您可以在异步模式下运行 Dataplex 数据质量扫描,然后使用传感器检查其状态
DataplexRunDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要检查运行 Dataplex 数据质量扫描是否成功,您可以使用
DataplexDataQualityJobStatusSensor
.
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
您也可以在此操作中使用可延迟模式下的运算符
tests/system/google/cloud/dataplex/example_dataplex_dq.py
run_data_scan_def = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
获取数据质量扫描作业¶
要获取数据质量扫描作业,您可以使用
DataplexGetDataQualityScanResultOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
您也可以在此操作中使用可延迟模式下的运算符
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
创建区域¶
在创建 Dataplex 区域之前,您需要定义其主体。
有关创建区域时可传递的可用字段的更多信息,请访问 Dataplex create zone API.
一个简单的区域配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
使用此配置,我们可以创建区域
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
删除区域¶
要删除区域,您可以使用
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建资产¶
在创建 Dataplex 资产之前,您需要定义其主体。
有关创建资产时可传递的可用字段的更多信息,请访问 Dataplex create asset API.
一个简单的资产配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
使用此配置,我们可以创建资产
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
删除资产¶
要删除资产,您可以使用
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建或更新数据画像扫描¶
在创建 Dataplex 数据画像扫描之前,您需要定义其主体。有关创建数据画像扫描时可传递的可用字段的更多信息,请访问 Dataplex create data profile API.
一个简单的数据画像扫描配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dp.py
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})
使用此配置,我们可以创建或更新数据画像扫描
DataplexCreateOrUpdateDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
获取数据画像扫描¶
要获取数据画像扫描,您可以使用
DataplexGetDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan = DataplexGetDataProfileScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
删除数据画像扫描¶
要删除数据画像扫描,您可以使用
DataplexDeleteDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
delete_data_scan = DataplexDeleteDataProfileScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
运行数据画像扫描¶
您可以在异步模式下运行 Dataplex 数据画像扫描,然后使用传感器检查其状态
DataplexRunDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
run_data_scan_async = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要检查运行 Dataplex 数据画像扫描是否成功,您可以使用
DataplexDataProfileJobStatusSensor
.
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
您也可以在此操作中使用可延迟模式下的运算符
tests/system/google/cloud/dataplex/example_dataplex_dp.py
run_data_scan_def = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
获取数据画像扫描作业¶
要获取数据画像扫描作业,您可以使用
DataplexGetDataProfileScanResultOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Google Dataplex Catalog 运算符¶
Dataplex Catalog 提供 Google Cloud 资源(如 BigQuery)以及其他资源(如本地资源)的统一清单。Dataplex Catalog 自动检索 Google Cloud 资源的元数据,您可以将第三方资源的元数据引入 Dataplex Catalog。
有关 Dataplex Catalog 的更多信息,请访问 Dataplex Catalog production documentation <Product documentation
创建 EntryGroup¶
要在 Dataplex Catalog 中的特定位置创建 Entry Group,您可以使用 DataplexCatalogCreateEntryGroupOperator
有关创建 Entry Group 时可传递的可用字段的更多信息,请访问 Entry Group resource configuration.
一个简单的 Entry Group 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}
使用此配置,您可以创建 Entry Group 资源
DataplexCatalogCreateEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry_group = DataplexCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
entry_group_configuration=ENTRY_GROUP_BODY,
validate_request=False,
)
删除 EntryGroup¶
要删除 Dataplex Catalog 中的特定位置的 Entry Group,您可以使用 DataplexCatalogDeleteEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 EntryGroups¶
要列出 Dataplex Catalog 中特定位置的所有 Entry Groups,您可以使用 DataplexCatalogListEntryGroupsOperator
. 此运算符还支持对操作结果进行过滤和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry_group = DataplexCatalogListEntryGroupsOperator(
task_id="list_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
获取 EntryGroup¶
要在 Dataplex Catalog 中的特定位置检索 Entry Group,您可以使用 DataplexCatalogGetEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry_group = DataplexCatalogGetEntryGroupOperator(
task_id="get_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
)
更新 EntryGroup¶
要在 Dataplex Catalog 中的特定位置更新 Entry Group,您可以使用 DataplexCatalogUpdateEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
task_id="update_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
entry_group_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
创建 EntryType¶
要在 Dataplex Catalog 中的特定位置创建 Entry Type,您可以使用 DataplexCatalogCreateEntryTypeOperator
有关创建 Entry Type 时可传递的可用字段的更多信息,请访问 Entry Type resource configuration.
一个简单的 Entry Group 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}
使用此配置,您可以创建 Entry Type 资源
DataplexCatalogCreateEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry_type = DataplexCatalogCreateEntryTypeOperator(
task_id="create_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
entry_type_configuration=ENTRY_TYPE_BODY,
validate_request=False,
)
删除 EntryType¶
要删除 Dataplex Catalog 中的特定位置的 Entry Type,您可以使用 DataplexCatalogDeleteEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
task_id="delete_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 EntryTypes¶
要列出 Dataplex Catalog 中特定位置的所有 Entry Types,您可以使用 DataplexCatalogListEntryTypesOperator
. 此运算符还支持对操作结果进行过滤和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry_type = DataplexCatalogListEntryTypesOperator(
task_id="list_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
获取 EntryType¶
要在 Dataplex Catalog 中的特定位置检索 Entry Type,您可以使用 DataplexCatalogGetEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry_type = DataplexCatalogGetEntryTypeOperator(
task_id="get_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
)
更新 EntryType¶
要在 Dataplex Catalog 中的特定位置更新 Entry Type,您可以使用 DataplexCatalogUpdateEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
task_id="update_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
entry_type_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
创建 AspectType¶
要在 Dataplex Catalog 中的特定位置创建 Aspect Type,您可以使用 DataplexCatalogCreateAspectTypeOperator
有关创建 Aspect Type 时可传递的可用字段的更多信息,请访问 Aspect Type resource configuration.
一个简单的 Aspect Type 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ASPECT_TYPE_BODY = {
"display_name": "Sample AspectType",
"description": "A simple AspectType for demonstration purposes.",
"metadata_template": {
"name": "sample_field",
"type": "record",
"annotations": {
"display_name": "Sample Field",
"description": "A sample field within the AspectType.",
},
},
}
使用此配置,您可以创建 Aspect Type 资源
DataplexCatalogCreateAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
task_id="create_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
aspect_type_configuration=ASPECT_TYPE_BODY,
validate_request=False,
)
删除 AspectType¶
要删除 Dataplex Catalog 中的特定位置的 Aspect Type,您可以使用 DataplexCatalogDeleteAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
task_id="delete_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 AspectTypes¶
要列出 Dataplex Catalog 中特定位置的所有 Aspect Types,您可以使用 DataplexCatalogListAspectTypesOperator
. 此运算符还支持对操作结果进行过滤和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_aspect_type = DataplexCatalogListAspectTypesOperator(
task_id="list_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
获取 AspectType¶
要在 Dataplex Catalog 中的特定位置检索 Aspect Type,您可以使用 DataplexCatalogGetAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_aspect_type = DataplexCatalogGetAspectTypeOperator(
task_id="get_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
)
更新 AspectType¶
要在 Dataplex Catalog 中的特定位置更新 Aspect Type,您可以使用 DataplexCatalogUpdateAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
task_id="update_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
aspect_type_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
创建 Entry¶
要在 Dataplex Catalog 中的特定位置创建 Entry,您可以使用 DataplexCatalogCreateEntryOperator
有关创建 Entry 时可传递的可用字段的更多信息,请访问 Entry resource configuration.
一个简单的 Entry 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_BODY = {
"name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
"entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}
使用此配置,您可以创建 Entry 资源
DataplexCatalogCreateEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry = DataplexCatalogCreateEntryOperator(
task_id="create_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
entry_configuration=ENTRY_BODY,
)
删除 Entry¶
要删除 Dataplex Catalog 中的特定位置的 Entry,您可以使用 DataplexCatalogDeleteEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry = DataplexCatalogDeleteEntryOperator(
task_id="delete_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 Entries¶
要列出 Dataplex Catalog 中特定位置的所有 Entries,您可以使用 DataplexCatalogListEntriesOperator
. 此运算符还支持对操作结果进行过滤和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry = DataplexCatalogListEntriesOperator(
task_id="list_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
)
获取 Entry¶
要在 Dataplex Catalog 中的特定位置检索 Entry,您可以使用 DataplexCatalogGetEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry = DataplexCatalogGetEntryOperator(
task_id="get_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
)
更新 Entry¶
要在 Dataplex Catalog 中的特定位置更新 Entry,您可以使用 DataplexCatalogUpdateEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry = DataplexCatalogUpdateEntryOperator(
task_id="update_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
entry_configuration={
"fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
},
update_mask=["fully_qualified_name"],
)
查找单个 Entry¶
要在 Dataplex Catalog 中使用源系统上的权限按名称查找单个 Entry,您可以使用 DataplexCatalogLookupEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
lookup_entry = DataplexCatalogLookupEntryOperator(
task_id="lookup_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
)
搜索 Entries¶
要在 Dataplex Catalog 中搜索与给定查询和范围匹配的 Entries,您可以使用 DataplexCatalogSearchEntriesOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
search_entry = DataplexCatalogSearchEntriesOperator(
task_id="search_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
query=f"name={ENTRY_NAME}",
)