"""Chain that takes in an input and produces an action and action input."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from abc import abstractmethod
from pathlib import Path
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import yaml
from langchain_core._api import deprecated
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.callbacks import (
AsyncCallbackManagerForChainRun,
AsyncCallbackManagerForToolRun,
BaseCallbackManager,
CallbackManagerForChainRun,
CallbackManagerForToolRun,
Callbacks,
)
from langchain_core.exceptions import OutputParserException
from langchain_core.language_models import BaseLanguageModel
from langchain_core.messages import BaseMessage
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import BasePromptTemplate
from langchain_core.prompts.few_shot import FewShotPromptTemplate
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.runnables import Runnable, RunnableConfig, ensure_config
from langchain_core.runnables.utils import AddableDict
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from langchain.agents.agent_iterator import AgentExecutorIterator
from langchain.agents.agent_types import AgentType
from langchain.agents.tools import InvalidTool
from langchain.chains.base import Chain
from langchain.chains.llm import LLMChain
from langchain.utilities.asyncio import asyncio_timeout
logger = logging.getLogger(__name__)
[docs]class BaseSingleActionAgent(BaseModel):
"""Base Single Action Agent class."""
@property
def return_values(self) -> List[str]:
"""Return values of the agent."""
return ["output"]
[docs] @abstractmethod
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
[docs] @abstractmethod
async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
@property
@abstractmethod
def input_keys(self) -> List[str]:
"""Return the input keys.
:meta private:
"""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""Return response when agent has been stopped due to max iterations."""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish(
{"output": "Agent stopped due to iteration limit or time limit."}, ""
)
else:
raise ValueError(
f"Got unsupported early_stopping_method `{early_stopping_method}`"
)
@property
def _agent_type(self) -> str:
"""Return Identifier of agent type."""
raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict:
"""Return dictionary representation of agent."""
_dict = super().dict()
try:
_type = self._agent_type
except NotImplementedError:
_type = None
if isinstance(_type, AgentType):
_dict["_type"] = str(_type.value)
elif _type is not None:
_dict["_type"] = _type
return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""Save the agent.
Args:
file_path: Path to file to save the agent to.
Example:
.. code-block:: python
# If working with agent executor
agent.agent.save(file_path="path/agent.yaml")
"""
# Convert file to Path object.
if isinstance(file_path, str):
save_path = Path(file_path)
else:
save_path = file_path
directory_path = save_path.parent
directory_path.mkdir(parents=True, exist_ok=True)
# Fetch dictionary to save
agent_dict = self.dict()
if "_type" not in agent_dict:
raise NotImplementedError(f"Agent {self} does not support saving")
if save_path.suffix == ".json":
with open(file_path, "w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix == ".yaml":
with open(file_path, "w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
raise ValueError(f"{save_path} must be json or yaml")
[docs]class BaseMultiActionAgent(BaseModel):
"""Base Multi Action Agent class."""
@property
def return_values(self) -> List[str]:
"""Return values of the agent."""
return ["output"]
[docs] @abstractmethod
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[List[AgentAction], AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with the observations.
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Actions specifying what tool to use.
"""
[docs] @abstractmethod
async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[List[AgentAction], AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with the observations.
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Actions specifying what tool to use.
"""
@property
@abstractmethod
def input_keys(self) -> List[str]:
"""Return the input keys.
:meta private:
"""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""Return response when agent has been stopped due to max iterations."""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish({"output": "Agent stopped due to max iterations."}, "")
else:
raise ValueError(
f"Got unsupported early_stopping_method `{early_stopping_method}`"
)
@property
def _agent_type(self) -> str:
"""Return Identifier of agent type."""
raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict:
"""Return dictionary representation of agent."""
_dict = super().dict()
try:
_dict["_type"] = str(self._agent_type)
except NotImplementedError:
pass
return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""Save the agent.
Args:
file_path: Path to file to save the agent to.
Example:
.. code-block:: python
# If working with agent executor
agent.agent.save(file_path="path/agent.yaml")
"""
# Convert file to Path object.
if isinstance(file_path, str):
save_path = Path(file_path)
else:
save_path = file_path
# Fetch dictionary to save
agent_dict = self.dict()
if "_type" not in agent_dict:
raise NotImplementedError(f"Agent {self} does not support saving.")
directory_path = save_path.parent
directory_path.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix == ".yaml":
with open(file_path, "w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
raise ValueError(f"{save_path} must be json or yaml")
[docs]class AgentOutputParser(BaseOutputParser[Union[AgentAction, AgentFinish]]):
"""Base class for parsing agent output into agent action/finish."""
[docs] @abstractmethod
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
"""Parse text into agent action/finish."""
[docs]class MultiActionAgentOutputParser(
BaseOutputParser[Union[List[AgentAction], AgentFinish]]
):
"""Base class for parsing agent output into agent actions/finish."""
[docs] @abstractmethod
def parse(self, text: str) -> Union[List[AgentAction], AgentFinish]:
"""Parse text into agent actions/finish."""
[docs]class RunnableAgent(BaseSingleActionAgent):
"""Agent powered by runnables."""
runnable: Runnable[dict, Union[AgentAction, AgentFinish]]
"""Runnable to call to get agent action."""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
@property
def return_values(self) -> List[str]:
"""Return values of the agent."""
return self.return_keys_arg
@property
def input_keys(self) -> List[str]:
return self.input_keys_arg
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Based on past history and current inputs, decide what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with the observations.
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
return final_output
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
AgentAction,
AgentFinish,
]:
"""Based on past history and current inputs, decide what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs
Returns:
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
return final_output
[docs]class RunnableMultiActionAgent(BaseMultiActionAgent):
"""Agent powered by runnables."""
runnable: Runnable[dict, Union[List[AgentAction], AgentFinish]]
"""Runnable to call to get agent actions."""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
@property
def return_values(self) -> List[str]:
"""Return values of the agent."""
return self.return_keys_arg
@property
def input_keys(self) -> List[str]:
"""Return the input keys.
Returns:
List of input keys.
"""
return self.input_keys_arg
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
List[AgentAction],
AgentFinish,
]:
"""Based on past history and current inputs, decide what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with the observations.
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
return final_output
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
List[AgentAction],
AgentFinish,
]:
"""Based on past history and current inputs, decide what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
return final_output
[docs]@deprecated(
"0.1.0",
alternative=(
"Use new agent constructor methods like create_react_agent, create_json_agent, "
"create_structured_chat_agent, etc."
),
removal="0.2.0",
)
class LLMSingleActionAgent(BaseSingleActionAgent):
"""Base class for single action agents."""
llm_chain: LLMChain
"""LLMChain to use for agent."""
output_parser: AgentOutputParser
"""Output parser to use for agent."""
stop: List[str]
"""List of strings to stop on."""
@property
def input_keys(self) -> List[str]:
"""Return the input keys.
Returns:
List of input keys.
"""
return list(set(self.llm_chain.input_keys) - {"intermediate_steps"})
[docs] def dict(self, **kwargs: Any) -> Dict:
"""Return dictionary representation of agent."""
_dict = super().dict()
del _dict["output_parser"]
return _dict
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with the observations.
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
output = self.llm_chain.run(
intermediate_steps=intermediate_steps,
stop=self.stop,
callbacks=callbacks,
**kwargs,
)
return self.output_parser.parse(output)
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
output = await self.llm_chain.arun(
intermediate_steps=intermediate_steps,
stop=self.stop,
callbacks=callbacks,
**kwargs,
)
return self.output_parser.parse(output)
[docs]@deprecated(
"0.1.0",
alternative=(
"Use new agent constructor methods like create_react_agent, create_json_agent, "
"create_structured_chat_agent, etc."
),
removal="0.2.0",
)
class Agent(BaseSingleActionAgent):
"""Agent that calls the language model and deciding the action.
This is driven by an LLMChain. The prompt in the LLMChain MUST include
a variable called "agent_scratchpad" where the agent can put its
intermediary work.
"""
llm_chain: LLMChain
output_parser: AgentOutputParser
allowed_tools: Optional[List[str]] = None
[docs] def dict(self, **kwargs: Any) -> Dict:
"""Return dictionary representation of agent."""
_dict = super().dict()
del _dict["output_parser"]
return _dict
@property
def return_values(self) -> List[str]:
return ["output"]
def _fix_text(self, text: str) -> str:
"""Fix the text."""
raise ValueError("fix_text not implemented for this agent.")
@property
def _stop(self) -> List[str]:
return [
f"\n{self.observation_prefix.rstrip()}",
f"\n\t{self.observation_prefix.rstrip()}",
]
def _construct_scratchpad(
self, intermediate_steps: List[Tuple[AgentAction, str]]
) -> Union[str, List[BaseMessage]]:
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"
return thoughts
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
full_output = self.llm_chain.predict(callbacks=callbacks, **full_inputs)
return self.output_parser.parse(full_output)
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""Given input, decided what to do.
Args:
intermediate_steps: Steps the LLM has taken to date,
along with observations
callbacks: Callbacks to run.
**kwargs: User inputs.
Returns:
Action specifying what tool to use.
"""
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
full_output = await self.llm_chain.apredict(callbacks=callbacks, **full_inputs)
agent_output = await self.output_parser.aparse(full_output)
return agent_output
@property
def input_keys(self) -> List[str]:
"""Return the input keys.
:meta private:
"""
return list(set(self.llm_chain.input_keys) - {"agent_scratchpad"})
@root_validator()
def validate_prompt(cls, values: Dict) -> Dict:
"""Validate that prompt matches format."""
prompt = values["llm_chain"].prompt
if "agent_scratchpad" not in prompt.input_variables:
logger.warning(
"`agent_scratchpad` should be a variable in prompt.input_variables."
" Did not find it, so adding it at the end."
)
prompt.input_variables.append("agent_scratchpad")
if isinstance(prompt, PromptTemplate):
prompt.template += "\n{agent_scratchpad}"
elif isinstance(prompt, FewShotPromptTemplate):
prompt.suffix += "\n{agent_scratchpad}"
else:
raise ValueError(f"Got unexpected prompt type {type(prompt)}")
return values
@property
@abstractmethod
def observation_prefix(self) -> str:
"""Prefix to append the observation with."""
@property
@abstractmethod
def llm_prefix(self) -> str:
"""Prefix to append the LLM call with."""
[docs] @classmethod
@abstractmethod
def create_prompt(cls, tools: Sequence[BaseTool]) -> BasePromptTemplate:
"""Create a prompt for this class."""
@classmethod
def _validate_tools(cls, tools: Sequence[BaseTool]) -> None:
"""Validate that appropriate tools are passed in."""
pass
@classmethod
@abstractmethod
def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser:
"""Get default output parser for this class."""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""Return response when agent has been stopped due to max iterations."""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish(
{"output": "Agent stopped due to iteration limit or time limit."}, ""
)
elif early_stopping_method == "generate":
# Generate does one final forward pass
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += (
f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"
)
# Adding to the previous steps, we now tell the LLM to make a final pred
thoughts += (
"\n\nI now need to return a final answer based on the previous steps:"
)
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop}
full_inputs = {**kwargs, **new_inputs}
full_output = self.llm_chain.predict(**full_inputs)
# We try to extract a final answer
parsed_output = self.output_parser.parse(full_output)
if isinstance(parsed_output, AgentFinish):
# If we can extract, we send the correct stuff
return parsed_output
else:
# If we can extract, but the tool is not the final tool,
# we just return the full output
return AgentFinish({"output": full_output}, full_output)
else:
raise ValueError(
"early_stopping_method should be one of `force` or `generate`, "
f"got {early_stopping_method}"
)
NextStepOutput = List[Union[AgentFinish, AgentAction, AgentStep]]
[docs]class AgentExecutor(Chain):
"""Agent that is using tools."""
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent]
"""The agent to run for creating a plan and determining actions
to take at each step of the execution loop."""
tools: Sequence[BaseTool]
"""The valid tools the agent can call."""
return_intermediate_steps: bool = False
"""Whether to return the agent's trajectory of intermediate steps
at the end in addition to the final output."""
max_iterations: Optional[int] = 15
"""The maximum number of steps to take before ending the execution
loop.
Setting to 'None' could lead to an infinite loop."""
max_execution_time: Optional[float] = None
"""The maximum amount of wall clock time to spend in the execution
loop.
"""
early_stopping_method: str = "force"
"""The method to use for early stopping if the agent never
returns `AgentFinish`. Either 'force' or 'generate'.
`"force"` returns a string saying that it stopped because it met a
time or iteration limit.
`"generate"` calls the agent's LLM Chain one final time to generate
a final answer based on the previous steps.
"""
handle_parsing_errors: Union[
bool, str, Callable[[OutputParserException], str]
] = False
"""How to handle errors raised by the agent's output parser.
Defaults to `False`, which raises the error.
If `true`, the error will be sent back to the LLM as an observation.
If a string, the string itself will be sent to the LLM as an observation.
If a callable function, the function will be called with the exception
as an argument, and the result of that function will be passed to the agent
as an observation.
"""
trim_intermediate_steps: Union[
int, Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]]
] = -1
@root_validator()
def validate_tools(cls, values: Dict) -> Dict:
"""Validate that tools are compatible with agent."""
agent = values["agent"]
tools = values["tools"]
allowed_tools = agent.get_allowed_tools()
if allowed_tools is not None:
if set(allowed_tools) != set([tool.name for tool in tools]):
raise ValueError(
f"Allowed tools ({allowed_tools}) different than "
f"provided tools ({[tool.name for tool in tools]})"
)
return values
@root_validator()
def validate_return_direct_tool(cls, values: Dict) -> Dict:
"""Validate that tools are compatible with agent."""
agent = values["agent"]
tools = values["tools"]
if isinstance(agent, BaseMultiActionAgent):
for tool in tools:
if tool.return_direct:
raise ValueError(
"Tools that have `return_direct=True` are not allowed "
"in multi-action agents"
)
return values
@root_validator(pre=True)
def validate_runnable_agent(cls, values: Dict) -> Dict:
"""Convert runnable to agent if passed in."""
agent = values["agent"]
if isinstance(agent, Runnable):
try:
output_type = agent.OutputType
except Exception as _:
multi_action = False
else:
multi_action = output_type == Union[List[AgentAction], AgentFinish]
if multi_action:
values["agent"] = RunnableMultiActionAgent(runnable=agent)
else:
values["agent"] = RunnableAgent(runnable=agent)
return values
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""Raise error - saving not supported for Agent Executors."""
raise ValueError(
"Saving not supported for agent executors. "
"If you are trying to save the agent, please use the "
"`.save_agent(...)`"
)
[docs] def save_agent(self, file_path: Union[Path, str]) -> None:
"""Save the underlying agent."""
return self.agent.save(file_path)
[docs] def iter(
self,
inputs: Any,
callbacks: Callbacks = None,
*,
include_run_info: bool = False,
async_: bool = False, # arg kept for backwards compat, but ignored
) -> AgentExecutorIterator:
"""Enables iteration over steps taken to reach final output."""
return AgentExecutorIterator(
self,
inputs,
callbacks,
tags=self.tags,
include_run_info=include_run_info,
)
@property
def input_keys(self) -> List[str]:
"""Return the input keys.
:meta private:
"""
return self.agent.input_keys
@property
def output_keys(self) -> List[str]:
"""Return the singular output key.
:meta private:
"""
if self.return_intermediate_steps:
return self.agent.return_values + ["intermediate_steps"]
else:
return self.agent.return_values
def _should_continue(self, iterations: int, time_elapsed: float) -> bool:
if self.max_iterations is not None and iterations >= self.max_iterations:
return False
if (
self.max_execution_time is not None
and time_elapsed >= self.max_execution_time
):
return False
return True
def _return(
self,
output: AgentFinish,
intermediate_steps: list,
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
if run_manager:
run_manager.on_agent_finish(output, color="green", verbose=self.verbose)
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
async def _areturn(
self,
output: AgentFinish,
intermediate_steps: list,
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
if run_manager:
await run_manager.on_agent_finish(
output, color="green", verbose=self.verbose
)
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
def _consume_next_step(
self, values: NextStepOutput
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
if isinstance(values[-1], AgentFinish):
assert len(values) == 1
return values[-1]
else:
return [
(a.action, a.observation) for a in values if isinstance(a, AgentStep)
]
def _take_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
return self._consume_next_step(
[
a
for a in self._iter_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager,
)
]
)
def _iter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.
Override this to take control of how the agent makes and acts on choices.
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "Invalid or incomplete response"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
if run_manager:
run_manager.on_agent_action(output, color="green")
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = ExceptionTool().run(
output.tool_input,
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=output, observation=observation)
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
for agent_action in actions:
yield self._perform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
def _perform_agent_action(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
agent_action: AgentAction,
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> AgentStep:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# Otherwise we lookup the tool
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = InvalidTool().run(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
return AgentStep(action=agent_action, observation=observation)
async def _atake_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
return self._consume_next_step(
[
a
async for a in self._aiter_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager,
)
]
)
async def _aiter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.
Override this to take control of how the agent makes and acts on choices.
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = await self.agent.aplan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "Invalid or incomplete response"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = await ExceptionTool().arun(
output.tool_input,
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=output, observation=observation)
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
# Use asyncio.gather to run multiple tool.arun() calls concurrently
result = await asyncio.gather(
*[
self._aperform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
for agent_action in actions
],
)
# TODO This could yield each result as it becomes available
for chunk in result:
yield chunk
async def _aperform_agent_action(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
agent_action: AgentAction,
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AgentStep:
if run_manager:
await run_manager.on_agent_action(
agent_action, verbose=self.verbose, color="green"
)
# Otherwise we lookup the tool
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = await tool.arun(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = await InvalidTool().arun(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
return AgentStep(action=agent_action, observation=observation)
def _call(
self,
inputs: Dict[str, str],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green", "red"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the number of iterations and time elapsed
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
while self._should_continue(iterations, time_elapsed):
next_step_output = self._take_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output, intermediate_steps, run_manager=run_manager
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return self._return(
tool_return, intermediate_steps, run_manager=run_manager
)
iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return self._return(output, intermediate_steps, run_manager=run_manager)
async def _acall(
self,
inputs: Dict[str, str],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, str]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the number of iterations and time elapsed
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
try:
async with asyncio_timeout(self.max_execution_time):
while self._should_continue(iterations, time_elapsed):
next_step_output = await self._atake_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return await self._areturn(
next_step_output,
intermediate_steps,
run_manager=run_manager,
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return await self._areturn(
tool_return, intermediate_steps, run_manager=run_manager
)
iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return await self._areturn(
output, intermediate_steps, run_manager=run_manager
)
except (TimeoutError, asyncio.TimeoutError):
# stop early when interrupted by the async timeout
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return await self._areturn(
output, intermediate_steps, run_manager=run_manager
)
def _get_tool_return(
self, next_step_output: Tuple[AgentAction, str]
) -> Optional[AgentFinish]:
"""Check if the tool is a returning tool."""
agent_action, observation = next_step_output
name_to_tool_map = {tool.name: tool for tool in self.tools}
return_value_key = "output"
if len(self.agent.return_values) > 0:
return_value_key = self.agent.return_values[0]
# Invalid tools won't be in the map, so we return False.
if agent_action.tool in name_to_tool_map:
if name_to_tool_map[agent_action.tool].return_direct:
return AgentFinish(
{return_value_key: observation},
"",
)
return None
def _prepare_intermediate_steps(
self, intermediate_steps: List[Tuple[AgentAction, str]]
) -> List[Tuple[AgentAction, str]]:
if (
isinstance(self.trim_intermediate_steps, int)
and self.trim_intermediate_steps > 0
):
return intermediate_steps[-self.trim_intermediate_steps :]
elif callable(self.trim_intermediate_steps):
return self.trim_intermediate_steps(intermediate_steps)
else:
return intermediate_steps
[docs] def stream(
self,
input: Union[Dict[str, Any], Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[AddableDict]:
"""Enables streaming over steps taken to reach final output."""
config = ensure_config(config)
iterator = AgentExecutorIterator(
self,
input,
config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
yield_actions=True,
**kwargs,
)
for step in iterator:
yield step
[docs] async def astream(
self,
input: Union[Dict[str, Any], Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[AddableDict]:
"""Enables streaming over steps taken to reach final output."""
config = ensure_config(config)
iterator = AgentExecutorIterator(
self,
input,
config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
yield_actions=True,
**kwargs,
)
async for step in iterator:
yield step