langchain_core.utils.aiter.Tee¶
- class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]¶
Create
nseparate asynchronous iterators overiterableThis splits a single
iterableinto multiple iterators, each providing the same items in the same order. All child iterators may advance separately but share the same items fromiterable– when the most advanced iterator retrieves an item, it is buffered until the least advanced iterator has yielded it as well. Ateeworks lazily and can handle an infiniteiterable, provided that all iterators advance.async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current)
Unlike
itertools.tee(),tee()returns a custom type instead of atuple. Like a tuple, it can be indexed, iterated and unpacked to get the child iterators. In addition, itsaclose()method immediately closes all children, and it can be used in anasync withcontext for the same effect.If
iterableis an iterator and read elsewhere,teewill not provide these items. Also,teemust internally buffer each item until the last iterator has yielded it; if the most and least advanced iterator differ by most data, using alistis more efficient (but not lazy).If the underlying iterable is concurrency safe (
anextmay be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, the iterators are safe if there is only ever one single “most advanced” iterator. To enforce sequential use ofanext, provide alock- e.g. anasyncio.Lockinstance in anasyncioapplication - and access is automatically synchronised.Methods
__init__(iterable[, n, lock])aclose()- Parameters
iterable (AsyncIterator[T]) –
n (int) –
lock (Optional[AsyncContextManager[Any]]) –