airflow.providers.apache.pinot.hooks.pinot

PinotAdminHook

此 Hook 是对 pinot-admin.sh 脚本的封装。

PinotDbApiHook

与 Pinot Broker 查询 API 交互。

模块内容

class airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook(conn_id='pinot_admin_default', cmd_path='pinot-admin.sh', pinot_admin_system_exit=False)[源代码]

基类: airflow.hooks.base.BaseHook

此 Hook 是对 pinot-admin.sh 脚本的封装。

目前,仅实现了其子命令的一小部分,这是将离线数据摄取到 Apache Pinot 所必需的(即 AddSchema、AddTable、CreateSegment 和 UploadSegment)。它们的命令选项基于 Pinot v0.1.0。

不幸的是,截至 v0.1.0,pinot-admin.sh 始终以状态码 0 退出。为了解决此问题,用户可以使用 pinot_admin_system_exit 标志。如果其值设置为 false,则此 hook 根据输出消息而不是状态码来评估结果。Pinot 的此行为预计将在下一版本中得到改进,其中包括以下 PR:https://github.com/apache/incubator-pinot/pull/4110

参数:
  • conn_id (str) – 要使用的连接名称。

  • cmd_path (str) – 请勿修改此参数。它曾是 pinot-admin.sh 可执行文件的文件路径,但在 apache-pinot provider 4.0.0 版本中,此参数的值必须保持默认值:pinot-admin.sh。此处保留是为了避免在使用位置参数初始化 hook 时意外覆盖 pinot_admin_system_exit

  • pinot_admin_system_exit (bool) – 如果为 true,则根据状态码评估结果。否则,如果输出消息中包含“Error”或“Exception”,则评估结果为失败。

conn_name_attr = 'conn_id'[源代码]
default_conn_name = 'pinot_admin_default'[源代码]
conn_type = 'pinot_admin'[源代码]
hook_name = 'Pinot Admin'[源代码]
host[源代码]
port = ''[源代码]
username[源代码]
password[源代码]
cmd_path = 'pinot-admin.sh'[源代码]
pinot_admin_system_exit[源代码]
conn[源代码]
get_conn()[源代码]

返回 hook 的连接。

add_schema(schema_file, with_exec=True)[源代码]

通过运行 AddSchema 命令添加 Pinot Schema。

参数:
  • schema_file (str) – Pinot Schema 文件

  • with_exec (bool) – 布尔值

add_table(file_path, with_exec=True)[源代码]

通过运行 AddTable 命令添加 Pinot Table。

参数:
  • file_path (str) – Pinot Table 配置文件

  • with_exec (bool) – 布尔值

create_segment(generator_config_file=None, data_dir=None, segment_format=None, out_dir=None, overwrite=None, table_name=None, segment_name=None, time_column_name=None, schema_file=None, reader_config_file=None, enable_star_tree_index=None, star_tree_index_spec_file=None, hll_size=None, hll_columns=None, hll_suffix=None, num_threads=None, post_creation_verification=None, retry=None)[源代码]

通过运行 CreateSegment 命令创建 Pinot Segment。

upload_segment(segment_dir, table_name=None)[源代码]

通过运行 UploadSegment 命令上传 Segment。

参数:
  • segment_dir (str)

  • table_name (str | None)

返回:

返回类型:

Any

run_cli(cmd, verbose=True)[源代码]

使用 pinot-admin.sh 运行命令。

参数:
  • cmd (list[str]) – 将由 pinot-admin.sh 脚本运行的命令列表

  • verbose (bool)

class airflow.providers.apache.pinot.hooks.pinot.PinotDbApiHook(*args, schema=None, log_sql=True, **kwargs)[源代码]

基类: airflow.providers.common.sql.hooks.sql.DbApiHook

与 Pinot Broker 查询 API 交互。

此 hook 使用标准 SQL 端点,因为 PQL 端点即将废弃。 https://docs.pinot.apache.org/users/api/querying-pinot-using-standard-sql

conn_name_attr = 'pinot_broker_conn_id'[源代码]
default_conn_name = 'pinot_broker_default'[源代码]
conn_type = 'pinot'[源代码]
hook_name = 'Pinot Broker'[源代码]
supports_autocommit = False[源代码]
get_conn()[源代码]

通过 Pinot dbapi 建立与 Pinot Broker 的连接。

get_uri()[源代码]

获取 Pinot Broker 的连接 URI。

例如: http://localhost:9000/query/sql

get_records(sql, parameters=None, **kwargs)[源代码]

执行 SQL 并返回一组记录。

参数:
get_first(sql, parameters=None)[源代码]

执行 SQL 并返回第一个结果行。

参数:
abstract set_autocommit(conn, autocommit)[源代码]

设置连接上的 autocommit 标志。

abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[源代码]

将一组元组插入到表中。

行按块插入,每个块(大小为 commit_every)在新事务中完成。

参数:
  • table (str) – 目标表的名称

  • rows (str) – 要插入到表中的行

  • target_fields (str | None) – 要填充表中的列名称

  • commit_every (int) – 单个事务中插入的最大行数。设置为 0 以在单个事务中插入所有行。

  • replace (bool) – 是替换而不是插入

  • executemany – 如果为 True,则所有行将按照 commit_every 参数定义的块一次性插入。这仅适用于所有行具有相同数量的列名的情况,但能带来更好的性能。

  • fast_executemany – 如果为 True,则用于 executemany 的游标将设置 fast_executemany 参数,这可以带来更好的性能,如果驱动程序支持的话。

  • autocommit – 在执行查询之前,连接的 autocommit 设置为何。

此条目是否有帮助?