DatabricksSubmitRunOperator¶
使用 DatabricksSubmitRunOperator
通过 Databricks api/2.1/jobs/runs/submit API 端点提交新的 Databricks 作业。
使用 Operator¶
有三种实例化此 Operator 的方式。第一种方式是,您可以获取通常用于调用 api/2.1/jobs/runs/submit
端点的 JSON 有效载荷,并通过 json
参数将其直接传递给我们的 DatabricksSubmitRunOperator
。使用此方法,您可以完全控制 Jobs REST API 的底层有效载荷,包括执行包含多个任务的 Databricks 作业,但由于缺乏类型检查,更难检测错误。
json = {
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)
实现相同目标的第二种方法是直接使用 DatabricksSubmitRunOperator
的命名参数。请注意,runs/submit
端点中的每个顶级参数恰好对应一个命名参数。使用命名参数时,必须指定以下内容
任务规范 - 应为以下之一:
spark_jar_task
- JAR 任务的主类和参数notebook_task
- 任务的 notebook 路径和参数spark_python_task
- 用于运行 Python 文件的 Python 文件路径和参数spark_submit_task
- 运行spark-submit
命令所需的参数pipeline_task
- 运行 Delta Live Tables 流水线所需的参数dbt_task
- 运行 dbt 项目所需的参数
集群规范 - 应为以下之一: *
new_cluster
- 运行此任务的新集群配置 *existing_cluster_id
- 运行此任务的现有集群 IDpipeline_task
- 可能指pipeline_id
或pipeline_name
在同时提供了 json 参数和命名参数的情况下,它们将被合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json
键。
- 目前
DatabricksSubmitRunOperator
支持的命名参数有 spark_jar_task
notebook_task
spark_python_task
spark_submit_task
pipeline_task
dbt_task
git_source
new_cluster
existing_cluster_id
libraries
run_name
timeout_seconds
new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
"notebook_path": "/Users/airflow@example.com/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)
另一种方法是使用 tasks 参数传递对象数组来实例化此 Operator。在此处,用于调用 api/2.1/jobs/runs/submit
端点的 tasks 参数的值通过 DatabricksSubmitRunOperator
中的 tasks
参数传递。您可以传递任务数组而不是调用单个任务,并提交一次性运行。
tasks = [
{
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {"notebook_path": "/Users/airflow@example.com/PrepareData"},
}
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)
示例¶
将参数指定为 JSON¶
DatabricksSubmitRunOperator 的示例用法如下
tests/system/databricks/example_databricks.py
# Example of using the JSON parameter to initialize the operator.
new_cluster = {
"spark_version": "9.1.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {"availability": "ON_DEMAND"},
"num_workers": 8,
}
notebook_task_params = {
"new_cluster": new_cluster,
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
使用命名参数¶
您也可以使用命名参数初始化 Operator 并运行作业。
tests/system/databricks/example_databricks.py
# Example of using the named parameters of DatabricksSubmitRunOperator
# to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id="spark_jar_task",
new_cluster=new_cluster,
spark_jar_task={"main_class_name": "com.example.ProcessData"},
libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
)
DatabricksSubmitRunDeferrableOperator¶
DatabricksSubmitRunOperator
Operator 的可延迟版本。
它允许使用 Airflow 2.2.0 中引入的新功能更有效地利用 Airflow Worker。