airflow.providers.google.cloud.openlineage.utils

模块内容

BigQueryJobRunFacet

表示 BigQuery 运行相关统计信息的 Facet。

函数

extract_ds_name_from_gcs_path(path)

从给定的路径中提取并处理数据集名称。

get_facets_from_bq_table(table)

从 BigQuery 表对象获取 Facet。

get_identity_column_lineage_facet(dest_field_names, ...)

获取用于标识转换的列血缘 Facet。

get_from_nullable_chain(source, chain)

从对象的嵌套结构中获取对象,其中不保证嵌套结构中的所有键都存在。

inject_openlineage_properties_into_dataproc_job(job, ...)

将 OpenLineage 属性注入到 Spark 作业定义中。

属性

log

BIGQUERY_NAMESPACE

BIGQUERY_URI

WILDCARD

airflow.providers.google.cloud.openlineage.utils.log[源代码]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_NAMESPACE = 'bigquery'[源代码]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_URI = 'bigquery'[源代码]
airflow.providers.google.cloud.openlineage.utils.WILDCARD = '*'[源代码]
airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[源代码]

从给定的路径中提取并处理数据集名称。

参数

path: 要处理的路径,例如 gcs 文件的路径。

返回

处理后的数据集名称。

示例
>>> extract_ds_name_from_gcs_path("/dir/file.*")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/pre_")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/file.txt")
'dir/file.txt'
>>> extract_ds_name_from_gcs_path("/dir/file.")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/")
'dir'
>>> extract_ds_name_from_gcs_path("")
'/'
>>> extract_ds_name_from_gcs_path("/")
'/'
>>> extract_ds_name_from_gcs_path(".")
'/'
airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[源代码]

从 BigQuery 表对象获取 Facet。

airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[源代码]

获取用于标识转换的列血缘 Facet。

此函数生成一个简单的列血缘 Facet,其中每个目标列都由来自所有具有该列的输入数据集的同名源列组成。血缘假设没有应用转换,这意味着列在源数据集和目标数据集之间保留其身份。

参数

dest_field_names:应确定血缘的目标列名称列表。input_datasets:带有模式 Facet 的输入数据集列表。

返回
包含单个键 columnLineage 的字典,映射到 ColumnLineageDatasetFacet

如果无法确定列血缘,则返回一个空字典 - 请参阅下面的注释。

注释
  • 如果任何输入数据集缺少模式 Facet,则该函数会立即返回一个空字典。

  • 如果源数据集模式中的任何字段未出现在目标表中,则该函数返回一个空字典。目标表可以包含额外的字段,但所有源列都应出现在目标表中。

  • 如果任何目标列都无法与输入数据集列匹配,则返回一个空字典。

  • 目标表中不存在于输入数据集中的额外列将被忽略并在血缘 Facet 中跳过,因为它们无法追溯到源列。

  • 该函数假设没有应用任何转换,这意味着列在源数据集和目标数据集之间保留其标识。

class airflow.providers.google.cloud.openlineage.utils.BigQueryJobRunFacet[源代码]

基类:airflow.providers.common.compat.openlineage.facet.RunFacet

表示 BigQuery 运行相关统计信息的 Facet。

此 Facet 用于提供有关 BigQuery 运行的统计信息。

参数
  • cached – BigQuery 缓存查询结果。对于缓存的查询,将不会提供其余的统计信息。

  • billedBytes – BigQuery 收费的字节数。

  • properties – BigQUery 运行的完整属性树。

cached: bool[源代码]
billedBytes: int | None[源代码]
properties: str | None[源代码]
airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[源代码]

从对象的嵌套结构中获取对象,其中不保证嵌套结构中的所有键都存在。

旨在替换 dict.get() 语句链。

用法示例

if (
    not job._properties.get("statistics")
    or not job._properties.get("statistics").get("query")
    or not job._properties.get("statistics").get("query").get("referencedTables")
):
    return None
result = job._properties.get("statistics").get("query").get("referencedTables")

变为

result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"])
if not result:
    return None
airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info)[源代码]

将 OpenLineage 属性注入到 Spark 作业定义中。

该函数不会删除任何配置或以任何其他方式修改作业,只是在 Dataproc 作业定义中添加所需的 OpenLineage 属性(如果尚未存在)。

注意
如果发生以下情况,将跳过对作业的任何修改
  • 无法访问 OpenLineage 提供程序。

  • 不支持该作业类型。

  • 禁用了自动父作业信息注入。

  • Spark 作业定义中已存在任何包含父作业信息的 OpenLineage 属性。

参数

job: 原始 Dataproc 作业定义。 context: 作业运行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作业信息的标志。

返回

修改后的作业定义,如果适用,会注入 OpenLineage 属性。

此条目是否有帮助?