Source code for langchain_community.chat_models.fireworks

from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Type,
    Union,
)

from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.language_models.llms import create_base_retry_decorator
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    BaseMessageChunk,
    ChatMessage,
    ChatMessageChunk,
    FunctionMessage,
    FunctionMessageChunk,
    HumanMessage,
    HumanMessageChunk,
    SystemMessage,
    SystemMessageChunk,
)
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.pydantic_v1 import Field, SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str
from langchain_core.utils.env import get_from_dict_or_env

from langchain_community.adapters.openai import convert_message_to_dict


def _convert_delta_to_message_chunk(
    _dict: Any, default_class: Type[BaseMessageChunk]
) -> BaseMessageChunk:
    """Convert a delta response to a message chunk."""
    role = _dict.role
    content = _dict.content or ""
    additional_kwargs: Dict = {}

    if role == "user" or default_class == HumanMessageChunk:
        return HumanMessageChunk(content=content)
    elif role == "assistant" or default_class == AIMessageChunk:
        return AIMessageChunk(content=content, additional_kwargs=additional_kwargs)
    elif role == "system" or default_class == SystemMessageChunk:
        return SystemMessageChunk(content=content)
    elif role == "function" or default_class == FunctionMessageChunk:
        return FunctionMessageChunk(content=content, name=_dict.name)
    elif role or default_class == ChatMessageChunk:
        return ChatMessageChunk(content=content, role=role)
    else:
        return default_class(content=content)


[docs]def convert_dict_to_message(_dict: Any) -> BaseMessage: """Convert a dict response to a message.""" role = _dict.role content = _dict.content or "" if role == "user": return HumanMessage(content=content) elif role == "assistant": content = _dict.content additional_kwargs: Dict = {} return AIMessage(content=content, additional_kwargs=additional_kwargs) elif role == "system": return SystemMessage(content=content) elif role == "function": return FunctionMessage(content=content, name=_dict.name) else: return ChatMessage(content=content, role=role)
[docs]class ChatFireworks(BaseChatModel): """Fireworks Chat models.""" model: str = "accounts/fireworks/models/llama-v2-7b-chat" model_kwargs: dict = Field( default_factory=lambda: { "temperature": 0.7, "max_tokens": 512, "top_p": 1, }.copy() ) fireworks_api_key: Optional[SecretStr] = None max_retries: int = 20 use_retry: bool = True @property def lc_secrets(self) -> Dict[str, str]: return {"fireworks_api_key": "FIREWORKS_API_KEY"}
[docs] @classmethod def is_lc_serializable(cls) -> bool: return True
[docs] @classmethod def get_lc_namespace(cls) -> List[str]: """Get the namespace of the langchain object.""" return ["langchain", "chat_models", "fireworks"]
@root_validator() def validate_environment(cls, values: Dict) -> Dict: """Validate that api key in environment.""" try: import fireworks.client except ImportError as e: raise ImportError( "Could not import fireworks-ai python package. " "Please install it with `pip install fireworks-ai`." ) from e fireworks_api_key = convert_to_secret_str( get_from_dict_or_env(values, "fireworks_api_key", "FIREWORKS_API_KEY") ) fireworks.client.api_key = fireworks_api_key.get_secret_value() return values @property def _llm_type(self) -> str: """Return type of llm.""" return "fireworks-chat" def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: message_dicts = self._create_message_dicts(messages) params = { "model": self.model, "messages": message_dicts, **self.model_kwargs, **kwargs, } response = completion_with_retry( self, self.use_retry, run_manager=run_manager, stop=stop, **params, ) return self._create_chat_result(response) async def _agenerate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: message_dicts = self._create_message_dicts(messages) params = { "model": self.model, "messages": message_dicts, **self.model_kwargs, **kwargs, } response = await acompletion_with_retry( self, self.use_retry, run_manager=run_manager, stop=stop, **params ) return self._create_chat_result(response) def _combine_llm_outputs(self, llm_outputs: List[Optional[dict]]) -> dict: if llm_outputs[0] is None: return {} return llm_outputs[0] def _create_chat_result(self, response: Any) -> ChatResult: generations = [] for res in response.choices: message = convert_dict_to_message(res.message) gen = ChatGeneration( message=message, generation_info=dict(finish_reason=res.finish_reason), ) generations.append(gen) llm_output = {"model": self.model} return ChatResult(generations=generations, llm_output=llm_output) def _create_message_dicts( self, messages: List[BaseMessage] ) -> List[Dict[str, Any]]: message_dicts = [convert_message_to_dict(m) for m in messages] return message_dicts def _stream( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[ChatGenerationChunk]: message_dicts = self._create_message_dicts(messages) default_chunk_class = AIMessageChunk params = { "model": self.model, "messages": message_dicts, "stream": True, **self.model_kwargs, **kwargs, } for chunk in completion_with_retry( self, self.use_retry, run_manager=run_manager, stop=stop, **params ): choice = chunk.choices[0] chunk = _convert_delta_to_message_chunk(choice.delta, default_chunk_class) finish_reason = choice.finish_reason generation_info = ( dict(finish_reason=finish_reason) if finish_reason is not None else None ) default_chunk_class = chunk.__class__ chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info) yield chunk if run_manager: run_manager.on_llm_new_token(chunk.text, chunk=chunk) async def _astream( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> AsyncIterator[ChatGenerationChunk]: message_dicts = self._create_message_dicts(messages) default_chunk_class = AIMessageChunk params = { "model": self.model, "messages": message_dicts, "stream": True, **self.model_kwargs, **kwargs, } async for chunk in await acompletion_with_retry_streaming( self, self.use_retry, run_manager=run_manager, stop=stop, **params ): choice = chunk.choices[0] chunk = _convert_delta_to_message_chunk(choice.delta, default_chunk_class) finish_reason = choice.finish_reason generation_info = ( dict(finish_reason=finish_reason) if finish_reason is not None else None ) default_chunk_class = chunk.__class__ chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info) yield chunk if run_manager: await run_manager.on_llm_new_token(token=chunk.text, chunk=chunk)
[docs]def conditional_decorator( condition: bool, decorator: Callable[[Any], Any] ) -> Callable[[Any], Any]: def actual_decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]: if condition: return decorator(func) return func return actual_decorator
[docs]def completion_with_retry( llm: ChatFireworks, use_retry: bool, *, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Any: """Use tenacity to retry the completion call.""" import fireworks.client retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) @conditional_decorator(use_retry, retry_decorator) def _completion_with_retry(**kwargs: Any) -> Any: return fireworks.client.ChatCompletion.create( **kwargs, ) return _completion_with_retry(**kwargs)
[docs]async def acompletion_with_retry( llm: ChatFireworks, use_retry: bool, *, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Any: """Use tenacity to retry the async completion call.""" import fireworks.client retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) @conditional_decorator(use_retry, retry_decorator) async def _completion_with_retry(**kwargs: Any) -> Any: return await fireworks.client.ChatCompletion.acreate( **kwargs, ) return await _completion_with_retry(**kwargs)
[docs]async def acompletion_with_retry_streaming( llm: ChatFireworks, use_retry: bool, *, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Any: """Use tenacity to retry the completion call for streaming.""" import fireworks.client retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) @conditional_decorator(use_retry, retry_decorator) async def _completion_with_retry(**kwargs: Any) -> Any: return fireworks.client.ChatCompletion.acreate( **kwargs, ) return await _completion_with_retry(**kwargs)
def _create_retry_decorator( llm: ChatFireworks, run_manager: Optional[ Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun] ] = None, ) -> Callable[[Any], Any]: """Define retry mechanism.""" import fireworks.client errors = [ fireworks.client.error.RateLimitError, fireworks.client.error.InternalServerError, fireworks.client.error.BadGatewayError, fireworks.client.error.ServiceUnavailableError, ] return create_base_retry_decorator( error_types=errors, max_retries=llm.max_retries, run_manager=run_manager )