Skip to content

graph_retriever.utils

Utilities used in graph_retriever and related packages.

math

Math utility functions for vector operations.

cosine_similarity

cosine_similarity(X: Matrix, Y: Matrix) -> ndarray

Compute row-wise cosine similarity between two equal-width matrices.

PARAMETER DESCRIPTION
X

A matrix of shape (m, n), where m is the number of rows and n is the number of columns (features).

TYPE: Matrix

Y

A matrix of shape (p, n), where p is the number of rows and n is the number of columns (features).

TYPE: Matrix

RETURNS DESCRIPTION
ndarray

A matrix of shape (m, p) containing the cosine similarity scores between each row of X and each row of Y.

RAISES DESCRIPTION
ValueError

If the number of columns in X and Y are not equal.

Notes
  • If the simsimd library is available, it will be used for performance optimization. Otherwise, the function falls back to a NumPy implementation.
  • Divide-by-zero and invalid values in similarity calculations are replaced with 0.0 in the output.
Source code in packages/graph-retriever/src/graph_retriever/utils/math.py
def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
    """
    Compute row-wise cosine similarity between two equal-width matrices.

    Parameters
    ----------
    X :
        A matrix of shape (m, n), where `m` is the number of rows and `n` is the
        number of columns (features).
    Y :
        A matrix of shape (p, n), where `p` is the number of rows and `n` is the
        number of columns (features).

    Returns
    -------
    :
        A matrix of shape (m, p) containing the cosine similarity scores
        between each row of `X` and each row of `Y`.

    Raises
    ------
    ValueError
        If the number of columns in `X` and `Y` are not equal.

    Notes
    -----
    - If the `simsimd` library is available, it will be used for performance
      optimization. Otherwise, the function falls back to a NumPy implementation.
    - Divide-by-zero and invalid values in similarity calculations are replaced
      with 0.0 in the output.
    """
    if len(X) == 0 or len(Y) == 0:
        return np.array([])

    X = np.array(X)
    Y = np.array(Y)
    if X.shape[1] != Y.shape[1]:
        raise ValueError(
            f"Number of columns in X and Y must be the same. X has shape {X.shape} "
            f"and Y has shape {Y.shape}."
        )
    try:
        import simsimd as simd

        X = np.array(X, dtype=np.float32)
        Y = np.array(Y, dtype=np.float32)
        Z = 1 - np.array(simd.cdist(X, Y, metric="cosine"))
        return Z
    except ImportError:
        logger.debug(
            "Unable to import simsimd, defaulting to NumPy implementation. If you want "
            "to use simsimd please install with `pip install simsimd`."
        )
        X_norm = np.linalg.norm(X, axis=1)
        Y_norm = np.linalg.norm(Y, axis=1)
        # Ignore divide by zero errors run time warnings as those are handled below.
        with np.errstate(divide="ignore", invalid="ignore"):
            similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm)
        similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
        return similarity

cosine_similarity_top_k

cosine_similarity_top_k(
    X: Matrix,
    Y: Matrix,
    top_k: int | None,
    score_threshold: float | None = None,
) -> tuple[list[tuple[int, int]], list[float]]

Row-wise cosine similarity with optional top-k and score threshold filtering.

PARAMETER DESCRIPTION
X

A matrix of shape (m, n), where m is the number of rows and n is the number of columns (features).

TYPE: Matrix

Y

A matrix of shape (p, n), where p is the number of rows and n is the number of columns (features).

TYPE: Matrix

top_k

Max number of results to return.

TYPE: int | None

score_threshold

Minimum score to return.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[int, int]]

Two-tuples of indices (X_idx, Y_idx) indicating the respective rows in X and Y.

list[float]

The corresponding cosine similarities.

Source code in packages/graph-retriever/src/graph_retriever/utils/math.py
def cosine_similarity_top_k(
    X: Matrix,
    Y: Matrix,
    top_k: int | None,
    score_threshold: float | None = None,
) -> tuple[list[tuple[int, int]], list[float]]:
    """
    Row-wise cosine similarity with optional top-k and score threshold filtering.

    Parameters
    ----------
    X :
        A matrix of shape (m, n), where `m` is the number of rows and `n` is the
        number of columns (features).
    Y :
        A matrix of shape (p, n), where `p` is the number of rows and `n` is the
        number of columns (features).
    top_k :
        Max number of results to return.
    score_threshold:
        Minimum score to return.

    Returns
    -------
    list[tuple[int, int]]
        Two-tuples of indices `(X_idx, Y_idx)` indicating the respective rows in
        `X` and `Y`.
    list[float]
        The corresponding cosine similarities.
    """
    if len(X) == 0 or len(Y) == 0:
        return [], []
    score_array = cosine_similarity(X, Y)
    score_threshold = score_threshold or -1.0
    score_array[score_array < score_threshold] = 0
    top_k = min(top_k or len(score_array), np.count_nonzero(score_array))
    top_k_idxs = np.argpartition(score_array, -top_k, axis=None)[-top_k:]
    top_k_idxs = top_k_idxs[np.argsort(score_array.ravel()[top_k_idxs])][::-1]
    ret_idxs = np.unravel_index(top_k_idxs, score_array.shape)
    scores = score_array.ravel()[top_k_idxs].tolist()
    return list(zip(*ret_idxs)), scores  # type: ignore

merge

amerge async

amerge(
    *async_iterables: AsyncIterator[T],
    queue_size: int = 10,
) -> AsyncIterator[T]

Merge async iterables into a single async iterator.

Elements are yielded in the order they become available.

PARAMETER DESCRIPTION
async_iterables

The async iterators to merge.

TYPE: AsyncIterator[T] DEFAULT: ()

queue_size

Number of elements to buffer in the queue.

TYPE: int DEFAULT: 10

YIELDS DESCRIPTION
AsyncIterator[T]

The elements of the iterators as they become available.

Source code in packages/graph-retriever/src/graph_retriever/utils/merge.py
async def amerge(
    *async_iterables: AsyncIterator[T],
    queue_size: int = 10,
) -> AsyncIterator[T]:
    """
    Merge async iterables into a single async iterator.

    Elements are yielded in the order they become available.

    Parameters
    ----------
    async_iterables :
        The async iterators to merge.
    queue_size :
        Number of elements to buffer in the queue.

    Yields
    ------
    :
        The elements of the iterators as they become available.
    """
    queue: asyncio.Queue[T | _Done] = asyncio.Queue(queue_size)

    async def pump(aiter: AsyncIterator[T]) -> None:
        try:
            async for item in aiter:
                await queue.put(item)
            await queue.put(_Done(exception=False))
        except:
            await queue.put(_Done(exception=True))
            raise

    tasks = [asyncio.create_task(pump(aiter)) for aiter in async_iterables]

    try:
        pending_count = len(async_iterables)
        while pending_count > 0:
            item = await queue.get()
            if isinstance(item, _Done):
                if item.exception:
                    # If there has been an exception, end early.
                    break
                else:
                    pending_count -= 1
            else:
                yield item
            queue.task_done()
    finally:
        for task in tasks:
            if not task.done():
                task.cancel()
        await asyncio.gather(*tasks)

run_in_executor

run_in_executor async

run_in_executor(
    executor: Executor | None,
    func: Callable[P, T],
    *args: args,
    **kwargs: kwargs,
) -> T

Run a function in an executor.

PARAMETER DESCRIPTION
executor

The executor to run in.

TYPE: Executor | None

func

The function.

TYPE: Callable[P, T]

*args

The positional arguments to the function.

TYPE: args DEFAULT: ()

kwargs

The keyword arguments to the function.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
T

The output of the function.

RAISES DESCRIPTION
RuntimeError

If the function raises a StopIteration.

Source code in packages/graph-retriever/src/graph_retriever/utils/run_in_executor.py
async def run_in_executor(
    executor: Executor | None,
    func: Callable[P, T],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:  # noqa: DOC502
    """
    Run a function in an executor.

    Parameters
    ----------
    executor :
        The executor to run in.
    func :
        The function.
    *args :
        The positional arguments to the function.
    kwargs :
        The keyword arguments to the function.

    Returns
    -------
    :
        The output of the function.

    Raises
    ------
    RuntimeError
        If the function raises a StopIteration.
    """  # noqa: DOC502

    def wrapper() -> T:
        try:
            return func(*args, **kwargs)
        except StopIteration as exc:
            # StopIteration can't be set on an asyncio.Future
            # it raises a TypeError and leaves the Future pending forever
            # so we need to convert it to a RuntimeError
            raise RuntimeError from exc

    if executor is None or isinstance(executor, dict):
        # Use default executor with context copied from current context
        return await asyncio.get_running_loop().run_in_executor(
            None,
            cast(Callable[..., T], partial(copy_context().run, wrapper)),
        )

    return await asyncio.get_running_loop().run_in_executor(executor, wrapper)

top_k

top_k

top_k(
    contents: Iterable[Content],
    *,
    embedding: list[float],
    k: int,
) -> list[Content]

Select the top-k contents from the given contet.

PARAMETER DESCRIPTION
contents

The content from which to select the top-K.

TYPE: Iterable[Content]

embedding

The embedding we're looking for.

TYPE: list[float]

k

The number of items to select.

TYPE: int

RETURNS DESCRIPTION
list[Content]

Top-K by similarity. All results will have their score set.

Source code in packages/graph-retriever/src/graph_retriever/utils/top_k.py
def top_k(
    contents: Iterable[Content],
    *,
    embedding: list[float],
    k: int,
) -> list[Content]:
    """
    Select the top-k contents from the given contet.

    Parameters
    ----------
    contents :
        The content from which to select the top-K.
    embedding: list[float]
        The embedding we're looking for.
    k :
        The number of items to select.

    Returns
    -------
    list[Content]
        Top-K by similarity. All results will have their `score` set.
    """
    # TODO: Consider handling specially cases of already-sorted batches (merge).
    # TODO: Consider passing threshold here to limit results.

    # Use dicts to de-duplicate by ID. This ensures we choose the top K distinct
    # content (rather than K copies of the same content).
    scored = {c.id: c for c in contents if c.score is not None}
    unscored = {c.id: c for c in contents if c.score is None if c.id not in scored}

    if unscored:
        top_unscored = _similarity_sort_top_k(
            list(unscored.values()), embedding=embedding, k=k
        )
        scored.update(top_unscored)

    sorted = list(scored.values())
    sorted.sort(key=_score, reverse=True)

    return sorted[:k]