from __future__ import annotations
import json
import logging
import os
from hashlib import md5
from typing import Any, Iterable, List, Optional, Tuple, Type
import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field
from langchain_core.vectorstores import VectorStore, VectorStoreRetriever
logger = logging.getLogger(__name__)
[docs]class Vectara(VectorStore):
"""`Vectara API` vector store.
See (https://vectara.com).
Example:
.. code-block:: python
from langchain_community.vectorstores import Vectara
vectorstore = Vectara(
vectara_customer_id=vectara_customer_id,
vectara_corpus_id=vectara_corpus_id,
vectara_api_key=vectara_api_key
)
"""
[docs] def __init__(
self,
vectara_customer_id: Optional[str] = None,
vectara_corpus_id: Optional[str] = None,
vectara_api_key: Optional[str] = None,
vectara_api_timeout: int = 120,
source: str = "langchain",
):
"""Initialize with Vectara API."""
self._vectara_customer_id = vectara_customer_id or os.environ.get(
"VECTARA_CUSTOMER_ID"
)
self._vectara_corpus_id = vectara_corpus_id or os.environ.get(
"VECTARA_CORPUS_ID"
)
self._vectara_api_key = vectara_api_key or os.environ.get("VECTARA_API_KEY")
if (
self._vectara_customer_id is None
or self._vectara_corpus_id is None
or self._vectara_api_key is None
):
logger.warning(
"Can't find Vectara credentials, customer_id or corpus_id in "
"environment."
)
else:
logger.debug(f"Using corpus id {self._vectara_corpus_id}")
self._source = source
self._session = requests.Session() # to reuse connections
adapter = requests.adapters.HTTPAdapter(max_retries=3)
self._session.mount("http://", adapter)
self.vectara_api_timeout = vectara_api_timeout
@property
def embeddings(self) -> Optional[Embeddings]:
return None
def _get_post_headers(self) -> dict:
"""Returns headers that should be attached to each post request."""
return {
"x-api-key": self._vectara_api_key,
"customer-id": self._vectara_customer_id,
"Content-Type": "application/json",
"X-Source": self._source,
}
def _delete_doc(self, doc_id: str) -> bool:
"""
Delete a document from the Vectara corpus.
Args:
url (str): URL of the page to delete.
doc_id (str): ID of the document to delete.
Returns:
bool: True if deletion was successful, False otherwise.
"""
body = {
"customer_id": self._vectara_customer_id,
"corpus_id": self._vectara_corpus_id,
"document_id": doc_id,
}
response = self._session.post(
"https://api.vectara.io/v1/delete-doc",
data=json.dumps(body),
verify=True,
headers=self._get_post_headers(),
timeout=self.vectara_api_timeout,
)
if response.status_code != 200:
logger.error(
f"Delete request failed for doc_id = {doc_id} with status code "
f"{response.status_code}, reason {response.reason}, text "
f"{response.text}"
)
return False
return True
def _index_doc(self, doc: dict) -> str:
request: dict[str, Any] = {}
request["customer_id"] = self._vectara_customer_id
request["corpus_id"] = self._vectara_corpus_id
request["document"] = doc
response = self._session.post(
headers=self._get_post_headers(),
url="https://api.vectara.io/v1/index",
data=json.dumps(request),
timeout=self.vectara_api_timeout,
verify=True,
)
status_code = response.status_code
result = response.json()
status_str = result["status"]["code"] if "status" in result else None
if status_code == 409 or status_str and (status_str == "ALREADY_EXISTS"):
return "E_ALREADY_EXISTS"
elif status_str and (status_str == "FORBIDDEN"):
return "E_NO_PERMISSIONS"
else:
return "E_SUCCEEDED"
[docs] def add_files(
self,
files_list: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""
Vectara provides a way to add documents directly via our API where
pre-processing and chunking occurs internally in an optimal way
This method provides a way to use that API in LangChain
Args:
files_list: Iterable of strings, each representing a local file path.
Files could be text, HTML, PDF, markdown, doc/docx, ppt/pptx, etc.
see API docs for full list
metadatas: Optional list of metadatas associated with each file
Returns:
List of ids associated with each of the files indexed
"""
doc_ids = []
for inx, file in enumerate(files_list):
if not os.path.exists(file):
logger.error(f"File {file} does not exist, skipping")
continue
md = metadatas[inx] if metadatas else {}
files: dict = {
"file": (file, open(file, "rb")),
"doc_metadata": json.dumps(md),
}
headers = self._get_post_headers()
headers.pop("Content-Type")
response = self._session.post(
f"https://api.vectara.io/upload?c={self._vectara_customer_id}&o={self._vectara_corpus_id}&d=True",
files=files,
verify=True,
headers=headers,
timeout=self.vectara_api_timeout,
)
if response.status_code == 409:
doc_id = response.json()["document"]["documentId"]
logger.info(
f"File {file} already exists on Vectara (doc_id={doc_id}), skipping"
)
elif response.status_code == 200:
doc_id = response.json()["document"]["documentId"]
doc_ids.append(doc_id)
else:
logger.info(f"Error indexing file {file}: {response.json()}")
return doc_ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
doc_metadata: Optional[dict] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
doc_metadata: optional metadata for the document
This function indexes all the input text strings in the Vectara corpus as a
single Vectara document, where each input text is considered a "section" and the
metadata are associated with each section.
if 'doc_metadata' is provided, it is associated with the Vectara document.
Returns:
document ID of the document added
"""
doc_hash = md5()
for t in texts:
doc_hash.update(t.encode())
doc_id = doc_hash.hexdigest()
if metadatas is None:
metadatas = [{} for _ in texts]
if doc_metadata:
doc_metadata["source"] = "langchain"
else:
doc_metadata = {"source": "langchain"}
doc = {
"document_id": doc_id,
"metadataJson": json.dumps(doc_metadata),
"section": [
{"text": text, "metadataJson": json.dumps(md)}
for text, md in zip(texts, metadatas)
],
}
success_str = self._index_doc(doc)
if success_str == "E_ALREADY_EXISTS":
self._delete_doc(doc_id)
self._index_doc(doc)
elif success_str == "E_NO_PERMISSIONS":
print(
"""No permissions to add document to Vectara.
Check your corpus ID, customer ID and API key"""
)
return [doc_id]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 5,
lambda_val: float = 0.025,
filter: Optional[str] = None,
score_threshold: Optional[float] = None,
n_sentence_context: int = 2,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return Vectara documents most similar to query, along with scores.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 5.
lambda_val: lexical match parameter for hybrid search.
filter: Dictionary of argument(s) to filter on metadata. For example a
filter can be "doc.rating > 3.0 and part.lang = 'deu'"} see
https://docs.vectara.com/docs/search-apis/sql/filter-overview
for more details.
score_threshold: minimal score threshold for the result.
If defined, results with score less than this value will be
filtered out.
n_sentence_context: number of sentences before/after the matching segment
to add, defaults to 2
Returns:
List of Documents most similar to the query and score for each.
"""
data = json.dumps(
{
"query": [
{
"query": query,
"start": 0,
"num_results": k,
"context_config": {
"sentences_before": n_sentence_context,
"sentences_after": n_sentence_context,
},
"corpus_key": [
{
"customer_id": self._vectara_customer_id,
"corpus_id": self._vectara_corpus_id,
"metadataFilter": filter,
"lexical_interpolation_config": {"lambda": lambda_val},
}
],
}
]
}
)
response = self._session.post(
headers=self._get_post_headers(),
url="https://api.vectara.io/v1/query",
data=data,
timeout=self.vectara_api_timeout,
)
if response.status_code != 200:
logger.error(
"Query failed %s",
f"(code {response.status_code}, reason {response.reason}, details "
f"{response.text})",
)
return []
result = response.json()
if score_threshold:
responses = [
r
for r in result["responseSet"][0]["response"]
if r["score"] > score_threshold
]
else:
responses = result["responseSet"][0]["response"]
documents = result["responseSet"][0]["document"]
metadatas = []
for x in responses:
md = {m["name"]: m["value"] for m in x["metadata"]}
doc_num = x["documentIndex"]
doc_md = {m["name"]: m["value"] for m in documents[doc_num]["metadata"]}
md.update(doc_md)
metadatas.append(md)
docs_with_score = [
(
Document(
page_content=x["text"],
metadata=md,
),
x["score"],
)
for x, md in zip(responses, metadatas)
]
return docs_with_score
[docs] def similarity_search(
self,
query: str,
k: int = 5,
lambda_val: float = 0.025,
filter: Optional[str] = None,
n_sentence_context: int = 2,
**kwargs: Any,
) -> List[Document]:
"""Return Vectara documents most similar to query, along with scores.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 5.
filter: Dictionary of argument(s) to filter on metadata. For example a
filter can be "doc.rating > 3.0 and part.lang = 'deu'"} see
https://docs.vectara.com/docs/search-apis/sql/filter-overview for more
details.
n_sentence_context: number of sentences before/after the matching segment
to add, defaults to 2
Returns:
List of Documents most similar to the query
"""
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
lambda_val=lambda_val,
filter=filter,
score_threshold=None,
n_sentence_context=n_sentence_context,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs] @classmethod
def from_texts(
cls: Type[Vectara],
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> Vectara:
"""Construct Vectara wrapper from raw documents.
This is intended to be a quick way to get started.
Example:
.. code-block:: python
from langchain_community.vectorstores import Vectara
vectara = Vectara.from_texts(
texts,
vectara_customer_id=customer_id,
vectara_corpus_id=corpus_id,
vectara_api_key=api_key,
)
"""
# Notes:
# * Vectara generates its own embeddings, so we ignore the provided
# embeddings (required by interface)
# * when metadatas[] are provided they are associated with each "part"
# in Vectara. doc_metadata can be used to provide additional metadata
# for the document itself (applies to all "texts" in this call)
doc_metadata = kwargs.pop("doc_metadata", {})
vectara = cls(**kwargs)
vectara.add_texts(texts, metadatas, doc_metadata=doc_metadata, **kwargs)
return vectara
[docs] @classmethod
def from_files(
cls: Type[Vectara],
files: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> Vectara:
"""Construct Vectara wrapper from raw documents.
This is intended to be a quick way to get started.
Example:
.. code-block:: python
from langchain_community.vectorstores import Vectara
vectara = Vectara.from_files(
files_list,
vectara_customer_id=customer_id,
vectara_corpus_id=corpus_id,
vectara_api_key=api_key,
)
"""
# Note: Vectara generates its own embeddings, so we ignore the provided
# embeddings (required by interface)
vectara = cls(**kwargs)
vectara.add_files(files, metadatas)
return vectara
[docs] def as_retriever(self, **kwargs: Any) -> VectaraRetriever:
tags = kwargs.pop("tags", None) or []
tags.extend(self._get_retriever_tags())
return VectaraRetriever(vectorstore=self, search_kwargs=kwargs, tags=tags)
[docs]class VectaraRetriever(VectorStoreRetriever):
"""Retriever class for `Vectara`."""
vectorstore: Vectara
"""Vectara vectorstore."""
search_kwargs: dict = Field(
default_factory=lambda: {
"lambda_val": 0.0,
"k": 5,
"filter": "",
"n_sentence_context": "2",
}
)
"""Search params.
k: Number of Documents to return. Defaults to 5.
lambda_val: lexical match parameter for hybrid search.
filter: Dictionary of argument(s) to filter on metadata. For example a
filter can be "doc.rating > 3.0 and part.lang = 'deu'"} see
https://docs.vectara.com/docs/search-apis/sql/filter-overview
for more details.
n_sentence_context: number of sentences before/after the matching segment to add
"""
[docs] def add_texts(
self,
texts: List[str],
metadatas: Optional[List[dict]] = None,
doc_metadata: Optional[dict] = None,
) -> None:
"""Add text to the Vectara vectorstore.
Args:
texts (List[str]): The text
metadatas (List[dict]): Metadata dicts, must line up with existing store
"""
self.vectorstore.add_texts(texts, metadatas, doc_metadata or {})