langchain_core.utils.aiter
.Tee¶
- class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]¶
Create
n
separate asynchronous iterators overiterable
This splits a single
iterable
into multiple iterators, each providing the same items in the same order. All child iterators may advance separately but share the same items fromiterable
– when the most advanced iterator retrieves an item, it is buffered until the least advanced iterator has yielded it as well. Atee
works lazily and can handle an infiniteiterable
, provided that all iterators advance.async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current)
Unlike
itertools.tee()
,tee()
returns a custom type instead of atuple
. Like a tuple, it can be indexed, iterated and unpacked to get the child iterators. In addition, itsaclose()
method immediately closes all children, and it can be used in anasync with
context for the same effect.If
iterable
is an iterator and read elsewhere,tee
will not provide these items. Also,tee
must internally buffer each item until the last iterator has yielded it; if the most and least advanced iterator differ by most data, using alist
is more efficient (but not lazy).If the underlying iterable is concurrency safe (
anext
may be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, the iterators are safe if there is only ever one single “most advanced” iterator. To enforce sequential use ofanext
, provide alock
- e.g. anasyncio.Lock
instance in anasyncio
application - and access is automatically synchronised.Methods
__init__
(iterable[, n, lock])aclose
()- Parameters
iterable (AsyncIterator[T]) –
n (int) –
lock (Optional[AsyncContextManager[Any]]) –