import cv2 import numpy as np from PIL import Image import moviepy.editor as mp import time import os from typing import Dict, Tuple, Optional, List import torch from transformers import pipeline import requests from io import BytesIO import multiprocessing as mp_proc from concurrent.futures import ProcessPoolExecutor, as_completed import tempfile import subprocess import json import hashlib from collections import defaultdict import gc # Removed model_manager import to avoid GIL issues class VideoProcessor: def __init__(self, processing_resolution: Tuple[int, int] = (1280, 720), upscale_factor: float = 1.5, enable_ai_upscaling: bool = True, depth_fps: int = 24, enable_frame_interpolation: bool = True, chunk_duration: int = 15, max_workers: int = 4, enable_depth_caching: bool = True, scene_change_threshold: float = 0.1): self.depth_estimator = None self.upscaler = None self.processing_resolution = processing_resolution self.upscale_factor = upscale_factor self.enable_ai_upscaling = enable_ai_upscaling self.depth_fps = depth_fps self.enable_frame_interpolation = enable_frame_interpolation self.chunk_duration = chunk_duration # seconds per chunk self.max_workers = max_workers self.enable_depth_caching = enable_depth_caching self.scene_change_threshold = scene_change_threshold # Memory optimization and caching self.depth_cache = {} # Cache for depth maps self.frame_cache = {} # Cache for processed frames self.scene_changes = [] # Track scene change points self.last_frame_hash = None self.memory_usage = 0 self.max_memory_mb = 2048 # Maximum memory usage in MB self._load_models() def _load_models(self): """Load depth estimation and upscaling models directly""" try: # Load depth estimation model directly print("Loading depth estimation model...") self.depth_estimator = pipeline( "depth-estimation", model="Intel/dpt-hybrid-midas", device="cuda" if torch.cuda.is_available() else "cpu", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) print("✅ Depth estimation model loaded") except Exception as e: print(f"❌ Error loading depth model: {e}") self.depth_estimator = None # Load upscaler (bicubic fallback for now) self._load_upscaler() def _load_upscaler(self): """Load AI upscaling model""" try: # For now, we'll use a simple bicubic upscaling # In production, you could integrate Real-ESRGAN or similar self.upscaler = "bicubic" # Placeholder for now print("AI upscaler loaded (bicubic fallback)") except Exception as e: print(f"Error loading upscaler: {e}") self.upscaler = None def downscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray: """Downscale image to target resolution for processing""" height, width = image.shape[:2] target_width, target_height = target_resolution # Calculate scaling factor scale_factor = min(target_width / width, target_height / height) if scale_factor >= 1.0: return image # No downscaling needed # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) # Use high-quality downscaling downscaled = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) return downscaled def upscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray: """Upscale image using AI or high-quality interpolation""" height, width = image.shape[:2] target_width, target_height = target_resolution # Calculate scaling factor scale_factor = min(target_width / width, target_height / height) if scale_factor <= 1.0: return image # No upscaling needed # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) if self.upscaler and self.upscaler != "bicubic": # Use AI upscaling if available return self._ai_upscale(image, (new_width, new_height)) else: # Use high-quality bicubic upscaling return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_CUBIC) def _ai_upscale(self, image: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray: """AI-powered upscaling (placeholder for Real-ESRGAN integration)""" # For now, use bicubic as fallback # In production, integrate Real-ESRGAN or similar return cv2.resize(image, target_size, interpolation=cv2.INTER_CUBIC) def _get_frame_hash(self, frame: np.ndarray) -> str: """Generate a hash for frame content to detect scene changes""" # Resize frame to small size for faster hashing small_frame = cv2.resize(frame, (64, 64)) frame_bytes = small_frame.tobytes() return hashlib.md5(frame_bytes).hexdigest() def _detect_scene_change(self, current_frame: np.ndarray) -> bool: """Detect if there's a significant scene change""" if self.last_frame_hash is None: self.last_frame_hash = self._get_frame_hash(current_frame) return True current_hash = self._get_frame_hash(current_frame) # Calculate structural similarity if self.enable_depth_caching: # Convert to grayscale for comparison gray_current = cv2.cvtColor(current_frame, cv2.COLOR_RGB2GRAY) gray_last = cv2.cvtColor(self.last_frame_hash, cv2.COLOR_RGB2GRAY) if hasattr(self, 'last_frame') else None if gray_last is not None: # Calculate structural similarity index from skimage.metrics import structural_similarity as ssim try: similarity = ssim(gray_current, gray_last) scene_change = similarity < (1.0 - self.scene_change_threshold) except: # Fallback to hash comparison scene_change = current_hash != self.last_frame_hash else: scene_change = current_hash != self.last_frame_hash else: scene_change = current_hash != self.last_frame_hash if scene_change: self.last_frame_hash = current_hash self.last_frame = current_frame.copy() return scene_change def _get_memory_usage_mb(self) -> float: """Get current memory usage in MB""" import psutil process = psutil.Process() return process.memory_info().rss / 1024 / 1024 def _manage_memory(self): """Manage memory usage by clearing caches if needed""" current_memory = self._get_memory_usage_mb() if current_memory > self.max_memory_mb: # Clear oldest cache entries if len(self.depth_cache) > 10: # Remove oldest 50% of depth cache keys_to_remove = list(self.depth_cache.keys())[:len(self.depth_cache)//2] for key in keys_to_remove: del self.depth_cache[key] if len(self.frame_cache) > 5: # Remove oldest 50% of frame cache keys_to_remove = list(self.frame_cache.keys())[:len(self.frame_cache)//2] for key in keys_to_remove: del self.frame_cache[key] # Force garbage collection gc.collect() print(f"Memory management: Cleared caches, current usage: {self._get_memory_usage_mb():.1f}MB") def _get_cached_depth(self, frame_hash: str, frame: np.ndarray) -> Optional[np.ndarray]: """Get cached depth map if available""" if not self.enable_depth_caching or frame_hash not in self.depth_cache: return None # Verify the cached depth is still valid cached_depth = self.depth_cache[frame_hash] if cached_depth.shape[:2] == frame.shape[:2]: return cached_depth.copy() return None def _cache_depth(self, frame_hash: str, depth: np.ndarray): """Cache depth map for reuse""" if self.enable_depth_caching: self.depth_cache[frame_hash] = depth.copy() self.memory_usage += depth.nbytes / 1024 / 1024 # Track memory usage def _get_cached_frame(self, frame_hash: str) -> Optional[np.ndarray]: """Get cached processed frame if available""" if frame_hash in self.frame_cache: return self.frame_cache[frame_hash].copy() return None def _cache_frame(self, frame_hash: str, frame: np.ndarray): """Cache processed frame for reuse""" self.frame_cache[frame_hash] = frame.copy() self.memory_usage += frame.nbytes / 1024 / 1024 def create_video_chunks(self, input_path: str, temp_dir: str) -> List[Dict]: """Split video into chunks for parallel processing""" try: video = mp.VideoFileClip(input_path) duration = video.duration fps = video.fps chunks = [] chunk_count = int(np.ceil(duration / self.chunk_duration)) print(f"Creating {chunk_count} chunks of {self.chunk_duration}s each") for i in range(chunk_count): start_time = i * self.chunk_duration end_time = min((i + 1) * self.chunk_duration, duration) chunk_path = os.path.join(temp_dir, f"chunk_{i:03d}.mp4") # Extract chunk chunk = video.subclip(start_time, end_time) chunk.write_videofile(chunk_path, verbose=False, logger=None) chunk.close() chunks.append({ 'index': i, 'path': chunk_path, 'start_time': start_time, 'end_time': end_time, 'duration': end_time - start_time }) video.close() return chunks except Exception as e: print(f"Error creating video chunks: {e}") return [] def process_chunk_worker(self, chunk_info: Dict, config: Dict) -> Dict: """Worker function for processing a single chunk""" try: # Create a new processor instance for this worker processor = VideoProcessor( processing_resolution=tuple(config['processing_resolution']), upscale_factor=config['upscale_factor'], enable_ai_upscaling=config['enable_ai_upscaling'], depth_fps=config['depth_fps'], enable_frame_interpolation=config['enable_frame_interpolation'] ) # Process the chunk output_path = chunk_info['path'].replace('.mp4', '_processed.mp4') result = processor.process_video( chunk_info['path'], output_path ) if result['success']: return { 'success': True, 'chunk_index': chunk_info['index'], 'output_path': output_path, 'start_time': chunk_info['start_time'], 'end_time': chunk_info['end_time'] } else: return { 'success': False, 'chunk_index': chunk_info['index'], 'error': result['error'] } except Exception as e: return { 'success': False, 'chunk_index': chunk_info['index'], 'error': str(e) } def stitch_chunks_with_ffmpeg(self, chunk_results: List[Dict], output_path: str, fps: float) -> bool: """Stitch processed chunks back together using FFmpeg""" try: # Filter successful chunks and sort by index successful_chunks = [r for r in chunk_results if r['success']] successful_chunks.sort(key=lambda x: x['chunk_index']) if not successful_chunks: print("No successful chunks to stitch") return False # Create file list for FFmpeg concat concat_file = os.path.join(os.path.dirname(output_path), "concat_list.txt") with open(concat_file, 'w') as f: for chunk in successful_chunks: f.write(f"file '{chunk['output_path']}'\n") # Use FFmpeg to concatenate chunks cmd = [ 'ffmpeg', '-y', # -y to overwrite output file '-f', 'concat', '-safe', '0', '-i', concat_file, '-c', 'copy', # Copy streams without re-encoding '-r', str(fps), # Set output frame rate output_path ] print(f"Stitching chunks with FFmpeg: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print("Successfully stitched chunks") # Clean up concat file os.remove(concat_file) return True else: print(f"FFmpeg error: {result.stderr}") return False except Exception as e: print(f"Error stitching chunks: {e}") return False def estimate_depth(self, image: np.ndarray, frame_hash: str = None) -> np.ndarray: """Estimate depth map for a single frame with caching""" # Check cache first if frame_hash and self.enable_depth_caching: cached_depth = self._get_cached_depth(frame_hash, image) if cached_depth is not None: return cached_depth if self.depth_estimator is None: # Fallback: create a simple depth map based on image gradients gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) depth = cv2.Laplacian(gray, cv2.CV_64F) depth = np.abs(depth) depth = cv2.GaussianBlur(depth, (5, 5), 0) depth = (depth - depth.min()) / (depth.max() - depth.min()) else: try: # Convert to PIL Image for the model pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) # Get depth prediction result = self.depth_estimator(pil_image) depth = np.array(result['depth']) # Normalize depth map depth = (depth - depth.min()) / (depth.max() - depth.min()) except Exception as e: print(f"Error in depth estimation: {e}") # Fallback to gradient-based depth gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) depth = cv2.Laplacian(gray, cv2.CV_64F) depth = np.abs(depth) depth = cv2.GaussianBlur(depth, (5, 5), 0) depth = (depth - depth.min()) / (depth.max() - depth.min()) # Cache the result if frame_hash and self.enable_depth_caching: self._cache_depth(frame_hash, depth) return depth def create_stereo_pair(self, image: np.ndarray, depth: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Create left and right eye views for VR180""" height, width = image.shape[:2] # Create disparity map (inverse of depth for stereo effect) disparity = 1.0 - depth disparity = disparity * 30 # Scale disparity # Create left and right views left_view = image.copy() right_view = image.copy() # Apply horizontal shift based on disparity for y in range(height): for x in range(width): shift = int(disparity[y, x]) # Left view: shift pixels to the right if x + shift < width: left_view[y, x] = image[y, min(x + shift, width - 1)] # Right view: shift pixels to the left if x - shift >= 0: right_view[y, x] = image[y, max(x - shift, 0)] return left_view, right_view def create_vr180_frame(self, left_view: np.ndarray, right_view: np.ndarray) -> np.ndarray: """Combine left and right views into VR180 format""" height, width = left_view.shape[:2] # Create VR180 frame (side-by-side) vr180_frame = np.zeros((height, width * 2, 3), dtype=np.uint8) # Place left view on the left half vr180_frame[:, :width] = left_view # Place right view on the right half vr180_frame[:, width:] = right_view return vr180_frame def process_video(self, input_path: str, output_path: str, progress_callback=None, use_chunking: bool = True) -> Dict: """Process video from 2D to VR180 with chunking, downscaling and upscaling""" start_time = time.time() try: # Load video to get metadata video = mp.VideoFileClip(input_path) fps = video.fps duration = video.duration original_size = video.size video.close() print(f"Processing video: {input_path}") print(f"Original resolution: {original_size}") print(f"Processing resolution: {self.processing_resolution}") print(f"Duration: {duration}s, FPS: {fps}") # Decide whether to use chunking based on video duration if use_chunking and duration > self.chunk_duration: return self._process_video_chunked(input_path, output_path, fps, original_size, progress_callback) else: return self._process_video_single(input_path, output_path, fps, original_size, progress_callback) except Exception as e: return { 'success': False, 'error': str(e) } def _process_video_chunked(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict: """Process video using chunked parallel processing""" try: # Create temporary directory for chunks with tempfile.TemporaryDirectory() as temp_dir: print(f"Using temporary directory: {temp_dir}") # Step 1: Create video chunks if progress_callback: progress_callback(0.1, desc="Creating video chunks...") chunks = self.create_video_chunks(input_path, temp_dir) if not chunks: return {'success': False, 'error': 'Failed to create video chunks'} print(f"Created {len(chunks)} chunks") # Step 2: Process chunks in parallel if progress_callback: progress_callback(0.2, desc="Processing chunks in parallel...") # Prepare configuration for workers config = { 'processing_resolution': self.processing_resolution, 'upscale_factor': self.upscale_factor, 'enable_ai_upscaling': self.enable_ai_upscaling, 'depth_fps': self.depth_fps, 'enable_frame_interpolation': self.enable_frame_interpolation } # Process chunks in parallel chunk_results = [] with ProcessPoolExecutor(max_workers=self.max_workers) as executor: # Submit all chunk processing tasks future_to_chunk = { executor.submit(self.process_chunk_worker, chunk, config): chunk for chunk in chunks } # Collect results as they complete completed_chunks = 0 for future in as_completed(future_to_chunk): result = future.result() chunk_results.append(result) completed_chunks += 1 if progress_callback: progress = 0.2 + (0.6 * completed_chunks / len(chunks)) progress_callback(progress, desc=f"Processed {completed_chunks}/{len(chunks)} chunks") if result['success']: print(f"✅ Chunk {result['chunk_index']} completed successfully") else: print(f"❌ Chunk {result['chunk_index']} failed: {result['error']}") # Step 3: Stitch chunks together if progress_callback: progress_callback(0.8, desc="Stitching chunks together...") success = self.stitch_chunks_with_ffmpeg(chunk_results, output_path, fps) if not success: return {'success': False, 'error': 'Failed to stitch chunks together'} # Step 4: Clean up chunk files for chunk in chunks: if os.path.exists(chunk['path']): os.remove(chunk['path']) processed_path = chunk['path'].replace('.mp4', '_processed.mp4') if os.path.exists(processed_path): os.remove(processed_path) processing_time = time.time() - start_time if progress_callback: progress_callback(1.0, desc="Processing complete!") return { 'success': True, 'processing_time': processing_time, 'output_path': output_path, 'original_resolution': original_size, 'processing_resolution': self.processing_resolution, 'chunks_processed': len([r for r in chunk_results if r['success']]), 'total_chunks': len(chunks), 'upscale_factor': self.upscale_factor } except Exception as e: return {'success': False, 'error': str(e)} def _process_video_single(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict: """Process video as a single unit (fallback for short videos)""" try: # Load video video = mp.VideoFileClip(input_path) # Calculate target output resolution target_width = int(original_size[0] * self.upscale_factor) target_height = int(original_size[1] * self.upscale_factor) target_resolution = (target_width, target_height) print(f"Target output resolution: {target_resolution}") # Process frames processed_frames = [] total_frames = int(video.duration * fps) for i, frame in enumerate(video.iter_frames()): if i % 10 == 0: # Print progress every 10 frames progress = i / total_frames if progress_callback: progress_callback(progress, desc=f"Processing frame {i}/{total_frames}") print(f"Processing frame {i}/{total_frames}") # Convert frame to numpy array frame_array = np.array(frame) # Generate frame hash for caching frame_hash = self._get_frame_hash(frame_array) # Check if we can reuse cached frame cached_frame = self._get_cached_frame(frame_hash) if cached_frame is not None: processed_frames.append(cached_frame) continue # Detect scene change for depth caching optimization scene_changed = self._detect_scene_change(frame_array) # Step 1: Downscale for processing downscaled_frame = self.downscale_image(frame_array, self.processing_resolution) # Step 2: Estimate depth on downscaled frame (with caching) depth = self.estimate_depth(downscaled_frame, frame_hash if scene_changed else None) # Step 3: Create stereo pair on downscaled frame left_view, right_view = self.create_stereo_pair(downscaled_frame, depth) # Step 4: Upscale the stereo pair to target resolution left_upscaled = self.upscale_image(left_view, target_resolution) right_upscaled = self.upscale_image(right_view, target_resolution) # Step 5: Create VR180 frame vr180_frame = self.create_vr180_frame(left_upscaled, right_upscaled) # Cache the processed frame self._cache_frame(frame_hash, vr180_frame) processed_frames.append(vr180_frame) # Manage memory usage if i % 50 == 0: # Check memory every 50 frames self._manage_memory() # Save processed video if processed_frames: if progress_callback: progress_callback(0.9, desc="Saving processed video...") # Create video writer height, width = processed_frames[0].shape[:2] fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) for frame in processed_frames: # Convert RGB to BGR for OpenCV frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) out.write(frame_bgr) out.release() if progress_callback: progress_callback(1.0, desc="Processing complete!") video.close() processing_time = time.time() - start_time return { 'success': True, 'processing_time': processing_time, 'output_path': output_path, 'original_resolution': original_size, 'processing_resolution': self.processing_resolution, 'output_resolution': (width, height), 'upscale_factor': self.upscale_factor } except Exception as e: return { 'success': False, 'error': str(e) } def create_preview_frame(self, input_path: str) -> np.ndarray: """Create a preview frame for the UI""" try: video = mp.VideoFileClip(input_path) frame = video.get_frame(0) # Get first frame video.close() # Process the frame frame_array = np.array(frame) depth = self.estimate_depth(frame_array) left_view, right_view = self.create_stereo_pair(frame_array, depth) vr180_frame = self.create_vr180_frame(left_view, right_view) return vr180_frame except Exception as e: print(f"Error creating preview: {e}") return None