airflow.providers.google.cloud.openlineage.utils¶
属性¶
函数¶
|
将多个列血缘关系 Facet 合并为一个统一的 Facet。 |
从给定路径中提取和处理数据集名称。 |
|
|
从 BigQuery 表对象获取 Facet。 |
|
|
|
获取身份转换的列血缘关系 Facet。 |
|
从嵌套对象结构中获取对象,其中不保证嵌套结构中的所有键都存在。 |
将 OpenLineage 属性注入到 Spark 作业定义中。 |
|
将 OpenLineage 属性注入到 Dataproc 批处理定义中。 |
|
|
将 OpenLineage 属性注入到工作流模板中的所有 Spark 作业中。 |
模块内容¶
- airflow.providers.google.cloud.openlineage.utils.merge_column_lineage_facets(facets)[源码]¶
将多个列血缘关系 Facet 合并为一个统一的 Facet。
具体来说,它会汇总所有提供的 Facet 中每个字段的输入字段和转换。
- 参数
facets: 要合并的列血缘关系 Facet。
- 返回
一个新的列血缘关系 Facet,包含所有字段、它们各自的输入字段和转换。
- 注意
输入字段通过其 (namespace, name, field) 元组唯一标识。
如果多个 Facet 包含具有相同输入字段的同一字段,则这些输入字段将被合并而不会重复。
与输入字段关联的转换也会被合并。如果 InputField 类版本不支持转换,则会省略它们。
转换合并依赖于字段名和输入字段元组的复合键来跟踪和整合转换。
- 示例
案例 1: 具有相同输入字段的两个 Facet
` >>> facet1 = ColumnLineageDatasetFacet( ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> facet2 = ColumnLineageDatasetFacet( ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields [InputField("namespace1", "dataset1", "field1")] `
案例 2: 具有相同输入字段但转换不同的两个 Facet
` >>> facet1 = ColumnLineageDatasetFacet( ... fields={ ... "columnA": Fields( ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t1"])] ... ) ... } ... ) >>> facet2 = ColumnLineageDatasetFacet( ... fields={ ... "columnA": Fields( ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t2"])] ... ) ... } ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields[0].transformations ["t1", "t2"] `
- airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[源码]¶
从给定路径中提取和处理数据集名称。
- 参数
path: 要处理的路径,例如 GCS 文件路径。
- 返回
处理后的数据集名称。
- 示例
>>> extract_ds_name_from_gcs_path("/dir/file.*") 'dir' >>> extract_ds_name_from_gcs_path("/dir/pre_") 'dir' >>> extract_ds_name_from_gcs_path("/dir/file.txt") 'dir/file.txt' >>> extract_ds_name_from_gcs_path("/dir/file.") 'dir' >>> extract_ds_name_from_gcs_path("/dir/") 'dir' >>> extract_ds_name_from_gcs_path("") '/' >>> extract_ds_name_from_gcs_path("/") '/' >>> extract_ds_name_from_gcs_path(".") '/'
- airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[源码]¶
从 BigQuery 表对象获取 Facet。
- airflow.providers.google.cloud.openlineage.utils.get_namespace_name_from_source_uris(source_uris)[源码]¶
- airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[源码]¶
获取身份转换的列血缘关系 Facet。
此函数生成一个简单的列血缘关系 Facet,其中每个目标列包含来自所有具有该列的输入数据集的同名源列。该血缘关系假设没有应用任何转换,这意味着列在源数据集和目标数据集之间保留其身份。
- 参数
dest_field_names: 一个目标列名列表,需要确定其血缘关系。 input_datasets: 具有 schema Facet 的输入数据集列表。
- 返回
- 一个字典,包含一个键 columnLineage,映射到一个 ColumnLineageDatasetFacet。
如果无法确定列血缘关系,则返回一个空字典 - 详见下方“注意”部分。
- 注意
如果任何输入数据集缺少 schema Facet,函数会立即返回一个空字典。
如果源数据集 schema 中的任何字段不在目标表中,函数将返回一个空字典。目标表可以包含额外的字段,但所有源列都应存在于目标表中。
如果没有任何目标列可以与输入数据集列匹配,则返回一个空字典。
目标表中不存在于输入数据集中的额外列在血缘关系 Facet 中会被忽略和跳过,因为它们无法追溯到源列。
此函数假设没有应用任何转换,这意味着列在源数据集和目标数据集之间保留其身份。
- airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[源码]¶
从嵌套对象结构中获取对象,其中不保证嵌套结构中的所有键都存在。
旨在取代一系列 dict.get() 语句。
示例用法
if ( not job._properties.get("statistics") or not job._properties.get("statistics").get("query") or not job._properties.get("statistics").get("query").get("referencedTables") ): return None result = job._properties.get("statistics").get("query").get("referencedTables")
变为
result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"]) if not result: return None
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info, inject_transport_info)[源码]¶
将 OpenLineage 属性注入到 Spark 作业定义中。
- 此函数不会删除现有配置或以任何方式修改作业定义,
只会添加必需的 OpenLineage 属性,如果它们尚未存在。
- 如果满足以下任一条件,整个属性注入过程将被跳过
OpenLineage provider 不可访问。
作业类型不受支持。
inject_parent_job_info 和 inject_transport_info 都设置为 False。
此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。
- 如果满足以下条件,则不会注入父作业信息
存在任何以 spark.openlineage.parent 为前缀的属性。
inject_parent_job_info 为 False。
- 如果满足以下条件,则不会注入传输信息
存在任何以 spark.openlineage.transport 为前缀的属性。
inject_transport_info 为 False。
- 参数
job: 原始 Dataproc 作业定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。
- 返回
注入 OpenLineage 属性后的修改版作业定义(如适用)。
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_batch(batch, context, inject_parent_job_info, inject_transport_info)[源码]¶
将 OpenLineage 属性注入到 Dataproc 批处理定义中。
- 此函数不会删除现有配置或以任何方式修改批处理定义,
只会添加必需的 OpenLineage 属性,如果它们尚未存在。
- 如果满足以下任一条件,整个属性注入过程将被跳过
OpenLineage provider 不可访问。
批处理类型不受支持。
inject_parent_job_info 和 inject_transport_info 都设置为 False。
此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。
- 如果满足以下条件,则不会注入父作业信息
存在任何以 spark.openlineage.parent 为前缀的属性。
inject_parent_job_info 为 False。
- 如果满足以下条件,则不会注入传输信息
存在任何以 spark.openlineage.transport 为前缀的属性。
inject_transport_info 为 False。
- 参数
batch: 原始 Dataproc 批处理定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。
- 返回
注入 OpenLineage 属性后的修改版批处理定义(如适用)。
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_workflow_template(template, context, inject_parent_job_info, inject_transport_info)[源码]¶
将 OpenLineage 属性注入到工作流模板中的所有 Spark 作业中。
- 此函数不会删除现有配置或以任何方式修改作业定义,
只会添加必需的 OpenLineage 属性,如果它们尚未存在。
- 如果满足以下任一条件,将跳过每个作业的整个属性注入过程
OpenLineage provider 不可访问。
作业类型不受支持。
inject_parent_job_info 和 inject_transport_info 都设置为 False。
此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。
- 如果满足以下条件,则不会注入父作业信息
存在任何以 spark.openlineage.parent 为前缀的属性。
inject_parent_job_info 为 False。
- 如果满足以下条件,则不会注入传输信息
存在任何以 spark.openlineage.transport 为前缀的属性。
inject_transport_info 为 False。
- 参数
template: 原始 Dataproc 工作流模板定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。
- 返回
注入 OpenLineage 属性后的修改版工作流模板定义(如适用)。