langchain_core.runnables.base.RunnableLambda¶
- class langchain_core.runnables.base.RunnableLambda(func: Union[Union[Callable[[Input], Output], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None)[source]¶
- RunnableLambda converts a python callable into a Runnable. - Wrapping a callable in a RunnableLambda makes the callable usable within either a sync or async context. - RunnableLambda can be composed as any other Runnable and provides seamless integration with LangChain tracing. - Examples - # This is a RunnableLambda from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) runnable.invoke(1) # returns 2 runnable.batch([1, 2, 3]) # returns [2, 3, 4] # Async is supported by default by delegating to the sync implementation await runnable.ainvoke(1) # returns 2 await runnable.abatch([1, 2, 3]) # returns [2, 3, 4] # Alternatively, can provide both synd and sync implementations async def add_one_async(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one, afunc=add_one_async) runnable.invoke(1) # Uses add_one await runnable.ainvoke(1) # Uses add_one_async - Create a RunnableLambda from a callable, and async callable or both. - Accepts both sync and async variants to allow providing efficient implementations for sync and async execution. - Parameters
- func – Either sync or async callable 
- afunc – An async callable that takes an input and returns an output. 
 
 - Attributes - InputType- The type of the input to this runnable. - OutputType- The type of the output of this runnable as a type annotation. - config_specs- List configurable fields for this runnable. - input_schema- The type of input this runnable accepts specified as a pydantic model. - output_schema- The type of output this runnable produces specified as a pydantic model. - Methods - __init__(func[, afunc])- Create a RunnableLambda from a callable, and async callable or both. - abatch(inputs[, config, return_exceptions])- Default implementation runs ainvoke in parallel using asyncio.gather. - ainvoke(input[, config])- Invoke this runnable asynchronously. - astream(input[, config])- Default implementation of astream, which calls ainvoke. - astream_log(input[, config, diff, ...])- Stream all output from a runnable, as reported to the callback system. - atransform(input[, config])- Default implementation of atransform, which buffers input and calls astream. - batch(inputs[, config, return_exceptions])- Default implementation runs invoke in parallel using a thread pool executor. - bind(**kwargs)- Bind arguments to a Runnable, returning a new Runnable. - config_schema(*[, include])- The type of config this runnable accepts specified as a pydantic model. - get_input_schema([config])- The pydantic schema for the input to this runnable. - get_output_schema([config])- Get a pydantic model that can be used to validate output to the runnable. - invoke(input[, config])- Invoke this runnable synchronously. - map()- Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input. - stream(input[, config])- Default implementation of stream, which calls invoke. - transform(input[, config])- Default implementation of transform, which buffers input and then calls stream. - with_config([config])- Bind config to a Runnable, returning a new Runnable. - with_fallbacks(fallbacks, *[, ...])- Add fallbacks to a runnable, returning a new Runnable. - with_listeners(*[, on_start, on_end, on_error])- Bind lifecycle listeners to a Runnable, returning a new Runnable. - with_retry(*[, retry_if_exception_type, ...])- Create a new Runnable that retries the original runnable on exceptions. - with_types(*[, input_type, output_type])- Bind input and output types to a Runnable, returning a new Runnable. - __init__(func: Union[Union[Callable[[Input], Output], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None) None[source]¶
- Create a RunnableLambda from a callable, and async callable or both. - Accepts both sync and async variants to allow providing efficient implementations for sync and async execution. - Parameters
- func – Either sync or async callable 
- afunc – An async callable that takes an input and returns an output. 
 
 
 - async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]¶
- Default implementation runs ainvoke in parallel using asyncio.gather. - The default implementation of batch works well for IO bound runnables. - Subclasses should override this method if they can batch more efficiently; e.g., if the underlying runnable uses an API which supports a batch mode. 
 - async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output[source]¶
- Invoke this runnable asynchronously. 
 - async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]¶
- Default implementation of astream, which calls ainvoke. Subclasses should override this method if they support streaming output. 
 - async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Optional[Any]) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]¶
- Stream all output from a runnable, as reported to the callback system. This includes all inner runs of LLMs, Retrievers, Tools, etc. - Output is streamed as Log objects, which include a list of jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run. - The jsonpatch ops can be applied in order to construct state. - Parameters
- input – The input to the runnable. 
- config – The config to use for the runnable. 
- diff – Whether to yield diffs between each step, or the current state. 
- with_streamed_output_list – Whether to yield the streamed_output list. 
- include_names – Only include logs with these names. 
- include_types – Only include logs with these types. 
- include_tags – Only include logs with these tags. 
- exclude_names – Exclude logs with these names. 
- exclude_types – Exclude logs with these types. 
- exclude_tags – Exclude logs with these tags. 
 
 
 - async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]¶
- Default implementation of atransform, which buffers input and calls astream. Subclasses should override this method if they can start producing output while input is still being generated. 
 - batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]¶
- Default implementation runs invoke in parallel using a thread pool executor. - The default implementation of batch works well for IO bound runnables. - Subclasses should override this method if they can batch more efficiently; e.g., if the underlying runnable uses an API which supports a batch mode. 
 - bind(**kwargs: Any) Runnable[Input, Output]¶
- Bind arguments to a Runnable, returning a new Runnable. 
 - config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel]¶
- The type of config this runnable accepts specified as a pydantic model. - To mark a field as configurable, see the configurable_fields and configurable_alternatives methods. - Parameters
- include – A list of fields to include in the config schema. 
- Returns
- A pydantic model that can be used to validate config. 
 
 - get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel][source]¶
- The pydantic schema for the input to this runnable. 
 - get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]¶
- Get a pydantic model that can be used to validate output to the runnable. - Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the runnable is invoked with. - This method allows to get an output schema for a specific configuration. - Parameters
- config – A config to use when generating the schema. 
- Returns
- A pydantic model that can be used to validate output. 
 
 - invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output[source]¶
- Invoke this runnable synchronously. 
 - map() Runnable[List[Input], List[Output]]¶
- Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input. 
 - stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]¶
- Default implementation of stream, which calls invoke. Subclasses should override this method if they support streaming output. 
 - transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]¶
- Default implementation of transform, which buffers input and then calls stream. Subclasses should override this method if they can start producing output while input is still being generated. 
 - with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output]¶
- Bind config to a Runnable, returning a new Runnable. 
 - with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,)) RunnableWithFallbacksT[Input, Output]¶
- Add fallbacks to a runnable, returning a new Runnable. - Parameters
- fallbacks – A sequence of runnables to try if the original runnable fails. 
- exceptions_to_handle – A tuple of exception types to handle. 
 
- Returns
- A new Runnable that will try the original runnable, and then each fallback in order, upon failures. 
 
 - with_listeners(*, on_start: Optional[Listener] = None, on_end: Optional[Listener] = None, on_error: Optional[Listener] = None) Runnable[Input, Output]¶
- Bind lifecycle listeners to a Runnable, returning a new Runnable. - on_start: Called before the runnable starts running, with the Run object. on_end: Called after the runnable finishes running, with the Run object. on_error: Called if the runnable throws an error, with the Run object. - The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run. 
 - with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output]¶
- Create a new Runnable that retries the original runnable on exceptions. - Parameters
- retry_if_exception_type – A tuple of exception types to retry on 
- wait_exponential_jitter – Whether to add jitter to the wait time between retries 
- stop_after_attempt – The maximum number of attempts to make before giving up 
 
- Returns
- A new Runnable that retries the original runnable on exceptions.