airflow.providers.apache.livy.hooks.livy

此模块包含 Apache Livy hook。

BatchState

批量会话状态。

LivyHook

通过 REST API 连接 Apache Livy 的 hook。

LivyAsyncHook

通过 REST API 异步连接 Apache Livy 的 hook。

函数

sanitize_endpoint_prefix(endpoint_prefix)

确保 endpoint 前缀以斜杠开头。

模块内容

class airflow.providers.apache.livy.hooks.livy.BatchState[source]

继承自: enum.Enum

批量会话状态。

NOT_STARTED = 'not_started'[source]
STARTING = 'starting'[source]
RUNNING = 'running'[source]
IDLE = 'idle'[source]
BUSY = 'busy'[source]
SHUTTING_DOWN = 'shutting_down'[source]
ERROR = 'error'[source]
DEAD = 'dead'[source]
KILLED = 'killed'[source]
SUCCESS = 'success'[source]
airflow.providers.apache.livy.hooks.livy.sanitize_endpoint_prefix(endpoint_prefix)[source]

确保 endpoint 前缀以斜杠开头。

class airflow.providers.apache.livy.hooks.livy.LivyHook(livy_conn_id=default_conn_name, extra_options=None, extra_headers=None, auth_type=None, endpoint_prefix=None)[source]

继承自: airflow.providers.http.hooks.http.HttpHook

通过 REST API 连接 Apache Livy 的 hook。

参数:
  • livy_conn_id (str) – 对预定义 Livy 连接的引用。

  • extra_options (dict[str, Any] | None) – 传递给 Livy 的选项字典。

  • extra_headers (dict[str, Any] | None) – 传递给 Livy 的 HTTP 请求头字典。

  • auth_type (Any | None) – 服务的认证类型。

另请参阅

有关更多详细信息,请参阅 Apache Livy API 参考:https://livy.apache.org/docs/latest/rest-api.html

TERMINAL_STATES[source]
conn_name_attr = 'livy_conn_id'[source]
default_conn_name = 'livy_default'[source]
conn_type = 'livy'[source]
hook_name = 'Apache Livy'[source]
default_headers[source]
method = 'POST'[source]
http_conn_id = 'livy_default'[source]
extra_headers[source]
extra_options[source]
endpoint_prefix = ''[source]
run_method(endpoint, method='GET', data=None, headers=None, retry_args=None)[source]

封装 HttpHook;允许在同一个 HttpHook 上更改方法。

参数:
  • method (str) – http 方法

  • endpoint (str) – 端点

  • data (Any | None) – 请求载荷

  • headers (dict[str, Any] | None) – 请求头

  • retry_args (dict[str, Any] | None) – 定义重试行为的参数。请参阅 Tenacity 文档:https://github.com/jd/tenacity

返回:

http 响应

返回类型:

Any

post_batch(*args, **kwargs)[source]

执行请求以提交批量作业。

返回:

批量会话 ID

返回类型:

int

get_batch(session_id)[source]

获取指定批量作业的信息。

参数:

session_id (int | str) – 批量会话的标识符

返回:

响应体

返回类型:

dict

get_batch_state(session_id, retry_args=None)[source]

获取指定批量作业的状态。

参数:
返回:

批量状态

返回类型:

BatchState

delete_batch(session_id)[source]

删除指定的批量作业。

参数:

session_id (int | str) – 批量会话的标识符

返回:

响应体

返回类型:

dict

get_batch_logs(session_id, log_start_position, log_batch_size)[source]

获取指定批量作业的会话日志。

参数:
  • session_id (int | str) – 批量会话的标识符

  • log_start_position – 从何处拉取日志的位置

  • log_batch_size – 每次批量拉取的行数

返回:

响应体

返回类型:

dict

dump_batch_logs(session_id)[source]

转储指定批量作业的会话日志。

参数:

session_id (int | str) – 批量会话的标识符

返回:

响应体

返回类型:

static build_post_batch_body(file, args=None, class_name=None, jars=None, py_files=None, files=None, archives=None, name=None, driver_memory=None, driver_cores=None, executor_memory=None, executor_cores=None, num_executors=None, queue=None, proxy_user=None, conf=None)[source]

构建 post batch 请求体。

另请参阅

有关格式的更多信息,请参阅 https://livy.apache.org/docs/latest/rest-api.html

参数:
  • file (str) – 包含要执行的应用程序的文件路径(必需)。

  • proxy_user (str | None) – 运行作业时要模拟的用户。

  • class_name (str | None) – 应用程序 Java/Spark 主类字符串。

  • args (collections.abc.Sequence[str | int | float] | None) – 应用程序的命令行参数。

  • jars (list[str] | None) – 此会话中要使用的 jar 包。

  • py_files (list[str] | None) – 此会话中要使用的 Python 文件。

  • files (list[str] | None) – 此会话中要使用的文件。

  • driver_memory (str | None) – 用于 driver 进程的内存量字符串。

  • driver_cores (int | str | None) – 用于 driver 进程的核数整数。

  • executor_memory (str | None) – 每个 executor 进程使用的内存量字符串。

  • executor_cores (int | None) – 每个 executor 使用的核数整数。

  • num_executors (int | str | None) – 为此会话启动的 executor 数量整数。

  • archives (list[str] | None) – 此会话中要使用的归档文件。

  • queue (str | None) – 提交到的 YARN 队列名称字符串。

  • name (str | None) – 此会话的名称字符串。

  • conf (dict[Any, Any] | None) – Spark 配置属性。

返回:

请求体

返回类型:

dict

class airflow.providers.apache.livy.hooks.livy.LivyAsyncHook(livy_conn_id=default_conn_name, extra_options=None, extra_headers=None, endpoint_prefix=None)[source]

继承自: airflow.providers.http.hooks.http.HttpAsyncHook

通过 REST API 异步连接 Apache Livy 的 hook。

参数:
  • livy_conn_id (str) – 对预定义 Livy 连接的引用。

  • extra_options (dict[str, Any] | None) – 传递给 Livy 的选项字典。

  • extra_headers (dict[str, Any] | None) – 传递给 Livy 的 HTTP 请求头字典。

另请参阅

有关更多详细信息,请参阅 Apache Livy API 参考:https://livy.apache.org/docs/latest/rest-api.html

TERMINAL_STATES[source]
conn_name_attr = 'livy_conn_id'[source]
default_conn_name = 'livy_default'[source]
conn_type = 'livy'[source]
hook_name = 'Apache Livy'[source]
method = 'POST'[source]
http_conn_id = 'livy_default'[source]
extra_headers[source]
extra_options[source]
endpoint_prefix = ''[source]
async run_method(endpoint, method='GET', data=None, headers=None)[source]

包装 HttpAsyncHook;允许在同一个 HttpAsyncHook 上更改方法。

参数:
  • method (str) – http 方法

  • endpoint (str) – 端点

  • data (Any | None) – 请求载荷

  • headers (dict[str, Any] | None) – 请求头

返回:

http 响应

返回类型:

Any

async get_batch_state(session_id)[source]

异步获取指定批处理的状态。

参数:

session_id (int | str) – 批量会话的标识符

返回:

批量状态

返回类型:

Any

async get_batch_logs(session_id, log_start_position, log_batch_size)[source]

异步获取指定批处理的会话日志。

参数:
  • session_id (int | str) – 批量会话的标识符

  • log_start_position (int) – 从何处拉取日志的位置

  • log_batch_size (int) – 在一个批处理中拉取的行数

返回:

响应体

返回类型:

Any

async dump_batch_logs(session_id)[source]

异步转储指定批处理的会话日志。

参数:

session_id (int | str) – 批量会话的标识符

返回:

响应体

返回类型:

Any

static build_post_batch_body(file, args=None, class_name=None, jars=None, py_files=None, files=None, archives=None, name=None, driver_memory=None, driver_cores=None, executor_memory=None, executor_cores=None, num_executors=None, queue=None, proxy_user=None, conf=None)[source]

构建 post batch 请求体。

参数:
  • file (str) – 包含要执行的应用程序的文件路径(必需)。

  • proxy_user (str | None) – 运行作业时要模拟的用户。

  • class_name (str | None) – 应用程序 Java/Spark 主类字符串。

  • args (collections.abc.Sequence[str | int | float] | None) – 应用程序的命令行参数。

  • jars (list[str] | None) – 此会话中要使用的 jar 包。

  • py_files (list[str] | None) – 此会话中要使用的 Python 文件。

  • files (list[str] | None) – 此会话中要使用的文件。

  • driver_memory (str | None) – 用于 driver 进程的内存量字符串。

  • driver_cores (int | str | None) – 用于 driver 进程的核数整数。

  • executor_memory (str | None) – 每个 executor 进程使用的内存量字符串。

  • executor_cores (int | None) – 每个 executor 使用的核数整数。

  • num_executors (int | str | None) – 为此会话启动的 executor 数量整数。

  • archives (list[str] | None) – 此会话中要使用的归档文件。

  • queue (str | None) – 提交到的 YARN 队列名称字符串。

  • name (str | None) – 此会话的名称字符串。

  • conf (dict[Any, Any] | None) – Spark 配置属性。

返回:

请求体

返回类型:

dict[str, Any]

此条目是否有帮助?