Source code for langchain_community.vectorstores.hanavector

"""SAP HANA Cloud Vector Engine"""
from __future__ import annotations

import importlib.util
import json
import re
from typing import (
    TYPE_CHECKING,
    Callable,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from hdbcli import dbapi

HANA_DISTANCE_FUNCTION: dict = {
    DistanceStrategy.COSINE: ("COSINE_SIMILARITY", "DESC"),
    DistanceStrategy.EUCLIDEAN_DISTANCE: ("L2DISTANCE", "ASC"),
}

default_distance_strategy = DistanceStrategy.COSINE
default_table_name: str = "EMBEDDINGS"
default_content_column: str = "VEC_TEXT"
default_metadata_column: str = "VEC_META"
default_vector_column: str = "VEC_VECTOR"
default_vector_column_length: int = -1  # -1 means dynamic length


[docs]class HanaDB(VectorStore): """SAP HANA Cloud Vector Engine The prerequisite for using this class is the installation of the ``hdbcli`` Python package. The HanaDB vectorstore can be created by providing an embedding function and an existing database connection. Optionally, the names of the table and the columns to use. """
[docs] def __init__( self, connection: dbapi.Connection, embedding: Embeddings, distance_strategy: DistanceStrategy = default_distance_strategy, table_name: str = default_table_name, content_column: str = default_content_column, metadata_column: str = default_metadata_column, vector_column: str = default_vector_column, vector_column_length: int = default_vector_column_length, ): # Check if the hdbcli package is installed if importlib.util.find_spec("hdbcli") is None: raise ImportError( "Could not import hdbcli python package. " "Please install it with `pip install hdbcli`." ) valid_distance = False for key in HANA_DISTANCE_FUNCTION.keys(): if key is distance_strategy: valid_distance = True if not valid_distance: raise ValueError( "Unsupported distance_strategy: {}".format(distance_strategy) ) self.connection = connection self.embedding = embedding self.distance_strategy = distance_strategy self.table_name = HanaDB._sanitize_name(table_name) self.content_column = HanaDB._sanitize_name(content_column) self.metadata_column = HanaDB._sanitize_name(metadata_column) self.vector_column = HanaDB._sanitize_name(vector_column) self.vector_column_length = HanaDB._sanitize_int(vector_column_length) # Check if the table exists, and eventually create it if not self._table_exists(self.table_name): sql_str = ( f"CREATE TABLE {self.table_name}(" f"{self.content_column} NCLOB, " f"{self.metadata_column} NCLOB, " f"{self.vector_column} REAL_VECTOR " ) if self.vector_column_length == -1: sql_str += ");" else: sql_str += f"({self.vector_column_length}));" try: cur = self.connection.cursor() cur.execute(sql_str) finally: cur.close() # Check if the needed columns exist and have the correct type self._check_column(self.table_name, self.content_column, ["NCLOB", "NVARCHAR"]) self._check_column(self.table_name, self.metadata_column, ["NCLOB", "NVARCHAR"]) self._check_column( self.table_name, self.vector_column, ["REAL_VECTOR"], self.vector_column_length, )
def _table_exists(self, table_name) -> bool: # type: ignore[no-untyped-def] sql_str = ( "SELECT COUNT(*) FROM SYS.TABLES WHERE SCHEMA_NAME = CURRENT_SCHEMA" " AND TABLE_NAME = ?" ) try: cur = self.connection.cursor() cur.execute(sql_str, (table_name)) if cur.has_result_set(): rows = cur.fetchall() if rows[0][0] == 1: return True finally: cur.close() return False def _check_column(self, table_name, column_name, column_type, column_length=None): # type: ignore[no-untyped-def] sql_str = ( "SELECT DATA_TYPE_NAME, LENGTH FROM SYS.TABLE_COLUMNS WHERE " "SCHEMA_NAME = CURRENT_SCHEMA " "AND TABLE_NAME = ? AND COLUMN_NAME = ?" ) try: cur = self.connection.cursor() cur.execute(sql_str, (table_name, column_name)) if cur.has_result_set(): rows = cur.fetchall() if len(rows) == 0: raise AttributeError(f"Column {column_name} does not exist") # Check data type if rows[0][0] not in column_type: raise AttributeError( f"Column {column_name} has the wrong type: {rows[0][0]}" ) # Check length, if parameter was provided if column_length is not None: if rows[0][1] != column_length: raise AttributeError( f"Column {column_name} has the wrong length: {rows[0][1]}" ) else: raise AttributeError(f"Column {column_name} does not exist") finally: cur.close() @property def embeddings(self) -> Embeddings: return self.embedding def _sanitize_name(input_str: str) -> str: # type: ignore[misc] # Remove characters that are not alphanumeric or underscores return re.sub(r"[^a-zA-Z0-9_]", "", input_str) def _sanitize_int(input_int: any) -> int: # type: ignore[valid-type] value = int(str(input_int)) if value < -1: raise ValueError(f"Value ({value}) must not be smaller than -1") return int(str(input_int)) def _sanitize_list_float(embedding: List[float]) -> List[float]: # type: ignore[misc] for value in embedding: if not isinstance(value, float): raise ValueError(f"Value ({value}) does not have type float") return embedding # Compile pattern only once, for better performance _compiled_pattern = re.compile("^[_a-zA-Z][_a-zA-Z0-9]*$") def _sanitize_metadata_keys(metadata: dict) -> dict: # type: ignore[misc] for key in metadata.keys(): if not HanaDB._compiled_pattern.match(key): raise ValueError(f"Invalid metadata key {key}") return metadata
[docs] def add_texts( # type: ignore[override] self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, embeddings: Optional[List[List[float]]] = None, ) -> List[str]: """Add more texts to the vectorstore. Args: texts (Iterable[str]): Iterable of strings/text to add to the vectorstore. metadatas (Optional[List[dict]], optional): Optional list of metadatas. Defaults to None. embeddings (Optional[List[List[float]]], optional): Optional pre-generated embeddings. Defaults to None. Returns: List[str]: empty list """ # Create all embeddings of the texts beforehand to improve performance if embeddings is None: embeddings = self.embedding.embed_documents(list(texts)) cur = self.connection.cursor() try: # Insert data into the table for i, text in enumerate(texts): # Use provided values by default or fallback metadata = metadatas[i] if metadatas else {} embedding = ( embeddings[i] if embeddings else self.embedding.embed_documents([text])[0] ) sql_str = ( f"INSERT INTO {self.table_name} ({self.content_column}, " f"{self.metadata_column}, {self.vector_column}) " f"VALUES (?, ?, TO_REAL_VECTOR (?));" ) cur.execute( sql_str, ( text, json.dumps(HanaDB._sanitize_metadata_keys(metadata)), f"[{','.join(map(str, embedding))}]", ), ) finally: cur.close() return []
[docs] @classmethod def from_texts( # type: ignore[no-untyped-def, override] cls: Type[HanaDB], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, connection: dbapi.Connection = None, distance_strategy: DistanceStrategy = default_distance_strategy, table_name: str = default_table_name, content_column: str = default_content_column, metadata_column: str = default_metadata_column, vector_column: str = default_vector_column, vector_column_length: int = default_vector_column_length, ): """Create a HanaDB instance from raw documents. This is a user-friendly interface that: 1. Embeds documents. 2. Creates a table if it does not yet exist. 3. Adds the documents to the table. This is intended to be a quick way to get started. """ instance = cls( connection=connection, embedding=embedding, distance_strategy=distance_strategy, table_name=table_name, content_column=content_column, metadata_column=metadata_column, vector_column=vector_column, vector_column_length=vector_column_length, # -1 means dynamic length ) instance.add_texts(texts, metadatas) return instance
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float]]: """Return documents and score values most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: A dictionary of metadata fields and values to filter by. Defaults to None. Returns: List of tuples (containing a Document and a score) that are most similar to the query """ embedding = self.embedding.embed_query(query) return self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter )
[docs] def similarity_search_with_score_and_vector_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float, List[float]]]: """Return docs most similar to the given embedding. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: A dictionary of metadata fields and values to filter by. Defaults to None. Returns: List of Documents most similar to the query and score and the document's embedding vector for each """ result = [] k = HanaDB._sanitize_int(k) embedding = HanaDB._sanitize_list_float(embedding) distance_func_name = HANA_DISTANCE_FUNCTION[self.distance_strategy][0] embedding_as_str = ",".join(map(str, embedding)) sql_str = ( f"SELECT TOP {k}" f" {self.content_column}, " # row[0] f" {self.metadata_column}, " # row[1] f" TO_NVARCHAR({self.vector_column}), " # row[2] f" {distance_func_name}({self.vector_column}, TO_REAL_VECTOR " f" (ARRAY({embedding_as_str}))) AS CS " # row[3] f"FROM {self.table_name}" ) order_str = f" order by CS {HANA_DISTANCE_FUNCTION[self.distance_strategy][1]}" where_str, query_tuple = self._create_where_by_filter(filter) sql_str = sql_str + where_str sql_str = sql_str + order_str try: cur = self.connection.cursor() cur.execute(sql_str, query_tuple) if cur.has_result_set(): rows = cur.fetchall() for row in rows: js = json.loads(row[1]) doc = Document(page_content=row[0], metadata=js) result_vector = HanaDB._parse_float_array_from_string(row[2]) result.append((doc, row[3], result_vector)) finally: cur.close() return result
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float]]: """Return docs most similar to the given embedding. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: A dictionary of metadata fields and values to filter by. Defaults to None. Returns: List of Documents most similar to the query and score for each """ whole_result = self.similarity_search_with_score_and_vector_by_vector( embedding=embedding, k=k, filter=filter ) return [(result_item[0], result_item[1]) for result_item in whole_result]
[docs] def similarity_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Document]: """Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: A dictionary of metadata fields and values to filter by. Defaults to None. Returns: List of Documents most similar to the query vector. """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return [doc for doc, _ in docs_and_scores]
def _create_where_by_filter(self, filter): # type: ignore[no-untyped-def] query_tuple = [] where_str = "" if filter: for i, key in enumerate(filter.keys()): if i == 0: where_str += " WHERE " else: where_str += " AND " where_str += f" JSON_VALUE({self.metadata_column}, '$.{key}') = ?" if isinstance(filter[key], bool): if filter[key]: query_tuple.append("true") else: query_tuple.append("false") elif isinstance(filter[key], int) or isinstance(filter[key], str): query_tuple.append(filter[key]) else: raise ValueError( f"Unsupported filter data-type: {type(filter[key])}" ) return where_str, query_tuple
[docs] def delete( # type: ignore[override] self, ids: Optional[List[str]] = None, filter: Optional[dict] = None ) -> Optional[bool]: """Delete entries by filter with metadata values Args: ids: Deletion with ids is not supported! A ValueError will be raised. filter: A dictionary of metadata fields and values to filter by. An empty filter ({}) will delete all entries in the table. Returns: Optional[bool]: True, if deletion is technically successful. Deletion of zero entries, due to non-matching filters is a success. """ if ids is not None: raise ValueError("Deletion via ids is not supported") if filter is None: raise ValueError("Parameter 'filter' is required when calling 'delete'") where_str, query_tuple = self._create_where_by_filter(filter) sql_str = f"DELETE FROM {self.table_name} {where_str}" try: cur = self.connection.cursor() cur.execute(sql_str, query_tuple) finally: cur.close() return True
[docs] async def adelete( # type: ignore[override] self, ids: Optional[List[str]] = None, filter: Optional[dict] = None ) -> Optional[bool]: """Delete by vector ID or other criteria. Args: ids: List of ids to delete. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ return await run_in_executor(None, self.delete, ids=ids, filter=filter)
def _parse_float_array_from_string(array_as_string: str) -> List[float]: # type: ignore[misc] array_wo_brackets = array_as_string[1:-1] return [float(x) for x in array_wo_brackets.split(",")]
[docs] def max_marginal_relevance_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, ) -> List[Document]: whole_result = self.similarity_search_with_score_and_vector_by_vector( embedding=embedding, k=fetch_k, filter=filter ) embeddings = [result_item[2] for result_item in whole_result] mmr_doc_indexes = maximal_marginal_relevance( np.array(embedding), embeddings, lambda_mult=lambda_mult, k=k ) return [whole_result[i][0] for i in mmr_doc_indexes]
[docs] async def amax_marginal_relevance_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, ) -> List[Document]: """Return docs selected using the maximal marginal relevance.""" return await run_in_executor( None, self.max_marginal_relevance_search_by_vector, embedding=embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, )
@staticmethod def _cosine_relevance_score_fn(distance: float) -> float: return distance def _select_relevance_score_fn(self) -> Callable[[float], float]: """ The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. Vectorstores should define their own selection based method of relevance. """ if self.distance_strategy == DistanceStrategy.COSINE: return HanaDB._cosine_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return HanaDB._euclidean_relevance_score_fn else: raise ValueError( "Unsupported distance_strategy: {}".format(self.distance_strategy) )