DatabricksCreateJobsOperator¶
使用 DatabricksCreateJobsOperator
来创建(或重置)一个 Databricks 作业。此 Operator 依赖于过去的 XComs 来记住已创建的 job_id
,以便通过此 Operator 进行重复调用时更新现有作业,而不是创建新作业。当与 DatabricksRunNowOperator 配对使用时,所有运行都将属于 Databricks UI 中的同一作业。
使用此 Operator¶
有三种实例化此 Operator 的方法。第一种方法是,您可以获取通常用于调用 api/2.1/jobs/create
端点的 JSON 有效负载,并通过 json
参数将其直接传递给我们的 DatabricksCreateJobsOperator
。使用此方法,您可以完全控制底层有效负载到 Jobs REST API,包括执行包含多个任务的 Databricks 作业,但由于缺乏类型检查,更难检测错误。
完成相同事情的第二种方法是直接使用 DatabricksCreateJobsOperator
的命名参数。请注意,api/2.1/jobs/create
端点中的每个顶级参数都恰好对应一个命名参数。
第三种方法是同时使用 json 参数 和 命名参数。它们将被合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶级 json
键。
- 目前
DatabricksCreateJobsOperator
支持的命名参数有 name
description
tags
tasks
job_clusters
email_notifications
webhook_notifications
notification_settings
timeout_seconds
schedule
max_concurrent_runs
git_source
access_control_list
示例¶
将参数指定为 JSON¶
以下是 DatabricksCreateJobsOperator 的一个使用示例
tests/system/databricks/example_databricks.py
# Example of using the JSON parameter to initialize the operator.
job = {
"tasks": [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
],
"job_clusters": [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
],
}
jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)
使用命名参数¶
您也可以使用命名参数来初始化此 Operator 并运行作业。
tests/system/databricks/example_databricks.py
# Example of using the named parameters to initialize the operator.
tasks = [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
]
job_clusters = [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
jobs_create_named = DatabricksCreateJobsOperator(
task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
)
与 DatabricksRunNowOperator 配对¶
您可以将 DatabricksCreateJobsOperator 在 return_value XCom 中返回的 job_id
作为参数传递给 DatabricksRunNowOperator 来运行作业。
tests/system/databricks/example_databricks.py
# Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
run_now = DatabricksRunNowOperator(
task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
)
jobs_create_named >> run_now