Source code for langchain_core.vectorstores

from __future__ import annotations

import logging
import math
import warnings
from abc import ABC, abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    Collection,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
)

from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables.config import run_in_executor

if TYPE_CHECKING:
    from langchain_core.callbacks.manager import (
        AsyncCallbackManagerForRetrieverRun,
        CallbackManagerForRetrieverRun,
    )
    from langchain_core.documents import Document

logger = logging.getLogger(__name__)

VST = TypeVar("VST", bound="VectorStore")


[docs]class VectorStore(ABC): """Interface for vector store."""
[docs] @abstractmethod def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """
@property def embeddings(self) -> Optional[Embeddings]: """Access the query embedding object if available.""" logger.debug( f"{Embeddings.__name__} is not implemented for {self.__class__.__name__}" ) return None
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ raise NotImplementedError("delete method must be implemented by subclass.")
[docs] async def adelete( self, ids: Optional[List[str]] = None, **kwargs: Any ) -> Optional[bool]: """Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ raise NotImplementedError("delete method must be implemented by subclass.")
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore.""" return await run_in_executor(None, self.add_texts, texts, metadatas, **kwargs)
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: """Run more documents through the embeddings and add to the vectorstore. Args: documents (List[Document]: Documents to add to the vectorstore. Returns: List[str]: List of IDs of the added texts. """ # TODO: Handle the case where the user doesn't provide ids on the Collection texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return self.add_texts(texts, metadatas, **kwargs)
[docs] async def aadd_documents( self, documents: List[Document], **kwargs: Any ) -> List[str]: """Run more documents through the embeddings and add to the vectorstore. Args: documents (List[Document]: Documents to add to the vectorstore. Returns: List[str]: List of IDs of the added texts. """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return await self.aadd_texts(texts, metadatas, **kwargs)
[docs] def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]: """Return docs most similar to query using specified search type.""" if search_type == "similarity": return self.similarity_search(query, **kwargs) elif search_type == "mmr": return self.max_marginal_relevance_search(query, **kwargs) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
[docs] async def asearch( self, query: str, search_type: str, **kwargs: Any ) -> List[Document]: """Return docs most similar to query using specified search type.""" if search_type == "similarity": return await self.asimilarity_search(query, **kwargs) elif search_type == "mmr": return await self.amax_marginal_relevance_search(query, **kwargs) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
@staticmethod def _euclidean_relevance_score_fn(distance: float) -> float: """Return a similarity score on a scale [0, 1].""" # The 'correct' relevance function # may differ depending on a few things, including: # - the distance / similarity metric used by the VectorStore # - the scale of your embeddings (OpenAI's are unit normed. Many # others are not!) # - embedding dimensionality # - etc. # This function converts the euclidean norm of normalized embeddings # (0 is most similar, sqrt(2) most dissimilar) # to a similarity function (0 to 1) return 1.0 - distance / math.sqrt(2) @staticmethod def _cosine_relevance_score_fn(distance: float) -> float: """Normalize the distance to a score on a scale [0, 1].""" return 1.0 - distance @staticmethod def _max_inner_product_relevance_score_fn(distance: float) -> float: """Normalize the distance to a score on a scale [0, 1].""" if distance > 0: return 1.0 - distance return -1.0 * distance def _select_relevance_score_fn(self) -> Callable[[float], float]: """ The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. Vectorstores should define their own selection based method of relevance. """ raise NotImplementedError
[docs] def similarity_search_with_score( self, *args: Any, **kwargs: Any ) -> List[Tuple[Document, float]]: """Run similarity search with distance.""" raise NotImplementedError
[docs] async def asimilarity_search_with_score( self, *args: Any, **kwargs: Any ) -> List[Tuple[Document, float]]: """Run similarity search with distance asynchronously.""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. return await run_in_executor( None, self.similarity_search_with_score, *args, **kwargs )
def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Default similarity search with relevance scores. Modify if necessary in subclass. Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ relevance_score_fn = self._select_relevance_score_fn() docs_and_scores = self.similarity_search_with_score(query, k, **kwargs) return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores] async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Default async similarity search with relevance scores. Modify if necessary in subclass. Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ relevance_score_fn = self._select_relevance_score_fn() docs_and_scores = await self.asimilarity_search_with_score(query, k, **kwargs) return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
[docs] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ score_threshold = kwargs.pop("score_threshold", None) docs_and_similarities = self._similarity_search_with_relevance_scores( query, k=k, **kwargs ) if any( similarity < 0.0 or similarity > 1.0 for _, similarity in docs_and_similarities ): warnings.warn( "Relevance scores must be between" f" 0 and 1, got {docs_and_similarities}" ) if score_threshold is not None: docs_and_similarities = [ (doc, similarity) for doc, similarity in docs_and_similarities if similarity >= score_threshold ] if len(docs_and_similarities) == 0: warnings.warn( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return docs_and_similarities
[docs] async def asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and relevance scores in the range [0, 1], asynchronously. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ score_threshold = kwargs.pop("score_threshold", None) docs_and_similarities = await self._asimilarity_search_with_relevance_scores( query, k=k, **kwargs ) if any( similarity < 0.0 or similarity > 1.0 for _, similarity in docs_and_similarities ): warnings.warn( "Relevance scores must be between" f" 0 and 1, got {docs_and_similarities}" ) if score_threshold is not None: docs_and_similarities = [ (doc, similarity) for doc, similarity in docs_and_similarities if similarity >= score_threshold ] if len(docs_and_similarities) == 0: warnings.warn( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return docs_and_similarities
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, **kwargs: Any ) -> List[Document]: """Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. Returns: List of Documents most similar to the query vector. """ raise NotImplementedError
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, **kwargs: Any ) -> List[Document]: """Return docs most similar to embedding vector.""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. return await run_in_executor( None, self.similarity_search_by_vector, embedding, k=k, **kwargs )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """ raise NotImplementedError
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance.""" raise NotImplementedError
[docs] @classmethod def from_documents( cls: Type[VST], documents: List[Document], embedding: Embeddings, **kwargs: Any, ) -> VST: """Return VectorStore initialized from documents and embeddings.""" texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod async def afrom_documents( cls: Type[VST], documents: List[Document], embedding: Embeddings, **kwargs: Any, ) -> VST: """Return VectorStore initialized from documents and embeddings.""" texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod @abstractmethod def from_texts( cls: Type[VST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> VST: """Return VectorStore initialized from texts and embeddings."""
[docs] @classmethod async def afrom_texts( cls: Type[VST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> VST: """Return VectorStore initialized from texts and embeddings.""" return await run_in_executor( None, cls.from_texts, texts, embedding, metadatas, **kwargs )
def _get_retriever_tags(self) -> List[str]: """Get tags for retriever.""" tags = [self.__class__.__name__] if self.embeddings: tags.append(self.embeddings.__class__.__name__) return tags
[docs] def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever: """Return VectorStoreRetriever initialized from this VectorStore. Args: search_type (Optional[str]): Defines the type of search that the Retriever should perform. Can be "similarity" (default), "mmr", or "similarity_score_threshold". search_kwargs (Optional[Dict]): Keyword arguments to pass to the search function. Can include things like: k: Amount of documents to return (Default: 4) score_threshold: Minimum relevance threshold for similarity_score_threshold fetch_k: Amount of documents to pass to MMR algorithm (Default: 20) lambda_mult: Diversity of results returned by MMR; 1 for minimum diversity and 0 for maximum. (Default: 0.5) filter: Filter by document metadata Returns: VectorStoreRetriever: Retriever class for VectorStore. Examples: .. code-block:: python # Retrieve more documents with higher diversity # Useful if your dataset has many similar documents docsearch.as_retriever( search_type="mmr", search_kwargs={'k': 6, 'lambda_mult': 0.25} ) # Fetch more documents for the MMR algorithm to consider # But only return the top 5 docsearch.as_retriever( search_type="mmr", search_kwargs={'k': 5, 'fetch_k': 50} ) # Only retrieve documents that have a relevance score # Above a certain threshold docsearch.as_retriever( search_type="similarity_score_threshold", search_kwargs={'score_threshold': 0.8} ) # Only get the single most similar document from the dataset docsearch.as_retriever(search_kwargs={'k': 1}) # Use a filter to only retrieve documents from a specific paper docsearch.as_retriever( search_kwargs={'filter': {'paper_title':'GPT-4 Technical Report'}} ) """ tags = kwargs.pop("tags", None) or [] tags.extend(self._get_retriever_tags()) return VectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)
[docs]class VectorStoreRetriever(BaseRetriever): """Base Retriever class for VectorStore.""" vectorstore: VectorStore """VectorStore to use for retrieval.""" search_type: str = "similarity" """Type of search to perform. Defaults to "similarity".""" search_kwargs: dict = Field(default_factory=dict) """Keyword arguments to pass to the search function.""" allowed_search_types: ClassVar[Collection[str]] = ( "similarity", "similarity_score_threshold", "mmr", ) class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True @root_validator() def validate_search_type(cls, values: Dict) -> Dict: """Validate search type.""" search_type = values["search_type"] if search_type not in cls.allowed_search_types: raise ValueError( f"search_type of {search_type} not allowed. Valid values are: " f"{cls.allowed_search_types}" ) if search_type == "similarity_score_threshold": score_threshold = values["search_kwargs"].get("score_threshold") if (score_threshold is None) or (not isinstance(score_threshold, float)): raise ValueError( "`score_threshold` is not specified with a float value(0~1) " "in `search_kwargs`." ) return values def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = self.vectorstore.similarity_search(query, **self.search_kwargs) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( self.vectorstore.similarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = self.vectorstore.max_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs async def _aget_relevant_documents( self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = await self.vectorstore.asimilarity_search( query, **self.search_kwargs ) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( await self.vectorstore.asimilarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = await self.vectorstore.amax_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: """Add documents to vectorstore.""" return self.vectorstore.add_documents(documents, **kwargs)
[docs] async def aadd_documents( self, documents: List[Document], **kwargs: Any ) -> List[str]: """Add documents to vectorstore.""" return await self.vectorstore.aadd_documents(documents, **kwargs)