airflow.providers.apache.hive.hooks.hive

属性

HIVE_QUEUE_PRIORITIES

HiveCliHook

Hive CLI 的简单包装器。

HiveMetastoreHook

与 Hive Metastore 交互的包装器。

HiveServer2Hook

pyhive 库的包装器。

函数

get_context_from_env_var()

从环境变量中提取上下文(如 dag_id, task_id 等),用于 BashOperator 和 PythonOperator。

模块内容

airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][源代码]
airflow.providers.apache.hive.hooks.hive.get_context_from_env_var()[源代码]

从环境变量中提取上下文(如 dag_id, task_id 等),用于 BashOperator 和 PythonOperator。

返回:

感兴趣的上下文。

返回类型:

dict[Any, Any]

airflow.providers.apache.hive.hooks.hive.HiveCliHook(hive_cli_conn_id=default_conn_name, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, hive_cli_params='', auth=None, proxy_user=None)[源代码]

基类: airflow.hooks.base.BaseHook

Hive CLI 的简单包装器。

它也支持 beeline,这是一个更轻量级的 CLI,运行 JDBC 并正在取代更重量级的传统 CLI。要启用 beeline,请在连接的 extra 字段中设置 use_beeline 参数,例如 { "use_beeline": true }

请注意,您还可以通过传递 hive_cli_params(空格分隔的参数列表)来设置默认的 hive CLI 参数,这些参数将添加到 hive 命令中。

extra 连接参数 auth 会按原样传递给 jdbc 连接字符串。

参数:
  • hive_cli_conn_id (str) – 引用 Hive CLI 连接 ID

  • mapred_queue (str | None) – Hadoop Scheduler (Capacity 或 Fair) 使用的队列

  • mapred_queue_priority (str | None) – 作业队列中的优先级。可能的设置包括: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

  • mapred_job_name (str | None) – 此名称将出现在 jobtracker 中。这可以使监控更容易。

  • hive_cli_params (str) – 空格分隔的 hive 命令参数列表,将添加到 hive 命令中。

  • proxy_user (str | None) – 以此用户身份运行 HQL 代码。

conn_name_attr = 'hive_cli_conn_id'[源代码]
default_conn_name = 'hive_cli_default'[源代码]
conn_type = 'hive_cli'[源代码]
hook_name = 'Hive 客户端包装器'[源代码]
hive_cli_params: str = ''[源代码]
use_beeline: bool[源代码]
auth = None[源代码]
conn[源代码]
sub_process: Any = None[源代码]
mapred_queue[源代码]
mapred_queue_priority = None[源代码]
mapred_job_name = None[源代码]
proxy_user = None[源代码]
high_availability[源代码]
类方法 get_connection_form_widgets()[源代码]

返回添加到 Hive 客户端包装器连接表单的连接小部件。

类方法 get_ui_field_behaviour()[源代码]

返回 Hive 客户端包装器连接的自定义 UI 字段行为。

run_cli(hql, schema=None, verbose=True, hive_conf=None)[源代码]

使用 hive cli 运行 HQL 语句。

如果指定了 hive_conf,它应该是一个字典,并且其中的条目将作为键/值对在 HiveConf 中设置。

参数:
  • hql (str) – 要使用 hive cli 运行的 HQL (Hive 查询语言) 语句

  • schema (str | None) – 要使用的 Hive schema (数据库) 名称

  • verbose (bool) – 提供额外日志记录。默认为 True。

  • hive_conf (dict[Any, Any] | None) – 如果指定,这些键值对将作为 -hiveconf "key"="value" 传递给 hive。请注意,它们将在 hive_cli_params 之后传递,因此会覆盖数据库中指定的值。

>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
True
test_hql(hql)[源代码]

使用 hive cli 和 EXPLAIN 测试 HQL 语句。

load_df(df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)[源代码]

将 pandas DataFrame 加载到 Hive 中。

如果未传递,将推断 Hive 数据类型,但列名不会被清理。

参数:
  • df (pandas.DataFrame) – 要加载到 Hive 表中的 DataFrame

  • table (str) – 目标 Hive 表,使用点符号指定特定数据库

  • field_dict (dict[Any, Any] | None) – 从列名到 Hive 数据类型的映射。请注意,Python 字典是有序的,因此它保持了列的顺序。

  • delimiter (str) – 文件中的字段分隔符

  • encoding (str) – 将 DataFrame 写入文件时使用的字符串编码

  • pandas_kwargs (Any) – 传递给 DataFrame.to_csv

  • kwargs (Any) – 传递给 self.load_file

load_file(filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[源代码]

将本地文件加载到 Hive 中。

请注意,在 Hive 中生成的表使用 STORED AS textfile,这不是最高效的序列化格式。如果加载了大量数据和/或表被频繁查询,您可能只想使用此 operator 将数据暂存到临时表中,然后再使用 HiveOperator 将其加载到最终目标。

参数:
  • filepath (str) – 要加载文件的本地文件路径

  • table (str) – 目标 Hive 表,使用点符号指定特定数据库

  • delimiter (str) – 文件中的字段分隔符

  • field_dict (dict[Any, Any] | None) – 文件中字段名称作为键、对应 Hive 类型作为值的字典。请注意,Python 字典是有序的,因此它保持了列的顺序。

  • create (bool) – 如果表不存在是否创建

  • overwrite (bool) – 是否覆盖表或分区中的数据

  • partition (dict[str, Any] | None) – 目标分区,表示为分区列和值的字典

  • recreate (bool) – 是否在每次执行时删除并重新创建表

  • tblproperties (dict[str, Any] | None) – 正在创建的 Hive 表的 TBLPROPERTIES

kill()[源代码]

终止 Hive cli 命令。

airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook(metastore_conn_id=default_conn_name)[源代码]

基类: airflow.hooks.base.BaseHook

与 Hive Metastore 交互的包装器。

参数:

metastore_conn_id (str) – 引用 metastore thrift 服务连接 ID

MAX_PART_COUNT = 32767[源代码]
conn_name_attr = 'metastore_conn_id'[源代码]
default_conn_name = 'metastore_default'[源代码]
conn_type = 'hive_metastore'[源代码]
hook_name = 'Hive Metastore Thrift'[源代码]
conn[源代码]
metastore[源代码]
__getstate__()[源代码]

序列化对象并省略不可序列化的属性。

__setstate__(d)[源代码]

反序列化对象并恢复不可序列化的属性。

get_metastore_client()[源代码]

返回一个 Hive thrift 客户端。

get_conn()[源代码]

返回 hook 的连接。

check_for_partition(schema, table, partition)[源代码]

检查分区是否存在。

参数:
  • schema (str) – @table 所属的 Hive schema (数据库) 名称

  • table (str) – @partition 所属的 Hive 表名称

  • partition (str) – 匹配要检查的分区的表达式 (例如 a = ‘b’ AND c = ‘d’)

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> hh.check_for_partition("airflow", t, "ds='2015-01-01'")
True
check_for_named_partition(schema, table, partition_name)[源代码]

检查具有给定名称的分区是否存在。

参数:
  • schema (str) – @table 所属的 Hive schema (数据库) 名称

  • table (str) – @partition 所属的 Hive 表名称

  • partition_name (str) – 要检查的分区名称 (例如 a=b/c=d)

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> hh.check_for_named_partition("airflow", t, "ds=2015-01-01")
True
>>> hh.check_for_named_partition("airflow", t, "ds=xxx")
False
get_table(table_name, db='default')[源代码]

获取一个 metastore 表对象。

>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db="airflow", table_name="static_babynames")
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
['state', 'year', 'name', 'gender', 'num']
get_tables(db, pattern='*')[源代码]

获取一个 metastore 表对象。

get_databases(pattern='*')[源代码]

获取一个 metastore 表对象。

get_partitions(schema, table_name, partition_filter=None)[源代码]

返回表中所有分区的列表。

仅适用于分区少于 32767 (java short 最大值) 的表。对于子分区表,数量可能很容易超过此值。

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> parts = hh.get_partitions(schema="airflow", table_name=t)
>>> len(parts)
1
>>> parts
[{'ds': '2015-01-01'}]
max_partition(schema, table_name, field=None, filter_map=None)[源代码]

返回表中具有给定字段的所有分区的最大值。

如果表中只存在一个分区键,则该键将用作字段。filter_map 应为 partition_key:partition_value 映射,并将用于过滤分区。

参数:
  • schema (str) – schema 名称。

  • table_name (str) – 表名称。

  • field (str | None) – 要从中获取最大分区的分区键。

  • filter_map (dict[Any, Any] | None) – 用于分区过滤的分区键:分区值映射。

>>> hh = HiveMetastoreHook()
>>> filter_map = {'ds': '2015-01-01'}
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow',        ... table_name=t, field='ds', filter_map=filter_map)
'2015-01-01'
table_exists(table_name, db='default')[source]

检查表是否存在。

>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db="airflow", table_name="static_babynames")
True
>>> hh.table_exists(db="airflow", table_name="does_not_exist")
False
drop_partitions(table_name, part_vals, delete_data=False, db='default')[source]

从给定表中删除与 part_vals 输入匹配的分区。

参数:
  • table_name – 表名。

  • part_vals – 分区规范列表。

  • delete_data – 设置此选项以控制除了删除分区外是否还要删除底层数据。

  • db – @table 所属的 Hive Schema (数据库) 名称

>>> hh = HiveMetastoreHook()
>>> hh.drop_partitions(db='airflow', table_name='static_babynames',
part_vals="['2020-05-01']")
True
class airflow.providers.apache.hive.hooks.hive.HiveServer2Hook(*args, schema=None, log_sql=True, **kwargs)[source]

基类:airflow.providers.common.sql.hooks.sql.DbApiHook

pyhive 库的包装器。

注意:* 默认的 auth_mechanism 是 PLAIN,要覆盖它,可以在 UI 中连接的 extra 中指定 * run_set_variable_statements 的默认值为 true,如果你使用 impala,可能需要在 UI 中连接的 extra 中将其设置为 false

参数:
  • hiveserver2_conn_id – 对 :ref: Hive Server2 thrift 服务连接 ID <howto/connection:hiveserver2> 的引用。

  • schema (str | None) – Hive 数据库名称。

conn_name_attr = 'hiveserver2_conn_id'[source]
default_conn_name = 'hiveserver2_default'[source]
conn_type = 'hiveserver2'[source]
hook_name = 'Hive Server 2 Thrift'[source]
supports_autocommit = False[source]
get_conn(schema=None)[source]

返回一个 Hive 连接对象。

get_results(sql, schema='default', fetch_size=None, hive_conf=None)[source]

获取目标 schema 中提供的 hql 的结果。

参数:
返回:

hql 执行结果,字典格式,包含数据(结果列表)和头部信息

返回类型:

dict[str, Any]

to_csv(sql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)[source]

在目标 schema 中执行 hql 并将结果写入 csv 文件。

参数:
  • sql (str) – 要执行的 hql。

  • csv_filepath (str) – 要写入结果的 csv 文件路径。

  • schema (str) – 目标 schema,默认为 'default'。

  • delimiter (str) – csv 文件的分隔符,默认为 ','。

  • lineterminator (str) – csv 文件的行终止符。

  • output_header (bool) – csv 文件是否包含头部,默认为 True。

  • fetch_size (int) – 写入 csv 文件的结果行数,默认为 1000。

  • hive_conf (dict[Any, Any] | None) – 与 hql 一起执行的 hive_conf。

get_records(sql, parameters=None, **kwargs)[source]

从 Hive 查询中获取一组记录;可选择传递 'schema' kwarg 以指定目标 schema。

参数:
返回:

hive 执行结果

返回类型:

Any

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
100
get_pandas_df(sql, schema='default', hive_conf=None, **kwargs)[source]

从 Hive 查询中获取一个 pandas 数据框。

参数:
  • sql (str) – 要执行的 hql。

  • schema (str) – 目标 schema,默认为 'default'。

  • hive_conf (dict[Any, Any] | None) – 与 hql 一起执行的 hive_conf。

  • kwargs – (可选) 传递给 pandas.DataFrame 构造函数

返回:

hive 执行结果

返回类型:

pandas.DataFrame

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
100
返回:

pandas.DateFrame

返回类型:

pandas.DataFrame

此条目有帮助吗?