Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import yt_dlp | |
| from typing import Dict | |
| from urllib.parse import urlparse | |
| def _is_youtube_url(url: str) -> bool: | |
| """Check if URL is a YouTube link.""" | |
| parsed_url = urlparse(url) | |
| return parsed_url.netloc.lower().endswith(('youtube.com', 'youtu.be')) | |
| def load_video( | |
| video_source: str, | |
| with_subtitle: bool = False, | |
| subtitle_source: str | None = None, | |
| ) -> str: | |
| """ | |
| Load video from YouTube URL or local file path. | |
| Returns the path to the downloaded/loaded video file. | |
| """ | |
| from dvd import config | |
| raw_video_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, "raw") | |
| os.makedirs(raw_video_dir, exist_ok=True) | |
| # ------------------- YouTube source ------------------- | |
| if video_source.startswith(('http://', 'https://')): | |
| if not _is_youtube_url(video_source): | |
| raise ValueError("Provided URL is not a valid YouTube link.") | |
| # Enhanced yt-dlp options to avoid bot detection | |
| ydl_opts = { | |
| 'format': ( | |
| f'bestvideo[height<={config.VIDEO_RESOLUTION}][ext=mp4]' | |
| f'best[height<={config.VIDEO_RESOLUTION}][ext=mp4]' | |
| ), | |
| 'outtmpl': os.path.join(raw_video_dir, '%(id)s.%(ext)s'), | |
| 'merge_output_format': 'mp4', | |
| # Anti-bot detection options | |
| 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'extractor_args': { | |
| 'youtube': { | |
| 'player_client': ['android', 'web'], # Try android first, fallback to web | |
| 'player_skip': ['webpage', 'configs'], | |
| } | |
| }, | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| } | |
| if with_subtitle: | |
| ydl_opts.update({ | |
| 'writesubtitles': True, | |
| 'subtitlesformat': 'srt', | |
| 'overwritesubtitles': True, | |
| }) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(video_source, download=True) | |
| video_path = ydl.prepare_filename(info) | |
| # rename subtitle -> "<video_file_name>.srt" | |
| if with_subtitle: | |
| video_base = os.path.splitext(video_path)[0] | |
| for f in os.listdir(raw_video_dir): | |
| if f.startswith(info["id"]) and f.endswith(".srt"): | |
| shutil.move( | |
| os.path.join(raw_video_dir, f), | |
| f"{video_base}.srt", | |
| ) | |
| break | |
| return os.path.abspath(video_path) | |
| # ------------------- Local source ------------------- | |
| elif os.path.isfile(video_source): | |
| video_id = os.path.splitext(os.path.basename(video_source))[0] | |
| video_destination = os.path.join(raw_video_dir, f"{video_id}.mp4") | |
| os.makedirs(os.path.dirname(video_destination), exist_ok=True) | |
| shutil.copy2(video_source, video_destination) | |
| if with_subtitle and subtitle_source: | |
| subtitle_destination = f"{os.path.splitext(video_destination)[0]}.srt" | |
| os.makedirs(os.path.dirname(subtitle_destination), exist_ok=True) | |
| shutil.copy2(subtitle_source, subtitle_destination) | |
| return os.path.abspath(video_destination) | |
| else: | |
| raise ValueError(f"Video source '{video_source}' is not a valid URL or file path.") | |
| def download_srt_subtitle(video_url: str, output_path: str): | |
| """ | |
| Downloads an SRT subtitle from a YouTube URL using youtube-transcript-api. | |
| This is a simple and reliable approach that handles all the complexity internally. | |
| """ | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound | |
| if not _is_youtube_url(video_url): | |
| raise ValueError("Provided URL is not a valid YouTube link.") | |
| # Extract video ID from URL | |
| if 'v=' in video_url: | |
| video_id = video_url.split('v=')[1].split('&')[0] | |
| elif 'youtu.be/' in video_url: | |
| video_id = video_url.split('youtu.be/')[1].split('?')[0] | |
| else: | |
| raise ValueError(f"Could not extract video ID from {video_url}") | |
| output_dir = os.path.dirname(output_path) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Check for proxy configuration (optional - set via environment variables) | |
| proxy_username = os.environ.get('YOUTUBE_PROXY_USERNAME', None) | |
| proxy_password = os.environ.get('YOUTUBE_PROXY_PASSWORD', None) | |
| try: | |
| # Configure YouTube Transcript API with optional proxy | |
| if proxy_username and proxy_password: | |
| from youtube_transcript_api.proxies import WebshareProxyConfig | |
| print(f"🌐 Using proxy for subtitle download (username: {proxy_username[:3]}***)...") | |
| ytt_api = YouTubeTranscriptApi( | |
| proxy_config=WebshareProxyConfig( | |
| proxy_username=proxy_username, | |
| proxy_password=proxy_password, | |
| ) | |
| ) | |
| else: | |
| # No proxy - use default | |
| print(f"⚠️ No proxy configured - YouTube may block cloud provider IPs") | |
| ytt_api = YouTubeTranscriptApi() | |
| # Fetch transcript directly (prefer English, but will use any available) | |
| print(f"🔄 Fetching transcript for video {video_id}...") | |
| # Try English first, then any available language | |
| transcript_data = None | |
| try: | |
| transcript_data = ytt_api.fetch(video_id, languages=['en', 'en-US', 'en-GB']) | |
| print(f"✅ Found English transcript") | |
| except Exception as english_error: | |
| # If no English, try any available language | |
| print(f"⚠️ English transcript not available, trying any language...") | |
| try: | |
| transcript_data = ytt_api.fetch(video_id) | |
| print(f"✅ Found transcript in available language") | |
| except Exception as fetch_error: | |
| # Re-raise the original error with better context | |
| raise fetch_error | |
| if not transcript_data: | |
| raise FileNotFoundError(f"No transcript data returned for video {video_id}") | |
| # Convert to SRT format | |
| srt_content = _convert_transcript_to_srt(transcript_data) | |
| # Write to file | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(srt_content) | |
| file_size = os.path.getsize(output_path) | |
| print(f"✅ Successfully downloaded subtitles to {output_path} ({file_size} bytes)") | |
| except TranscriptsDisabled as e: | |
| raise FileNotFoundError(f"Transcripts are disabled for video {video_id}: {e}") | |
| except NoTranscriptFound as e: | |
| raise FileNotFoundError(f"No transcript found for video {video_id}: {e}") | |
| except Exception as e: | |
| # Check if it's a RequestBlocked error (IP blocking) | |
| error_str = str(e).lower() | |
| error_type = type(e).__name__ | |
| # Check for IP blocking errors | |
| if 'requestblocked' in error_str or (error_type == 'RequestBlocked') or ('ip' in error_str and 'blocked' in error_str): | |
| error_msg = ( | |
| f"YouTube is blocking requests from this IP (cloud provider IP).\n\n" | |
| f"**Solution:** Set proxy credentials via environment variables:\n" | |
| f" YOUTUBE_PROXY_USERNAME=your-username\n" | |
| f" YOUTUBE_PROXY_PASSWORD=your-password\n\n" | |
| f"Original error: {e}" | |
| ) | |
| raise FileNotFoundError(error_msg) | |
| else: | |
| # Other errors - just pass through the original error message | |
| raise FileNotFoundError(f"Could not download SRT subtitle for {video_url}: {e}") | |
| def _convert_transcript_to_srt(transcript_data: list) -> str: | |
| """Convert YouTube transcript API data to SRT format. | |
| Handles both dictionary format and FetchedTranscriptSnippet objects. | |
| """ | |
| srt_lines = [] | |
| for index, entry in enumerate(transcript_data, start=1): | |
| # Handle both dict and object formats | |
| if isinstance(entry, dict): | |
| start_time = entry['start'] | |
| duration = entry.get('duration', 0) | |
| text = entry['text'].strip() | |
| else: | |
| # FetchedTranscriptSnippet object - use attributes | |
| start_time = entry.start | |
| duration = getattr(entry, 'duration', 0) | |
| text = entry.text.strip() | |
| end_time = start_time + duration | |
| # Convert seconds to SRT timestamp format (HH:MM:SS,mmm) | |
| start_srt = _seconds_to_srt_timestamp(start_time) | |
| end_srt = _seconds_to_srt_timestamp(end_time) | |
| srt_lines.append(str(index)) | |
| srt_lines.append(f"{start_srt} --> {end_srt}") | |
| srt_lines.append(text) | |
| srt_lines.append('') # Blank line between entries | |
| return '\n'.join(srt_lines) | |
| def _seconds_to_srt_timestamp(seconds: float) -> str: | |
| """Convert seconds to SRT timestamp format (HH:MM:SS,mmm).""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| millis = int((seconds % 1) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" | |
| def decode_video_to_frames(video_path: str) -> str: | |
| """ | |
| Decode video into frames and save them to disk. | |
| Returns the path to the frames directory. | |
| """ | |
| import cv2 | |
| from tqdm import tqdm | |
| from dvd import config | |
| video_id = os.path.splitext(os.path.basename(video_path))[0] | |
| frames_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "frames") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_interval = int(fps / config.VIDEO_FPS) # Extract frame every N frames | |
| frame_count = 0 | |
| saved_count = 0 | |
| with tqdm(desc=f"Decoding {video_id}") as pbar: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % frame_interval == 0: | |
| frame_filename = os.path.join( | |
| frames_dir, f"frame_n{saved_count * frame_interval}.jpg" | |
| ) | |
| cv2.imwrite(frame_filename, frame) | |
| saved_count += 1 | |
| pbar.update(1) | |
| frame_count += 1 | |
| cap.release() | |
| return frames_dir | |