Google Cloud Data Catalog Operator¶
Data Catalog 是一项完全托管且可扩展的元数据管理服务,使组织能够在 Google Cloud 中快速发现、管理和理解其所有数据。它提供
一个简单易用的数据发现搜索界面,由支持 Gmail 和 Drive 的相同 Google 搜索技术提供支持
一个灵活且强大的目录系统,用于捕获技术和业务元数据
一个用于标记敏感数据的自动标记机制,集成了 DLP API
前提任务¶
要使用这些 operator,您必须执行以下几项操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用结算功能,详见Google Cloud 文档。
启用 API,详见Cloud 控制台文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息。
管理条目(entries)¶
Operator 使用 Entry
表示条目。
获取条目¶
获取条目使用 CloudDataCatalogGetEntryOperator
和 CloudDataCatalogLookupEntryOperator
operator。
CloudDataCatalogGetEntryOperator
使用项目 ID、条目组 ID、条目 ID 来获取条目。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
CloudDataCatalogLookupEntryOperator
使用资源名称来获取条目。
tests/system/google/datacatalog/example_datacatalog_entries.py
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
您可以将 Jinja 模板 与 linked_resource
、sql_resource
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
创建条目¶
CloudDataCatalogCreateEntryOperator
operator 创建条目。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
您可以将 Jinja 模板 与 location
、entry_group
、entry_id
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
可以使用 entry_id
键读取新创建的条目 ID。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
更新条目¶
CloudDataCatalogUpdateEntryOperator
operator 更新条目。
tests/system/google/datacatalog/example_datacatalog_entries.py
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
您可以将 Jinja 模板 与 entry
、update_mask
、location
、entry_group
、entry_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
删除条目¶
CloudDataCatalogDeleteEntryOperator
operator 删除条目。
tests/system/google/datacatalog/example_datacatalog_entries.py
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
管理条目组(entry groups)¶
Operator 使用 Entry
表示条目组。
创建条目组¶
CloudDataCatalogCreateEntryGroupOperator
operator 创建条目组。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
您可以将 Jinja 模板 与 location
、entry_group_id
、entry_group
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
可以使用 entry_group_id
键读取新创建的条目组 ID。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
获取条目组¶
CloudDataCatalogGetEntryGroupOperator
operator 获取条目组。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
您可以将 Jinja 模板 与 location
、entry_group
、read_mask
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
删除条目组¶
CloudDataCatalogDeleteEntryGroupOperator
operator 删除条目组。
tests/system/google/datacatalog/example_datacatalog_entries.py
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
管理标签模板(tag templates)¶
Operator 使用 TagTemplate
表示标签模板。
创建标签模板¶
CloudDataCatalogCreateTagTemplateOperator
operator 获取标签模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
您可以将 Jinja 模板 与 location
、tag_template_id
、tag_template
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
可以使用 tag_template_id
键读取新创建的标签模板 ID。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
删除标签模板¶
CloudDataCatalogDeleteTagTemplateOperator
operator 删除标签模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
您可以将 Jinja 模板 与 location
、tag_template
、force
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
获取标签模板¶
CloudDataCatalogGetTagTemplateOperator
operator 获取标签模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
您可以将 Jinja 模板 与 location
、tag_template
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
更新标签模板¶
CloudDataCatalogUpdateTagTemplateOperator
operator 更新标签模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
您可以将 Jinja 模板 与 tag_template
、update_mask
、location
、tag_template_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
管理标签模板字段(tag template fields)¶
Operator 使用 TagTemplateField
表示标签模板字段。
创建字段¶
CloudDataCatalogCreateTagTemplateFieldOperator
operator 获取标签模板字段。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
您可以将 Jinja 模板 与 location
、tag_template
、tag_template_field_id
、tag_template_field
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
可以使用 tag_template_field_id
键读取新创建的字段 ID。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
重命名字段¶
CloudDataCatalogRenameTagTemplateFieldOperator
operator 重命名标签模板字段。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
您可以将 Jinja 模板 与 location
、tag_template
、field
、new_tag_template_field_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
更新字段¶
CloudDataCatalogUpdateTagTemplateFieldOperator
operator 获取标签模板字段。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
您可以将 Jinja 模板 与 tag_template_field
、update_mask
、tag_template_field_name
、location
、tag_template
、tag_template_field_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
删除字段¶
CloudDataCatalogDeleteTagTemplateFieldOperator
operator 删除标签模板字段。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
您可以将 Jinja 模板 与 location
、tag_template
、field
、force
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,从而动态确定值。
搜索资源¶
CloudDataCatalogSearchCatalogOperator
operator 在 Data Catalog 中搜索与查询匹配的多个资源,例如条目、标签。
query
参数应使用搜索语法定义。
tests/system/google/datacatalog/example_datacatalog_search_catalog.py
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
您可以使用 Jinja 模板 配合 scope
、query
、page_size
、order_by
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数,从而动态确定值。
结果会保存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_search_catalog.py
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)
参考¶
如需了解更多信息,请参阅