OpenLineage 宏

OpenLineage 插件中包含的宏被集成到 Airflow 的主集合中,并可供使用。

它们可以作为 Jinja 模板被调用,例如:

Lineage 作业与运行宏

这些宏
  • lineage_job_namespace()

  • lineage_job_name(task_instance)

  • lineage_run_id(task_instance)

允许将某个 Airflow 任务的部分运行信息注入到发送给远程处理作业的参数中。例如,SparkSubmitOperator 可以像这样设置:

SparkSubmitOperator(
    task_id="my_task",
    application="/script.py",
    conf={
        # separated components
        "spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
        "spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
        "spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
    },
)

Lineage 父 ID

相同的信息,但压缩成一个字符串,可以使用 linage_parent_id(task_instance) 宏传递。

def my_task_function(templates_dict, **kwargs):
    parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/")
    ...


PythonOperator(
    task_id="render_template",
    python_callable=my_task_function,
    templates_dict={
        # joined components as one string `<namespace>/<name>/<run_id>`
        "parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
    },
    provide_context=False,
    dag=dag,
)

本文是否有帮助?