Source code for langchain_community.chat_message_histories.upstash_redis

import json
import logging
from typing import List, Optional

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
)

logger = logging.getLogger(__name__)


[docs]class UpstashRedisChatMessageHistory(BaseChatMessageHistory): """Chat message history stored in an Upstash Redis database."""
[docs] def __init__( self, session_id: str, url: str = "", token: str = "", key_prefix: str = "message_store:", ttl: Optional[int] = None, ): try: from upstash_redis import Redis except ImportError: raise ImportError( "Could not import upstash redis python package. " "Please install it with `pip install upstash_redis`." ) if url == "" or token == "": raise ValueError( "UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN are needed." ) try: self.redis_client = Redis(url=url, token=token) except Exception: logger.error("Upstash Redis instance could not be initiated.") self.session_id = session_id self.key_prefix = key_prefix self.ttl = ttl
@property def key(self) -> str: """Construct the record key to use""" return self.key_prefix + self.session_id @property def messages(self) -> List[BaseMessage]: # type: ignore """Retrieve the messages from Upstash Redis""" _items = self.redis_client.lrange(self.key, 0, -1) items = [json.loads(m) for m in _items[::-1]] messages = messages_from_dict(items) return messages
[docs] def add_message(self, message: BaseMessage) -> None: """Append the message to the record in Upstash Redis""" self.redis_client.lpush(self.key, json.dumps(message_to_dict(message))) if self.ttl: self.redis_client.expire(self.key, self.ttl)
[docs] def clear(self) -> None: """Clear session memory from Upstash Redis""" self.redis_client.delete(self.key)