Papermill¶
Apache Airflow 支持与 Papermill 集成。Papermill 是一个用于参数化和执行 Jupyter Notebook 的工具。也许你有一个财务报告,你想在每月的月初或月末,或每年的年初或年末使用不同的数值来运行它。在你的 notebook 中使用 参数 并使用 PapermillOperator
让这一切变得轻而易举。
用法¶
创建 notebook¶
要参数化你的 notebook,请指定一个带有 parameters 标签的单元格。Papermill 会寻找 parameters 单元格,并将该单元格视为执行时传入参数的默认值。Papermill 会添加一个带有 injected-parameters 标签的新单元格,其中包含输入参数,以覆盖 parameters 中的值。如果没有单元格被标记为 parameters,注入的单元格将被插入到 notebook 的顶部。
请确保你将 notebook 保存到 Airflow 可以访问的地方。Papermill 支持 S3、GCS、Azure 和本地文件系统。不支持 HDFS。
示例 DAG¶
使用 PapermillOperator
执行 jupyter notebook
tests/system/papermill/example_papermill.py
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb="/tmp/hello_world.ipynb",
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)
验证 notebook 中消息的示例 DAG
tests/system/papermill/example_papermill_verify.py
@task
def check_notebook(output, logical_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(output.url)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {logical_date}")
if message.data != f"Ran from Airflow at {logical_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_verify",
schedule=SCHEDULE_INTERVAL,
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ logical_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
)
check_notebook(output=run_this.output, logical_date="{{ logical_date }}")
使用远程 jupyter kernel 验证 notebook 中消息的示例 DAG
tests/system/papermill/example_papermill_remote_verify.py
@task
def check_notebook(output_notebook, execution_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(output_notebook)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {execution_date}")
if message.data != f"Ran from Airflow at {execution_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_remote_verify",
schedule="@once",
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
kernel_conn_id="jupyter_kernel_default",
)
run_this >> check_notebook(
output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
)