使用 OpenLineage 集成

OpenLineage 是一个用于数据血缘收集和分析的开放框架。其核心是一个可扩展的规范,系统可以使用它来实现与血缘元数据的互操作性。查看 OpenLineage 文档

使用 OpenLineage 不需要更改用户 DAG 文件。需要进行基本配置,以便 OpenLineage 知道将事件发送到何处。

快速入门

注意

OpenLineage Provider 提供多种数据传输选项(http、kafka、file 等),包括创建自定义解决方案的灵活性。配置可以通过多种方式管理,并提供大量设置供用户微调和增强 OpenLineage 的使用。有关这些功能的全面说明,请参阅本文档的后续部分。

此示例是 OpenLineage 设置的基本演示。

  1. 安装 provider 包或将其添加到 requirements.txt 文件中。

    pip install apache-airflow-providers-openlineage
    
  2. 提供 Transport 配置,以便 OpenLineage 知道将事件发送到何处。在 airflow.cfg 文件中

    [openlineage]
    transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
    

    或使用 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量

    AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
    
  3. 就是这样!运行 DAG 时,OpenLineage 事件应发送到配置的后端。

使用

启用并配置后,该集成不需要用户进一步操作。它将自动

  • 收集任务输入/输出元数据(来源、模式等)。

  • 收集任务运行级元数据(执行时间、状态、参数等)

  • 收集任务作业级元数据(所有者、类型、描述等)

  • 收集任务特定元数据(bigquery 作业 ID、python 源代码等)- 取决于 Operator

所有这些数据将作为 OpenLineage 事件发送到配置的后端,如 作业层次结构 中所述。

传输设置

配置 OpenLineage Airflow Provider 的主要和推荐方法是 Airflow 配置(airflow.cfg 文件)。所有可能的配置选项以及示例值都可以在配置部分中找到。

至少,在每种情况下都需要设置的一件事是 Transport - 您希望事件最终发送到哪里 - 例如 Marquez

以 JSON 字符串形式传输

Airflow 配置中的 transport 选项用于此目的。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量是等效的。

AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'

如果您想查看 OpenLineage 事件而不将其发送到任何地方,您可以设置 ConsoleTransport - 事件将出现在任务日志中。

[openlineage]
transport = {"type": "console"}

注意

有关内置传输类型的完整列表、特定传输的选项或如何实现自定义传输的说明,请参阅 Python 客户端文档

以配置文件形式传输

您也可以使用 YAML 文件(例如 openlineage.yml)配置 OpenLineage Transport。在 Airflow 配置中将 YAML 文件的路径作为 config_path 选项提供。

[openlineage]
config_path = '/path/to/openlineage.yml'

AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量是等效的。

AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'

配置 YAML 文件的示例内容

transport:
  type: http
  url: https://backend:5000
  endpoint: events/receive
  auth:
    type: api_key
    apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e

注意

该配置方法的详细说明以及示例配置文件可以在 Python 客户端文档中找到。

配置优先级

由于配置 OpenLineage 有多种可能的方法,因此务必牢记不同配置的优先级。OpenLineage Airflow Provider 按以下顺序查找配置

  1. airflow.cfgopenlineage 部分下检查 config_path(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量)

  2. airflow.cfgopenlineage 部分下检查 transport(或 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量)

  3. 如果以上所有选项均缺失,则底层使用的 OpenLineage Python 客户端将按照此处文档中描述的顺序查找配置。请注意,**鼓励使用 Airflow 配置**,这是唯一面向未来的解决方案。

向后兼容性

警告

以下变量**不应**使用,将来可能会被移除。请考虑使用 Airflow 配置(如上所述)以获得面向未来的解决方案。

为了与 openlineage-airflow 包向后兼容,一些环境变量仍然可用

  • OPENLINEAGE_DISABLED 等效于 AIRFLOW__OPENLINEAGE__DISABLED

  • OPENLINEAGE_CONFIG 等效于 AIRFLOW__OPENLINEAGE__CONFIG_PATH

  • OPENLINEAGE_NAMESPACE 等效于 AIRFLOW__OPENLINEAGE__NAMESPACE

  • OPENLINEAGE_EXTRACTORS 等效于设置 AIRFLOW__OPENLINEAGE__EXTRACTORS

  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE 等效于 AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE

  • OPENLINEAGE_URL 可用于设置简单的 http 传输。此方法有一些限制,可能需要使用其他环境变量来达到期望的输出。请参阅文档

附加选项

命名空间

为这个特定实例设置 OpenLineage 命名空间非常有用。这样,如果您使用多个 OpenLineage 生产者,来自它们的事件将在逻辑上分开。如果未设置,则使用 default 命名空间。在 Airflow 配置中将命名空间的名称作为 namespace 选项提供。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'

AIRFLOW__OPENLINEAGE__NAMESPACE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

超时

为了在任务执行和 OpenLineage 之间增加一层隔离,确保 OpenLineage 执行不会以占用时间以外的方式干扰任务执行,OpenLineage 方法在单独的进程中运行。代码以默认的 10 秒超时运行。您可以通过设置 execution_timeout 值来增加此时间。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT 环境变量是等效的。

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60

禁用

您可以通过在 Airflow 配置中将 disabled 选项设置为 true 来禁用发送 OpenLineage 事件,而无需卸载 OpenLineage provider。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true

AIRFLOW__OPENLINEAGE__DISABLED 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED=true

禁用源代码

默认情况下,一些 Operators(例如 Python、Bash)会在其 OpenLineage 事件中包含其源代码。为防止这种情况,请在 Airflow 配置中将 disable_source_code 选项设置为 true

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true

对 Operator 禁用

您可以通过在 Airflow 配置中将要禁用的 Airflow Operators 的完整导入路径字符串(用分号分隔)作为 disabled_for_operators 字段传递,轻松排除某些 Operator 发送 OpenLineage 事件。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

完整任务信息

默认情况下,OpenLineage 集成的 AirflowRunFacet(在每个任务实例事件的 START 事件时附加)不包含完整的序列化任务信息(给定 operator 的参数),而只包含选定的参数。

但是,我们允许用户设置 OpenLineage 集成以包含完整的任务信息。通过这样做,我们不是仅序列化一些已知属性,而是排除某些不可序列化的元素并发送其余所有内容。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO 环境变量是等效的。

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true

警告

通过将此变量设置为 true,OpenLineage 集成不控制您发送的事件大小。它可能包含大小为兆字节或更大的元素,具体取决于您传递给任务的数据大小。

自定义 Extractors

要使用 自定义 Extractors 功能,请在 Airflow 配置中将 Airflow Operators 的完整导入路径字符串(用分号分隔)作为 extractors 选项传递来注册 extractors。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

自定义 Run Facets

要注入 自定义 run facets,请在 Airflow 配置中将自定义 run facet 函数的完整导入路径字符串(用分号分隔)作为 custom_run_facets 选项传递来注册函数。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

调试模式

您可以通过在 Airflow 配置中将 debug_mode 选项设置为 true 来启用在 OpenLineage 事件中发送附加信息,这对于调试和重现您的环境设置非常有用。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true

AIRFLOW__OPENLINEAGE__DEBUG_MODE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DEBUG_MODE=true

警告

通过将此变量设置为 true,OpenLineage 集成可能会记录和发送大量详细信息。仅应临时启用此功能用于调试目的。

在 DAG/任务级别启用 OpenLineage

可以使用 selective_enable 策略选择性地为特定的 DAG 和任务启用 OpenLineage。要启用此策略,请在 Airflow 配置文件的 [openlineage] 部分中将 selective_enable 选项设置为 True

[openlineage]
selective_enable = True

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true

虽然 selective_enable 支持选择性控制,但 disabled 选项 仍然具有优先级。如果您在配置中将 disabled 设置为 True,无论 selective_enable 设置如何,OpenLineage 都将对所有 DAG 和任务禁用。

启用 selective_enable 策略后,您可以使用 enable_lineagedisable_lineage 函数选择为单个 DAG 和任务启用 OpenLineage。

  1. 在 DAG 上启用 Lineage

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
    # Tasks within this DAG will have lineage tracking enabled
    MyOperator(...)

    AnotherOperator(...)
  1. 在 Task 上启用 Lineage

虽然在 DAG 上启用 lineage 会隐式地为该 DAG 内的所有任务启用 lineage,但您仍然可以为特定任务选择性地禁用它

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
    t1 = MyOperator(...)
    t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

在 DAG 级别启用 lineage 会自动为该 DAG 内的所有任务启用 lineage,除非针对每个任务明确禁用。

在任务级别启用 lineage 会隐式地在其 DAG 上启用 lineage。这是因为每个发送事件的任务都会发送一个 ParentRunFacet,这要求在某些 OpenLineage 后端系统中启用 DAG 级别的 lineage。在启用任务级别 lineage 的同时禁用 DAG 级别 lineage 可能会导致错误或不一致。

将父作业信息传递给 Spark 作业

OpenLineage 集成可以自动将 Airflow 的信息(namespace、job name、run id)作为父作业信息(spark.openlineage.parentJobNamespacespark.openlineage.parentJobNamespark.openlineage.parentRunId)注入到 Spark 应用程序属性中,针对支持的 Operator。这使得 Spark 集成能够自动在应用程序级别的 OpenLineage 事件中包含 parentRunFacet,从而在不同集成的任务之间创建父子关系。请参阅 从 Airflow 调度

此配置作为支持自动注入 Spark 属性的所有 Operator 的默认行为,除非在 Operator 级别明确覆盖。为了防止某个特定 Operator 注入父作业信息,同时允许所有其他受支持的 Operator 默认执行此操作,可以明确为该特定 Operator 提供 openlineage_inject_parent_job_info=False

注意

如果在 Spark 作业配置中手动指定了任何 spark.openlineage.parent* 属性,集成将避免注入父作业属性,以确保保留手动提供的值。

您可以通过在 Airflow 配置中将 spark_inject_parent_job_info 选项设置为 true 来启用此自动化。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_parent_job_info = true

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO 环境变量是等效的。

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true

将传输信息传递给 Spark 作业

OpenLineage 集成可以自动将 Airflow 的传输信息注入到 Spark 应用程序属性中,针对支持的 Operator。这使得 Spark 集成无需手动配置即可将事件发送到与 Airflow 集成相同的后端。请参阅 从 Airflow 调度

注意

如果在 Spark 作业配置中手动指定了任何 spark.openlineage.transport* 属性,集成将避免注入传输属性,以确保保留手动提供的值。

您可以通过在 Airflow 配置中将 spark_inject_transport_info 选项设置为 true 来启用此自动化。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_transport_info = true

AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO 环境变量是等效的。

AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true

故障排除

有关如何排除 OpenLineage 故障的详细信息,请参阅故障排除

添加对自定义 Operator 的支持

如果您想为特定 Operator 添加 OpenLineage 支持,请查看 在 Operator 中实现 OpenLineage

在哪里可以了解更多?

反馈

您可以在 slack 上联系我们并留下反馈!

如何贡献

我们欢迎您的贡献!OpenLineage 是一个活跃开发的开源项目,我们非常欢迎您的帮助!

听起来很有趣?查看我们的新贡献者指南以开始。

此条目有帮助吗?