airflow.providers.databricks.hooks.databricks
Databricks hook。
这个 Hook 能够将作业提交并运行到 Databricks 平台。在内部,Operator 与 api/2.1/jobs/run-now
端点 <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_ 或 api/2.1/jobs/runs/submit
端点 进行通信。
模块内容
-
airflow.providers.databricks.hooks.databricks.GET_CLUSTER_ENDPOINT = ('GET', 'api/2.0/clusters/get')[source]
-
airflow.providers.databricks.hooks.databricks.RESTART_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/restart')[source]
-
airflow.providers.databricks.hooks.databricks.START_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/start')[source]
-
airflow.providers.databricks.hooks.databricks.TERMINATE_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/delete')[source]
-
airflow.providers.databricks.hooks.databricks.CREATE_ENDPOINT = ('POST', 'api/2.1/jobs/create')[source]
-
airflow.providers.databricks.hooks.databricks.RESET_ENDPOINT = ('POST', 'api/2.1/jobs/reset')[source]
-
airflow.providers.databricks.hooks.databricks.UPDATE_ENDPOINT = ('POST', 'api/2.1/jobs/update')[source]
-
airflow.providers.databricks.hooks.databricks.RUN_NOW_ENDPOINT = ('POST', 'api/2.1/jobs/run-now')[source]
-
airflow.providers.databricks.hooks.databricks.SUBMIT_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/submit')[source]
-
airflow.providers.databricks.hooks.databricks.GET_RUN_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get')[source]
-
airflow.providers.databricks.hooks.databricks.CANCEL_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel')[source]
-
airflow.providers.databricks.hooks.databricks.DELETE_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/delete')[source]
-
airflow.providers.databricks.hooks.databricks.REPAIR_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/repair')[source]
-
airflow.providers.databricks.hooks.databricks.OUTPUT_RUNS_JOB_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get-output')[source]
-
airflow.providers.databricks.hooks.databricks.CANCEL_ALL_RUNS_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel-all')[source]
-
airflow.providers.databricks.hooks.databricks.INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')[source]
-
airflow.providers.databricks.hooks.databricks.UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')[source]
-
airflow.providers.databricks.hooks.databricks.LIST_JOBS_ENDPOINT = ('GET', 'api/2.1/jobs/list')[source]
-
airflow.providers.databricks.hooks.databricks.LIST_PIPELINES_ENDPOINT = ('GET', 'api/2.0/pipelines')[source]
-
airflow.providers.databricks.hooks.databricks.WORKSPACE_GET_STATUS_ENDPOINT = ('GET', 'api/2.0/workspace/get-status')[source]
-
airflow.providers.databricks.hooks.databricks.SPARK_VERSIONS_ENDPOINT = ('GET', 'api/2.0/clusters/spark-versions')[source]
-
airflow.providers.databricks.hooks.databricks.SQL_STATEMENTS_ENDPOINT = 'api/2.0/sql/statements'[source]
-
class airflow.providers.databricks.hooks.databricks.RunLifeCycleState[source]
基类:enum.Enum
Databricks 运行生命周期状态概念的枚举。
更多信息请参阅:https://docs.databricks.com/api/azure/workspace/jobs/listruns#runs-state-life_cycle_state
-
BLOCKED = 'BLOCKED'[source]
-
INTERNAL_ERROR = 'INTERNAL_ERROR'[source]
-
PENDING = 'PENDING'[source]
-
QUEUED = 'QUEUED'[source]
-
RUNNING = 'RUNNING'[source]
-
SKIPPED = 'SKIPPED'[source]
-
TERMINATED = 'TERMINATED'[source]
-
TERMINATING = 'TERMINATING'[source]
-
WAITING_FOR_RETRY = 'WAITING_FOR_RETRY'[source]
-
class airflow.providers.databricks.hooks.databricks.RunState(life_cycle_state, result_state='', state_message='', *args, **kwargs)[source]
Databricks 运行状态概念的工具类。
-
RUN_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'TERMINATING', 'TERMINATED', 'SKIPPED', 'INTERNAL_ERROR', 'QUEUED'][source]
-
life_cycle_state[source]
-
result_state = ''[source]
-
state_message = ''[source]
-
属性 is_terminal: bool[source]
如果当前状态是终端状态,则为 True。
-
属性 is_successful: bool[source]
如果结果状态是 SUCCESS,则为 True。
-
__eq__(other)[source]
-
__repr__()[source]
-
to_json()[source]
-
类方法 from_json(data)[source]
-
class airflow.providers.databricks.hooks.databricks.ClusterState(state='', state_message='', *args, **kwargs)[source]
Databricks 集群状态概念的工具类。
-
CLUSTER_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'RESTARTING', 'RESIZING', 'TERMINATING', 'TERMINATED', 'ERROR', 'UNKNOWN'][source]
-
state = ''[source]
-
state_message = ''[source]
-
属性 is_terminal: bool[source]
如果当前状态是终端状态,则为 True。
-
属性 is_running: bool[source]
如果当前状态是 running,则为 True。
-
__eq__(other)[source]
-
__repr__()[source]
-
to_json()[source]
-
类方法 from_json(data)[source]
-
class airflow.providers.databricks.hooks.databricks.SQLStatementState(state='', error_code='', error_message='', *args, **kwargs)[source]
Databricks 语句的 SQL 语句状态概念的工具类。
-
SQL_STATEMENT_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELED', 'CLOSED'][source]
-
state = ''[source]
-
error_code = ''[source]
-
error_message = ''[source]
-
property is_terminal: bool[source]
如果当前状态是终端状态,则为 True。
-
property is_running: bool[source]
如果当前状态是 running,则为 True。
-
property is_successful: bool[source]
如果状态为 SUCCEEDED,则为 True。
-
__eq__(other)[source]
-
__repr__()[source]
-
to_json()[source]
-
classmethod from_json(data)[source]
-
class airflow.providers.databricks.hooks.databricks.DatabricksHook(databricks_conn_id=BaseDatabricksHook.default_conn_name, timeout_seconds=180, retry_limit=3, retry_delay=1.0, retry_args=None, caller='DatabricksHook')[source]
继承自: airflow.providers.databricks.hooks.databricks_base.BaseDatabricksHook
与 Databricks 交互。
- 参数:
databricks_conn_id (str) – 指向 Databricks 连接 的引用。
timeout_seconds (int) – requests 库在超时前等待的秒数。
retry_limit (int) – 在服务中断时重试连接的最大次数。
retry_delay (float) – 每次重试之间等待的秒数(可以是浮点数)。
retry_args (dict[Any, Any] | None) – 一个可选字典,包含传递给 tenacity.Retrying
类的参数。
-
hook_name = 'Databricks'[source]
-
create_job(json)[source]
调用 api/2.1/jobs/create
端点。
- 参数:
json (dict) – 用于向 create
端点发送请求正文的数据。
- 返回:
作业 ID (整数类型)
- 返回类型:
int
-
reset_job(job_id, json)[source]
调用 api/2.1/jobs/reset
端点。
- 参数:
json (dict) – 用于向 reset
端点发送请求的新设置数据。
-
update_job(job_id, json)[source]
调用 api/2.1/jobs/update
端点。
- 参数:
-
-
run_now(json)[source]
调用 api/2.1/jobs/run-now
端点。
- 参数:
json (dict) – 用于向 run-now
端点发送请求正文的数据。
- 返回:
运行 ID (整数类型)
- 返回类型:
int
-
submit_run(json)[source]
调用 api/2.1/jobs/runs/submit
端点。
- 参数:
json (dict) – 用于向 submit
端点发送请求正文的数据。
- 返回:
运行 ID (整数类型)
- 返回类型:
int
-
list_jobs(limit=25, expand_tasks=False, job_name=None, page_token=None, include_user_names=False)[source]
列出 Databricks 作业服务中的作业。
- 参数:
limit (int) – 用于检索作业的限制/批次大小。
expand_tasks (bool) – 响应中是否包含任务和集群详细信息。
job_name (str | None) – 要搜索的可选作业名称。
page_token (str | None) – 指向要返回的第一个作业的可选页面令牌。
- 返回:
作业列表。
- 返回类型:
list[dict[str, Any]]
-
find_job_id_by_name(job_name)[source]
按名称查找作业 ID;如果存在多个同名作业,则引发 AirflowException 异常。
- 参数:
job_name (str) – 要查找的作业名称。
- 返回:
作业 ID (整数类型),如果未找到作业则为 None。
- 返回类型:
int | None
-
list_pipelines(batch_size=25, pipeline_name=None, notebook_path=None)[source]
列出 Databricks Delta Live Tables 中的流水线。
- 参数:
batch_size (int) – 用于检索流水线的限制/批次大小。
pipeline_name (str | None) – 要搜索的可选流水线名称。不可与路径参数同时使用。
notebook_path (str | None) – 要搜索的可选流水线 notebook 路径。不可与名称参数同时使用。
- 返回:
流水线列表。
- 返回类型:
list[dict[str, Any]]
-
find_pipeline_id_by_name(pipeline_name)[source]
按名称查找流水线 ID;如果存在多个同名流水线,则引发 AirflowException 异常。
- 参数:
pipeline_name (str) – 要查找的流水线名称。
- 返回:
流水线 ID (GUID 字符串类型),如果未找到流水线则为 None。
- 返回类型:
str | None
-
get_run_page_url(run_id)[source]
检索 run_page_url。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行页面 URL
- 返回类型:
str
-
async a_get_run_page_url(run_id)[source]
get_run_page_url() 的异步版本。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行页面 URL
- 返回类型:
str
-
get_job_id(run_id)[source]
从运行 ID 检索作业 ID。
- 参数:
run_id (int) – 运行 ID
- 返回:
给定 Databricks 运行的作业 ID
- 返回类型:
int
-
get_run_state(run_id)[source]
检索运行的状态。
请注意,任何调用 get_run_state
方法的 Airflow 任务都将失败,除非您已启用 xcom pickling。这可以通过设置以下环境变量来完成:AIRFLOW__CORE__ENABLE_XCOM_PICKLING
如果您不想启用 xcom pickling,请使用 get_run_state_str
方法获取描述状态的字符串,或使用 get_run_state_lifecycle
、get_run_state_result
或 get_run_state_message
来获取运行状态的各个组成部分。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行状态
- 返回类型:
RunState
-
async a_get_run_state(run_id)[source]
get_run_state() 的异步版本。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行状态
- 返回类型:
RunState
-
get_run(run_id)[source]
检索运行信息。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行状态
- 返回类型:
dict[str, Any]
-
async a_get_run(run_id)[source]
get_run 的异步版本。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行状态
- 返回类型:
dict[str, Any]
-
get_run_state_str(run_id)[source]
返回 RunState 的字符串表示形式。
- 参数:
run_id (int) – 运行 ID
- 返回:
描述运行状态的字符串
- 返回类型:
str
-
get_run_state_lifecycle(run_id)[source]
返回运行的生命周期状态。
- 参数:
run_id (int) – 运行 ID
- 返回:
包含生命周期状态的字符串
- 返回类型:
str
-
get_run_state_result(run_id)[source]
返回运行的最终状态。
- 参数:
run_id (int) – 运行 ID
- 返回:
包含最终状态的字符串
- 返回类型:
str
-
get_run_state_message(run_id)[source]
返回运行的状态消息。
- 参数:
run_id (int) – 运行 ID
- 返回:
包含状态消息的字符串
- 返回类型:
str
-
get_run_output(run_id)[source]
检索运行的输出。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行输出
- 返回类型:
dict
-
async a_get_run_output(run_id)[source]
get_run_output() 的异步版本。
- 参数:
run_id (int) – 运行 ID
- 返回:
运行输出
- 返回类型:
dict
-
cancel_run(run_id)[source]
取消运行。
- 参数:
run_id (int) – 运行 ID
-
cancel_all_runs(job_id)[source]
异步取消作业的所有活动运行。
- 参数:
job_id (int) – 要取消所有运行的作业的规范标识符
-
delete_run(run_id)[source]
删除非活动运行。
- 参数:
run_id (int) – 运行 ID
-
repair_run(json)[source]
重新运行一个或多个任务。
- 参数:
json (dict) – 修复作业运行。
-
get_latest_repair_id(run_id)[source]
获取运行 ID 的最新修复 ID(如果存在),否则为 None。
-
get_cluster_state(cluster_id)[source]
检索集群的运行状态。
- 参数:
cluster_id (str) – 集群 ID
- 返回:
集群状态
- 返回类型:
ClusterState
-
async a_get_cluster_state(cluster_id)[source]
get_cluster_state 的异步版本。
- 参数:
cluster_id (str) – 集群 ID
- 返回:
集群状态
- 返回类型:
ClusterState
-
restart_cluster(json)[source]
重新启动集群。
- 参数:
json (dict) – 包含集群规范的 JSON 字典。
-
start_cluster(json)[source]
启动集群。
- 参数:
json (dict) – 包含集群规范的 JSON 字典。
-
terminate_cluster(json)[source]
终止集群。
- 参数:
json (dict) – 包含集群规范的 JSON 字典。
-
install(json)[source]
在集群上安装库。
调用 2.0/libraries/install
端点的实用函数。
- 参数:
json (dict) – 包含 cluster_id 和一个库数组的 JSON 字典
-
uninstall(json)[source]
在集群上卸载库。
调用 2.0/libraries/uninstall
端点的实用函数。
- 参数:
json (dict) – 包含 cluster_id 和一个库数组的 JSON 字典
-
update_repo(repo_id, json)[source]
更新给定的 Databricks Repos。
- 参数:
-
- 返回:
更新后的元数据
- 返回类型:
dict
-
delete_repo(repo_id)[source]
删除给定的 Databricks Repos。
- 参数:
repo_id (str) – Databricks Repos 的 ID
- 返回:
-
-
create_repo(json)[source]
创建一个 Databricks Repos。
- 参数:
json (dict[str, Any]) – 载荷
- 返回:
-
- 返回类型:
dict
-
get_repo_by_path(path)[source]
按路径获取 Repos ID。
- 参数:
path (str) – 仓库路径
- 返回:
如果存在则返回 Repos ID,否则返回 None。
- 返回类型:
str | None
-
update_job_permission(job_id, json)[source]
更新 databricks 作业权限。
- 参数:
-
- 返回:
包含权限规范的 JSON
- 返回类型:
dict
-
post_sql_statement(json)[source]
提交 SQL 语句到 Databricks SQL Statements 端点。
- 参数:
json (dict[str, Any]) – 用于请求 SQL Statements 端点时放在请求体中的数据。
- 返回:
statement_id 的字符串形式。
- 返回类型:
str
-
get_sql_statement_state(statement_id)[source]
获取 SQL 语句的运行状态。
- 参数:
statement_id (str) – SQL 语句的 ID。
- 返回:
SQL 语句的状态。
- 返回类型:
SQLStatementState
-
async a_get_sql_statement_state(statement_id)[source]
get_sql_statement_state 的异步版本。
- 参数:
statement_id (str) – SQL 语句的 ID
- 返回:
SQL 语句的状态
- 返回类型:
SQLStatementState
-
cancel_sql_statement(statement_id)[source]
取消 SQL 语句。
- 参数:
statement_id (str) – SQL 语句的 ID
-
test_connection()[source]
从 UI 测试 Databricks 连接。