在 Operator 中实现 OpenLineage¶
OpenLineage 通过支持直接修改 Airflow Operator,使得向数据管道添加血缘信息变得容易。当可以修改 Operator 时,添加血缘信息提取就像向其添加一个方法一样简单。有关更多详细信息,请参阅 OpenLineage 方法。
可能存在一些您无法修改的 Operator(例如第三方提供商),但您仍然希望从中提取血缘信息。为了处理这种情况,OpenLineage 允许您为任何 Operator 提供自定义 Extractor。有关更多详细信息,请参阅 自定义 Extractor。
提取优先级¶
由于有多种实现 OpenLineage 对 Operator 支持的方式,因此请务必牢记 OpenLineage 查找血缘数据的顺序。
Extractor - 检查是否为 Operator 类名指定了自定义 Extractor。用户注册的任何自定义 Extractor 都将优先于 Airflow Provider 源代码中定义的默认 Extractor(例如 BashExtractor)。
OpenLineage 方法 - 如果未为 Operator 类名显式指定 Extractor,则使用 DefaultExtractor,它会在 Operator 中查找 OpenLineage 方法。
输入和输出 - 如果 Operator 中没有定义 OpenLineage 方法,则检查输入和输出。
如果以上所有选项都缺失,则不会从 Operator 中提取任何血缘数据。您仍然会收到 OpenLineage 事件,其中包含通用的 Airflow facets、正确的事件时间/类型等信息,但输入/输出将为空,并且 Operator 特定的 facets 将会缺失。
OpenLineage 方法¶
当处理您自己的 Operator 并可以直接实现 OpenLineage 方法时,推荐使用此方法。当处理您无法修改(例如第三方提供商)但仍然希望从中提取血缘信息的 Operator 时,请参阅 自定义 Extractor。
OpenLineage 定义了一些用于在 Operator 中实现的方法。这些方法被称为 OpenLineage 方法。
def get_openlineage_facets_on_start() -> OperatorLineage: ...
def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage: ...
def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage: ...
当任务实例状态变为以下时,OpenLineage 方法会被相应地调用:
RUNNING ->
get_openlineage_facets_on_start()
SUCCESS ->
get_openlineage_facets_on_complete()
FAILED ->
get_openlineage_facets_on_failure()
以下方法中至少必须实现一个:get_openlineage_facets_on_start()
或 get_openlineage_facets_on_complete()
。有关在其他方法缺失时会调用哪些方法的更多详细信息,请参阅 如何正确实现 OpenLineage 方法?。
Provider 定义了 OperatorLineage
结构,供 Operator 返回,而不是返回完整的 OpenLineage 事件
@define
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage 集成本身负责用通用的 Airflow facets、正确的事件时间/类型等信息对其进行丰富,从而创建正确的 OpenLineage RunEvent。
如何正确实现 OpenLineage 方法?¶
在 Operator 中实现 OpenLineage 时,有几点值得注意。
首先,不要在顶层导入 OpenLineage 相关的对象,而是在 OL 方法本身内部导入。这使得用户即使没有安装 OpenLineage provider 也能使用您的 provider。
第二点重要之处是确保您的 provider 返回符合 OpenLineage 规范的数据集名称。这使得 OpenLineage 消费者能够正确匹配从不同来源收集的数据集信息。命名约定在 OpenLineage 命名文档 中描述。
第三,OpenLineage 实现不应该浪费不使用它的用户的时间。这意味着不要在 execute
方法中进行不需要其结果的繁重处理或网络调用。更好的选择是将相关信息保存在 Operator 属性中 - 然后在 OpenLineage 方法中使用它。一个很好的例子是 BigQueryExecuteQueryOperator
。它保存了已执行查询的 job_ids
。get_openlineage_facets_on_complete
然后可以调用 BigQuery API,查询这些表的血缘信息,并将其转换为 OpenLineage 格式。
第四,并非必须实现所有方法。如果在调用 execute
之前已知所有数据集,并且没有相关的运行时数据,则可能没有必要实现 get_openlineage_facets_on_complete
- get_openlineage_facets_on_start
方法可以提供所有数据。反之,如果在执行之前一切都未知,则可能没有必要编写 _on_start
方法。类似地,如果没有相关的失败数据 - 或失败条件未知,则实现 get_openlineage_facets_on_failure
可能不值得。通常:如果不存在 on_failure
方法,则会调用 on_complete
方法。如果不存在 on_failure
和 on_complete
方法,则会调用 on_start
方法(在任务开始和任务完成时都会调用)。如果不存在 on_start
方法,则血缘信息将不包含在 START 事件中,并且在任务完成时会调用 on_complete
方法。
如何测试 OpenLineage 方法?¶
在 Operator 中对 OpenLineage 集成进行单元测试与测试 Operator 本身非常相似。这些测试的目的是确保 get_openlineage_*
方法返回正确的 OperatorLineage
数据结构并填充相关字段。建议模拟任何外部调用。测试作者需要记住调用不同 OL 方法的条件是不同的。get_openlineage_facets_on_start
在 execute
调用之前被调用,因此不能依赖于在那里设置的值。
有关如何在本地排除 OpenLineage 故障的详细信息,请参阅 故障排除。
目前没有用于系统测试 OpenLineage 集成的现有框架,但最简单的方法是将发出的事件(例如使用 FileTransport
)与预期的事件进行比较。OpenLineage 系统测试作者的目标是提供预期事件键的字典。事件键标识从特定 Operator 和方法发送的事件:它们的结构为 <dag_id>.<task_id>.event.<event_type>
;通过这种方式总是可以标识从特定任务发送的特定事件。提供的事件结构不必包含最终事件中的所有字段。只能比较测试作者提供的字段;这允许只检查特定测试关注的字段。它还允许跳过(半)随机生成的字段,如 runId
或 eventTime
,或者在 Airflow 中的 OpenLineage 上下文中始终相同的字段,如 producer
。
示例¶
以下是为 GcsToGcsOperator 正确实现的 get_openlineage_facets_on_complete
方法的示例。由于在 execute
方法中进行了一些处理,并且没有相关的失败数据,因此只实现这一个方法就足够了。
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
有关已实现的 OpenLineage 方法的更多示例,请查看 支持的类 的源代码。
自定义 Extractor¶
当处理您无法修改(例如第三方提供商)但仍希望从中提取血缘信息的 Operator 时,推荐使用此方法。如果您想从您自己的 Operator 中提取血缘信息,您可能更倾向于直接实现 OpenLineage 方法 中描述的 OpenLineage 方法。
此方法的工作原理是检测您的 DAG 使用了哪些 Airflow Operator,并使用相应的 Extractor 类从它们中提取血缘数据。
接口¶
自定义 Extractor 必须派生自 BaseExtractor
并至少实现两个方法:_execute_extraction
和 get_operator_classnames
。
BaseExtractor 还定义了三个方法:extract
、extract_on_complete
和 extract_on_failure
,这些方法被调用并用于提供实际的血缘数据。区别在于 extract
在 Operator 的 execute
方法之前被调用,而 extract_on_complete
和 extract_on_failure
在任务成功或失败后分别被调用。默认情况下,extract
调用自定义 Extractor 中实现的 _execute_extraction
方法。当任务成功时,extract_on_complete
被调用,如果未被覆盖,默认情况下会委托给 extract
。当任务失败时,extract_on_failure
被调用,如果未被覆盖,默认情况下会委托给 extract_on_complete
。如果您想提供任务执行后的一些附加信息,可以覆盖 extract_on_complete
和 extract_on_failure
方法。这对于提取 Operator 在执行期间或之后设置为自身属性的数据非常有用。一个很好的例子是 SQL Operator,它在执行后设置 query_ids
。
的 get_operator_classnames
是一个 classmethod,用于提供您的 Extractor 可以从中获取血缘信息的 Operator 列表。
例如
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['CustomPostgresOperator']
如果 Operator 的名称与列表中的名称之一匹配,Extractor 将被实例化 - 并在 Extractor 的 self.operator
属性中提供 Operator - 并且会调用 extract
以及 extract_on_complete
/extract_on_failure
方法。
这两个方法都返回 OperatorLineage
结构
@define
class OperatorLineage:
"""Structure returned from lineage extraction."""
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
输入和输出是普通 OpenLineage 数据集的列表 (openlineage.client.event_v2.Dataset)。
run_facets
和 job_facets
是可选的 RunFacets 和 JobFacets 的字典,这些 facet 将附加到作业 - 例如,如果您的 Operator 正在执行 SQL,您可能希望附加 SqlJobFacet
。
要了解有关 OpenLineage 中 facets 的更多信息,请参阅 自定义 Facets。
注册自定义 Extractor¶
除非您注册 Extractor,否则 OpenLineage 集成不会知道您已提供了一个 Extractor。
可以通过在 Airflow 配置中使用 extractors
选项来完成。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
环境变量是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
您可以选择使用空格分隔它们。如果您将它们作为某些 YAML 文件的一部分提供,这将非常有用。
AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
请记住确保该路径对于 scheduler 和 worker 来说是可导入的。
调试自定义 Extractor¶
自定义 Extractor 通常存在两个问题。
首先,是提供给 Airflow 配置中 extractors
选项的路径错误。该路径需要与您在代码中使用的路径完全相同。如果路径错误或无法从 worker 导入,插件将无法加载 Extractor,并且不会为该 Operator 发出正确的 OpenLineage 事件。
第二个问题,可能更隐蔽,是来自 Airflow 的导入。由于 OpenLineage 代码在 Airflow worker 本身启动时被实例化,任何来自 Airflow 的导入都可能悄无声息地形成循环引用。这会导致 OpenLineage 提取失败。
为了避免此问题,请仅在本地导入 Airflow - 在 _execute_extraction
或 extract_on_complete
/extract_on_failure
方法中。如果您需要用于类型检查的导入,请将其放在 typing.TYPE_CHECKING 之后。
测试自定义 Extractor¶
与所有代码一样,自定义 Extractor 也应该进行测试。本节将提供有关编写测试的最重要数据结构的某些信息以及有关故障排除的一些注意事项。我们假设您对编写自定义 Extractor 有先前的了解。要详细了解 Operator 和 Extractor 在底层如何协同工作,请查看 自定义 Extractor。
测试 Extractor 时,我们首先要验证是否正在创建 OperatorLineage
对象,特别是验证对象是否使用正确的输入和输出数据集以及相关 facets 构建。这在 OpenLineage 中通过 pytest 完成,并对连接和对象进行了适当的模拟和补丁。查看 示例测试。
测试每个 facet 也非常重要,因为如果 facet 不正确,UI 中的数据或图表可能会错误地渲染。例如,如果在 Extractor 中 facet 名称创建不正确,则 Operator 的任务将不会显示在血缘图谱中,从而在管道可观测性方面产生空白。
即使进行了单元测试,Extractor 可能仍未按预期运行。判断数据是否未正确传输的最简单方法是检查 UI 元素是否未在 Lineage 选项卡中正确显示。
有关如何在本地排除 OpenLineage 故障的详细信息,请参阅 故障排除。
示例¶
这是一个简单的 Extractor 示例,用于一个 Operator,该 Operator 在 BigQuery 中执行导出查询并将结果保存到 S3 文件。在调用 Operator 的 execute
方法之前已知一些信息,我们可以在 _execute_extraction
方法中提取一些血缘信息。在调用 Operator 的 execute
方法之后,在 extract_on_complete
中,我们可以简单地将一些额外的 Facets(例如包含 Bigquery 作业 ID)附加到我们之前准备的信息中。如果仅在任务失败时需要包含某些信息,我们还可以实现 extract_on_failure
方法。这样,我们可以从 Operator 获取所有可能的信息。
请注意,这只是一个示例。OpenLineage 提供了一些内置功能,可以方便地处理不同的过程,例如使用 SQL 解析器从 SQL 查询中提取列级血缘信息和输入/输出。
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
ErrorMessageRunFacet,
SQLJobFacet,
)
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self._job_id = None
def execute(self, context) -> Any:
self._job_id, self._error_message = run_query(query=self.query)
class ExampleExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ["ExampleOperator"]
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
job_facets={
"sql": SQLJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self, task_instance) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
}
return lineage_metadata
def extract_on_failure(self, task_instance) -> OperatorLineage:
"""Add any failure-specific information."""
lineage_metadata = self.extract_on_complete(task_instance)
lineage_metadata.run_facets = {
"error": ErrorMessageRunFacet(
message=task_instance.task._error_message, programmingLanguage="python"
)
}
return lineage_metadata
有关 OpenLineage Extractor 的更多示例,请查看 BashExtractor 或 PythonExtractor 的源代码。
自定义 Facets¶
要了解有关 OpenLineage 中 facets 的更多信息,请参阅 facet 文档。另请查看 可用 facets 以及关于 使用 facets 进行扩展 的博客文章。
OpenLineage 规范可能不包含编写 Extractor 所需的所有 facets,在这种情况下,您将不得不创建自己的 自定义 facets。
您还可以使用 custom_run_facets
Airflow 配置将自己的自定义 facets 注入到血缘事件的 run facet 中。
需要采取的步骤,
编写一个返回自定义 facets 的函数。您可以根据需要编写任意数量的自定义 facet 函数。
使用
custom_run_facets
Airflow 配置注册函数。
Airflow OpenLineage 监听器将在生成血缘事件期间自动执行这些函数,并将其返回值附加到血缘事件中的 run facet。
编写自定义 facet 函数¶
输入参数: 函数应接受两个输入参数:
TaskInstance
和TaskInstanceState
。函数体: 执行生成自定义 facets 所需的逻辑。自定义 facets 必须继承自
RunFacet
,以便自动为 facet 添加_producer
和_schemaURL
。返回值: 要添加到血缘事件的自定义 facets。返回类型应为
dict[str, RunFacet]
或None
。如果您不想为特定条件添加自定义 facets,可以选择返回None
。
自定义 facet 函数示例
import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
注册自定义 facet 函数¶
使用 custom_run_facets
Airflow 配置来注册自定义 run facet 函数,方法是传递一个以分号分隔的函数完整导入路径字符串。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS
环境变量是等效的。
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
注意
自定义 facet 函数在 TaskInstance 的 START 和 COMPLETE/FAIL 时都会执行,并添加到相应的 OpenLineage 事件中。
在创建 TaskInstance 状态的条件时,您应该使用提供的第二个参数 (
TaskInstanceState
),该参数将包含任务应处于的状态。这可能与 ti.current_state() 不同,因为 OpenLineage 监听器可能在 TaskInstance 的状态在 Airflow 数据库中更新之前被调用。当单个函数的路径注册多次时,它仍然只会执行一次。
当多个注册的函数返回重复的自定义 facet 键时,血缘事件中将添加随机函数的结果。请避免使用重复的 facet 键,因为这可能导致意外行为。
作业层级¶
Apache Airflow 具有固有的作业层级结构:DAGs 是大型且可独立调度的单元,它们包含更小、可执行的任务。
OpenLineage 在其 Job Hierarchy 模型中反映了这种结构。
在 DAG 调度时,会发出一个 START 事件。
随后,按照 Airflow 的任务顺序,每个任务触发
在 TaskInstance 启动时的 START 事件。
在 TaskInstance 完成时的 COMPLETE/FAILED 事件。
最后,在 DAG 终止时,会发出一个完成事件(COMPLETE 或 FAILED)。
TaskInstance 事件的 ParentRunFacet 引用了原始的 DAG 运行。
故障排除¶
在本地测试代码时,可以使用 Marquez 来检查正在发出或未发出的数据。使用 Marquez 可以帮助您确定错误是由 Extractor 还是 API 引起的。如果数据按预期从 Extractor 发出但未到达 UI,则 Extractor 是正常的,应在 OpenLineage 中提一个 issue。但是,如果数据未正确发出,则可能需要更多单元测试来覆盖 Extractor 的行为。Marquez 可以帮助您查明哪些 facets 未正确形成,以便您知道在哪里添加测试覆盖。
调试设置¶
为了调试目的,请确保 Airflow 日志级别 和 OpenLineage 客户端日志级别 都设置为 DEBUG
。最新的 provider 会自动将 Airflow 的日志级别与 OpenLineage 客户端同步,无需手动配置。
要将包含附加信息(例如,所有已安装包的列表)的 DebugFacet 附加到所有 OL 事件,请为 OpenLineage 集成启用 调试模式。
请记住,启用这些设置会增加 Airflow 日志的详细程度(这将增加日志大小)并向 OpenLineage 事件添加额外信息。建议暂时使用它们,主要用于调试目的。
在寻求调试帮助时,请始终尝试提供以下信息
将日志级别设置为 DEBUG 的 Airflow scheduler 日志
将日志级别设置为 DEBUG 的 Airflow worker 日志(任务日志)
启用 debug_mode 的 OpenLineage 事件
有关 Airflow 版本和 OpenLineage provider 版本的信息
有关 Airflow 运行的部署环境所做的任何自定义修改的信息
我可以在哪里了解更多?¶
查看 OpenLineage 网站。
访问我们的 GitHub 仓库。
观看多个关于 OpenLineage 的 讲座。
反馈¶
您可以在 slack 上联系我们并给我们留下反馈!
如何贡献¶
我们欢迎您的贡献!OpenLineage 是一个正在积极开发的开源项目,我们非常欢迎您的帮助!
听起来很有趣?查看我们的 新贡献者指南 开始吧。