airflow.providers.databricks.hooks.databricks

Databricks hook。

这个 Hook 能够将作业提交并运行到 Databricks 平台。在内部,Operator 与 api/2.1/jobs/run-now 端点 <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_api/2.1/jobs/runs/submit 端点 进行通信。

属性

GET_CLUSTER_ENDPOINT

RESTART_CLUSTER_ENDPOINT

START_CLUSTER_ENDPOINT

TERMINATE_CLUSTER_ENDPOINT

CREATE_ENDPOINT

RESET_ENDPOINT

UPDATE_ENDPOINT

RUN_NOW_ENDPOINT

SUBMIT_RUN_ENDPOINT

GET_RUN_ENDPOINT

CANCEL_RUN_ENDPOINT

DELETE_RUN_ENDPOINT

REPAIR_RUN_ENDPOINT

OUTPUT_RUNS_JOB_ENDPOINT

CANCEL_ALL_RUNS_ENDPOINT

INSTALL_LIBS_ENDPOINT

UNINSTALL_LIBS_ENDPOINT

LIST_JOBS_ENDPOINT

LIST_PIPELINES_ENDPOINT

WORKSPACE_GET_STATUS_ENDPOINT

SPARK_VERSIONS_ENDPOINT

SQL_STATEMENTS_ENDPOINT

RunLifeCycleState

Databricks 运行生命周期状态概念的枚举。

RunState

Databricks 运行状态概念的工具类。

ClusterState

Databricks 集群状态概念的工具类。

SQLStatementState

Databricks 语句的 SQL 语句状态概念的工具类。

DatabricksHook

与 Databricks 交互。

模块内容

airflow.providers.databricks.hooks.databricks.GET_CLUSTER_ENDPOINT = ('GET', 'api/2.0/clusters/get')[source]
airflow.providers.databricks.hooks.databricks.RESTART_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/restart')[source]
airflow.providers.databricks.hooks.databricks.START_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/start')[source]
airflow.providers.databricks.hooks.databricks.TERMINATE_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/delete')[source]
airflow.providers.databricks.hooks.databricks.CREATE_ENDPOINT = ('POST', 'api/2.1/jobs/create')[source]
airflow.providers.databricks.hooks.databricks.RESET_ENDPOINT = ('POST', 'api/2.1/jobs/reset')[source]
airflow.providers.databricks.hooks.databricks.UPDATE_ENDPOINT = ('POST', 'api/2.1/jobs/update')[source]
airflow.providers.databricks.hooks.databricks.RUN_NOW_ENDPOINT = ('POST', 'api/2.1/jobs/run-now')[source]
airflow.providers.databricks.hooks.databricks.SUBMIT_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/submit')[source]
airflow.providers.databricks.hooks.databricks.GET_RUN_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel')[source]
airflow.providers.databricks.hooks.databricks.DELETE_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/delete')[source]
airflow.providers.databricks.hooks.databricks.REPAIR_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/repair')[source]
airflow.providers.databricks.hooks.databricks.OUTPUT_RUNS_JOB_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get-output')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_ALL_RUNS_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel-all')[source]
airflow.providers.databricks.hooks.databricks.INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')[source]
airflow.providers.databricks.hooks.databricks.UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')[source]
airflow.providers.databricks.hooks.databricks.LIST_JOBS_ENDPOINT = ('GET', 'api/2.1/jobs/list')[source]
airflow.providers.databricks.hooks.databricks.LIST_PIPELINES_ENDPOINT = ('GET', 'api/2.0/pipelines')[source]
airflow.providers.databricks.hooks.databricks.WORKSPACE_GET_STATUS_ENDPOINT = ('GET', 'api/2.0/workspace/get-status')[source]
airflow.providers.databricks.hooks.databricks.SPARK_VERSIONS_ENDPOINT = ('GET', 'api/2.0/clusters/spark-versions')[source]
airflow.providers.databricks.hooks.databricks.SQL_STATEMENTS_ENDPOINT = 'api/2.0/sql/statements'[source]
class airflow.providers.databricks.hooks.databricks.RunLifeCycleState[source]

基类:enum.Enum

Databricks 运行生命周期状态概念的枚举。

更多信息请参阅:https://docs.databricks.com/api/azure/workspace/jobs/listruns#runs-state-life_cycle_state

BLOCKED = 'BLOCKED'[source]
INTERNAL_ERROR = 'INTERNAL_ERROR'[source]
PENDING = 'PENDING'[source]
QUEUED = 'QUEUED'[source]
RUNNING = 'RUNNING'[source]
SKIPPED = 'SKIPPED'[source]
TERMINATED = 'TERMINATED'[source]
TERMINATING = 'TERMINATING'[source]
WAITING_FOR_RETRY = 'WAITING_FOR_RETRY'[source]
class airflow.providers.databricks.hooks.databricks.RunState(life_cycle_state, result_state='', state_message='', *args, **kwargs)[source]

Databricks 运行状态概念的工具类。

RUN_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'TERMINATING', 'TERMINATED', 'SKIPPED', 'INTERNAL_ERROR', 'QUEUED'][source]
life_cycle_state[source]
result_state = ''[source]
state_message = ''[source]
属性 is_terminal: bool[source]

如果当前状态是终端状态,则为 True。

属性 is_successful: bool[source]

如果结果状态是 SUCCESS,则为 True。

__eq__(other)[source]
__repr__()[source]
to_json()[source]
类方法 from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.ClusterState(state='', state_message='', *args, **kwargs)[source]

Databricks 集群状态概念的工具类。

CLUSTER_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'RESTARTING', 'RESIZING', 'TERMINATING', 'TERMINATED', 'ERROR', 'UNKNOWN'][source]
state = ''[source]
state_message = ''[source]
属性 is_terminal: bool[source]

如果当前状态是终端状态,则为 True。

属性 is_running: bool[source]

如果当前状态是 running,则为 True。

__eq__(other)[source]
__repr__()[source]
to_json()[source]
类方法 from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.SQLStatementState(state='', error_code='', error_message='', *args, **kwargs)[source]

Databricks 语句的 SQL 语句状态概念的工具类。

SQL_STATEMENT_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELED', 'CLOSED'][source]
state = ''[source]
error_code = ''[source]
error_message = ''[source]
property is_terminal: bool[source]

如果当前状态是终端状态,则为 True。

property is_running: bool[source]

如果当前状态是 running,则为 True。

property is_successful: bool[source]

如果状态为 SUCCEEDED,则为 True。

__eq__(other)[source]
__repr__()[source]
to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.DatabricksHook(databricks_conn_id=BaseDatabricksHook.default_conn_name, timeout_seconds=180, retry_limit=3, retry_delay=1.0, retry_args=None, caller='DatabricksHook')[source]

继承自: airflow.providers.databricks.hooks.databricks_base.BaseDatabricksHook

与 Databricks 交互。

参数:
  • databricks_conn_id (str) – 指向 Databricks 连接 的引用。

  • timeout_seconds (int) – requests 库在超时前等待的秒数。

  • retry_limit (int) – 在服务中断时重试连接的最大次数。

  • retry_delay (float) – 每次重试之间等待的秒数(可以是浮点数)。

  • retry_args (dict[Any, Any] | None) – 一个可选字典,包含传递给 tenacity.Retrying 类的参数。

hook_name = 'Databricks'[source]
create_job(json)[source]

调用 api/2.1/jobs/create 端点。

参数:

json (dict) – 用于向 create 端点发送请求正文的数据。

返回:

作业 ID (整数类型)

返回类型:

int

reset_job(job_id, json)[source]

调用 api/2.1/jobs/reset 端点。

参数:

json (dict) – 用于向 reset 端点发送请求的新设置数据。

update_job(job_id, json)[source]

调用 api/2.1/jobs/update 端点。

参数:
  • job_id (str) – 要更新的作业 ID。

  • json (dict) – 用于向 update 端点发送请求的新设置数据。

run_now(json)[source]

调用 api/2.1/jobs/run-now 端点。

参数:

json (dict) – 用于向 run-now 端点发送请求正文的数据。

返回:

运行 ID (整数类型)

返回类型:

int

submit_run(json)[source]

调用 api/2.1/jobs/runs/submit 端点。

参数:

json (dict) – 用于向 submit 端点发送请求正文的数据。

返回:

运行 ID (整数类型)

返回类型:

int

list_jobs(limit=25, expand_tasks=False, job_name=None, page_token=None, include_user_names=False)[source]

列出 Databricks 作业服务中的作业。

参数:
  • limit (int) – 用于检索作业的限制/批次大小。

  • expand_tasks (bool) – 响应中是否包含任务和集群详细信息。

  • job_name (str | None) – 要搜索的可选作业名称。

  • page_token (str | None) – 指向要返回的第一个作业的可选页面令牌。

返回:

作业列表。

返回类型:

list[dict[str, Any]]

find_job_id_by_name(job_name)[source]

按名称查找作业 ID;如果存在多个同名作业,则引发 AirflowException 异常。

参数:

job_name (str) – 要查找的作业名称。

返回:

作业 ID (整数类型),如果未找到作业则为 None。

返回类型:

int | None

list_pipelines(batch_size=25, pipeline_name=None, notebook_path=None)[source]

列出 Databricks Delta Live Tables 中的流水线。

参数:
  • batch_size (int) – 用于检索流水线的限制/批次大小。

  • pipeline_name (str | None) – 要搜索的可选流水线名称。不可与路径参数同时使用。

  • notebook_path (str | None) – 要搜索的可选流水线 notebook 路径。不可与名称参数同时使用。

返回:

流水线列表。

返回类型:

list[dict[str, Any]]

find_pipeline_id_by_name(pipeline_name)[source]

按名称查找流水线 ID;如果存在多个同名流水线,则引发 AirflowException 异常。

参数:

pipeline_name (str) – 要查找的流水线名称。

返回:

流水线 ID (GUID 字符串类型),如果未找到流水线则为 None。

返回类型:

str | None

get_run_page_url(run_id)[source]

检索 run_page_url。

参数:

run_id (int) – 运行 ID

返回:

运行页面 URL

返回类型:

str

async a_get_run_page_url(run_id)[source]

get_run_page_url() 的异步版本。

参数:

run_id (int) – 运行 ID

返回:

运行页面 URL

返回类型:

str

get_job_id(run_id)[source]

从运行 ID 检索作业 ID。

参数:

run_id (int) – 运行 ID

返回:

给定 Databricks 运行的作业 ID

返回类型:

int

get_run_state(run_id)[source]

检索运行的状态。

请注意,任何调用 get_run_state 方法的 Airflow 任务都将失败,除非您已启用 xcom pickling。这可以通过设置以下环境变量来完成:AIRFLOW__CORE__ENABLE_XCOM_PICKLING

如果您不想启用 xcom pickling,请使用 get_run_state_str 方法获取描述状态的字符串,或使用 get_run_state_lifecycleget_run_state_resultget_run_state_message 来获取运行状态的各个组成部分。

参数:

run_id (int) – 运行 ID

返回:

运行状态

返回类型:

RunState

async a_get_run_state(run_id)[source]

get_run_state() 的异步版本。

参数:

run_id (int) – 运行 ID

返回:

运行状态

返回类型:

RunState

get_run(run_id)[source]

检索运行信息。

参数:

run_id (int) – 运行 ID

返回:

运行状态

返回类型:

dict[str, Any]

async a_get_run(run_id)[source]

get_run 的异步版本。

参数:

run_id (int) – 运行 ID

返回:

运行状态

返回类型:

dict[str, Any]

get_run_state_str(run_id)[source]

返回 RunState 的字符串表示形式。

参数:

run_id (int) – 运行 ID

返回:

描述运行状态的字符串

返回类型:

str

get_run_state_lifecycle(run_id)[source]

返回运行的生命周期状态。

参数:

run_id (int) – 运行 ID

返回:

包含生命周期状态的字符串

返回类型:

str

get_run_state_result(run_id)[source]

返回运行的最终状态。

参数:

run_id (int) – 运行 ID

返回:

包含最终状态的字符串

返回类型:

str

get_run_state_message(run_id)[source]

返回运行的状态消息。

参数:

run_id (int) – 运行 ID

返回:

包含状态消息的字符串

返回类型:

str

get_run_output(run_id)[source]

检索运行的输出。

参数:

run_id (int) – 运行 ID

返回:

运行输出

返回类型:

dict

async a_get_run_output(run_id)[source]

get_run_output() 的异步版本。

参数:

run_id (int) – 运行 ID

返回:

运行输出

返回类型:

dict

cancel_run(run_id)[source]

取消运行。

参数:

run_id (int) – 运行 ID

cancel_all_runs(job_id)[source]

异步取消作业的所有活动运行。

参数:

job_id (int) – 要取消所有运行的作业的规范标识符

delete_run(run_id)[source]

删除非活动运行。

参数:

run_id (int) – 运行 ID

repair_run(json)[source]

重新运行一个或多个任务。

参数:

json (dict) – 修复作业运行。

get_latest_repair_id(run_id)[source]

获取运行 ID 的最新修复 ID(如果存在),否则为 None。

get_cluster_state(cluster_id)[source]

检索集群的运行状态。

参数:

cluster_id (str) – 集群 ID

返回:

集群状态

返回类型:

ClusterState

async a_get_cluster_state(cluster_id)[source]

get_cluster_state 的异步版本。

参数:

cluster_id (str) – 集群 ID

返回:

集群状态

返回类型:

ClusterState

restart_cluster(json)[source]

重新启动集群。

参数:

json (dict) – 包含集群规范的 JSON 字典。

start_cluster(json)[source]

启动集群。

参数:

json (dict) – 包含集群规范的 JSON 字典。

terminate_cluster(json)[source]

终止集群。

参数:

json (dict) – 包含集群规范的 JSON 字典。

install(json)[source]

在集群上安装库。

调用 2.0/libraries/install 端点的实用函数。

参数:

json (dict) – 包含 cluster_id 和一个库数组的 JSON 字典

uninstall(json)[source]

在集群上卸载库。

调用 2.0/libraries/uninstall 端点的实用函数。

参数:

json (dict) – 包含 cluster_id 和一个库数组的 JSON 字典

update_repo(repo_id, json)[source]

更新给定的 Databricks Repos。

参数:
  • repo_id (str) – Databricks Repos 的 ID

  • json (dict[str, Any]) – 载荷

返回:

更新后的元数据

返回类型:

dict

delete_repo(repo_id)[source]

删除给定的 Databricks Repos。

参数:

repo_id (str) – Databricks Repos 的 ID

返回:

create_repo(json)[source]

创建一个 Databricks Repos。

参数:

json (dict[str, Any]) – 载荷

返回:

返回类型:

dict

get_repo_by_path(path)[source]

按路径获取 Repos ID。

参数:

path (str) – 仓库路径

返回:

如果存在则返回 Repos ID,否则返回 None。

返回类型:

str | None

update_job_permission(job_id, json)[source]

更新 databricks 作业权限。

参数:
  • job_id (int) – 作业 ID

  • json (dict[str, Any]) – 载荷

返回:

包含权限规范的 JSON

返回类型:

dict

post_sql_statement(json)[source]

提交 SQL 语句到 Databricks SQL Statements 端点。

参数:

json (dict[str, Any]) – 用于请求 SQL Statements 端点时放在请求体中的数据。

返回:

statement_id 的字符串形式。

返回类型:

str

get_sql_statement_state(statement_id)[source]

获取 SQL 语句的运行状态。

参数:

statement_id (str) – SQL 语句的 ID。

返回:

SQL 语句的状态。

返回类型:

SQLStatementState

async a_get_sql_statement_state(statement_id)[source]

get_sql_statement_state 的异步版本。

参数:

statement_id (str) – SQL 语句的 ID

返回:

SQL 语句的状态

返回类型:

SQLStatementState

cancel_sql_statement(statement_id)[source]

取消 SQL 语句。

参数:

statement_id (str) – SQL 语句的 ID

test_connection()[source]

从 UI 测试 Databricks 连接。

这个条目有帮助吗?