Source code for langchain_community.utilities.github

"""Util that calls GitHub."""
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import requests
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env

if TYPE_CHECKING:
    from github.Issue import Issue
    from github.PullRequest import PullRequest


def _import_tiktoken() -> Any:
    """Import tiktoken."""
    try:
        import tiktoken
    except ImportError:
        raise ImportError(
            "tiktoken is not installed. "
            "Please install it with `pip install tiktoken`"
        )
    return tiktoken


[docs]class GitHubAPIWrapper(BaseModel): """Wrapper for GitHub API.""" github: Any #: :meta private: github_repo_instance: Any #: :meta private: github_repository: Optional[str] = None github_app_id: Optional[str] = None github_app_private_key: Optional[str] = None active_branch: Optional[str] = None github_base_branch: Optional[str] = None class Config: """Configuration for this pydantic object.""" extra = Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """Validate that api key and python package exists in environment.""" github_repository = get_from_dict_or_env( values, "github_repository", "GITHUB_REPOSITORY" ) github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID") github_app_private_key = get_from_dict_or_env( values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY" ) try: from github import Auth, GithubIntegration except ImportError: raise ImportError( "PyGithub is not installed. " "Please install it with `pip install PyGithub`" ) try: # interpret the key as a file path # fallback to interpreting as the key itself with open(github_app_private_key, "r") as f: private_key = f.read() except Exception: private_key = github_app_private_key auth = Auth.AppAuth( github_app_id, private_key, ) gi = GithubIntegration(auth=auth) installation = gi.get_installations()[0] # create a GitHub instance: g = installation.get_github_for_installation() repo = g.get_repo(github_repository) github_base_branch = get_from_dict_or_env( values, "github_base_branch", "GITHUB_BASE_BRANCH", default=repo.default_branch, ) active_branch = get_from_dict_or_env( values, "active_branch", "ACTIVE_BRANCH", default=repo.default_branch, ) values["github"] = g values["github_repo_instance"] = repo values["github_repository"] = github_repository values["github_app_id"] = github_app_id values["github_app_private_key"] = github_app_private_key values["active_branch"] = active_branch values["github_base_branch"] = github_base_branch return values
[docs] def parse_issues(self, issues: List[Issue]) -> List[dict]: """ Extracts title and number from each Issue and puts them in a dictionary Parameters: issues(List[Issue]): A list of Github Issue objects Returns: List[dict]: A dictionary of issue titles and numbers """ parsed = [] for issue in issues: title = issue.title number = issue.number opened_by = issue.user.login if issue.user else None issue_dict = {"title": title, "number": number} if opened_by is not None: issue_dict["opened_by"] = opened_by parsed.append(issue_dict) return parsed
[docs] def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]: """ Extracts title and number from each Issue and puts them in a dictionary Parameters: issues(List[Issue]): A list of Github Issue objects Returns: List[dict]: A dictionary of issue titles and numbers """ parsed = [] for pr in pull_requests: parsed.append( { "title": pr.title, "number": pr.number, "commits": str(pr.commits), "comments": str(pr.comments), } ) return parsed
[docs] def get_issues(self) -> str: """ Fetches all open issues from the repo excluding pull requests Returns: str: A plaintext report containing the number of issues and each issue's title and number. """ issues = self.github_repo_instance.get_issues(state="open") # Filter out pull requests (part of GH issues object) issues = [issue for issue in issues if not issue.pull_request] if issues: parsed_issues = self.parse_issues(issues) parsed_issues_str = ( "Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues) ) return parsed_issues_str else: return "No open issues available"
[docs] def list_open_pull_requests(self) -> str: """ Fetches all open PRs from the repo Returns: str: A plaintext report containing the number of PRs and each PR's title and number. """ # issues = self.github_repo_instance.get_issues(state="open") pull_requests = self.github_repo_instance.get_pulls(state="open") if pull_requests.totalCount > 0: parsed_prs = self.parse_pull_requests(pull_requests) parsed_prs_str = ( "Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs) ) return parsed_prs_str else: return "No open pull requests available"
[docs] def list_files_in_main_branch(self) -> str: """ Fetches all files in the main branch of the repo. Returns: str: A plaintext report containing the paths and names of the files. """ files: List[str] = [] try: contents = self.github_repo_instance.get_contents( "", ref=self.github_base_branch ) for content in contents: if content.type == "dir": files.extend(self.get_files_from_directory(content.path)) else: files.append(content.path) if files: files_str = "\n".join(files) return f"Found {len(files)} files in the main branch:\n{files_str}" else: return "No files found in the main branch" except Exception as e: return str(e)
[docs] def set_active_branch(self, branch_name: str) -> str: """Equivalent to `git checkout branch_name` for this Agent. Clones formatting from Github. Returns an Error (as a string) if branch doesn't exist. """ curr_branches = [ branch.name for branch in self.github_repo_instance.get_branches() ] if branch_name in curr_branches: self.active_branch = branch_name return f"Switched to branch `{branch_name}`" else: return ( f"Error {branch_name} does not exist," f"in repo with current branches: {str(curr_branches)}" )
[docs] def list_branches_in_repo(self) -> str: """ Fetches a list of all branches in the repository. Returns: str: A plaintext report containing the names of the branches. """ try: branches = [ branch.name for branch in self.github_repo_instance.get_branches() ] if branches: branches_str = "\n".join(branches) return ( f"Found {len(branches)} branches in the repository:" f"\n{branches_str}" ) else: return "No branches found in the repository" except Exception as e: return str(e)
[docs] def create_branch(self, proposed_branch_name: str) -> str: """ Create a new branch, and set it as the active bot branch. Equivalent to `git switch -c proposed_branch_name` If the proposed branch already exists, we append _v1 then _v2... until a unique name is found. Returns: str: A plaintext success message. """ from github import GithubException i = 0 new_branch_name = proposed_branch_name base_branch = self.github_repo_instance.get_branch( self.github_repo_instance.default_branch ) for i in range(1000): try: self.github_repo_instance.create_git_ref( ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha ) self.active_branch = new_branch_name return ( f"Branch '{new_branch_name}' " "created successfully, and set as current active branch." ) except GithubException as e: if e.status == 422 and "Reference already exists" in e.data["message"]: i += 1 new_branch_name = f"{proposed_branch_name}_v{i}" else: # Handle any other exceptions print(f"Failed to create branch. Error: {e}") raise Exception( "Unable to create branch name from proposed_branch_name: " f"{proposed_branch_name}" ) return ( "Unable to create branch. " "At least 1000 branches exist with named derived from " f"proposed_branch_name: `{proposed_branch_name}`" )
[docs] def list_files_in_bot_branch(self) -> str: """ Fetches all files in the active branch of the repo, the branch the bot uses to make changes. Returns: str: A plaintext list containing the the filepaths in the branch. """ files: List[str] = [] try: contents = self.github_repo_instance.get_contents( "", ref=self.active_branch ) for content in contents: if content.type == "dir": files.extend(self.get_files_from_directory(content.path)) else: files.append(content.path) if files: files_str = "\n".join(files) return ( f"Found {len(files)} files in branch `{self.active_branch}`:\n" f"{files_str}" ) else: return f"No files found in branch: `{self.active_branch}`" except Exception as e: return f"Error: {e}"
[docs] def get_files_from_directory(self, directory_path: str) -> str: """ Recursively fetches files from a directory in the repo. Parameters: directory_path (str): Path to the directory Returns: str: List of file paths, or an error message. """ from github import GithubException files: List[str] = [] try: contents = self.github_repo_instance.get_contents( directory_path, ref=self.active_branch ) except GithubException as e: return f"Error: status code {e.status}, {e.message}" for content in contents: if content.type == "dir": files.extend(self.get_files_from_directory(content.path)) else: files.append(content.path) return str(files)
[docs] def get_issue(self, issue_number: int) -> Dict[str, Any]: """ Fetches a specific issue and its first 10 comments Parameters: issue_number(int): The number for the github issue Returns: dict: A dictionary containing the issue's title, body, comments as a string, and the username of the user who opened the issue """ issue = self.github_repo_instance.get_issue(number=issue_number) page = 0 comments: List[dict] = [] while len(comments) <= 10: comments_page = issue.get_comments().get_page(page) if len(comments_page) == 0: break for comment in comments_page: comments.append({"body": comment.body, "user": comment.user.login}) page += 1 opened_by = None if issue.user and issue.user.login: opened_by = issue.user.login return { "number": issue_number, "title": issue.title, "body": issue.body, "comments": str(comments), "opened_by": str(opened_by), }
[docs] def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]: """Fetches the full text of all files in a PR. Truncates after first 3k tokens. # TODO: Enhancement to summarize files with ctags if they're getting long. Args: pr_number(int): The number of the pull request on Github Returns: dict: A dictionary containing the issue's title, body, and comments as a string """ tiktoken = _import_tiktoken() MAX_TOKENS_FOR_FILES = 3_000 pr_files = [] pr = self.github_repo_instance.get_pull(number=int(pr_number)) total_tokens = 0 page = 0 while True: # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES: files_page = pr.get_files().get_page(page) if len(files_page) == 0: break for file in files_page: try: file_metadata_response = requests.get(file.contents_url) if file_metadata_response.status_code == 200: download_url = json.loads(file_metadata_response.text)[ "download_url" ] else: print(f"Failed to download file: {file.contents_url}, skipping") continue file_content_response = requests.get(download_url) if file_content_response.status_code == 200: # Save the content as a UTF-8 string file_content = file_content_response.text else: print( "Failed downloading file content " f"(Error {file_content_response.status_code}). Skipping" ) continue file_tokens = len( tiktoken.get_encoding("cl100k_base").encode( file_content + file.filename + "file_name file_contents" ) ) if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES: pr_files.append( { "filename": file.filename, "contents": file_content, "additions": file.additions, "deletions": file.deletions, } ) total_tokens += file_tokens except Exception as e: print(f"Error when reading files from a PR on github. {e}") page += 1 return pr_files
[docs] def get_pull_request(self, pr_number: int) -> Dict[str, Any]: """ Fetches a specific pull request and its first 10 comments, limited by max_tokens. Parameters: pr_number(int): The number for the Github pull max_tokens(int): The maximum number of tokens in the response Returns: dict: A dictionary containing the pull's title, body, and comments as a string """ max_tokens = 2_000 pull = self.github_repo_instance.get_pull(number=pr_number) total_tokens = 0 def get_tokens(text: str) -> int: tiktoken = _import_tiktoken() return len(tiktoken.get_encoding("cl100k_base").encode(text)) def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None: nonlocal total_tokens # Declare total_tokens as nonlocal tokens = get_tokens(value) if total_tokens + tokens <= max_tokens: data_dict[key] = value total_tokens += tokens # Now this will modify the outer variable response_dict: Dict[str, str] = {} add_to_dict(response_dict, "title", pull.title) add_to_dict(response_dict, "number", str(pr_number)) add_to_dict(response_dict, "body", pull.body) comments: List[str] = [] page = 0 while len(comments) <= 10: comments_page = pull.get_issue_comments().get_page(page) if len(comments_page) == 0: break for comment in comments_page: comment_str = str({"body": comment.body, "user": comment.user.login}) if total_tokens + get_tokens(comment_str) > max_tokens: break comments.append(comment_str) total_tokens += get_tokens(comment_str) page += 1 add_to_dict(response_dict, "comments", str(comments)) commits: List[str] = [] page = 0 while len(commits) <= 10: commits_page = pull.get_commits().get_page(page) if len(commits_page) == 0: break for commit in commits_page: commit_str = str({"message": commit.commit.message}) if total_tokens + get_tokens(commit_str) > max_tokens: break commits.append(commit_str) total_tokens += get_tokens(commit_str) page += 1 add_to_dict(response_dict, "commits", str(commits)) return response_dict
[docs] def create_pull_request(self, pr_query: str) -> str: """ Makes a pull request from the bot's branch to the base branch Parameters: pr_query(str): a string which contains the PR title and the PR body. The title is the first line in the string, and the body are the rest of the string. For example, "Updated README\nmade changes to add info" Returns: str: A success or failure message """ if self.github_base_branch == self.active_branch: return """Cannot make a pull request because commits are already in the main or master branch.""" else: try: title = pr_query.split("\n")[0] body = pr_query[len(title) + 2 :] pr = self.github_repo_instance.create_pull( title=title, body=body, head=self.active_branch, base=self.github_base_branch, ) return f"Successfully created PR number {pr.number}" except Exception as e: return "Unable to make pull request due to error:\n" + str(e)
[docs] def comment_on_issue(self, comment_query: str) -> str: """ Adds a comment to a github issue Parameters: comment_query(str): a string which contains the issue number, two newlines, and the comment. for example: "1\n\nWorking on it now" adds the comment "working on it now" to issue 1 Returns: str: A success or failure message """ issue_number = int(comment_query.split("\n\n")[0]) comment = comment_query[len(str(issue_number)) + 2 :] try: issue = self.github_repo_instance.get_issue(number=issue_number) issue.create_comment(comment) return "Commented on issue " + str(issue_number) except Exception as e: return "Unable to make comment due to error:\n" + str(e)
[docs] def create_file(self, file_query: str) -> str: """ Creates a new file on the Github repo Parameters: file_query(str): a string which contains the file path and the file contents. The file path is the first line in the string, and the contents are the rest of the string. For example, "hello_world.md\n# Hello World!" Returns: str: A success or failure message """ if self.active_branch == self.github_base_branch: return ( "You're attempting to commit to the directly to the" f"{self.github_base_branch} branch, which is protected. " "Please create a new branch and try again." ) file_path = file_query.split("\n")[0] file_contents = file_query[len(file_path) + 2 :] try: try: file = self.github_repo_instance.get_contents( file_path, ref=self.active_branch ) if file: return ( f"File already exists at `{file_path}` " f"on branch `{self.active_branch}`. You must use " "`update_file` to modify it." ) except Exception: # expected behavior, file shouldn't exist yet pass self.github_repo_instance.create_file( path=file_path, message="Create " + file_path, content=file_contents, branch=self.active_branch, ) return "Created file " + file_path except Exception as e: return "Unable to make file due to error:\n" + str(e)
[docs] def read_file(self, file_path: str) -> str: """ Read a file from this agent's branch, defined by self.active_branch, which supports PR branches. Parameters: file_path(str): the file path Returns: str: The file decoded as a string, or an error message if not found """ try: file = self.github_repo_instance.get_contents( file_path, ref=self.active_branch ) return file.decoded_content.decode("utf-8") except Exception as e: return ( f"File not found `{file_path}` on branch" f"`{self.active_branch}`. Error: {str(e)}" )
[docs] def update_file(self, file_query: str) -> str: """ Updates a file with new content. Parameters: file_query(str): Contains the file path and the file contents. The old file contents is wrapped in OLD <<<< and >>>> OLD The new file contents is wrapped in NEW <<<< and >>>> NEW For example: /test/hello.txt OLD <<<< Hello Earth! >>>> OLD NEW <<<< Hello Mars! >>>> NEW Returns: A success or failure message """ if self.active_branch == self.github_base_branch: return ( "You're attempting to commit to the directly" f"to the {self.github_base_branch} branch, which is protected. " "Please create a new branch and try again." ) try: file_path: str = file_query.split("\n")[0] old_file_contents = ( file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip() ) new_file_contents = ( file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip() ) file_content = self.read_file(file_path) updated_file_content = file_content.replace( old_file_contents, new_file_contents ) if file_content == updated_file_content: return ( "File content was not updated because old content was not found." "It may be helpful to use the read_file action to get " "the current file contents." ) self.github_repo_instance.update_file( path=file_path, message="Update " + str(file_path), content=updated_file_content, branch=self.active_branch, sha=self.github_repo_instance.get_contents( file_path, ref=self.active_branch ).sha, ) return "Updated file " + str(file_path) except Exception as e: return "Unable to update file due to error:\n" + str(e)
[docs] def delete_file(self, file_path: str) -> str: """ Deletes a file from the repo Parameters: file_path(str): Where the file is Returns: str: Success or failure message """ if self.active_branch == self.github_base_branch: return ( "You're attempting to commit to the directly" f"to the {self.github_base_branch} branch, which is protected. " "Please create a new branch and try again." ) try: self.github_repo_instance.delete_file( path=file_path, message="Delete " + file_path, branch=self.active_branch, sha=self.github_repo_instance.get_contents( file_path, ref=self.active_branch ).sha, ) return "Deleted file " + file_path except Exception as e: return "Unable to delete file due to error:\n" + str(e)
[docs] def search_issues_and_prs(self, query: str) -> str: """ Searches issues and pull requests in the repository. Parameters: query(str): The search query Returns: str: A string containing the first 5 issues and pull requests """ search_result = self.github.search_issues(query, repo=self.github_repository) max_items = min(5, len(search_result)) results = [f"Top {max_items} results:"] for issue in search_result[:max_items]: results.append( f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}" ) return "\n".join(results)
[docs] def search_code(self, query: str) -> str: """ Searches code in the repository. # Todo: limit total tokens returned... Parameters: query(str): The search query Returns: str: A string containing, at most, the top 5 search results """ search_result = self.github.search_code( query=query, repo=self.github_repository ) if search_result.totalCount == 0: return "0 results found." max_results = min(5, search_result.totalCount) results = [f"Showing top {max_results} of {search_result.totalCount} results:"] count = 0 for code in search_result: if count >= max_results: break # Get the file content using the PyGithub get_contents method file_content = self.github_repo_instance.get_contents( code.path, ref=self.active_branch ).decoded_content.decode() results.append( f"Filepath: `{code.path}`\nFile contents: " f"{file_content}\n<END OF FILE>" ) count += 1 return "\n".join(results)
[docs] def create_review_request(self, reviewer_username: str) -> str: """ Creates a review request on *THE* open pull request that matches the current active_branch. Parameters: reviewer_username(str): The username of the person who is being requested Returns: str: A message confirming the creation of the review request """ pull_requests = self.github_repo_instance.get_pulls( state="open", sort="created" ) # find PR against active_branch pr = next( (pr for pr in pull_requests if pr.head.ref == self.active_branch), None ) if pr is None: return ( "No open pull request found for the " f"current branch `{self.active_branch}`" ) try: pr.create_review_request(reviewers=[reviewer_username]) return ( f"Review request created for user {reviewer_username} " f"on PR #{pr.number}" ) except Exception as e: return f"Failed to create a review request with error {e}"
[docs] def run(self, mode: str, query: str) -> str: if mode == "get_issue": return json.dumps(self.get_issue(int(query))) elif mode == "get_pull_request": return json.dumps(self.get_pull_request(int(query))) elif mode == "list_pull_request_files": return json.dumps(self.list_pull_request_files(int(query))) elif mode == "get_issues": return self.get_issues() elif mode == "comment_on_issue": return self.comment_on_issue(query) elif mode == "create_file": return self.create_file(query) elif mode == "create_pull_request": return self.create_pull_request(query) elif mode == "read_file": return self.read_file(query) elif mode == "update_file": return self.update_file(query) elif mode == "delete_file": return self.delete_file(query) elif mode == "list_open_pull_requests": return self.list_open_pull_requests() elif mode == "list_files_in_main_branch": return self.list_files_in_main_branch() elif mode == "list_files_in_bot_branch": return self.list_files_in_bot_branch() elif mode == "list_branches_in_repo": return self.list_branches_in_repo() elif mode == "set_active_branch": return self.set_active_branch(query) elif mode == "create_branch": return self.create_branch(query) elif mode == "get_files_from_directory": return self.get_files_from_directory(query) elif mode == "search_issues_and_prs": return self.search_issues_and_prs(query) elif mode == "search_code": return self.search_code(query) elif mode == "create_review_request": return self.create_review_request(query) else: raise ValueError("Invalid mode" + mode)