Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import moviepy.editor as mp | |
| import time | |
| import os | |
| from typing import Dict, Tuple, Optional, List | |
| import torch | |
| from transformers import pipeline | |
| import requests | |
| from io import BytesIO | |
| import multiprocessing as mp_proc | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| import tempfile | |
| import subprocess | |
| import json | |
| import hashlib | |
| from collections import defaultdict | |
| import gc | |
| # Removed model_manager import to avoid GIL issues | |
| class VideoProcessor: | |
| def __init__(self, processing_resolution: Tuple[int, int] = (1280, 720), | |
| upscale_factor: float = 1.5, enable_ai_upscaling: bool = True, | |
| depth_fps: int = 24, enable_frame_interpolation: bool = True, | |
| chunk_duration: int = 15, max_workers: int = 4, | |
| enable_depth_caching: bool = True, scene_change_threshold: float = 0.1): | |
| self.depth_estimator = None | |
| self.upscaler = None | |
| self.processing_resolution = processing_resolution | |
| self.upscale_factor = upscale_factor | |
| self.enable_ai_upscaling = enable_ai_upscaling | |
| self.depth_fps = depth_fps | |
| self.enable_frame_interpolation = enable_frame_interpolation | |
| self.chunk_duration = chunk_duration # seconds per chunk | |
| self.max_workers = max_workers | |
| self.enable_depth_caching = enable_depth_caching | |
| self.scene_change_threshold = scene_change_threshold | |
| # Memory optimization and caching | |
| self.depth_cache = {} # Cache for depth maps | |
| self.frame_cache = {} # Cache for processed frames | |
| self.scene_changes = [] # Track scene change points | |
| self.last_frame_hash = None | |
| self.memory_usage = 0 | |
| self.max_memory_mb = 2048 # Maximum memory usage in MB | |
| self._load_models() | |
| def _load_models(self): | |
| """Load depth estimation and upscaling models directly""" | |
| try: | |
| # Load depth estimation model directly | |
| print("Loading depth estimation model...") | |
| self.depth_estimator = pipeline( | |
| "depth-estimation", | |
| model="Intel/dpt-hybrid-midas", | |
| device="cuda" if torch.cuda.is_available() else "cpu", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| print("✅ Depth estimation model loaded") | |
| except Exception as e: | |
| print(f"❌ Error loading depth model: {e}") | |
| self.depth_estimator = None | |
| # Load upscaler (bicubic fallback for now) | |
| self._load_upscaler() | |
| def _load_upscaler(self): | |
| """Load AI upscaling model""" | |
| try: | |
| # For now, we'll use a simple bicubic upscaling | |
| # In production, you could integrate Real-ESRGAN or similar | |
| self.upscaler = "bicubic" # Placeholder for now | |
| print("AI upscaler loaded (bicubic fallback)") | |
| except Exception as e: | |
| print(f"Error loading upscaler: {e}") | |
| self.upscaler = None | |
| def downscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray: | |
| """Downscale image to target resolution for processing""" | |
| height, width = image.shape[:2] | |
| target_width, target_height = target_resolution | |
| # Calculate scaling factor | |
| scale_factor = min(target_width / width, target_height / height) | |
| if scale_factor >= 1.0: | |
| return image # No downscaling needed | |
| # Calculate new dimensions | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| # Use high-quality downscaling | |
| downscaled = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) | |
| return downscaled | |
| def upscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray: | |
| """Upscale image using AI or high-quality interpolation""" | |
| height, width = image.shape[:2] | |
| target_width, target_height = target_resolution | |
| # Calculate scaling factor | |
| scale_factor = min(target_width / width, target_height / height) | |
| if scale_factor <= 1.0: | |
| return image # No upscaling needed | |
| # Calculate new dimensions | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| if self.upscaler and self.upscaler != "bicubic": | |
| # Use AI upscaling if available | |
| return self._ai_upscale(image, (new_width, new_height)) | |
| else: | |
| # Use high-quality bicubic upscaling | |
| return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_CUBIC) | |
| def _ai_upscale(self, image: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray: | |
| """AI-powered upscaling (placeholder for Real-ESRGAN integration)""" | |
| # For now, use bicubic as fallback | |
| # In production, integrate Real-ESRGAN or similar | |
| return cv2.resize(image, target_size, interpolation=cv2.INTER_CUBIC) | |
| def _get_frame_hash(self, frame: np.ndarray) -> str: | |
| """Generate a hash for frame content to detect scene changes""" | |
| # Resize frame to small size for faster hashing | |
| small_frame = cv2.resize(frame, (64, 64)) | |
| frame_bytes = small_frame.tobytes() | |
| return hashlib.md5(frame_bytes).hexdigest() | |
| def _detect_scene_change(self, current_frame: np.ndarray) -> bool: | |
| """Detect if there's a significant scene change""" | |
| if self.last_frame_hash is None: | |
| self.last_frame_hash = self._get_frame_hash(current_frame) | |
| return True | |
| current_hash = self._get_frame_hash(current_frame) | |
| # Calculate structural similarity | |
| if self.enable_depth_caching: | |
| # Convert to grayscale for comparison | |
| gray_current = cv2.cvtColor(current_frame, cv2.COLOR_RGB2GRAY) | |
| gray_last = cv2.cvtColor(self.last_frame_hash, cv2.COLOR_RGB2GRAY) if hasattr(self, 'last_frame') else None | |
| if gray_last is not None: | |
| # Calculate structural similarity index | |
| from skimage.metrics import structural_similarity as ssim | |
| try: | |
| similarity = ssim(gray_current, gray_last) | |
| scene_change = similarity < (1.0 - self.scene_change_threshold) | |
| except: | |
| # Fallback to hash comparison | |
| scene_change = current_hash != self.last_frame_hash | |
| else: | |
| scene_change = current_hash != self.last_frame_hash | |
| else: | |
| scene_change = current_hash != self.last_frame_hash | |
| if scene_change: | |
| self.last_frame_hash = current_hash | |
| self.last_frame = current_frame.copy() | |
| return scene_change | |
| def _get_memory_usage_mb(self) -> float: | |
| """Get current memory usage in MB""" | |
| import psutil | |
| process = psutil.Process() | |
| return process.memory_info().rss / 1024 / 1024 | |
| def _manage_memory(self): | |
| """Manage memory usage by clearing caches if needed""" | |
| current_memory = self._get_memory_usage_mb() | |
| if current_memory > self.max_memory_mb: | |
| # Clear oldest cache entries | |
| if len(self.depth_cache) > 10: | |
| # Remove oldest 50% of depth cache | |
| keys_to_remove = list(self.depth_cache.keys())[:len(self.depth_cache)//2] | |
| for key in keys_to_remove: | |
| del self.depth_cache[key] | |
| if len(self.frame_cache) > 5: | |
| # Remove oldest 50% of frame cache | |
| keys_to_remove = list(self.frame_cache.keys())[:len(self.frame_cache)//2] | |
| for key in keys_to_remove: | |
| del self.frame_cache[key] | |
| # Force garbage collection | |
| gc.collect() | |
| print(f"Memory management: Cleared caches, current usage: {self._get_memory_usage_mb():.1f}MB") | |
| def _get_cached_depth(self, frame_hash: str, frame: np.ndarray) -> Optional[np.ndarray]: | |
| """Get cached depth map if available""" | |
| if not self.enable_depth_caching or frame_hash not in self.depth_cache: | |
| return None | |
| # Verify the cached depth is still valid | |
| cached_depth = self.depth_cache[frame_hash] | |
| if cached_depth.shape[:2] == frame.shape[:2]: | |
| return cached_depth.copy() | |
| return None | |
| def _cache_depth(self, frame_hash: str, depth: np.ndarray): | |
| """Cache depth map for reuse""" | |
| if self.enable_depth_caching: | |
| self.depth_cache[frame_hash] = depth.copy() | |
| self.memory_usage += depth.nbytes / 1024 / 1024 # Track memory usage | |
| def _get_cached_frame(self, frame_hash: str) -> Optional[np.ndarray]: | |
| """Get cached processed frame if available""" | |
| if frame_hash in self.frame_cache: | |
| return self.frame_cache[frame_hash].copy() | |
| return None | |
| def _cache_frame(self, frame_hash: str, frame: np.ndarray): | |
| """Cache processed frame for reuse""" | |
| self.frame_cache[frame_hash] = frame.copy() | |
| self.memory_usage += frame.nbytes / 1024 / 1024 | |
| def create_video_chunks(self, input_path: str, temp_dir: str) -> List[Dict]: | |
| """Split video into chunks for parallel processing""" | |
| try: | |
| video = mp.VideoFileClip(input_path) | |
| duration = video.duration | |
| fps = video.fps | |
| chunks = [] | |
| chunk_count = int(np.ceil(duration / self.chunk_duration)) | |
| print(f"Creating {chunk_count} chunks of {self.chunk_duration}s each") | |
| for i in range(chunk_count): | |
| start_time = i * self.chunk_duration | |
| end_time = min((i + 1) * self.chunk_duration, duration) | |
| chunk_path = os.path.join(temp_dir, f"chunk_{i:03d}.mp4") | |
| # Extract chunk | |
| chunk = video.subclip(start_time, end_time) | |
| chunk.write_videofile(chunk_path, verbose=False, logger=None) | |
| chunk.close() | |
| chunks.append({ | |
| 'index': i, | |
| 'path': chunk_path, | |
| 'start_time': start_time, | |
| 'end_time': end_time, | |
| 'duration': end_time - start_time | |
| }) | |
| video.close() | |
| return chunks | |
| except Exception as e: | |
| print(f"Error creating video chunks: {e}") | |
| return [] | |
| def process_chunk_worker(self, chunk_info: Dict, config: Dict) -> Dict: | |
| """Worker function for processing a single chunk""" | |
| try: | |
| # Create a new processor instance for this worker | |
| processor = VideoProcessor( | |
| processing_resolution=tuple(config['processing_resolution']), | |
| upscale_factor=config['upscale_factor'], | |
| enable_ai_upscaling=config['enable_ai_upscaling'], | |
| depth_fps=config['depth_fps'], | |
| enable_frame_interpolation=config['enable_frame_interpolation'] | |
| ) | |
| # Process the chunk | |
| output_path = chunk_info['path'].replace('.mp4', '_processed.mp4') | |
| result = processor.process_video( | |
| chunk_info['path'], | |
| output_path | |
| ) | |
| if result['success']: | |
| return { | |
| 'success': True, | |
| 'chunk_index': chunk_info['index'], | |
| 'output_path': output_path, | |
| 'start_time': chunk_info['start_time'], | |
| 'end_time': chunk_info['end_time'] | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'chunk_index': chunk_info['index'], | |
| 'error': result['error'] | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'chunk_index': chunk_info['index'], | |
| 'error': str(e) | |
| } | |
| def stitch_chunks_with_ffmpeg(self, chunk_results: List[Dict], output_path: str, fps: float) -> bool: | |
| """Stitch processed chunks back together using FFmpeg""" | |
| try: | |
| # Filter successful chunks and sort by index | |
| successful_chunks = [r for r in chunk_results if r['success']] | |
| successful_chunks.sort(key=lambda x: x['chunk_index']) | |
| if not successful_chunks: | |
| print("No successful chunks to stitch") | |
| return False | |
| # Create file list for FFmpeg concat | |
| concat_file = os.path.join(os.path.dirname(output_path), "concat_list.txt") | |
| with open(concat_file, 'w') as f: | |
| for chunk in successful_chunks: | |
| f.write(f"file '{chunk['output_path']}'\n") | |
| # Use FFmpeg to concatenate chunks | |
| cmd = [ | |
| 'ffmpeg', '-y', # -y to overwrite output file | |
| '-f', 'concat', | |
| '-safe', '0', | |
| '-i', concat_file, | |
| '-c', 'copy', # Copy streams without re-encoding | |
| '-r', str(fps), # Set output frame rate | |
| output_path | |
| ] | |
| print(f"Stitching chunks with FFmpeg: {' '.join(cmd)}") | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print("Successfully stitched chunks") | |
| # Clean up concat file | |
| os.remove(concat_file) | |
| return True | |
| else: | |
| print(f"FFmpeg error: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| print(f"Error stitching chunks: {e}") | |
| return False | |
| def estimate_depth(self, image: np.ndarray, frame_hash: str = None) -> np.ndarray: | |
| """Estimate depth map for a single frame with caching""" | |
| # Check cache first | |
| if frame_hash and self.enable_depth_caching: | |
| cached_depth = self._get_cached_depth(frame_hash, image) | |
| if cached_depth is not None: | |
| return cached_depth | |
| if self.depth_estimator is None: | |
| # Fallback: create a simple depth map based on image gradients | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| depth = cv2.Laplacian(gray, cv2.CV_64F) | |
| depth = np.abs(depth) | |
| depth = cv2.GaussianBlur(depth, (5, 5), 0) | |
| depth = (depth - depth.min()) / (depth.max() - depth.min()) | |
| else: | |
| try: | |
| # Convert to PIL Image for the model | |
| pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
| # Get depth prediction | |
| result = self.depth_estimator(pil_image) | |
| depth = np.array(result['depth']) | |
| # Normalize depth map | |
| depth = (depth - depth.min()) / (depth.max() - depth.min()) | |
| except Exception as e: | |
| print(f"Error in depth estimation: {e}") | |
| # Fallback to gradient-based depth | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| depth = cv2.Laplacian(gray, cv2.CV_64F) | |
| depth = np.abs(depth) | |
| depth = cv2.GaussianBlur(depth, (5, 5), 0) | |
| depth = (depth - depth.min()) / (depth.max() - depth.min()) | |
| # Cache the result | |
| if frame_hash and self.enable_depth_caching: | |
| self._cache_depth(frame_hash, depth) | |
| return depth | |
| def create_stereo_pair(self, image: np.ndarray, depth: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """Create left and right eye views for VR180""" | |
| height, width = image.shape[:2] | |
| # Create disparity map (inverse of depth for stereo effect) | |
| disparity = 1.0 - depth | |
| disparity = disparity * 30 # Scale disparity | |
| # Create left and right views | |
| left_view = image.copy() | |
| right_view = image.copy() | |
| # Apply horizontal shift based on disparity | |
| for y in range(height): | |
| for x in range(width): | |
| shift = int(disparity[y, x]) | |
| # Left view: shift pixels to the right | |
| if x + shift < width: | |
| left_view[y, x] = image[y, min(x + shift, width - 1)] | |
| # Right view: shift pixels to the left | |
| if x - shift >= 0: | |
| right_view[y, x] = image[y, max(x - shift, 0)] | |
| return left_view, right_view | |
| def create_vr180_frame(self, left_view: np.ndarray, right_view: np.ndarray) -> np.ndarray: | |
| """Combine left and right views into VR180 format""" | |
| height, width = left_view.shape[:2] | |
| # Create VR180 frame (side-by-side) | |
| vr180_frame = np.zeros((height, width * 2, 3), dtype=np.uint8) | |
| # Place left view on the left half | |
| vr180_frame[:, :width] = left_view | |
| # Place right view on the right half | |
| vr180_frame[:, width:] = right_view | |
| return vr180_frame | |
| def process_video(self, input_path: str, output_path: str, progress_callback=None, use_chunking: bool = True) -> Dict: | |
| """Process video from 2D to VR180 with chunking, downscaling and upscaling""" | |
| start_time = time.time() | |
| try: | |
| # Load video to get metadata | |
| video = mp.VideoFileClip(input_path) | |
| fps = video.fps | |
| duration = video.duration | |
| original_size = video.size | |
| video.close() | |
| print(f"Processing video: {input_path}") | |
| print(f"Original resolution: {original_size}") | |
| print(f"Processing resolution: {self.processing_resolution}") | |
| print(f"Duration: {duration}s, FPS: {fps}") | |
| # Decide whether to use chunking based on video duration | |
| if use_chunking and duration > self.chunk_duration: | |
| return self._process_video_chunked(input_path, output_path, fps, original_size, progress_callback) | |
| else: | |
| return self._process_video_single(input_path, output_path, fps, original_size, progress_callback) | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _process_video_chunked(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict: | |
| """Process video using chunked parallel processing""" | |
| try: | |
| # Create temporary directory for chunks | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| print(f"Using temporary directory: {temp_dir}") | |
| # Step 1: Create video chunks | |
| if progress_callback: | |
| progress_callback(0.1, desc="Creating video chunks...") | |
| chunks = self.create_video_chunks(input_path, temp_dir) | |
| if not chunks: | |
| return {'success': False, 'error': 'Failed to create video chunks'} | |
| print(f"Created {len(chunks)} chunks") | |
| # Step 2: Process chunks in parallel | |
| if progress_callback: | |
| progress_callback(0.2, desc="Processing chunks in parallel...") | |
| # Prepare configuration for workers | |
| config = { | |
| 'processing_resolution': self.processing_resolution, | |
| 'upscale_factor': self.upscale_factor, | |
| 'enable_ai_upscaling': self.enable_ai_upscaling, | |
| 'depth_fps': self.depth_fps, | |
| 'enable_frame_interpolation': self.enable_frame_interpolation | |
| } | |
| # Process chunks in parallel | |
| chunk_results = [] | |
| with ProcessPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit all chunk processing tasks | |
| future_to_chunk = { | |
| executor.submit(self.process_chunk_worker, chunk, config): chunk | |
| for chunk in chunks | |
| } | |
| # Collect results as they complete | |
| completed_chunks = 0 | |
| for future in as_completed(future_to_chunk): | |
| result = future.result() | |
| chunk_results.append(result) | |
| completed_chunks += 1 | |
| if progress_callback: | |
| progress = 0.2 + (0.6 * completed_chunks / len(chunks)) | |
| progress_callback(progress, desc=f"Processed {completed_chunks}/{len(chunks)} chunks") | |
| if result['success']: | |
| print(f"✅ Chunk {result['chunk_index']} completed successfully") | |
| else: | |
| print(f"❌ Chunk {result['chunk_index']} failed: {result['error']}") | |
| # Step 3: Stitch chunks together | |
| if progress_callback: | |
| progress_callback(0.8, desc="Stitching chunks together...") | |
| success = self.stitch_chunks_with_ffmpeg(chunk_results, output_path, fps) | |
| if not success: | |
| return {'success': False, 'error': 'Failed to stitch chunks together'} | |
| # Step 4: Clean up chunk files | |
| for chunk in chunks: | |
| if os.path.exists(chunk['path']): | |
| os.remove(chunk['path']) | |
| processed_path = chunk['path'].replace('.mp4', '_processed.mp4') | |
| if os.path.exists(processed_path): | |
| os.remove(processed_path) | |
| processing_time = time.time() - start_time | |
| if progress_callback: | |
| progress_callback(1.0, desc="Processing complete!") | |
| return { | |
| 'success': True, | |
| 'processing_time': processing_time, | |
| 'output_path': output_path, | |
| 'original_resolution': original_size, | |
| 'processing_resolution': self.processing_resolution, | |
| 'chunks_processed': len([r for r in chunk_results if r['success']]), | |
| 'total_chunks': len(chunks), | |
| 'upscale_factor': self.upscale_factor | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| def _process_video_single(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict: | |
| """Process video as a single unit (fallback for short videos)""" | |
| try: | |
| # Load video | |
| video = mp.VideoFileClip(input_path) | |
| # Calculate target output resolution | |
| target_width = int(original_size[0] * self.upscale_factor) | |
| target_height = int(original_size[1] * self.upscale_factor) | |
| target_resolution = (target_width, target_height) | |
| print(f"Target output resolution: {target_resolution}") | |
| # Process frames | |
| processed_frames = [] | |
| total_frames = int(video.duration * fps) | |
| for i, frame in enumerate(video.iter_frames()): | |
| if i % 10 == 0: # Print progress every 10 frames | |
| progress = i / total_frames | |
| if progress_callback: | |
| progress_callback(progress, desc=f"Processing frame {i}/{total_frames}") | |
| print(f"Processing frame {i}/{total_frames}") | |
| # Convert frame to numpy array | |
| frame_array = np.array(frame) | |
| # Generate frame hash for caching | |
| frame_hash = self._get_frame_hash(frame_array) | |
| # Check if we can reuse cached frame | |
| cached_frame = self._get_cached_frame(frame_hash) | |
| if cached_frame is not None: | |
| processed_frames.append(cached_frame) | |
| continue | |
| # Detect scene change for depth caching optimization | |
| scene_changed = self._detect_scene_change(frame_array) | |
| # Step 1: Downscale for processing | |
| downscaled_frame = self.downscale_image(frame_array, self.processing_resolution) | |
| # Step 2: Estimate depth on downscaled frame (with caching) | |
| depth = self.estimate_depth(downscaled_frame, frame_hash if scene_changed else None) | |
| # Step 3: Create stereo pair on downscaled frame | |
| left_view, right_view = self.create_stereo_pair(downscaled_frame, depth) | |
| # Step 4: Upscale the stereo pair to target resolution | |
| left_upscaled = self.upscale_image(left_view, target_resolution) | |
| right_upscaled = self.upscale_image(right_view, target_resolution) | |
| # Step 5: Create VR180 frame | |
| vr180_frame = self.create_vr180_frame(left_upscaled, right_upscaled) | |
| # Cache the processed frame | |
| self._cache_frame(frame_hash, vr180_frame) | |
| processed_frames.append(vr180_frame) | |
| # Manage memory usage | |
| if i % 50 == 0: # Check memory every 50 frames | |
| self._manage_memory() | |
| # Save processed video | |
| if processed_frames: | |
| if progress_callback: | |
| progress_callback(0.9, desc="Saving processed video...") | |
| # Create video writer | |
| height, width = processed_frames[0].shape[:2] | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| for frame in processed_frames: | |
| # Convert RGB to BGR for OpenCV | |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| out.write(frame_bgr) | |
| out.release() | |
| if progress_callback: | |
| progress_callback(1.0, desc="Processing complete!") | |
| video.close() | |
| processing_time = time.time() - start_time | |
| return { | |
| 'success': True, | |
| 'processing_time': processing_time, | |
| 'output_path': output_path, | |
| 'original_resolution': original_size, | |
| 'processing_resolution': self.processing_resolution, | |
| 'output_resolution': (width, height), | |
| 'upscale_factor': self.upscale_factor | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def create_preview_frame(self, input_path: str) -> np.ndarray: | |
| """Create a preview frame for the UI""" | |
| try: | |
| video = mp.VideoFileClip(input_path) | |
| frame = video.get_frame(0) # Get first frame | |
| video.close() | |
| # Process the frame | |
| frame_array = np.array(frame) | |
| depth = self.estimate_depth(frame_array) | |
| left_view, right_view = self.create_stereo_pair(frame_array, depth) | |
| vr180_frame = self.create_vr180_frame(left_view, right_view) | |
| return vr180_frame | |
| except Exception as e: | |
| print(f"Error creating preview: {e}") | |
| return None | |