deep-video-discovery / dvd /video_utils.py
aghiadbakry's picture
Upload video_utils.py
51eba22 verified
import os
import shutil
import yt_dlp
from typing import Dict
from urllib.parse import urlparse
def _is_youtube_url(url: str) -> bool:
"""Check if URL is a YouTube link."""
parsed_url = urlparse(url)
return parsed_url.netloc.lower().endswith(('youtube.com', 'youtu.be'))
def load_video(
video_source: str,
with_subtitle: bool = False,
subtitle_source: str | None = None,
) -> str:
"""
Load video from YouTube URL or local file path.
Returns the path to the downloaded/loaded video file.
"""
from dvd import config
raw_video_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, "raw")
os.makedirs(raw_video_dir, exist_ok=True)
# ------------------- YouTube source -------------------
if video_source.startswith(('http://', 'https://')):
if not _is_youtube_url(video_source):
raise ValueError("Provided URL is not a valid YouTube link.")
# Enhanced yt-dlp options to avoid bot detection
ydl_opts = {
'format': (
f'bestvideo[height<={config.VIDEO_RESOLUTION}][ext=mp4]'
f'best[height<={config.VIDEO_RESOLUTION}][ext=mp4]'
),
'outtmpl': os.path.join(raw_video_dir, '%(id)s.%(ext)s'),
'merge_output_format': 'mp4',
# Anti-bot detection options
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'], # Try android first, fallback to web
'player_skip': ['webpage', 'configs'],
}
},
'quiet': False,
'no_warnings': False,
}
if with_subtitle:
ydl_opts.update({
'writesubtitles': True,
'subtitlesformat': 'srt',
'overwritesubtitles': True,
})
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_source, download=True)
video_path = ydl.prepare_filename(info)
# rename subtitle -> "<video_file_name>.srt"
if with_subtitle:
video_base = os.path.splitext(video_path)[0]
for f in os.listdir(raw_video_dir):
if f.startswith(info["id"]) and f.endswith(".srt"):
shutil.move(
os.path.join(raw_video_dir, f),
f"{video_base}.srt",
)
break
return os.path.abspath(video_path)
# ------------------- Local source -------------------
elif os.path.isfile(video_source):
video_id = os.path.splitext(os.path.basename(video_source))[0]
video_destination = os.path.join(raw_video_dir, f"{video_id}.mp4")
os.makedirs(os.path.dirname(video_destination), exist_ok=True)
shutil.copy2(video_source, video_destination)
if with_subtitle and subtitle_source:
subtitle_destination = f"{os.path.splitext(video_destination)[0]}.srt"
os.makedirs(os.path.dirname(subtitle_destination), exist_ok=True)
shutil.copy2(subtitle_source, subtitle_destination)
return os.path.abspath(video_destination)
else:
raise ValueError(f"Video source '{video_source}' is not a valid URL or file path.")
def download_srt_subtitle(video_url: str, output_path: str):
"""
Downloads an SRT subtitle from a YouTube URL using youtube-transcript-api.
This is a simple and reliable approach that handles all the complexity internally.
"""
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
if not _is_youtube_url(video_url):
raise ValueError("Provided URL is not a valid YouTube link.")
# Extract video ID from URL
if 'v=' in video_url:
video_id = video_url.split('v=')[1].split('&')[0]
elif 'youtu.be/' in video_url:
video_id = video_url.split('youtu.be/')[1].split('?')[0]
else:
raise ValueError(f"Could not extract video ID from {video_url}")
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
# Check for proxy configuration (optional - set via environment variables)
proxy_username = os.environ.get('YOUTUBE_PROXY_USERNAME', None)
proxy_password = os.environ.get('YOUTUBE_PROXY_PASSWORD', None)
try:
# Configure YouTube Transcript API with optional proxy
if proxy_username and proxy_password:
from youtube_transcript_api.proxies import WebshareProxyConfig
print(f"🌐 Using proxy for subtitle download (username: {proxy_username[:3]}***)...")
ytt_api = YouTubeTranscriptApi(
proxy_config=WebshareProxyConfig(
proxy_username=proxy_username,
proxy_password=proxy_password,
)
)
else:
# No proxy - use default
print(f"⚠️ No proxy configured - YouTube may block cloud provider IPs")
ytt_api = YouTubeTranscriptApi()
# Fetch transcript directly (prefer English, but will use any available)
print(f"🔄 Fetching transcript for video {video_id}...")
# Try English first, then any available language
transcript_data = None
try:
transcript_data = ytt_api.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
print(f"✅ Found English transcript")
except Exception as english_error:
# If no English, try any available language
print(f"⚠️ English transcript not available, trying any language...")
try:
transcript_data = ytt_api.fetch(video_id)
print(f"✅ Found transcript in available language")
except Exception as fetch_error:
# Re-raise the original error with better context
raise fetch_error
if not transcript_data:
raise FileNotFoundError(f"No transcript data returned for video {video_id}")
# Convert to SRT format
srt_content = _convert_transcript_to_srt(transcript_data)
# Write to file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(srt_content)
file_size = os.path.getsize(output_path)
print(f"✅ Successfully downloaded subtitles to {output_path} ({file_size} bytes)")
except TranscriptsDisabled as e:
raise FileNotFoundError(f"Transcripts are disabled for video {video_id}: {e}")
except NoTranscriptFound as e:
raise FileNotFoundError(f"No transcript found for video {video_id}: {e}")
except Exception as e:
# Check if it's a RequestBlocked error (IP blocking)
error_str = str(e).lower()
error_type = type(e).__name__
# Check for IP blocking errors
if 'requestblocked' in error_str or (error_type == 'RequestBlocked') or ('ip' in error_str and 'blocked' in error_str):
error_msg = (
f"YouTube is blocking requests from this IP (cloud provider IP).\n\n"
f"**Solution:** Set proxy credentials via environment variables:\n"
f" YOUTUBE_PROXY_USERNAME=your-username\n"
f" YOUTUBE_PROXY_PASSWORD=your-password\n\n"
f"Original error: {e}"
)
raise FileNotFoundError(error_msg)
else:
# Other errors - just pass through the original error message
raise FileNotFoundError(f"Could not download SRT subtitle for {video_url}: {e}")
def _convert_transcript_to_srt(transcript_data: list) -> str:
"""Convert YouTube transcript API data to SRT format.
Handles both dictionary format and FetchedTranscriptSnippet objects.
"""
srt_lines = []
for index, entry in enumerate(transcript_data, start=1):
# Handle both dict and object formats
if isinstance(entry, dict):
start_time = entry['start']
duration = entry.get('duration', 0)
text = entry['text'].strip()
else:
# FetchedTranscriptSnippet object - use attributes
start_time = entry.start
duration = getattr(entry, 'duration', 0)
text = entry.text.strip()
end_time = start_time + duration
# Convert seconds to SRT timestamp format (HH:MM:SS,mmm)
start_srt = _seconds_to_srt_timestamp(start_time)
end_srt = _seconds_to_srt_timestamp(end_time)
srt_lines.append(str(index))
srt_lines.append(f"{start_srt} --> {end_srt}")
srt_lines.append(text)
srt_lines.append('') # Blank line between entries
return '\n'.join(srt_lines)
def _seconds_to_srt_timestamp(seconds: float) -> str:
"""Convert seconds to SRT timestamp format (HH:MM:SS,mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def decode_video_to_frames(video_path: str) -> str:
"""
Decode video into frames and save them to disk.
Returns the path to the frames directory.
"""
import cv2
from tqdm import tqdm
from dvd import config
video_id = os.path.splitext(os.path.basename(video_path))[0]
frames_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "frames")
os.makedirs(frames_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps / config.VIDEO_FPS) # Extract frame every N frames
frame_count = 0
saved_count = 0
with tqdm(desc=f"Decoding {video_id}") as pbar:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_filename = os.path.join(
frames_dir, f"frame_n{saved_count * frame_interval}.jpg"
)
cv2.imwrite(frame_filename, frame)
saved_count += 1
pbar.update(1)
frame_count += 1
cap.release()
return frames_dir