airflow.providers.google.cloud.openlineage.utils

属性

log

BIGQUERY_NAMESPACE

BIGQUERY_URI

WILDCARD

函数

merge_column_lineage_facets(facets)

将多个列血缘关系 Facet 合并为一个统一的 Facet。

extract_ds_name_from_gcs_path(path)

从给定路径中提取和处理数据集名称。

get_facets_from_bq_table(table)

从 BigQuery 表对象获取 Facet。

get_namespace_name_from_source_uris(source_uris)

get_identity_column_lineage_facet(dest_field_names, ...)

获取身份转换的列血缘关系 Facet。

get_from_nullable_chain(source, chain)

从嵌套对象结构中获取对象,其中不保证嵌套结构中的所有键都存在。

inject_openlineage_properties_into_dataproc_job(job, ...)

将 OpenLineage 属性注入到 Spark 作业定义中。

inject_openlineage_properties_into_dataproc_batch(...)

将 OpenLineage 属性注入到 Dataproc 批处理定义中。

inject_openlineage_properties_into_dataproc_workflow_template(...)

将 OpenLineage 属性注入到工作流模板中的所有 Spark 作业中。

模块内容

airflow.providers.google.cloud.openlineage.utils.log[源码]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_NAMESPACE = 'bigquery'[源码]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_URI = 'bigquery'[源码]
airflow.providers.google.cloud.openlineage.utils.WILDCARD = '*'[源码]
airflow.providers.google.cloud.openlineage.utils.merge_column_lineage_facets(facets)[源码]

将多个列血缘关系 Facet 合并为一个统一的 Facet。

具体来说,它会汇总所有提供的 Facet 中每个字段的输入字段和转换。

参数

facets: 要合并的列血缘关系 Facet。

返回

一个新的列血缘关系 Facet,包含所有字段、它们各自的输入字段和转换。

注意
  • 输入字段通过其 (namespace, name, field) 元组唯一标识。

  • 如果多个 Facet 包含具有相同输入字段的同一字段,则这些输入字段将被合并而不会重复。

  • 与输入字段关联的转换也会被合并。如果 InputField 类版本不支持转换,则会省略它们。

  • 转换合并依赖于字段名和输入字段元组的复合键来跟踪和整合转换。

示例

案例 1: 具有相同输入字段的两个 Facet ` >>> facet1 = ColumnLineageDatasetFacet( ...     fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> facet2 = ColumnLineageDatasetFacet( ...     fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields [InputField("namespace1", "dataset1", "field1")] `

案例 2: 具有相同输入字段但转换不同的两个 Facet ` >>> facet1 = ColumnLineageDatasetFacet( ...     fields={ ...         "columnA": Fields( ...             inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t1"])] ...         ) ...     } ... ) >>> facet2 = ColumnLineageDatasetFacet( ...     fields={ ...         "columnA": Fields( ...             inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t2"])] ...         ) ...     } ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields[0].transformations ["t1", "t2"] `

airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[源码]

从给定路径中提取和处理数据集名称。

参数

path: 要处理的路径,例如 GCS 文件路径。

返回

处理后的数据集名称。

示例
>>> extract_ds_name_from_gcs_path("/dir/file.*")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/pre_")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/file.txt")
'dir/file.txt'
>>> extract_ds_name_from_gcs_path("/dir/file.")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/")
'dir'
>>> extract_ds_name_from_gcs_path("")
'/'
>>> extract_ds_name_from_gcs_path("/")
'/'
>>> extract_ds_name_from_gcs_path(".")
'/'
airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[源码]

从 BigQuery 表对象获取 Facet。

airflow.providers.google.cloud.openlineage.utils.get_namespace_name_from_source_uris(source_uris)[源码]
airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[源码]

获取身份转换的列血缘关系 Facet。

此函数生成一个简单的列血缘关系 Facet,其中每个目标列包含来自所有具有该列的输入数据集的同名源列。该血缘关系假设没有应用任何转换,这意味着列在源数据集和目标数据集之间保留其身份。

参数

dest_field_names: 一个目标列名列表,需要确定其血缘关系。 input_datasets: 具有 schema Facet 的输入数据集列表。

返回
一个字典,包含一个键 columnLineage,映射到一个 ColumnLineageDatasetFacet

如果无法确定列血缘关系,则返回一个空字典 - 详见下方“注意”部分。

注意
  • 如果任何输入数据集缺少 schema Facet,函数会立即返回一个空字典。

  • 如果源数据集 schema 中的任何字段不在目标表中,函数将返回一个空字典。目标表可以包含额外的字段,但所有源列都应存在于目标表中。

  • 如果没有任何目标列可以与输入数据集列匹配,则返回一个空字典。

  • 目标表中不存在于输入数据集中的额外列在血缘关系 Facet 中会被忽略和跳过,因为它们无法追溯到源列。

  • 此函数假设没有应用任何转换,这意味着列在源数据集和目标数据集之间保留其身份。

airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[源码]

从嵌套对象结构中获取对象,其中不保证嵌套结构中的所有键都存在。

旨在取代一系列 dict.get() 语句。

示例用法

if (
    not job._properties.get("statistics")
    or not job._properties.get("statistics").get("query")
    or not job._properties.get("statistics").get("query").get("referencedTables")
):
    return None
result = job._properties.get("statistics").get("query").get("referencedTables")

变为

result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"])
if not result:
    return None
airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info, inject_transport_info)[源码]

将 OpenLineage 属性注入到 Spark 作业定义中。

此函数不会删除现有配置或以任何方式修改作业定义,

只会添加必需的 OpenLineage 属性,如果它们尚未存在。

如果满足以下任一条件,整个属性注入过程将被跳过
  • OpenLineage provider 不可访问。

  • 作业类型不受支持。

  • inject_parent_job_infoinject_transport_info 都设置为 False。

此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。

如果满足以下条件,则不会注入父作业信息
  • 存在任何以 spark.openlineage.parent 为前缀的属性。

  • inject_parent_job_info 为 False。

如果满足以下条件,则不会注入传输信息
  • 存在任何以 spark.openlineage.transport 为前缀的属性。

  • inject_transport_info 为 False。

参数

job: 原始 Dataproc 作业定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。

返回

注入 OpenLineage 属性后的修改版作业定义(如适用)。

airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_batch(batch, context, inject_parent_job_info, inject_transport_info)[源码]

将 OpenLineage 属性注入到 Dataproc 批处理定义中。

此函数不会删除现有配置或以任何方式修改批处理定义,

只会添加必需的 OpenLineage 属性,如果它们尚未存在。

如果满足以下任一条件,整个属性注入过程将被跳过
  • OpenLineage provider 不可访问。

  • 批处理类型不受支持。

  • inject_parent_job_infoinject_transport_info 都设置为 False。

此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。

如果满足以下条件,则不会注入父作业信息
  • 存在任何以 spark.openlineage.parent 为前缀的属性。

  • inject_parent_job_info 为 False。

如果满足以下条件,则不会注入传输信息
  • 存在任何以 spark.openlineage.transport 为前缀的属性。

  • inject_transport_info 为 False。

参数

batch: 原始 Dataproc 批处理定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。

返回

注入 OpenLineage 属性后的修改版批处理定义(如适用)。

airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_workflow_template(template, context, inject_parent_job_info, inject_transport_info)[源码]

将 OpenLineage 属性注入到工作流模板中的所有 Spark 作业中。

此函数不会删除现有配置或以任何方式修改作业定义,

只会添加必需的 OpenLineage 属性,如果它们尚未存在。

如果满足以下任一条件,将跳过每个作业的整个属性注入过程
  • OpenLineage provider 不可访问。

  • 作业类型不受支持。

  • inject_parent_job_infoinject_transport_info 都设置为 False。

此外,如果相关的 OpenLineage 属性已经存在,则不会注入特定信息。

如果满足以下条件,则不会注入父作业信息
  • 存在任何以 spark.openlineage.parent 为前缀的属性。

  • inject_parent_job_info 为 False。

如果满足以下条件,则不会注入传输信息
  • 存在任何以 spark.openlineage.transport 为前缀的属性。

  • inject_transport_info 为 False。

参数

template: 原始 Dataproc 工作流模板定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。 inject_transport_info: 指示是否注入传输信息的标志。

返回

注入 OpenLineage 属性后的修改版工作流模板定义(如适用)。

此条目是否有帮助?