Source code for langchain_community.document_loaders.youtube

"""Loads YouTube transcript."""
from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse

from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.pydantic_v1.dataclasses import dataclass

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)

SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]


[docs]@dataclass class GoogleApiClient: """Generic Google API Client. To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google`` python package installed. As the google api expects credentials you need to set up a google account and register your Service. "https://developers.google.com/docs/api/quickstart/python" Example: .. code-block:: python from langchain_community.document_loaders import GoogleApiClient google_api_client = GoogleApiClient( service_account_path=Path("path_to_your_sec_file.json") ) """ credentials_path: Path = Path.home() / ".credentials" / "credentials.json" service_account_path: Path = Path.home() / ".credentials" / "credentials.json" token_path: Path = Path.home() / ".credentials" / "token.json" def __post_init__(self) -> None: self.creds = self._load_credentials()
[docs] @root_validator def validate_channel_or_videoIds_is_set( cls, values: Dict[str, Any] ) -> Dict[str, Any]: """Validate that either folder_id or document_ids is set, but not both.""" if not values.get("credentials_path") and not values.get( "service_account_path" ): raise ValueError("Must specify either channel_name or video_ids") return values
def _load_credentials(self) -> Any: """Load credentials.""" # Adapted from https://developers.google.com/drive/api/v3/quickstart/python try: from google.auth.transport.requests import Request from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401 except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "google-api-python-client google-auth-httplib2 " "google-auth-oauthlib " "youtube-transcript-api` " "to use the Google Drive loader" ) creds = None if self.service_account_path.exists(): return service_account.Credentials.from_service_account_file( str(self.service_account_path) ) if self.token_path.exists(): creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( str(self.credentials_path), SCOPES ) creds = flow.run_local_server(port=0) with open(self.token_path, "w") as token: token.write(creds.to_json()) return creds
ALLOWED_SCHEMAS = {"http", "https"} ALLOWED_NETLOCK = { "youtu.be", "m.youtube.com", "youtube.com", "www.youtube.com", "www.youtube-nocookie.com", "vid.plus", } def _parse_video_id(url: str) -> Optional[str]: """Parse a youtube url and return the video id if valid, otherwise None.""" parsed_url = urlparse(url) if parsed_url.scheme not in ALLOWED_SCHEMAS: return None if parsed_url.netloc not in ALLOWED_NETLOCK: return None path = parsed_url.path if path.endswith("/watch"): query = parsed_url.query parsed_query = parse_qs(query) if "v" in parsed_query: ids = parsed_query["v"] video_id = ids if isinstance(ids, str) else ids[0] else: return None else: path = parsed_url.path.lstrip("/") video_id = path.split("/")[-1] if len(video_id) != 11: # Video IDs are 11 characters long return None return video_id
[docs]class YoutubeLoader(BaseLoader): """Load `YouTube` transcripts."""
[docs] def __init__( self, video_id: str, add_video_info: bool = False, language: Union[str, Sequence[str]] = "en", translation: Optional[str] = None, continue_on_failure: bool = False, ): """Initialize with YouTube video ID.""" self.video_id = video_id self.add_video_info = add_video_info self.language = language if isinstance(language, str): self.language = [language] else: self.language = language self.translation = translation self.continue_on_failure = continue_on_failure
[docs] @staticmethod def extract_video_id(youtube_url: str) -> str: """Extract video id from common YT urls.""" video_id = _parse_video_id(youtube_url) if not video_id: raise ValueError( f"Could not determine the video ID for the URL {youtube_url}" ) return video_id
[docs] @classmethod def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader: """Given youtube URL, load video.""" video_id = cls.extract_video_id(youtube_url) return cls(video_id, **kwargs)
[docs] def load(self) -> List[Document]: """Load documents.""" try: from youtube_transcript_api import ( NoTranscriptFound, TranscriptsDisabled, YouTubeTranscriptApi, ) except ImportError: raise ImportError( "Could not import youtube_transcript_api python package. " "Please install it with `pip install youtube-transcript-api`." ) metadata = {"source": self.video_id} if self.add_video_info: # Get more video meta info # Such as title, description, thumbnail url, publish_date video_info = self._get_video_info() metadata.update(video_info) try: transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id) except TranscriptsDisabled: return [] try: transcript = transcript_list.find_transcript(self.language) except NoTranscriptFound: transcript = transcript_list.find_transcript(["en"]) if self.translation is not None: transcript = transcript.translate(self.translation) transcript_pieces = transcript.fetch() transcript = " ".join([t["text"].strip(" ") for t in transcript_pieces]) return [Document(page_content=transcript, metadata=metadata)]
def _get_video_info(self) -> dict: """Get important video information. Components are: - title - description - thumbnail url, - publish_date - channel_author - and more. """ try: from pytube import YouTube except ImportError: raise ImportError( "Could not import pytube python package. " "Please install it with `pip install pytube`." ) yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}") video_info = { "title": yt.title or "Unknown", "description": yt.description or "Unknown", "view_count": yt.views or 0, "thumbnail_url": yt.thumbnail_url or "Unknown", "publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S") if yt.publish_date else "Unknown", "length": yt.length or 0, "author": yt.author or "Unknown", } return video_info
[docs]@dataclass class GoogleApiYoutubeLoader(BaseLoader): """Load all Videos from a `YouTube` Channel. To use, you should have the ``googleapiclient,youtube_transcript_api`` python package installed. As the service needs a google_api_client, you first have to initialize the GoogleApiClient. Additionally you have to either provide a channel name or a list of videoids "https://developers.google.com/docs/api/quickstart/python" Example: .. code-block:: python from langchain_community.document_loaders import GoogleApiClient from langchain_community.document_loaders import GoogleApiYoutubeLoader google_api_client = GoogleApiClient( service_account_path=Path("path_to_your_sec_file.json") ) loader = GoogleApiYoutubeLoader( google_api_client=google_api_client, channel_name = "CodeAesthetic" ) load.load() """ google_api_client: GoogleApiClient channel_name: Optional[str] = None video_ids: Optional[List[str]] = None add_video_info: bool = True captions_language: str = "en" continue_on_failure: bool = False def __post_init__(self) -> None: self.youtube_client = self._build_youtube_client(self.google_api_client.creds) def _build_youtube_client(self, creds: Any) -> Any: try: from googleapiclient.discovery import build from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401 except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "google-api-python-client google-auth-httplib2 " "google-auth-oauthlib " "youtube-transcript-api` " "to use the Google Drive loader" ) return build("youtube", "v3", credentials=creds)
[docs] @root_validator def validate_channel_or_videoIds_is_set( cls, values: Dict[str, Any] ) -> Dict[str, Any]: """Validate that either folder_id or document_ids is set, but not both.""" if not values.get("channel_name") and not values.get("video_ids"): raise ValueError("Must specify either channel_name or video_ids") return values
def _get_transcripe_for_video_id(self, video_id: str) -> str: from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: transcript = transcript_list.find_transcript([self.captions_language]) except NoTranscriptFound: for available_transcript in transcript_list: transcript = available_transcript.translate(self.captions_language) continue transcript_pieces = transcript.fetch() return " ".join([t["text"].strip(" ") for t in transcript_pieces]) def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document: captions = self._get_transcripe_for_video_id(video_id) video_response = ( self.youtube_client.videos() .list( part="id,snippet", id=video_id, ) .execute() ) return Document( page_content=captions, metadata=video_response.get("items")[0], ) def _get_channel_id(self, channel_name: str) -> str: request = self.youtube_client.search().list( part="id", q=channel_name, type="channel", maxResults=1, # we only need one result since channel names are unique ) response = request.execute() channel_id = response["items"][0]["id"]["channelId"] return channel_id def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]: try: from youtube_transcript_api import ( NoTranscriptFound, TranscriptsDisabled, ) except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "youtube-transcript-api` " "to use the youtube loader" ) channel_id = self._get_channel_id(channel) request = self.youtube_client.search().list( part="id,snippet", channelId=channel_id, maxResults=50, # adjust this value to retrieve more or fewer videos ) video_ids = [] while request is not None: response = request.execute() # Add each video ID to the list for item in response["items"]: if not item["id"].get("videoId"): continue meta_data = {"videoId": item["id"]["videoId"]} if self.add_video_info: item["snippet"].pop("thumbnails") meta_data.update(item["snippet"]) try: page_content = self._get_transcripe_for_video_id( item["id"]["videoId"] ) video_ids.append( Document( page_content=page_content, metadata=meta_data, ) ) except (TranscriptsDisabled, NoTranscriptFound) as e: if self.continue_on_failure: logger.error( "Error fetching transscript " + f" {item['id']['videoId']}, exception: {e}" ) else: raise e pass request = self.youtube_client.search().list_next(request, response) return video_ids
[docs] def load(self) -> List[Document]: """Load documents.""" document_list = [] if self.channel_name: document_list.extend(self._get_document_for_channel(self.channel_name)) elif self.video_ids: document_list.extend( [ self._get_document_for_video_id(video_id) for video_id in self.video_ids ] ) else: raise ValueError("Must specify either channel_name or video_ids") return document_list