DatabricksNotebookOperator¶
使用 DatabricksNotebookOperator
在 Databricks 上启动和监控 notebook 作业运行,作为 Airflow 任务。
示例¶
在新集群上在 Databricks 中运行 notebook¶
new_cluster_spec = {
"cluster_name": "",
"spark_version": "11.3.x-scala2.12",
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-east-2b",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0,
},
"node_type_id": "i3.xlarge",
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
"enable_elastic_disk": False,
"data_security_mode": "LEGACY_SINGLE_USER_STANDARD",
"runtime_engine": "STANDARD",
"num_workers": 8,
}
notebook_1 = DatabricksNotebookOperator(
task_id="notebook_1",
notebook_path="/Shared/Notebook_1",
notebook_packages=[
{
"pypi": {
"package": "simplejson==3.18.0",
"repo": "https://pypi.ac.cn/simple",
}
},
{"pypi": {"package": "Faker"}},
],
source="WORKSPACE",
new_cluster=new_cluster_spec,
)
在现有集群上在 Databricks 中运行 notebook¶
notebook_2 = DatabricksNotebookOperator(
task_id="notebook_2",
notebook_path="/Shared/Notebook_2",
notebook_packages=[
{
"pypi": {
"package": "simplejson==3.18.0",
"repo": "https://pypi.ac.cn/simple",
}
},
],
source="WORKSPACE",
existing_cluster_id="existing_cluster_id",
)