Source code for langchain_community.chat_models.mlflow_ai_gateway

import asyncio
import logging
import warnings
from functools import partial
from typing import Any, Dict, List, Mapping, Optional

from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    ChatMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.outputs import (
    ChatGeneration,
    ChatResult,
)
from langchain_core.pydantic_v1 import BaseModel, Extra

logger = logging.getLogger(__name__)


# Ignoring type because below is valid pydantic code
# Unexpected keyword argument "extra" for "__init_subclass__" of "object"  [call-arg]
[docs]class ChatParams(BaseModel, extra=Extra.allow): # type: ignore[call-arg] """Parameters for the `MLflow AI Gateway` LLM.""" temperature: float = 0.0 candidate_count: int = 1 """The number of candidates to return.""" stop: Optional[List[str]] = None max_tokens: Optional[int] = None
[docs]class ChatMLflowAIGateway(BaseChatModel): """`MLflow AI Gateway` chat models API. To use, you should have the ``mlflow[gateway]`` python package installed. For more information, see https://mlflow.org/docs/latest/gateway/index.html. Example: .. code-block:: python from langchain_community.chat_models import ChatMLflowAIGateway chat = ChatMLflowAIGateway( gateway_uri="<your-mlflow-ai-gateway-uri>", route="<your-mlflow-ai-gateway-chat-route>", params={ "temperature": 0.1 } ) """ def __init__(self, **kwargs: Any): warnings.warn( "`ChatMLflowAIGateway` is deprecated. Use `ChatMlflow` or " "`ChatDatabricks` instead.", DeprecationWarning, ) try: import mlflow.gateway except ImportError as e: raise ImportError( "Could not import `mlflow.gateway` module. " "Please install it with `pip install mlflow[gateway]`." ) from e super().__init__(**kwargs) if self.gateway_uri: mlflow.gateway.set_gateway_uri(self.gateway_uri) route: str gateway_uri: Optional[str] = None params: Optional[ChatParams] = None @property def _default_params(self) -> Dict[str, Any]: params: Dict[str, Any] = { "gateway_uri": self.gateway_uri, "route": self.route, **(self.params.dict() if self.params else {}), } return params def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: try: import mlflow.gateway except ImportError as e: raise ImportError( "Could not import `mlflow.gateway` module. " "Please install it with `pip install mlflow[gateway]`." ) from e message_dicts = [ ChatMLflowAIGateway._convert_message_to_dict(message) for message in messages ] data: Dict[str, Any] = { "messages": message_dicts, **(self.params.dict() if self.params else {}), } resp = mlflow.gateway.query(self.route, data=data) return ChatMLflowAIGateway._create_chat_result(resp) async def _agenerate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: func = partial( self._generate, messages, stop=stop, run_manager=run_manager, **kwargs ) return await asyncio.get_event_loop().run_in_executor(None, func) @property def _identifying_params(self) -> Dict[str, Any]: return self._default_params def _get_invocation_params( self, stop: Optional[List[str]] = None, **kwargs: Any ) -> Dict[str, Any]: """Get the parameters used to invoke the model FOR THE CALLBACKS.""" return { **self._default_params, **super()._get_invocation_params(stop=stop, **kwargs), } @property def _llm_type(self) -> str: """Return type of chat model.""" return "mlflow-ai-gateway-chat" @staticmethod def _convert_dict_to_message(_dict: Mapping[str, Any]) -> BaseMessage: role = _dict["role"] content = _dict["content"] if role == "user": return HumanMessage(content=content) elif role == "assistant": return AIMessage(content=content) elif role == "system": return SystemMessage(content=content) else: return ChatMessage(content=content, role=role) @staticmethod def _raise_functions_not_supported() -> None: raise ValueError( "Function messages are not supported by the MLflow AI Gateway. Please" " create a feature request at https://github.com/mlflow/mlflow/issues." ) @staticmethod def _convert_message_to_dict(message: BaseMessage) -> dict: if isinstance(message, ChatMessage): message_dict = {"role": message.role, "content": message.content} elif isinstance(message, HumanMessage): message_dict = {"role": "user", "content": message.content} elif isinstance(message, AIMessage): message_dict = {"role": "assistant", "content": message.content} elif isinstance(message, SystemMessage): message_dict = {"role": "system", "content": message.content} elif isinstance(message, FunctionMessage): raise ValueError( "Function messages are not supported by the MLflow AI Gateway. Please" " create a feature request at https://github.com/mlflow/mlflow/issues." ) else: raise ValueError(f"Got unknown message type: {message}") if "function_call" in message.additional_kwargs: ChatMLflowAIGateway._raise_functions_not_supported() if message.additional_kwargs: logger.warning( "Additional message arguments are unsupported by MLflow AI Gateway " " and will be ignored: %s", message.additional_kwargs, ) return message_dict @staticmethod def _create_chat_result(response: Mapping[str, Any]) -> ChatResult: generations = [] for candidate in response["candidates"]: message = ChatMLflowAIGateway._convert_dict_to_message(candidate["message"]) message_metadata = candidate.get("metadata", {}) gen = ChatGeneration( message=message, generation_info=dict(message_metadata), ) generations.append(gen) response_metadata = response.get("metadata", {}) return ChatResult(generations=generations, llm_output=response_metadata)