Source code for langchain.retrievers.document_compressors.base

from abc import ABC, abstractmethod
from inspect import signature
from typing import List, Optional, Sequence, Union

from langchain_core.documents import BaseDocumentTransformer, Document
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables.config import run_in_executor

from langchain.callbacks.manager import Callbacks


[docs]class BaseDocumentCompressor(BaseModel, ABC): """Base class for document compressors."""
[docs] @abstractmethod def compress_documents( self, documents: Sequence[Document], query: str, callbacks: Optional[Callbacks] = None, ) -> Sequence[Document]: """Compress retrieved documents given the query context."""
[docs] async def acompress_documents( self, documents: Sequence[Document], query: str, callbacks: Optional[Callbacks] = None, ) -> Sequence[Document]: """Compress retrieved documents given the query context.""" return await run_in_executor( None, self.compress_documents, documents, query, callbacks )
[docs]class DocumentCompressorPipeline(BaseDocumentCompressor): """Document compressor that uses a pipeline of Transformers.""" transformers: List[Union[BaseDocumentTransformer, BaseDocumentCompressor]] """List of document filters that are chained together and run in sequence.""" class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True
[docs] def compress_documents( self, documents: Sequence[Document], query: str, callbacks: Optional[Callbacks] = None, ) -> Sequence[Document]: """Transform a list of documents.""" for _transformer in self.transformers: if isinstance(_transformer, BaseDocumentCompressor): accepts_callbacks = ( signature(_transformer.compress_documents).parameters.get( "callbacks" ) is not None ) if accepts_callbacks: documents = _transformer.compress_documents( documents, query, callbacks=callbacks ) else: documents = _transformer.compress_documents(documents, query) elif isinstance(_transformer, BaseDocumentTransformer): documents = _transformer.transform_documents(documents) else: raise ValueError(f"Got unexpected transformer type: {_transformer}") return documents
[docs] async def acompress_documents( self, documents: Sequence[Document], query: str, callbacks: Optional[Callbacks] = None, ) -> Sequence[Document]: """Compress retrieved documents given the query context.""" for _transformer in self.transformers: if isinstance(_transformer, BaseDocumentCompressor): accepts_callbacks = ( signature(_transformer.acompress_documents).parameters.get( "callbacks" ) is not None ) if accepts_callbacks: documents = await _transformer.acompress_documents( documents, query, callbacks=callbacks ) else: documents = await _transformer.acompress_documents(documents, query) elif isinstance(_transformer, BaseDocumentTransformer): documents = await _transformer.atransform_documents(documents) else: raise ValueError(f"Got unexpected transformer type: {_transformer}") return documents