血缘

注意

血缘支持仍在实验阶段,可能会发生变化。

Airflow 提供了一项强大的功能,用于跟踪数据血缘,不仅在任务之间,还包括任务中使用的钩子。此功能帮助您了解数据在 Airflow 流水线中的流动方式。

全局实例 HookLineageCollector 充当收集血缘信息的中心枢纽。钩子可以将它们交互的资产详情发送给该收集器。收集器随后使用这些数据构建符合 AIP-60 的资产,这是一种用于描述资产的标准格式。钩子还可以向该收集器发送任意非资产相关的数据,如下面示例所示。

from airflow.sdk.lineage import get_hook_lineage_collector


class CustomHook(BaseHook):
    def run(self):
        # run actual code
        collector = get_hook_lineage_collector()
        collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
        collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})
        collector.add_extra(self, key="external_system_job_id", value="some_id_123")

通过 HookLineageCollector 收集的血缘数据可以使用 HookLineageReader 实例访问,该实例在 Airflow 插件中注册。

from airflow.sdk.lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin


class CustomHookLineageReader(HookLineageReader):
    def get_inputs(self):
        return self.lineage_collector.collected_assets.inputs


class HookLineageCollectionPlugin(AirflowPlugin):
    name = "HookLineageCollectionPlugin"
    hook_lineage_readers = [CustomHookLineageReader]

如果在 Airflow 中未注册 HookLineageReader,则会使用默认的 NoOpCollector。此收集器不会创建符合 AIP-60 标准的资产,也不会收集血缘信息。

此条目是否有帮助?