""" Transcript Service for YouTube Videos This service extracts transcripts from YouTube videos using multiple methods: 1. First, try youtube_transcript_api (works well on cloud platforms) 2. Then try yt-dlp subtitle extraction 3. If no subtitles available, fallback to audio extraction + Whisper transcription The fallback uses the SpeechToTextService for local Whisper transcription. """ import re import os import tempfile import logging from typing import Optional, Tuple, List # Try to import youtube_transcript_api (more reliable for cloud deployments) try: from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound HAS_YOUTUBE_TRANSCRIPT_API = True except ImportError: HAS_YOUTUBE_TRANSCRIPT_API = False import yt_dlp # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TranscriptService: """ Service for extracting transcripts from YouTube videos. Supports two methods: 1. Subtitle extraction (fast, no ML models) 2. Audio transcription via Whisper (slower, requires SpeechToTextService) """ def __init__(self): """Initialize the transcript service.""" self._speech_to_text = None # Lazy-loaded def _get_speech_to_text_service(self): """Lazy-load the SpeechToTextService to avoid loading Whisper unless needed.""" if self._speech_to_text is None: from services.speech_to_text import SpeechToTextService self._speech_to_text = SpeechToTextService() return self._speech_to_text def extract_video_id(self, url: str) -> str: """ Extract video ID from YouTube URL. Args: url: YouTube URL in various formats Returns: 11-character video ID Raises: ValueError: If URL is invalid """ regex = r"(?:v=|\/|youtu\.be\/)([0-9A-Za-z_-]{11}).*" match = re.search(regex, url) if match: return match.group(1) raise ValueError("Invalid YouTube URL") def clean_autogen_transcript(self, text: str) -> str: """ Clean auto-generated YouTube captions. Removes: - ... tags - Timestamps like <00:00:06.480> - Multiple spaces Args: text: Raw VTT subtitle text Returns: Cleaned transcript text """ # Remove ... tags text = re.sub(r"", "", text) # Remove timestamps like <00:00:06.480> text = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", text) # Collapse multiple spaces text = re.sub(r"\s+", " ", text).strip() return text def get_transcript_api(self, video_id: str) -> Optional[dict]: """ Get transcript using youtube_transcript_api (works better on cloud platforms). Args: video_id: YouTube video ID Returns: Dictionary with transcript and language, or None if not available """ if not HAS_YOUTUBE_TRANSCRIPT_API: logger.info("youtube_transcript_api not installed, skipping...") return None try: # Try to get transcript in preferred languages preferred_langs = ['en', 'en-IN', 'hi', 'ta', 'te', 'kn', 'ml', 'gu', 'bn', 'mr', 'pa', 'ur'] try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) # Try to find a manual transcript first, then auto-generated transcript = None detected_lang = "eng" # First try manual transcripts for lang in preferred_langs: try: transcript = transcript_list.find_manually_created_transcript([lang]) detected_lang = lang break except: pass # Then try auto-generated if not transcript: for lang in preferred_langs: try: transcript = transcript_list.find_generated_transcript([lang]) detected_lang = lang break except: pass # If still no transcript, try to get any available if not transcript: for t in transcript_list: transcript = t detected_lang = t.language_code break if transcript: # Fetch the actual transcript transcript_data = transcript.fetch() # Combine all text text_parts = [entry['text'] for entry in transcript_data] full_text = ' '.join(text_parts) # Clean the text clean_text = self.clean_autogen_transcript(full_text) if len(clean_text.strip()) < 50: logger.info("Transcript too short") return None # Normalize language code lang_map = { "en": "eng", "en-IN": "eng", "en-US": "eng", "en-GB": "eng", "hi": "hin", "hi-IN": "hin", "ta": "tam", "ta-IN": "tam", "te": "tel", "te-IN": "tel", "kn": "kan", "kn-IN": "kan", "ml": "mal", "ml-IN": "mal", "gu": "guj", "gu-IN": "guj", "bn": "ben", "bn-IN": "ben", "mr": "mar", "mr-IN": "mar", "pa": "pan", "pa-IN": "pan", "ur": "urd", "ur-PK": "urd", } normalized_lang = lang_map.get(detected_lang, detected_lang) logger.info(f"Transcript fetched via API (language: {normalized_lang})") return { "transcript": clean_text, "language": normalized_lang, "source": "youtube_api", "word_count": len(clean_text.split()) } except TranscriptsDisabled: logger.info("Transcripts are disabled for this video") return None except NoTranscriptFound: logger.info("No transcript found for this video") return None except Exception as e: logger.warning(f"youtube_transcript_api failed: {e}") return None return None def get_subtitles(self, url: str, lang: str = "en") -> Optional[dict]: """ Try to get existing subtitles from YouTube using yt-dlp. Args: url: YouTube video URL lang: Preferred language code (default: "en") Returns: Dictionary with transcript and language, or None if no subtitles """ with tempfile.TemporaryDirectory() as temp_dir: ydl_opts = { "skip_download": True, "writesubtitles": True, "writeautomaticsub": True, "subtitlesformat": "vtt", "outtmpl": os.path.join(temp_dir, "%(id)s.%(ext)s"), "quiet": True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) ydl.download([url]) # Find subtitle file video_id = info["id"] sub_file = None detected_lang = "eng" for file in os.listdir(temp_dir): if file.startswith(video_id) and file.endswith(".vtt"): sub_file = os.path.join(temp_dir, file) # Try to extract language from filename # Format: videoId.lang.vtt parts = file.split(".") if len(parts) >= 3: detected_lang = parts[-2] break if not sub_file: logger.info("No subtitle file found") return None # Read and clean VTT file lines = [] with open(sub_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line.startswith("WEBVTT"): continue if "-->" in line: continue if re.match(r"^\d+$", line): continue lines.append(line) raw_text = " ".join(lines) clean_text = self.clean_autogen_transcript(raw_text) if not clean_text or len(clean_text.strip()) < 50: logger.info("Extracted subtitles too short") return None # Map common language codes lang_map = { "en": "eng", "en-US": "eng", "en-GB": "eng", "hi": "hin", "hi-IN": "hin", "ta": "tam", "ta-IN": "tam", "te": "tel", "te-IN": "tel", "kn": "kan", "kn-IN": "kan", "ml": "mal", "ml-IN": "mal", "gu": "guj", "gu-IN": "guj", "bn": "ben", "bn-IN": "ben", "mr": "mar", "mr-IN": "mar", "pa": "pan", "pa-IN": "pan", "ur": "urd", "ur-PK": "urd", } normalized_lang = lang_map.get(detected_lang, detected_lang) logger.info(f"Subtitles extracted successfully (language: {normalized_lang})") return { "transcript": clean_text, "language": normalized_lang, "source": "subtitles", "word_count": len(clean_text.split()) } except Exception as e: logger.warning(f"Subtitle extraction failed: {e}") return None def get_video_transcript(self, url: str, use_whisper_fallback: bool = True) -> dict: """ Get transcript from a YouTube video. Tries multiple methods in order: 1. youtube_transcript_api (works best on cloud platforms) 2. yt-dlp subtitle extraction 3. Whisper transcription (fallback) Args: url: YouTube video URL use_whisper_fallback: Whether to use Whisper if no subtitles (default: True) Returns: Dictionary with: - transcript: The transcript text - language: Detected/extracted language code - source: "youtube_api", "subtitles", or "whisper" - word_count: Number of words Raises: Exception: If transcript cannot be obtained """ # Extract video ID for API-based methods video_id = self.extract_video_id(url) # Method 1: Try youtube_transcript_api first (best for cloud platforms) logger.info("Attempting to get transcript via YouTube API...") result = self.get_transcript_api(video_id) if result: return result # Method 2: Try yt-dlp subtitle extraction logger.info("Attempting to get subtitles via yt-dlp...") result = self.get_subtitles(url) if result: return result # Fallback to Whisper transcription if use_whisper_fallback: logger.info("No subtitles found. Falling back to Whisper transcription...") try: stt_service = self._get_speech_to_text_service() whisper_result = stt_service.transcribe_youtube_video(url) return { "transcript": whisper_result["text"], "language": whisper_result["language"], "source": "whisper", "word_count": whisper_result["word_count"] } except Exception as e: logger.error(f"Whisper transcription failed: {e}") raise Exception(f"Could not retrieve transcript: {str(e)}") raise Exception("No subtitles available and Whisper fallback is disabled") def get_video_transcript_legacy(self, url: str, lang: str = "en") -> str: """ Legacy method for backward compatibility. Returns only the transcript text (no language info). """ result = self.get_video_transcript(url, use_whisper_fallback=True) return result["transcript"]