from __future__ import annotations
import logging
from typing import Any, Callable, Dict, List, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import Generation, LLMResult
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.utils import get_from_dict_or_env
from requests.exceptions import HTTPError
from tenacity import (
    before_sleep_log,
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)
logger = logging.getLogger(__name__)
def _create_retry_decorator(llm: Tongyi) -> Callable[[Any], Any]:
    min_seconds = 1
    max_seconds = 4
    # Wait 2^x * 1 second between each retry starting with
    # 4 seconds, then up to 10 seconds, then 10 seconds afterwards
    return retry(
        reraise=True,
        stop=stop_after_attempt(llm.max_retries),
        wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
        retry=(retry_if_exception_type(HTTPError)),
        before_sleep=before_sleep_log(logger, logging.WARNING),
    )
[docs]def generate_with_retry(llm: Tongyi, **kwargs: Any) -> Any:
    """Use tenacity to retry the completion call."""
    retry_decorator = _create_retry_decorator(llm)
    @retry_decorator
    def _generate_with_retry(**_kwargs: Any) -> Any:
        resp = llm.client.call(**_kwargs)
        if resp.status_code == 200:
            return resp
        elif resp.status_code in [400, 401]:
            raise ValueError(
                f"status_code: {resp.status_code} \n "
                f"code: {resp.code} \n message: {resp.message}"
            )
        else:
            raise HTTPError(
                f"HTTP error occurred: status_code: {resp.status_code} \n "
                f"code: {resp.code} \n message: {resp.message}",
                response=resp,
            )
    return _generate_with_retry(**kwargs) 
[docs]def stream_generate_with_retry(llm: Tongyi, **kwargs: Any) -> Any:
    """Use tenacity to retry the completion call."""
    retry_decorator = _create_retry_decorator(llm)
    @retry_decorator
    def _stream_generate_with_retry(**_kwargs: Any) -> Any:
        stream_resps = []
        resps = llm.client.call(**_kwargs)
        for resp in resps:
            if resp.status_code == 200:
                stream_resps.append(resp)
            elif resp.status_code in [400, 401]:
                raise ValueError(
                    f"status_code: {resp.status_code} \n "
                    f"code: {resp.code} \n message: {resp.message}"
                )
            else:
                raise HTTPError(
                    f"HTTP error occurred: status_code: {resp.status_code} \n "
                    f"code: {resp.code} \n message: {resp.message}",
                    response=resp,
                )
        return stream_resps
    return _stream_generate_with_retry(**kwargs) 
[docs]class Tongyi(LLM):
    """Tongyi Qwen large language models.
    To use, you should have the ``dashscope`` python package installed, and the
    environment variable ``DASHSCOPE_API_KEY`` set with your API key, or pass
    it as a named parameter to the constructor.
    Example:
        .. code-block:: python
            from langchain_community.llms import Tongyi
            Tongyi = tongyi()
    """
    @property
    def lc_secrets(self) -> Dict[str, str]:
        return {"dashscope_api_key": "DASHSCOPE_API_KEY"}
[docs]    @classmethod
    def is_lc_serializable(cls) -> bool:
        return False 
    client: Any  #: :meta private:
    model_name: str = "qwen-plus-v1"
    """Model name to use."""
    model_kwargs: Dict[str, Any] = Field(default_factory=dict)
    top_p: float = 0.8
    """Total probability mass of tokens to consider at each step."""
    dashscope_api_key: Optional[str] = None
    """Dashscope api key provide by alicloud."""
    n: int = 1
    """How many completions to generate for each prompt."""
    streaming: bool = False
    """Whether to stream the results or not."""
    max_retries: int = 10
    """Maximum number of retries to make when generating."""
    prefix_messages: List = Field(default_factory=list)
    """Series of messages for Chat input."""
    @property
    def _llm_type(self) -> str:
        """Return type of llm."""
        return "tongyi"
    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that api key and python package exists in environment."""
        get_from_dict_or_env(values, "dashscope_api_key", "DASHSCOPE_API_KEY")
        try:
            import dashscope
        except ImportError:
            raise ImportError(
                "Could not import dashscope python package. "
                "Please install it with `pip install dashscope`."
            )
        try:
            values["client"] = dashscope.Generation
        except AttributeError:
            raise ValueError(
                "`dashscope` has no `Generation` attribute, this is likely "
                "due to an old version of the dashscope package. Try upgrading it "
                "with `pip install --upgrade dashscope`."
            )
        return values
    @property
    def _default_params(self) -> Dict[str, Any]:
        """Get the default parameters for calling OpenAI API."""
        normal_params = {
            "top_p": self.top_p,
        }
        return {**normal_params, **self.model_kwargs}
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Call out to Tongyi's generate endpoint.
        Args:
            prompt: The prompt to pass into the model.
        Returns:
            The string generated by the model.
        Example:
            .. code-block:: python
                response = tongyi("Tell me a joke.")
        """
        params: Dict[str, Any] = {
            **{"model": self.model_name},
            **self._default_params,
            **kwargs,
        }
        completion = generate_with_retry(
            self,
            prompt=prompt,
            **params,
        )
        return completion["output"]["text"]
    def _generate(
        self,
        prompts: List[str],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> LLMResult:
        generations = []
        params: Dict[str, Any] = {
            **{"model": self.model_name},
            **self._default_params,
            **kwargs,
        }
        if self.streaming:
            if len(prompts) > 1:
                raise ValueError("Cannot stream results with multiple prompts.")
            params["stream"] = True
            temp = ""
            for stream_resp in stream_generate_with_retry(
                self, prompt=prompts[0], **params
            ):
                if run_manager:
                    stream_resp_text = stream_resp["output"]["text"]
                    stream_resp_text = stream_resp_text.replace(temp, "")
                    # Ali Cloud's streaming transmission interface, each return content
                    # will contain the output
                    # of the previous round(as of September 20, 2023, future updates to
                    # the Alibaba Cloud API may vary)
                    run_manager.on_llm_new_token(stream_resp_text)
                    # The implementation of streaming transmission primarily relies on
                    # the "on_llm_new_token" method
                    # of the streaming callback.
                temp = stream_resp["output"]["text"]
                generations.append(
                    [
                        Generation(
                            text=stream_resp["output"]["text"],
                            generation_info=dict(
                                finish_reason=stream_resp["output"]["finish_reason"],
                            ),
                        )
                    ]
                )
            generations.reverse()
            # In the official implementation of the OpenAI API,
            # the "generations" parameter passed to LLMResult seems to be a 1*1*1
            # two-dimensional list
            # (including in non-streaming mode).
            # Considering that Alibaba Cloud's streaming transmission
            # (as of September 20, 2023, future updates to the Alibaba Cloud API may
            # vary)
            # includes the output of the previous round in each return,
            # reversing this "generations" list should suffice
            # (This is the solution with the least amount of changes to the source code,
            # while still allowing for convenient modifications in the future,
            # although it may result in slightly more memory consumption).
        else:
            for prompt in prompts:
                completion = generate_with_retry(
                    self,
                    prompt=prompt,
                    **params,
                )
                generations.append(
                    [
                        Generation(
                            text=completion["output"]["text"],
                            generation_info=dict(
                                finish_reason=completion["output"]["finish_reason"],
                            ),
                        )
                    ]
                )
        return LLMResult(generations=generations)