airflow.providers.apache.hive.hooks.hive¶
属性¶
类¶
Hive CLI 的简单包装器。 |
|
与 Hive Metastore 交互的包装器。 |
|
pyhive 库的包装器。 |
函数¶
从环境变量中提取上下文(如 dag_id, task_id 等),用于 BashOperator 和 PythonOperator。 |
模块内容¶
- airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][源代码]¶
- airflow.providers.apache.hive.hooks.hive.get_context_from_env_var()[源代码]¶
从环境变量中提取上下文(如 dag_id, task_id 等),用于 BashOperator 和 PythonOperator。
- 返回:
感兴趣的上下文。
- 返回类型:
dict[Any, Any]
- 类 airflow.providers.apache.hive.hooks.hive.HiveCliHook(hive_cli_conn_id=default_conn_name, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, hive_cli_params='', auth=None, proxy_user=None)[源代码]¶
基类:
airflow.hooks.base.BaseHook
Hive CLI 的简单包装器。
它也支持
beeline
,这是一个更轻量级的 CLI,运行 JDBC 并正在取代更重量级的传统 CLI。要启用beeline
,请在连接的 extra 字段中设置 use_beeline 参数,例如{ "use_beeline": true }
请注意,您还可以通过传递
hive_cli_params
(空格分隔的参数列表)来设置默认的 hive CLI 参数,这些参数将添加到 hive 命令中。extra 连接参数
auth
会按原样传递给jdbc
连接字符串。- 参数:
hive_cli_conn_id (str) – 引用 Hive CLI 连接 ID。
mapred_queue (str | None) – Hadoop Scheduler (Capacity 或 Fair) 使用的队列
mapred_queue_priority (str | None) – 作业队列中的优先级。可能的设置包括: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
mapred_job_name (str | None) – 此名称将出现在 jobtracker 中。这可以使监控更容易。
hive_cli_params (str) – 空格分隔的 hive 命令参数列表,将添加到 hive 命令中。
proxy_user (str | None) – 以此用户身份运行 HQL 代码。
- run_cli(hql, schema=None, verbose=True, hive_conf=None)[源代码]¶
使用 hive cli 运行 HQL 语句。
如果指定了 hive_conf,它应该是一个字典,并且其中的条目将作为键/值对在 HiveConf 中设置。
- 参数:
>>> hh = HiveCliHook() >>> result = hh.run_cli("USE airflow;") >>> ("OK" in result) True
- load_df(df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)[源代码]¶
将 pandas DataFrame 加载到 Hive 中。
如果未传递,将推断 Hive 数据类型,但列名不会被清理。
- 参数:
df (pandas.DataFrame) – 要加载到 Hive 表中的 DataFrame
table (str) – 目标 Hive 表,使用点符号指定特定数据库
field_dict (dict[Any, Any] | None) – 从列名到 Hive 数据类型的映射。请注意,Python 字典是有序的,因此它保持了列的顺序。
delimiter (str) – 文件中的字段分隔符
encoding (str) – 将 DataFrame 写入文件时使用的字符串编码
pandas_kwargs (Any) – 传递给 DataFrame.to_csv
kwargs (Any) – 传递给 self.load_file
- load_file(filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[源代码]¶
将本地文件加载到 Hive 中。
请注意,在 Hive 中生成的表使用
STORED AS textfile
,这不是最高效的序列化格式。如果加载了大量数据和/或表被频繁查询,您可能只想使用此 operator 将数据暂存到临时表中,然后再使用HiveOperator
将其加载到最终目标。- 参数:
filepath (str) – 要加载文件的本地文件路径
table (str) – 目标 Hive 表,使用点符号指定特定数据库
delimiter (str) – 文件中的字段分隔符
field_dict (dict[Any, Any] | None) – 文件中字段名称作为键、对应 Hive 类型作为值的字典。请注意,Python 字典是有序的,因此它保持了列的顺序。
create (bool) – 如果表不存在是否创建
overwrite (bool) – 是否覆盖表或分区中的数据
recreate (bool) – 是否在每次执行时删除并重新创建表
tblproperties (dict[str, Any] | None) – 正在创建的 Hive 表的 TBLPROPERTIES
- 类 airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook(metastore_conn_id=default_conn_name)[源代码]¶
基类:
airflow.hooks.base.BaseHook
与 Hive Metastore 交互的包装器。
- 参数:
metastore_conn_id (str) – 引用 metastore thrift 服务连接 ID。
- check_for_partition(schema, table, partition)[源代码]¶
检查分区是否存在。
- 参数:
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> hh.check_for_partition("airflow", t, "ds='2015-01-01'") True
- check_for_named_partition(schema, table, partition_name)[源代码]¶
检查具有给定名称的分区是否存在。
- 参数:
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> hh.check_for_named_partition("airflow", t, "ds=2015-01-01") True >>> hh.check_for_named_partition("airflow", t, "ds=xxx") False
- get_table(table_name, db='default')[源代码]¶
获取一个 metastore 表对象。
>>> hh = HiveMetastoreHook() >>> t = hh.get_table(db="airflow", table_name="static_babynames") >>> t.tableName 'static_babynames' >>> [col.name for col in t.sd.cols] ['state', 'year', 'name', 'gender', 'num']
- get_partitions(schema, table_name, partition_filter=None)[源代码]¶
返回表中所有分区的列表。
仅适用于分区少于 32767 (java short 最大值) 的表。对于子分区表,数量可能很容易超过此值。
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> parts = hh.get_partitions(schema="airflow", table_name=t) >>> len(parts) 1 >>> parts [{'ds': '2015-01-01'}]
- max_partition(schema, table_name, field=None, filter_map=None)[源代码]¶
返回表中具有给定字段的所有分区的最大值。
如果表中只存在一个分区键,则该键将用作字段。filter_map 应为 partition_key:partition_value 映射,并将用于过滤分区。
- 参数:
>>> hh = HiveMetastoreHook() >>> filter_map = {'ds': '2015-01-01'} >>> t = 'static_babynames_partitioned' >>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map) '2015-01-01'
- table_exists(table_name, db='default')[source]¶
检查表是否存在。
>>> hh = HiveMetastoreHook() >>> hh.table_exists(db="airflow", table_name="static_babynames") True >>> hh.table_exists(db="airflow", table_name="does_not_exist") False
- drop_partitions(table_name, part_vals, delete_data=False, db='default')[source]¶
从给定表中删除与 part_vals 输入匹配的分区。
- 参数:
table_name – 表名。
part_vals – 分区规范列表。
delete_data – 设置此选项以控制除了删除分区外是否还要删除底层数据。
db – @table 所属的 Hive Schema (数据库) 名称
>>> hh = HiveMetastoreHook() >>> hh.drop_partitions(db='airflow', table_name='static_babynames', part_vals="['2020-05-01']") True
- class airflow.providers.apache.hive.hooks.hive.HiveServer2Hook(*args, schema=None, log_sql=True, **kwargs)[source]¶
基类:
airflow.providers.common.sql.hooks.sql.DbApiHook
pyhive 库的包装器。
注意:* 默认的 auth_mechanism 是 PLAIN,要覆盖它,可以在 UI 中连接的
extra
中指定 * run_set_variable_statements 的默认值为 true,如果你使用 impala,可能需要在 UI 中连接的extra
中将其设置为 false- 参数:
hiveserver2_conn_id – 对 :ref: Hive Server2 thrift 服务连接 ID <howto/connection:hiveserver2> 的引用。
schema (str | None) – Hive 数据库名称。
- get_results(sql, schema='default', fetch_size=None, hive_conf=None)[source]¶
获取目标 schema 中提供的 hql 的结果。
- 参数:
schema (str) – 目标 schema,默认为 'default'。
fetch_size (int | None) – 要获取的结果的最大大小。
hive_conf (collections.abc.Iterable | collections.abc.Mapping | None) – 与 hql 一起执行的 hive_conf。
- 返回:
hql 执行结果,字典格式,包含数据(结果列表)和头部信息
- 返回类型:
- to_csv(sql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)[source]¶
在目标 schema 中执行 hql 并将结果写入 csv 文件。
- 参数:
sql (str) – 要执行的 hql。
csv_filepath (str) – 要写入结果的 csv 文件路径。
schema (str) – 目标 schema,默认为 'default'。
delimiter (str) – csv 文件的分隔符,默认为 ','。
lineterminator (str) – csv 文件的行终止符。
output_header (bool) – csv 文件是否包含头部,默认为 True。
fetch_size (int) – 写入 csv 文件的结果行数,默认为 1000。
hive_conf (dict[Any, Any] | None) – 与 hql 一起执行的 hive_conf。
- get_records(sql, parameters=None, **kwargs)[source]¶
从 Hive 查询中获取一组记录;可选择传递 'schema' kwarg 以指定目标 schema。
- 参数:
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 传递给 get_results 的可选配置
- 返回:
hive 执行结果
- 返回类型:
Any
>>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> len(hh.get_records(sql)) 100