airflow.providers.weaviate.hooks.weaviate

属性

ExitingSchemaOptions

HTTP_RETRY_STATUS_CODE

REQUESTS_EXCEPTIONS_TYPES

WeaviateHook

与 Weaviate 数据库交互以存储向量。此 hook 使用 'conn_id'。

函数

check_http_error_is_retryable(exc)

模块内容

airflow.providers.weaviate.hooks.weaviate.ExitingSchemaOptions[source]
airflow.providers.weaviate.hooks.weaviate.HTTP_RETRY_STATUS_CODE = [429, 500, 503, 504][source]
airflow.providers.weaviate.hooks.weaviate.REQUESTS_EXCEPTIONS_TYPES[source]
airflow.providers.weaviate.hooks.weaviate.check_http_error_is_retryable(exc)[source]
class airflow.providers.weaviate.hooks.weaviate.WeaviateHook(conn_id=default_conn_name, *args, **kwargs)[source]

基类: airflow.hooks.base.BaseHook

与 Weaviate 数据库交互以存储向量。此 hook 使用 'conn_id'。

参数:

conn_id (str) – 连接到 Weaviate 时要使用的连接 ID。 <howto/connection:weaviate>

conn_name_attr = 'conn_id'[source]
default_conn_name = 'weaviate_default'[source]
conn_type = 'weaviate'[source]
hook_name = 'Weaviate'[source]
conn_id = 'weaviate_default'[source]
classmethod get_connection_form_widgets()[source]

返回要添加到连接表单的连接小部件。

classmethod get_ui_field_behaviour()[source]

返回自定义字段行为。

get_conn()[source]

返回 hook 的连接。

property conn: weaviate.WeaviateClient[source]

返回一个 Weaviate 客户端。

test_connection()[source]
测试连接。

create_collection(name, **kwargs)[source]

创建一个新集合。

get_collection(name)[source]

参数:

按名称获取集合。

name (str) – 要获取的集合名称。

delete_collections(collection_names, if_error='stop')[source]

参数:
  • 如果提供了 collection_names,则删除所有或特定的集合。

  • collection_names (list[str] | str) – 要删除的集合名称列表。

if_error (str) – 定义删除集合时出错要采取的操作,可能的选项是 stopcontinue

返回:

如果 if_error=continue,则返回未能删除的集合列表。 如果 if_error=stop,则返回 None。

返回类型:

list[str] | None

get_collection_configuration(collection_name)[source]

参数:

从 Weaviate 获取集合配置。

collection_name (str) – 要返回集合配置的集合。

update_collection_configuration(collection_name, **kwargs)[source]

更新集合配置。

batch_data(collection_name, data, vector_col='Vector', uuid_col='id', retry_attempts_per_object=5, references=None)[source]

参数:
  • 一次性将多个对象或对象引用添加到 weaviate。

  • collection_name (str) – 对象所属集合的名称。

  • data (list[dict[str, Any]] | pandas.DataFrame | None) – 要添加的对象列表或 dataframe。

  • vector_col (str) – 包含向量的列名称。

  • uuid_col (str) – 包含 UUID 的列名称。

  • retry_attempts_per_object (int) – 失败时放弃前重试的次数。

references (weaviate.collections.classes.internal.ReferenceInputs | None) – 要添加的对象的引用,作为字典。使用 wvc.Reference.to 在字典中创建正确的值。

query_with_vector(embeddings, collection_name, properties, certainty=0.7, limit=1, **kwargs)[source]

使用近似向量查询 weaviate 数据库。

此方法使用 Get 查询进行向量搜索。我们使用 with_near_vector 向 weaviate 提供带有向量本身的查询。这对于使用自定义的外部向量化器查询 Weaviate 类是必需的。Weaviate 然后通过推理 API(在此特定示例中为 OpenAI)将其转换为向量,并使用该向量作为向量搜索的基础。

query_with_text(search_text, collection_name, properties, limit=1, **kwargs)[source]

使用近似文本查询。

此方法使用 Get 查询进行向量搜索。我们使用 nearText 操作符向 weaviate 提供查询搜索文本。Weaviate 然后通过推理 API(在此特定示例中为 OpenAI)将其转换为向量,并使用该向量作为向量搜索的基础。

create_object(data_object, collection_name, **kwargs)[source]

参数:
  • 创建一个新对象。

  • data_object (dict) – 要添加的对象。如果类型是 str,则应为 URL 或文件。

  • collection_name (str) – 与给定对象关联的集合名称。

kwargs – 要传递给 weaviate_client.data_object.create() 的附加参数

get_or_create_object(collection_name, data_object, vector=None, **kwargs)[source]

获取或创建一个新对象。

参数:
  • 如果对象已存在则返回对象,否则返回 UUID。

  • collection_name – 与给定对象关联的集合名称。

  • data_object (dict) – 要添加的对象。

  • vector (collections.abc.Sequence | None) – 与给定对象关联的向量。此参数仅在创建对象时使用。

kwargs – 要传递给 collection.data.fetch_object_by_id() 或 collection.data.fetch_objects() 的参数

get_object(collection_name, **kwargs)[source]

参数:

从 weaviate 获取对象或一个对象。

kwargs – 要传递给 collection.query.fetch_objects() 的参数

get_all_objects(collection_name, after=None, as_dataframe=False, **kwargs)[source]

从 weaviate 获取所有对象。

参数:
  • 如果提供了 after,它将用作列表的起点。

  • after (str | weaviate.types.UUID | None) – 开始列表的对象的 uuid

  • as_dataframe (bool) – 如果为 True,则返回一个 pandas dataframe

kwargs – 要传递给 weaviate_client.data_object.get() 的参数

delete_object(collection_name, uuid)[source]

参数:
  • data_object (dict) – 要添加的对象。如果类型是 str,则应为 URL 或文件。

  • 从 weaviate 删除一个对象。

uuid (weaviate.types.UUID | str) – 要删除的对象的 uuid

update_object(collection_name, uuid, properties=None, **kwargs)[source]

参数:
  • data_object (dict) – 要添加的对象。如果类型是 str,则应为 URL 或文件。

  • 在 weaviate 中更新一个对象。

  • uuid (weaviate.types.UUID | str) – 要更新的对象的 uuid

  • properties (weaviate.collections.classes.types.Properties | None) – 对象的属性。

kwargs – 要传递给 collection.data.update() 的可选参数

replace_object(collection_name, uuid, properties, references=None, **kwargs)[source]

参数:
  • data_object (dict) – 要添加的对象。如果类型是 str,则应为 URL 或文件。

  • 在 weaviate 中更新一个对象。

  • 在 weaviate 中替换一个对象。

  • properties (weaviate.collections.classes.types.Properties) – 对象的属性。

  • references (weaviate.collections.classes.internal.ReferenceInputs | None) – 对 Weaviate 中其他对象的任何引用。

kwargs – 要传递给 collection.data.replace() 的可选参数

object_exists(collection_name, uuid)[source]

参数:
  • data_object (dict) – 要添加的对象。如果类型是 str,则应为 URL 或文件。

  • 检查对象是否存在于 weaviate 中。

uuid (str | weaviate.types.UUID) – 可能存在或不存在于 Weaviate 中的对象的 UUID。

create_or_replace_document_objects(data, collection_name, document_column, existing='skip', uuid_column=None, vector_column='Vector', verbose=False)[source]

创建或替换属于文档的对象。

在实际场景中,信息源(如 Airflow 文档、Stack Overflow 或其他问题)在此被视为“文档”。保持数据库对象与这些信息源同步至关重要。如果这些文档发生任何变化,此函数旨在在数据库中反映这些变化。

注意

此函数负责识别文档中的更改、删除相关的数据库对象并根据更新的信息重新创建它们。务必谨慎处理此过程,确保备份和验证到位,以防止数据丢失或不一致。

参数:
  • 为用户提供了多种处理现有值的方式。replace:用新对象替换现有对象。此选项需要识别属于文档的对象,默认情况下通过使用 document_column 字段完成。skip:跳过现有对象,仅添加文档中缺失的对象。error:如果尝试创建属于现有文档的对象,则引发错误。

  • data (pandas.DataFrame | list[dict[str, Any]] | list[pandas.DataFrame]) – 要导入的单个 pandas DataFrame 或 dict 列表。

  • colleciton_name – 要将数据导入到的 Weaviate 架构中的集合名称。

  • existing (str) – 处理现有数据的策略:“skip”或“replace”。默认为“skip”。

  • document_column (str) – DataFrame 中用于识别源文档的列。

  • uuid_column (str | None) – 包含预生成 UUID 的列。如果未提供,将生成 UUID。

  • vector_column (str) – 包含预嵌入数据的嵌入向量的列。

if_error (str) – 定义删除集合时出错要采取的操作,可能的选项是 stopcontinue

verbose (bool) – 在导入过程中启用详细输出的标志。

如果 if_error=continue,则返回未能删除的集合列表。 如果 if_error=stop,则返回 None。

未能创建的 UUID 列表