谱系¶
注意
谱系支持极具试验性,可能会发生变化。
Airflow 可以帮助跟踪数据的来源、数据发生的变化以及数据随时间推移的去向。这有助于进行审计跟踪和数据治理,还可以调试数据流。
Airflow 通过任务的入口和出口来跟踪数据。让我们从一个示例入手,了解其工作原理。
import datetime
import pendulum
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)
入口可以是(上游任务 ID 列表)或静态定义为 attr 注释对象,例如 File
对象。出口只能是 attr 注释对象。两者都在运行时呈现。但是,如果任务的出口是另一个任务的入口,则不会为下游任务重新呈现出口。
注意
如果操作符支持,操作符可以自动添加入口和出口。
在示例 DAG 任务 run_this
(task_id=run_me_first
) 中,BashOperator 采用 3 个入口:CAT1
、CAT2
、CAT3
,它们从列表中生成。请注意,data_interval_start
是模板化字段,将在任务运行时呈现。
注意
在后台,Airflow 将谱系元数据作为任务的 pre_execute
方法的一部分进行准备。当任务执行完成后,将调用 post_execute
,并将谱系元数据推送到 XCOM。因此,如果您要创建自己的覆盖此方法的操作符,请确保使用 prepare_lineage
和 apply_lineage
分别装饰您的方法。
简写符号¶
简写符号也可用,这几乎等同于 Unix 命令行管道、输入和输出。请注意,操作符 优先级 仍然适用。此外,|
操作符仅在左侧已定义出口(例如,通过使用 add_outlets(..)
)或开箱即用地支持谱系 operator.supports_lineage == True
时才有效。
f_in > run_this | (run_this_last > outlets)
谱系后端¶
可以通过在配置中提供 LineageBackend 实例将谱系指标推送到自定义后端
[lineage]
backend = my.lineage.CustomBackend
后端应继承自 airflow.lineage.LineageBackend
。
from airflow.lineage.backend import LineageBackend
class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service