血缘¶
注意
血缘支持目前处于实验阶段,可能会发生变化。
Airflow 可以帮助跟踪数据的来源、发生的情况以及随时间推移的移动。这有助于进行审计跟踪和数据治理,还可以进行数据流的调试。
Airflow 通过任务的入口和出口来跟踪数据。让我们从一个例子开始,看看它是如何工作的。
import datetime
import pendulum
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)
入口可以是(上游)任务 ID 的列表,也可以静态定义为带有属性注释的对象,例如 File
对象。出口只能是带有属性注释的对象。两者都在运行时呈现。但是,如果一个任务的出口是另一个任务的入口,则不会为下游任务重新呈现。
注意
如果操作符支持,它可以自动添加入口和出口。
在示例 DAG 任务 run_this
( task_id=run_me_first
) 中,它是一个 BashOperator,它有 3 个入口:CAT1
、CAT2
、CAT3
,这些入口是从一个列表生成的。请注意,data_interval_start
是一个模板字段,将在任务运行时呈现。
注意
在幕后,Airflow 将血缘元数据作为任务的 pre_execute
方法的一部分进行准备。当任务执行完成后,调用 post_execute
,血缘元数据被推送到 XCOM。因此,如果您正在创建自己的操作符来覆盖此方法,请确保使用 prepare_lineage
和 apply_lineage
分别修饰您的方法。
简写符号¶
简写符号也可用,其工作方式几乎与 unix 命令行管道、输入和输出相同。请注意,操作符的 优先级仍然适用。此外,只有当左侧定义了出口(例如,通过使用 add_outlets(..)
或具有开箱即用的血缘支持 operator.supports_lineage == True
时,|
操作符才会起作用。
f_in > run_this | (run_this_last > outlets)
Hook 血缘¶
Airflow 提供了一个强大的功能,用于跟踪数据血缘,不仅在任务之间,而且在这些任务中使用的钩子中也可以进行跟踪。此功能可帮助您了解数据如何在 Airflow 管道中流动。
一个全局的 HookLineageCollector
实例充当收集血缘信息的中心枢纽。钩子可以将它们与之交互的数据集的详细信息发送到此收集器。然后,收集器使用此数据来构建符合 AIP-60 标准的数据集,这是描述数据集的标准格式。
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
可以使用在 Airflow 插件中注册的 HookLineageReader
实例来访问 HookLineageCollector
收集的血缘数据。
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
如果在 Airflow 中没有注册 HookLineageReader
,则会改为使用默认的 NoOpCollector
。此收集器不创建符合 AIP-60 标准的数据集或收集血缘信息。
血缘后端¶
可以通过在配置中提供 LineageBackend 的实例来将血缘指标推送到自定义后端
[lineage]
backend = my.lineage.CustomBackend
后端应继承自 airflow.lineage.LineageBackend
。
from airflow.lineage.backend import LineageBackend
class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service