使用 OpenLineage 集成

OpenLineage 是一个用于数据血缘收集和分析的开放框架。其核心是一个可扩展的规范,系统可以使用该规范与血缘元数据进行互操作。 查看 OpenLineage 文档

使用 OpenLineage 不需要更改用户 DAG 文件。需要进行基本配置,以便 OpenLineage 知道将事件发送到哪里。

快速入门

注意

OpenLineage 提供程序提供多种数据传输选项(http、kafka、文件等),包括创建自定义解决方案的灵活性。可以通过多种方法管理配置,并且用户可以使用大量的设置来微调和增强 OpenLineage 的使用。有关这些功能的全面解释,请参阅本文档的后续章节。

此示例是 OpenLineage 设置的基本演示。

  1. 安装提供程序包或将其添加到 requirements.txt 文件中。

    pip install apache-airflow-providers-openlineage
    
  2. 提供 Transport 配置,以便 OpenLineage 知道将事件发送到哪里。在 airflow.cfg 文件中

    [openlineage]
    transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
    

    或使用 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量

    AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
    
  3. 就这么简单! 当 DAG 运行时,OpenLineage 事件应该被发送到配置的后端。

用法

启用并配置后,集成无需用户进一步操作。它将自动

  • 收集任务输入/输出元数据(来源、模式等)。

  • 收集任务运行级元数据(执行时间、状态、参数等)

  • 收集任务作业级元数据(所有者、类型、描述等)

  • 收集特定于任务的元数据(bigquery 作业 ID、python 源代码等) - 取决于 Operator

所有这些数据将作为 OpenLineage 事件发送到配置的后端,如 作业层次结构 中所述。

传输设置

配置 OpenLineage Airflow 提供程序的主要和推荐方法是 Airflow 配置(airflow.cfg 文件)。所有可能的配置选项,以及示例值,都可以在 配置部分 中找到。

至少,在每种情况下都需要设置一件事是 Transport - 您希望事件最终到达哪里 - 例如 Marquez

将传输作为 JSON 字符串

Airflow 配置中的 transport 选项用于此目的。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量是等效的。

AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'

如果您想查看 OpenLineage 事件而不将其发送到任何地方,您可以设置 ConsoleTransport - 事件将最终出现在任务日志中。

[openlineage]
transport = {"type": "console"}

注意

有关内置传输类型的完整列表、特定传输的选项或有关如何实现自定义传输的说明,请参阅 Python 客户端文档

将传输作为配置文件

您还可以使用 YAML 文件(例如 openlineage.yml)配置 OpenLineage Transport。在 Airflow 配置中提供 YAML 文件的路径作为 config_path 选项。

[openlineage]
config_path = '/path/to/openlineage.yml'

AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量是等效的。

AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'

配置 YAML 文件的示例内容

transport:
  type: http
  url: https://backend:5000
  endpoint: events/receive
  auth:
    type: api_key
    apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e

注意

有关此配置方法的详细描述,以及示例配置文件,可以在 Python 客户端文档 中找到。

配置优先级

由于有多种配置 OpenLineage 的方法,因此请务必记住不同配置的优先级。OpenLineage Airflow 提供程序按以下顺序查找配置

  1. airflow.cfg 中的 openlineage 部分下检查 config_path (或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量)

  2. airflow.cfg 中的 openlineage 部分下检查 transport (或 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量)

  3. 如果所有上述选项都缺失,则在底层使用的 OpenLineage Python 客户端会按照 文档中描述的顺序查找配置。请注意,鼓励使用 Airflow 配置,并且它是唯一面向未来的解决方案。

向后兼容性

警告

以下变量不应使用,并且将来可能会删除。考虑使用 Airflow 配置(如上所述)来实现面向未来的解决方案。

为了与 openlineage-airflow 包向后兼容,仍然可以使用一些环境变量

  • OPENLINEAGE_DISABLED 等效于 AIRFLOW__OPENLINEAGE__DISABLED

  • OPENLINEAGE_CONFIG 等效于 AIRFLOW__OPENLINEAGE__CONFIG_PATH

  • OPENLINEAGE_NAMESPACE 等效于 AIRFLOW__OPENLINEAGE__NAMESPACE

  • OPENLINEAGE_EXTRACTORS 等效于设置 AIRFLOW__OPENLINEAGE__EXTRACTORS

  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE 等效于 AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE

  • OPENLINEAGE_URL 可用于设置简单的 http 传输。此方法有一些限制,可能需要使用其他环境变量才能实现所需的输出。请参阅 文档

其他选项

命名空间

为这个特定的实例设置 OpenLineage 命名空间非常有用。这样,如果您使用多个 OpenLineage 生产者,来自它们的事件将在逻辑上分离。如果未设置,则使用 default 命名空间。在 Airflow 配置中提供命名空间的名称作为 namespace 选项。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'

AIRFLOW__OPENLINEAGE__NAMESPACE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

超时

为了在任务执行和 OpenLineage 之间添加一层隔离,增加 OpenLineage 执行不会以占用时间以外的方式干扰任务执行的保证,OpenLineage 方法在单独的进程中运行。代码以 10 秒的默认超时时间运行。您可以通过设置 execution_timeout 值来增加此时间。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT 环境变量是等效的。

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60

禁用

您可以通过在 Airflow 配置中将 disabled 选项设置为 true 来禁用发送 OpenLineage 事件,而无需卸载 OpenLineage 提供程序。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true

AIRFLOW__OPENLINEAGE__DISABLED 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED=true

禁用源代码

一些 Operator(例如 Python、Bash)默认会将其源代码包含在其 OpenLineage 事件中。要阻止这种情况,请在 Airflow 配置中将 disable_source_code 选项设置为 true

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true

为 Operator 禁用

您可以通过将以分号分隔的 Airflow Operator 的完整导入路径字符串传递到 Airflow 配置中的 disabled_for_operators 字段,轻松地排除某些 Operator 发出 OpenLineage 事件。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

完整任务信息

默认情况下,OpenLineage 集成的 AirflowRunFacet(在每个任务实例事件的 START 事件上附加)不包含完整的序列化任务信息(给定操作符的参数),而仅包含选定的参数。

但是,我们允许用户设置 OpenLineage 集成以包含完整的任务信息。通过这样做,我们不会仅序列化一些已知的属性,而是排除某些不可序列化的元素并发送其他所有内容。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO 环境变量等效于此。

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true

警告

将此变量设置为 true 后,OpenLineage 集成不会控制您发送的事件的大小。它可能会包含大小为 MB 甚至更大的元素,具体取决于您传递给任务的数据大小。

自定义提取器

要使用自定义提取器功能,请通过将以分号分隔的 Airflow 操作符完整导入路径字符串传递给 Airflow 配置中的 extractors 选项来注册提取器。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS 环境变量等效于此。

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

自定义运行方面

要注入自定义运行方面,请通过将以分号分隔的完整导入路径字符串传递给 Airflow 配置中的 custom_run_facets 选项来注册自定义运行方面函数。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 环境变量等效于此。

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

调试模式

您可以通过在 Airflow 配置中将 debug_mode 选项设置为 true 来启用在 OpenLineage 事件中发送额外信息,这些信息对于调试和重现您的环境设置非常有用。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true

AIRFLOW__OPENLINEAGE__DEBUG_MODE 环境变量等效于此。

AIRFLOW__OPENLINEAGE__DEBUG_MODE=true

警告

将此变量设置为 true 后,OpenLineage 集成可能会记录和发出大量详细信息。它应该仅在调试目的时临时启用。

在 DAG/任务级别启用 OpenLineage

可以使用 selective_enable 策略为特定的 DAG 和任务选择性地启用 OpenLineage。要启用此策略,请在 Airflow 配置文件的 [openlineage] 部分中将 selective_enable 选项设置为 True。

[openlineage]
selective_enable = True

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE 环境变量等效于此。

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true

尽管 selective_enable 启用了选择性控制,但 disabled 选项 仍然具有优先权。如果在配置中将 disabled 设置为 True,则无论 selective_enable 设置如何,OpenLineage 都将对所有 DAG 和任务禁用。

一旦启用了 selective_enable 策略,您可以使用 enable_lineagedisable_lineage 函数选择为单独的 DAG 和任务启用 OpenLineage。

  1. 在 DAG 上启用 Lineage

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
    # Tasks within this DAG will have lineage tracking enabled
    MyOperator(...)

    AnotherOperator(...)
  1. 在任务上启用 Lineage

虽然在 DAG 上启用 lineage 会隐式地为该 DAG 内的所有任务启用它,但您仍然可以选择性地为特定任务禁用它。

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
    t1 = MyOperator(...)
    t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

在 DAG 级别启用 lineage 会自动为该 DAG 内的所有任务启用它,除非每个任务明确禁用。

在任务级别启用 lineage 会隐式地在其 DAG 上启用 lineage。这是因为每个发出任务都会发送一个 ParentRunFacet,这需要在某些 OpenLineage 后端系统中启用 DAG 级别的 lineage。禁用 DAG 级别的 lineage 同时启用任务级别的 lineage 可能会导致错误或不一致。

将父作业信息传递给 Spark 作业

OpenLineage 集成可以将 Airflow 的信息(命名空间、作业名称、运行 ID)作为父作业信息(spark.openlineage.parentJobNamespacespark.openlineage.parentJobNamespark.openlineage.parentRunId)自动注入到 Spark 应用程序属性中,适用于受支持的操作符。它允许 Spark 集成自动在应用程序级别的 OpenLineage 事件中包含 parentRunFacet,从而在来自不同集成的任务之间创建父子关系。请参阅从 Airflow 调度

警告

如果在 Spark 作业配置中手动指定了上述任何属性,则集成将避免注入父作业属性,以确保保留手动提供的值。

您可以通过在 Airflow 配置中将 spark_inject_parent_job_info 选项设置为 true 来启用此自动化。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_parent_job_info = true

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO 环境变量等效于此。

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true

问题排查

有关如何对 OpenLineage 进行故障排除的详细信息,请参阅问题排查

为自定义操作符添加支持

如果您想为特定操作符添加 OpenLineage 覆盖,请查看在操作符中实现 OpenLineage

我在哪里可以了解更多?

反馈

您可以在slack上联系我们并给我们留下反馈!

如何贡献

我们欢迎您的贡献!OpenLineage 是一个正在积极开发的开源项目,我们很乐意得到您的帮助!

听起来很有趣?请查看我们的新贡献者指南以开始使用。

这个条目有帮助吗?