谱系

注意

谱系支持极具试验性,可能会发生变化。

Airflow 可以帮助跟踪数据的来源、数据发生的变化以及数据随时间推移的去向。这有助于进行审计跟踪和数据治理,还可以调试数据流。

Airflow 通过任务的入口和出口来跟踪数据。让我们从一个示例入手,了解其工作原理。

import datetime
import pendulum

from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

dag = DAG(
    dag_id="example_lineage",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="0 0 * * *",
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)

f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)

f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
    outlets.append(f_out)

run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)

入口可以是(上游任务 ID 列表)或静态定义为 attr 注释对象,例如 File 对象。出口只能是 attr 注释对象。两者都在运行时呈现。但是,如果任务的出口是另一个任务的入口,则不会为下游任务重新呈现出口。

注意

如果操作符支持,操作符可以自动添加入口和出口。

在示例 DAG 任务 run_this (task_id=run_me_first) 中,BashOperator 采用 3 个入口:CAT1CAT2CAT3,它们从列表中生成。请注意,data_interval_start 是模板化字段,将在任务运行时呈现。

注意

在后台,Airflow 将谱系元数据作为任务的 pre_execute 方法的一部分进行准备。当任务执行完成后,将调用 post_execute,并将谱系元数据推送到 XCOM。因此,如果您要创建自己的覆盖此方法的操作符,请确保使用 prepare_lineageapply_lineage 分别装饰您的方法。

简写符号

简写符号也可用,这几乎等同于 Unix 命令行管道、输入和输出。请注意,操作符 优先级 仍然适用。此外,| 操作符仅在左侧已定义出口(例如,通过使用 add_outlets(..))或开箱即用地支持谱系 operator.supports_lineage == True 时才有效。

f_in > run_this | (run_this_last > outlets)

谱系后端

可以通过在配置中提供 LineageBackend 实例将谱系指标推送到自定义后端

[lineage]
backend = my.lineage.CustomBackend

后端应继承自 airflow.lineage.LineageBackend

from airflow.lineage.backend import LineageBackend


class CustomBackend(LineageBackend):
    def send_lineage(self, operator, inlets=None, outlets=None, context=None):
        ...
        # Send the info to some external service

此条目是否有用?