Skip to content

graph_rag_example_helpers

datasets

animals

fetch_documents

fetch_documents() -> list[Document]

Download and parse a list of Documents for use with Graph Retriever.

This is a small example dataset with useful links.

This method downloads the dataset each time -- generally it is preferable to invoke this only once and store the documents in memory or a vector store.

RETURNS DESCRIPTION
list[Document]

The fetched animal documents.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/animals/fetch.py
def fetch_documents() -> list[Document]:
    """
    Download and parse a list of Documents for use with Graph Retriever.

    This is a small example dataset with useful links.

    This method downloads the dataset each time -- generally it is preferable
    to invoke this only once and store the documents in memory or a vector
    store.

    Returns
    -------
    :
        The fetched animal documents.
    """
    response = requests.get(ANIMALS_JSONL_URL)
    response.raise_for_status()  # Ensure we got a valid response

    return [
        Document(id=data["id"], page_content=data["text"], metadata=data["metadata"])
        for line in response.text.splitlines()
        if (data := json.loads(line))
    ]

fetch

fetch_documents
fetch_documents() -> list[Document]

Download and parse a list of Documents for use with Graph Retriever.

This is a small example dataset with useful links.

This method downloads the dataset each time -- generally it is preferable to invoke this only once and store the documents in memory or a vector store.

RETURNS DESCRIPTION
list[Document]

The fetched animal documents.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/animals/fetch.py
def fetch_documents() -> list[Document]:
    """
    Download and parse a list of Documents for use with Graph Retriever.

    This is a small example dataset with useful links.

    This method downloads the dataset each time -- generally it is preferable
    to invoke this only once and store the documents in memory or a vector
    store.

    Returns
    -------
    :
        The fetched animal documents.
    """
    response = requests.get(ANIMALS_JSONL_URL)
    response.raise_for_status()  # Ensure we got a valid response

    return [
        Document(id=data["id"], page_content=data["text"], metadata=data["metadata"])
        for line in response.text.splitlines()
        if (data := json.loads(line))
    ]

astrapy

fetch_documents

fetch_documents() -> list[Document]

Download and parse a list of Documents for use with Graph Retriever.

This dataset contains the documentation for the AstraPy project as of version 1.5.2.

This method downloads the dataset each time -- generally it is preferable to invoke this only once and store the documents in memory or a vector store.

RETURNS DESCRIPTION
list[Document]

The fetched astra-py documentation Documents.

Notes
  • The dataset is setup in a way where the path of the item is the id, the pydoc description is the page_content, and the items other attributes are stored in the metadata.
  • There are many documents that contain an id and metadata, but no page_content.
Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/astrapy/fetch.py
def fetch_documents() -> list[Document]:
    """
    Download and parse a list of Documents for use with Graph Retriever.

    This dataset contains the documentation for the AstraPy project as of version 1.5.2.

    This method downloads the dataset each time -- generally it is preferable
    to invoke this only once and store the documents in memory or a vector
    store.

    Returns
    -------
    :
        The fetched astra-py documentation Documents.

    Notes
    -----
    - The dataset is setup in a way where the path of the item is the `id`, the pydoc
    description is the `page_content`, and the items other attributes are stored in the
    `metadata`.
    - There are many documents that contain an id and metadata, but no page_content.
    """
    response = requests.get(ASTRAPY_JSONL_URL)
    response.raise_for_status()  # Ensure we got a valid response

    return [
        Document(id=data["id"], page_content=data["text"], metadata=data["metadata"])
        for line in response.text.splitlines()
        if (data := json.loads(line))
    ]

fetch

fetch_documents
fetch_documents() -> list[Document]

Download and parse a list of Documents for use with Graph Retriever.

This dataset contains the documentation for the AstraPy project as of version 1.5.2.

This method downloads the dataset each time -- generally it is preferable to invoke this only once and store the documents in memory or a vector store.

RETURNS DESCRIPTION
list[Document]

The fetched astra-py documentation Documents.

Notes
  • The dataset is setup in a way where the path of the item is the id, the pydoc description is the page_content, and the items other attributes are stored in the metadata.
  • There are many documents that contain an id and metadata, but no page_content.
Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/astrapy/fetch.py
def fetch_documents() -> list[Document]:
    """
    Download and parse a list of Documents for use with Graph Retriever.

    This dataset contains the documentation for the AstraPy project as of version 1.5.2.

    This method downloads the dataset each time -- generally it is preferable
    to invoke this only once and store the documents in memory or a vector
    store.

    Returns
    -------
    :
        The fetched astra-py documentation Documents.

    Notes
    -----
    - The dataset is setup in a way where the path of the item is the `id`, the pydoc
    description is the `page_content`, and the items other attributes are stored in the
    `metadata`.
    - There are many documents that contain an id and metadata, but no page_content.
    """
    response = requests.get(ASTRAPY_JSONL_URL)
    response.raise_for_status()  # Ensure we got a valid response

    return [
        Document(id=data["id"], page_content=data["text"], metadata=data["metadata"])
        for line in response.text.splitlines()
        if (data := json.loads(line))
    ]

wikimultihop

BatchPreparer module-attribute

BatchPreparer = Callable[
    [Iterator[bytes]], Iterator[Document]
]

Function to apply to batches of lines to produce the document.

aload_2wikimultihop async

aload_2wikimultihop(
    limit: int | None,
    *,
    full_para_with_hyperlink_zip_path: str,
    store: VectorStore,
    batch_prepare: BatchPreparer,
) -> None

Load 2wikimultihop data into the given VectorStore.

PARAMETER DESCRIPTION
limit

Maximum number of lines to load. If a number less than one thousand, limits loading to the given number of lines. If None, loads all content.

TYPE: int | None

full_para_with_hyperlink_zip_path

Path to para_with_hyperlink.zip downloaded following the instructions in 2wikimultihop.

TYPE: str

store

The VectorStore to populate.

TYPE: VectorStore

batch_prepare

Function to apply to batches of lines to produce the document.

TYPE: BatchPreparer

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/wikimultihop/load.py
async def aload_2wikimultihop(
    limit: int | None,
    *,
    full_para_with_hyperlink_zip_path: str,
    store: VectorStore,
    batch_prepare: BatchPreparer,
) -> None:
    """
    Load 2wikimultihop data into the given `VectorStore`.

    Parameters
    ----------
    limit :
        Maximum number of lines to load.
        If a number less than one thousand, limits loading to the given number of lines.
        If `None`, loads all content.
    full_para_with_hyperlink_zip_path :
        Path to `para_with_hyperlink.zip` downloaded following the instructions
        in
        [2wikimultihop](https://github.com/Alab-NII/2wikimultihop?tab=readme-ov-file#new-update-april-7-2021).
    store :
        The VectorStore to populate.
    batch_prepare :
        Function to apply to batches of lines to produce the document.
    """
    if limit is None or limit > LINES_IN_FILE:
        limit = LINES_IN_FILE

    if limit <= 1000:
        local_path = "../../data/para_with_hyperlink_short.jsonl"
        if os.path.isfile(local_path):
            for batch in batched(
                itertools.islice(open(local_path, "rb").readlines(), limit), BATCH_SIZE
            ):
                docs = batch_prepare(iter(batch))
                store.add_documents(list(docs))
            print(f"Loaded from {local_path}")  # noqa: T201
        else:
            print(f"{local_path} not found, fetching short dataset")  # noqa: T201
            response = requests.get(SHORT_URL)
            response.raise_for_status()  # Ensure we get a valid response

            for batch in batched(
                itertools.islice(response.content.splitlines(), limit), BATCH_SIZE
            ):
                docs = batch_prepare(iter(batch))
                store.add_documents(list(docs))
            print(f"Loaded from {SHORT_URL}")  # noqa: T201
        return

    assert os.path.isfile(full_para_with_hyperlink_zip_path)
    persistence = PersistentIteration(
        journal_name="load_2wikimultihop.jrnl",
        iterator=batched(
            itertools.islice(wikipedia_lines(full_para_with_hyperlink_zip_path), limit),
            BATCH_SIZE,
        ),
    )
    total_batches = ceil(limit / BATCH_SIZE) - persistence.completed_count()
    if persistence.completed_count() > 0:
        print(  # noqa: T201
            f"Resuming loading with {persistence.completed_count()}"
            f" completed, {total_batches} remaining"
        )

    @backoff.on_exception(
        backoff.expo,
        EXCEPTIONS_TO_RETRY,
        max_tries=MAX_RETRIES,
    )
    async def add_docs(batch_docs, offset) -> None:
        from astrapy.exceptions import InsertManyException

        try:
            await store.aadd_documents(batch_docs)
            persistence.ack(offset)
        except InsertManyException as err:
            for err_desc in err.error_descriptors:
                if err_desc.error_code != "DOCUMENT_ALREADY_EXISTS":
                    print(err_desc)  # noqa: T201
            raise

    # We can't use asyncio.TaskGroup in 3.10. This would be simpler with that.
    tasks: list[asyncio.Task] = []

    for offset, batch_lines in tqdm(persistence, total=total_batches):
        batch_docs = batch_prepare(batch_lines)
        if batch_docs:
            task = asyncio.create_task(add_docs(batch_docs, offset))

            # It is OK if tasks are lost upon failure since that means we're
            # aborting the loading.
            tasks.append(task)

            while len(tasks) >= MAX_IN_FLIGHT:
                completed, pending = await asyncio.wait(
                    tasks, return_when=asyncio.FIRST_COMPLETED
                )
                for complete in completed:
                    if (e := complete.exception()) is not None:
                        print(f"Exception in task: {e}")  # noqa: T201
                tasks = list(pending)
        else:
            persistence.ack(offset)

    # Make sure all the tasks are done.
    # This wouldn't be necessary if we used a taskgroup, but that is Python 3.11+.
    while len(tasks) > 0:
        completed, pending = await asyncio.wait(
            tasks, return_when=asyncio.FIRST_COMPLETED
        )
        for complete in completed:
            if (e := complete.exception()) is not None:
                print(f"Exception in task: {e}")  # noqa: T201
        tasks = list(pending)

    assert len(tasks) == 0
    assert persistence.pending_count() == 0

load

BatchPreparer module-attribute
BatchPreparer = Callable[
    [Iterator[bytes]], Iterator[Document]
]

Function to apply to batches of lines to produce the document.

aload_2wikimultihop async
aload_2wikimultihop(
    limit: int | None,
    *,
    full_para_with_hyperlink_zip_path: str,
    store: VectorStore,
    batch_prepare: BatchPreparer,
) -> None

Load 2wikimultihop data into the given VectorStore.

PARAMETER DESCRIPTION
limit

Maximum number of lines to load. If a number less than one thousand, limits loading to the given number of lines. If None, loads all content.

TYPE: int | None

full_para_with_hyperlink_zip_path

Path to para_with_hyperlink.zip downloaded following the instructions in 2wikimultihop.

TYPE: str

store

The VectorStore to populate.

TYPE: VectorStore

batch_prepare

Function to apply to batches of lines to produce the document.

TYPE: BatchPreparer

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/wikimultihop/load.py
async def aload_2wikimultihop(
    limit: int | None,
    *,
    full_para_with_hyperlink_zip_path: str,
    store: VectorStore,
    batch_prepare: BatchPreparer,
) -> None:
    """
    Load 2wikimultihop data into the given `VectorStore`.

    Parameters
    ----------
    limit :
        Maximum number of lines to load.
        If a number less than one thousand, limits loading to the given number of lines.
        If `None`, loads all content.
    full_para_with_hyperlink_zip_path :
        Path to `para_with_hyperlink.zip` downloaded following the instructions
        in
        [2wikimultihop](https://github.com/Alab-NII/2wikimultihop?tab=readme-ov-file#new-update-april-7-2021).
    store :
        The VectorStore to populate.
    batch_prepare :
        Function to apply to batches of lines to produce the document.
    """
    if limit is None or limit > LINES_IN_FILE:
        limit = LINES_IN_FILE

    if limit <= 1000:
        local_path = "../../data/para_with_hyperlink_short.jsonl"
        if os.path.isfile(local_path):
            for batch in batched(
                itertools.islice(open(local_path, "rb").readlines(), limit), BATCH_SIZE
            ):
                docs = batch_prepare(iter(batch))
                store.add_documents(list(docs))
            print(f"Loaded from {local_path}")  # noqa: T201
        else:
            print(f"{local_path} not found, fetching short dataset")  # noqa: T201
            response = requests.get(SHORT_URL)
            response.raise_for_status()  # Ensure we get a valid response

            for batch in batched(
                itertools.islice(response.content.splitlines(), limit), BATCH_SIZE
            ):
                docs = batch_prepare(iter(batch))
                store.add_documents(list(docs))
            print(f"Loaded from {SHORT_URL}")  # noqa: T201
        return

    assert os.path.isfile(full_para_with_hyperlink_zip_path)
    persistence = PersistentIteration(
        journal_name="load_2wikimultihop.jrnl",
        iterator=batched(
            itertools.islice(wikipedia_lines(full_para_with_hyperlink_zip_path), limit),
            BATCH_SIZE,
        ),
    )
    total_batches = ceil(limit / BATCH_SIZE) - persistence.completed_count()
    if persistence.completed_count() > 0:
        print(  # noqa: T201
            f"Resuming loading with {persistence.completed_count()}"
            f" completed, {total_batches} remaining"
        )

    @backoff.on_exception(
        backoff.expo,
        EXCEPTIONS_TO_RETRY,
        max_tries=MAX_RETRIES,
    )
    async def add_docs(batch_docs, offset) -> None:
        from astrapy.exceptions import InsertManyException

        try:
            await store.aadd_documents(batch_docs)
            persistence.ack(offset)
        except InsertManyException as err:
            for err_desc in err.error_descriptors:
                if err_desc.error_code != "DOCUMENT_ALREADY_EXISTS":
                    print(err_desc)  # noqa: T201
            raise

    # We can't use asyncio.TaskGroup in 3.10. This would be simpler with that.
    tasks: list[asyncio.Task] = []

    for offset, batch_lines in tqdm(persistence, total=total_batches):
        batch_docs = batch_prepare(batch_lines)
        if batch_docs:
            task = asyncio.create_task(add_docs(batch_docs, offset))

            # It is OK if tasks are lost upon failure since that means we're
            # aborting the loading.
            tasks.append(task)

            while len(tasks) >= MAX_IN_FLIGHT:
                completed, pending = await asyncio.wait(
                    tasks, return_when=asyncio.FIRST_COMPLETED
                )
                for complete in completed:
                    if (e := complete.exception()) is not None:
                        print(f"Exception in task: {e}")  # noqa: T201
                tasks = list(pending)
        else:
            persistence.ack(offset)

    # Make sure all the tasks are done.
    # This wouldn't be necessary if we used a taskgroup, but that is Python 3.11+.
    while len(tasks) > 0:
        completed, pending = await asyncio.wait(
            tasks, return_when=asyncio.FIRST_COMPLETED
        )
        for complete in completed:
            if (e := complete.exception()) is not None:
                print(f"Exception in task: {e}")  # noqa: T201
        tasks = list(pending)

    assert len(tasks) == 0
    assert persistence.pending_count() == 0
wikipedia_lines
wikipedia_lines(
    para_with_hyperlink_zip_path: str,
) -> Iterable[bytes]

Return iterable of lines from the wikipedia file.

PARAMETER DESCRIPTION
para_with_hyperlink_zip_path

Path to para_with_hyperlink.zip downloaded following the instructions in 2wikimultihop.

TYPE: str

YIELDS DESCRIPTION
str

Lines from the Wikipedia file.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/datasets/wikimultihop/load.py
def wikipedia_lines(para_with_hyperlink_zip_path: str) -> Iterable[bytes]:
    """
    Return iterable of lines from the wikipedia file.

    Parameters
    ----------
    para_with_hyperlink_zip_path :
        Path to `para_with_hyperlink.zip` downloaded following the instructions
        in
        [2wikimultihop](https://github.com/Alab-NII/2wikimultihop?tab=readme-ov-file#new-update-april-7-2021).

    Yields
    ------
    str
        Lines from the Wikipedia file.
    """
    with zipfile.ZipFile(para_with_hyperlink_zip_path, "r") as archive:
        with archive.open("para_with_hyperlink.jsonl", "r") as para_with_hyperlink:
            yield from para_with_hyperlink

env

NON_SECRETS module-attribute

NON_SECRETS = {
    "ASTRA_DB_API_ENDPOINT",
    "ASTRA_DB_DATABASE_ID",
}

Environment variables that can use input instead of getpass.

Environment

Bases: Enum

Enumeration of supported environments for examples.

ASTRAPY class-attribute instance-attribute

ASTRAPY = auto()

Environment variables for connecting to AstraDB via AstraPy

CASSIO class-attribute instance-attribute

CASSIO = auto()

Environment variables for connecting to AstraDB via CassIO

required_envvars

required_envvars() -> list[str]

Return the required environment variables for this environment.

RETURNS DESCRIPTION
list[str]

The environment variables required in this environment.

RAISES DESCRIPTION
ValueError

If the environment isn't recognized.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/env.py
def required_envvars(self) -> list[str]:
    """
    Return the required environment variables for this environment.

    Returns
    -------
    :
        The environment variables required in this environment.

    Raises
    ------
    ValueError
        If the environment isn't recognized.
    """
    required = ["OPENAI_API_KEY", "ASTRA_DB_APPLICATION_TOKEN"]
    if self == Environment.CASSIO:
        required.append("ASTRA_DB_DATABASE_ID")
    elif self == Environment.ASTRAPY:
        required.append("ASTRA_DB_API_ENDPOINT")
    else:
        raise ValueError(f"Unrecognized environment '{self}")
    return required

initialize_environment

initialize_environment(env: Environment = CASSIO)

Initialize the environment variables.

PARAMETER DESCRIPTION
env

The environment to initialize

TYPE: Environment DEFAULT: CASSIO

Notes
This uses the following:

1. If a `.env` file is found, load environment variables from that.
2. If not, and running in colab, set necessary environment variables from
    secrets.
3. If necessary variables aren't set by the above, then prompts the user.
Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/env.py
def initialize_environment(env: Environment = Environment.CASSIO):
    """
    Initialize the environment variables.

    Parameters
    ----------
    env :
        The environment to initialize

    Notes
    -----
        This uses the following:

        1. If a `.env` file is found, load environment variables from that.
        2. If not, and running in colab, set necessary environment variables from
            secrets.
        3. If necessary variables aren't set by the above, then prompts the user.
    """
    # 1. If a `.env` file is found, load environment variables from that.
    if dotenv_path := find_dotenv():
        load_dotenv(dotenv_path)
        verify_environment(env)
        return

    # 2. If not, and running in colab, set necesary environment variables from secrets.
    try:
        initialize_from_colab_userdata(env)
        verify_environment(env)
        return
    except (ImportError, ModuleNotFoundError):
        pass

    # 3. Initialize from prompts.
    initialize_from_prompts(env)
    verify_environment(env)

initialize_from_colab_userdata

initialize_from_colab_userdata(env: Environment = CASSIO)

Try to initialize environment from colab userdata.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/env.py
def initialize_from_colab_userdata(env: Environment = Environment.CASSIO):
    """Try to initialize environment from colab `userdata`."""
    from google.colab import userdata  # type: ignore[import-untyped]

    for required in env.required_envvars():
        os.environ[required] = userdata.get(required)

    try:
        os.environ["ASTRA_DB_KEYSPACE"] = userdata.get("ASTRA_DB_KEYSPACE")
    except userdata.SecretNotFoundError as _:
        # User doesn't have a keyspace set, so use the default.
        os.environ.pop("ASTRA_DB_KEYSPACE", None)

    try:
        os.environ["LANGCHAIN_API_KEY"] = userdata.get("LANGCHAIN_API_KEY")
        os.environ["LANGCHAIN_TRACING_V2"] = "True"
    except (userdata.SecretNotFoundError, userdata.NotebookAccessError):
        print("Colab Secret not set / accessible. Not configuring tracing")  # noqa: T201
        os.environ.pop("LANGCHAIN_API_KEY")
        os.environ.pop("LANGCHAIN_TRACING_V2")

initialize_from_prompts

initialize_from_prompts(env: Environment = CASSIO)

Initialize the environment by prompting the user.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/env.py
def initialize_from_prompts(env: Environment = Environment.CASSIO):
    """Initialize the environment by prompting the user."""
    import getpass

    for required in env.required_envvars():
        if required in os.environ:
            continue
        elif required in NON_SECRETS:
            os.environ[required] = input(required)
        else:
            os.environ[required] = getpass.getpass(required)

verify_environment

verify_environment(env: Environment = CASSIO)

Verify the necessary environment variables are set.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/env.py
def verify_environment(env: Environment = Environment.CASSIO):
    """Verify the necessary environment variables are set."""
    for required in env.required_envvars():
        assert required in os.environ, f'"{required}" not defined in environment'

examples

code_generation

format_docs

format_docs(docs: list[Document]) -> str

Format documents as documentation for including as context in a LLM query.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/examples/code_generation/format.py
def format_docs(docs: list[Document]) -> str:
    """Format documents as documentation for including as context in a LLM query."""
    return "\n---\n".join(format_document(doc) for doc in docs)

format_document

format_document(doc: Document, debug: bool = False) -> str

Format a document as documentation for including as context in a LLM query.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/examples/code_generation/format.py
def format_document(doc: Document, debug: bool = False) -> str:
    """Format a document as documentation for including as context in a LLM query."""
    metadata = doc.metadata
    text = f"{metadata['name']} ({metadata['kind']})\n\n"

    text += f"path: \n\t{metadata['path']}\n\n"

    for key in ["bases", "exports", "implemented_by"]:
        if key in metadata:
            values = "\n".join(metadata[key])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    if "properties" in metadata:
        props = [f"{k}: {v}" for k, v in metadata["properties"].items()]
        values = "\n".join(props)
        text += f"properties: \n\t{_add_tabs(values)}\n\n"

    if doc.page_content != "":
        text += f"description: \n\t{_add_tabs(doc.page_content)}\n\n"
    elif "value" in metadata:
        text += f"{metadata['value']}\n\n"

    for key in ["attributes", "parameters"]:
        if key in metadata:
            values = "\n\n".join([_format_parameter(v) for v in metadata[key]])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    for key in ["returns", "yields"]:
        if key in metadata:
            values = "\n\n".join([_format_return(v) for v in metadata[key]])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    for key in ["note", "example"]:
        if key in metadata:
            text += f"{key}: \n\t{_add_tabs(metadata[key])}\n\n"

    if debug:
        if "imports" in metadata:
            imports = []
            for as_name, real_name in metadata["imports"].items():
                if real_name == as_name:
                    imports.append(real_name)
                else:
                    imports.append(f"{real_name} as {as_name}")
            values = "\n".join(imports)
            text += f"imports: \n\t{_add_tabs(values)}\n\n"

        for key in ["references", "gathered_types"]:
            if key in metadata:
                values = "\n".join(metadata[key])
                text += f"{key}: \n\t{_add_tabs(values)}\n\n"

        if "parent" in metadata:
            text += f"parent: {metadata['parent']}\n\n"

    return text

converter

convert
convert(
    package_name: str,
    search_paths: list[str],
    docstring_parser: DocstringStyle,
    output_path: str,
) -> None

Load and convert a package's objects and documentation into a JSONL file.

This method converts the internal documentation of modules, classes, functions, and attributes of a package into a format that is better suited for RAG (and GraphRAG in particular).

The code uses the griffe library, which is a Python code analysis tool that extracts information from Python code and docstrings.

The JSONL file contains one JSON object per line, with the following structure: id: the path to the object in the package text: the description of the object (if any, can be empty) metadata: Always includes name, path, kind keys. The remaining keys below are included when available. name: the name of the object path: the path to the object in the package kind: either module, class, function, or attribute parameters: the parameters for a class or function. Includes type information, default values, and descriptions attributes: the attributes on a class or module. Includes type information and descriptions gathered_types: list of non-standard types in the parameters and attributes imports: list of non-standard types imported by the class or module exports: list of non-standard types exported by the module properties: list of boolean properties about the module example: any code examples for the class, function, or module references: list of any non-standard types used in the example code returns: the return type and description yields: the yield type and description bases: list of base types inherited by the class implemented_by: list of types that implement the a base class

PARAMETER DESCRIPTION
package_name

The name of the package to convert.

TYPE: str

search_paths

The paths to search for the package.

TYPE: list[str]

docstring_parser

The docstring parser to use.

TYPE: DocstringStyle

output_path

The path to save the JSONL file.

TYPE: str

Examples:

from graph_rag_example_helpers.examples.code_generation.converter import convert convert("astrapy", [".venv/lib/python3.12/site-packages"], "google", "data")

Notes
  • This code was written the code-generation example and astrapy==1.5.2. It will probably need tweaking for use with other python packages. Use at your own risk.
Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/examples/code_generation/converter.py
def convert(
    package_name: str,
    search_paths: list[str],
    docstring_parser: griffe.DocstringStyle,
    output_path: str,
) -> None:
    """
    Load and convert a package's objects and documentation into a JSONL file.

    This method converts the internal documentation of modules, classes, functions, and
    attributes of a package into a format that is better suited for RAG (and GraphRAG
    in particular).

    The code uses the `griffe` library, which is a Python code analysis tool that
    extracts information from Python code and docstrings.

    The JSONL file contains one JSON object per line, with the following structure:
        id: the path to the object in the package
        text: the description of the object (if any, can be empty)
        metadata: Always includes `name`, `path`, `kind` keys.
                  The remaining keys below are included when available.
            name: the name of the object
            path: the path to the object in the package
            kind: either `module`, `class`, `function`, or `attribute`
            parameters: the parameters for a class or function. Includes type
                information, default values, and descriptions
            attributes: the attributes on a class or module. Includes type
                information and descriptions
            gathered_types: list of non-standard types in the parameters and attributes
            imports: list of non-standard types imported by the class or module
            exports: list of non-standard types exported by the module
            properties: list of boolean properties about the module
            example: any code examples for the class, function, or module
            references: list of any non-standard types used in the example code
            returns: the return type and description
            yields: the yield type and description
            bases: list of base types inherited by the class
            implemented_by: list of types that implement the a base class


    Parameters
    ----------
    package_name :
        The name of the package to convert.
    search_paths :
        The paths to search for the package.
    docstring_parser :
        The docstring parser to use.
    output_path :
        The path to save the JSONL file.


    Examples
    --------
    from graph_rag_example_helpers.examples.code_generation.converter import convert
    convert("astrapy", [".venv/lib/python3.12/site-packages"], "google", "data")


    Notes
    -----
    - This code was written the `code-generation` example and `astrapy==1.5.2`. It will
      probably need tweaking for use with other python packages. Use at your own risk.
    """
    my_package = griffe.load(
        package_name, search_paths=search_paths, docstring_parser=docstring_parser
    )

    converter = _Converter()
    items = converter._convert(package_name, my_package)

    with open(os.path.join(output_path, f"{package_name}.jsonl"), "w") as f:
        for item in items:
            text = item.pop("text", "")
            id = item.get("path")
            metadata = item
            for key, value in metadata.items():
                if isinstance(value, set):
                    metadata[key] = list(value)
            f.write(json.dumps({"id": id, "text": text, "metadata": metadata}))
            f.write("\n")

format

format_docs
format_docs(docs: list[Document]) -> str

Format documents as documentation for including as context in a LLM query.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/examples/code_generation/format.py
def format_docs(docs: list[Document]) -> str:
    """Format documents as documentation for including as context in a LLM query."""
    return "\n---\n".join(format_document(doc) for doc in docs)
format_document
format_document(doc: Document, debug: bool = False) -> str

Format a document as documentation for including as context in a LLM query.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/examples/code_generation/format.py
def format_document(doc: Document, debug: bool = False) -> str:
    """Format a document as documentation for including as context in a LLM query."""
    metadata = doc.metadata
    text = f"{metadata['name']} ({metadata['kind']})\n\n"

    text += f"path: \n\t{metadata['path']}\n\n"

    for key in ["bases", "exports", "implemented_by"]:
        if key in metadata:
            values = "\n".join(metadata[key])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    if "properties" in metadata:
        props = [f"{k}: {v}" for k, v in metadata["properties"].items()]
        values = "\n".join(props)
        text += f"properties: \n\t{_add_tabs(values)}\n\n"

    if doc.page_content != "":
        text += f"description: \n\t{_add_tabs(doc.page_content)}\n\n"
    elif "value" in metadata:
        text += f"{metadata['value']}\n\n"

    for key in ["attributes", "parameters"]:
        if key in metadata:
            values = "\n\n".join([_format_parameter(v) for v in metadata[key]])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    for key in ["returns", "yields"]:
        if key in metadata:
            values = "\n\n".join([_format_return(v) for v in metadata[key]])
            text += f"{key}: \n\t{_add_tabs(values)}\n\n"

    for key in ["note", "example"]:
        if key in metadata:
            text += f"{key}: \n\t{_add_tabs(metadata[key])}\n\n"

    if debug:
        if "imports" in metadata:
            imports = []
            for as_name, real_name in metadata["imports"].items():
                if real_name == as_name:
                    imports.append(real_name)
                else:
                    imports.append(f"{real_name} as {as_name}")
            values = "\n".join(imports)
            text += f"imports: \n\t{_add_tabs(values)}\n\n"

        for key in ["references", "gathered_types"]:
            if key in metadata:
                values = "\n".join(metadata[key])
                text += f"{key}: \n\t{_add_tabs(values)}\n\n"

        if "parent" in metadata:
            text += f"parent: {metadata['parent']}\n\n"

    return text

persistent_iteration

Offset dataclass

Offset(index: int)

Class for tracking a position in the iteraiton.

PersistentIteration

PersistentIteration(
    journal_name: str, iterator: Iterator[T]
)

Bases: Generic[T]

Create a persistent iteration.

This creates a journal file with the name journal_name containing the indices of completed items. When resuming iteration, the already processed indices will be skipped.

PARAMETER DESCRIPTION
journal_name

Name of the journal file to use. If it doesn't exist it will be created. The indices of completed items will be written to the journal.

TYPE: str

iterator

The iterator to process persistently. It must be deterministic -- elements should always be returned in the same order on restarts.

TYPE: Iterator[T]

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def __init__(self, journal_name: str, iterator: Iterator[T]) -> None:
    self.iterator = enumerate(iterator)
    self.pending: dict[Offset, T] = {}

    self._completed = set()
    try:
        read_journal = open(journal_name)
        for line in read_journal:
            self._completed.add(Offset(index=int(line)))
    except FileNotFoundError:
        pass

    self._write_journal = open(journal_name, "a")

__iter__

__iter__() -> Iterator[tuple[Offset, T]]

Iterate over pairs of offsets and elements.

RETURNS DESCRIPTION
Iterator[tuple[Offset, T]]
Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def __iter__(self) -> Iterator[tuple[Offset, T]]:
    """
    Iterate over pairs of offsets and elements.

    Returns
    -------
    :
    """
    return self

__next__

__next__() -> tuple[Offset, T]

Return the next offset and item.

RETURNS DESCRIPTION
offset

The offset of the next item. Should be acknowledge after the item is finished processing.

TYPE: Offset

item

The next item.

TYPE: T

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def __next__(self) -> tuple[Offset, T]:
    """
    Return the next offset and item.

    Returns
    -------
    offset :
        The offset of the next item. Should be acknowledge after the item
        is finished processing.
    item :
        The next item.
    """
    index, item = next(self.iterator)
    offset = Offset(index)

    while offset in self._completed:
        index, item = next(self.iterator)
        offset = Offset(index)

    self.pending[offset] = item
    return (offset, item)

ack

ack(offset: Offset) -> int

Acknowledge the given offset.

This should only be called after the elements in that offset have been persisted.

PARAMETER DESCRIPTION
offset

The offset to acknowledge.

TYPE: Offset

RETURNS DESCRIPTION
int

The numebr of pending elements.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def ack(self, offset: Offset) -> int:
    """
    Acknowledge the given offset.

    This should only be called after the elements in that offset have been
    persisted.

    Parameters
    ----------
    offset :
        The offset to acknowledge.

    Returns
    -------
    :
        The numebr of pending elements.
    """
    self._write_journal.write(f"{offset.index}\n")
    self._write_journal.flush()
    self._completed.add(offset)

    self.pending.pop(offset)
    return len(self.pending)

completed_count

completed_count() -> int

Return the numebr of completed elements.

RETURNS DESCRIPTION
int

The number of completed elements.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def completed_count(self) -> int:
    """
    Return the numebr of completed elements.

    Returns
    -------
    :
        The number of completed elements.
    """
    return len(self._completed)

pending_count

pending_count() -> int

Return the number of pending (not processed) elements.

RETURNS DESCRIPTION
int

The number of pending elements.

Source code in packages/graph-rag-example-helpers/src/graph_rag_example_helpers/persistent_iteration.py
def pending_count(self) -> int:
    """
    Return the number of pending (not processed) elements.

    Returns
    -------
    :
        The number of pending elements.
    """
    return len(self.pending)