Papermill

Apache Airflow 支持与 Papermill 集成。 Papermill 是一个用于参数化和执行 Jupyter Notebook 的工具。 也许您有一个财务报告,希望在每月的月初或月末,或者在年初或年末以不同的值运行。在您的笔记本中使用参数,并使用 PapermillOperator 让这一切变得轻而易举。

用法

创建笔记本

要参数化您的笔记本,请指定一个带有标签 parameters 的单元格。 Papermill 会查找 parameters 单元格,并将此单元格视为在执行时传入的参数的默认值。 Papermill 将添加一个带有 injected-parameters 标签的新单元格,其中包含输入参数,以便覆盖 parameters 中的值。 如果没有单元格带有 parameters 标签,则注入的单元格将插入到笔记本的顶部。

请确保将您的笔记本保存到 Airflow 可以访问的位置。 Papermill 支持 S3、GCS、Azure 和本地。 不支持 HDFS。

示例 DAG

使用 PapermillOperator 来执行 jupyter notebook

tests/system/papermill/example_papermill.py[源码]

run_this = PapermillOperator(
    task_id="run_example_notebook",
    input_nb="/tmp/hello_world.ipynb",
    output_nb="/tmp/out-{{ execution_date }}.ipynb",
    parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)

示例 DAG,用于验证笔记本中的消息

tests/system/papermill/example_papermill_verify.py[源码]

@task
def check_notebook(inlets, logical_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(inlets[0].url)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {logical_date}")

    if message.data != f"Ran from Airflow at {logical_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_verify",
    schedule=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ logical_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
    )

    run_this >> check_notebook(inlets=AUTO, logical_date="{{ logical_date }}")

示例 DAG,用于使用远程 jupyter 内核验证笔记本中的消息

tests/system/papermill/example_papermill_remote_verify.py[源码]

@task
def check_notebook(output_notebook, execution_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(output_notebook)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {execution_date}")

    if message.data != f"Ran from Airflow at {execution_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_remote_verify",
    schedule="@once",
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ execution_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
        kernel_conn_id="jupyter_kernel_default",
    )

    run_this >> check_notebook(
        output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
    )

此条目是否有帮助?