vr180-converter / backend /video_processor.py
Prithwis's picture
Force update backend\video_processor.py - 1757363038
41956f0 verified
import cv2
import numpy as np
from PIL import Image
import moviepy.editor as mp
import time
import os
from typing import Dict, Tuple, Optional, List
import torch
from transformers import pipeline
import requests
from io import BytesIO
import multiprocessing as mp_proc
from concurrent.futures import ProcessPoolExecutor, as_completed
import tempfile
import subprocess
import json
import hashlib
from collections import defaultdict
import gc
# Removed model_manager import to avoid GIL issues
class VideoProcessor:
def __init__(self, processing_resolution: Tuple[int, int] = (1280, 720),
upscale_factor: float = 1.5, enable_ai_upscaling: bool = True,
depth_fps: int = 24, enable_frame_interpolation: bool = True,
chunk_duration: int = 15, max_workers: int = 4,
enable_depth_caching: bool = True, scene_change_threshold: float = 0.1):
self.depth_estimator = None
self.upscaler = None
self.processing_resolution = processing_resolution
self.upscale_factor = upscale_factor
self.enable_ai_upscaling = enable_ai_upscaling
self.depth_fps = depth_fps
self.enable_frame_interpolation = enable_frame_interpolation
self.chunk_duration = chunk_duration # seconds per chunk
self.max_workers = max_workers
self.enable_depth_caching = enable_depth_caching
self.scene_change_threshold = scene_change_threshold
# Memory optimization and caching
self.depth_cache = {} # Cache for depth maps
self.frame_cache = {} # Cache for processed frames
self.scene_changes = [] # Track scene change points
self.last_frame_hash = None
self.memory_usage = 0
self.max_memory_mb = 2048 # Maximum memory usage in MB
self._load_models()
def _load_models(self):
"""Load depth estimation and upscaling models directly"""
try:
# Load depth estimation model directly
print("Loading depth estimation model...")
self.depth_estimator = pipeline(
"depth-estimation",
model="Intel/dpt-hybrid-midas",
device="cuda" if torch.cuda.is_available() else "cpu",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
print("✅ Depth estimation model loaded")
except Exception as e:
print(f"❌ Error loading depth model: {e}")
self.depth_estimator = None
# Load upscaler (bicubic fallback for now)
self._load_upscaler()
def _load_upscaler(self):
"""Load AI upscaling model"""
try:
# For now, we'll use a simple bicubic upscaling
# In production, you could integrate Real-ESRGAN or similar
self.upscaler = "bicubic" # Placeholder for now
print("AI upscaler loaded (bicubic fallback)")
except Exception as e:
print(f"Error loading upscaler: {e}")
self.upscaler = None
def downscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray:
"""Downscale image to target resolution for processing"""
height, width = image.shape[:2]
target_width, target_height = target_resolution
# Calculate scaling factor
scale_factor = min(target_width / width, target_height / height)
if scale_factor >= 1.0:
return image # No downscaling needed
# Calculate new dimensions
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
# Use high-quality downscaling
downscaled = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
return downscaled
def upscale_image(self, image: np.ndarray, target_resolution: Tuple[int, int]) -> np.ndarray:
"""Upscale image using AI or high-quality interpolation"""
height, width = image.shape[:2]
target_width, target_height = target_resolution
# Calculate scaling factor
scale_factor = min(target_width / width, target_height / height)
if scale_factor <= 1.0:
return image # No upscaling needed
# Calculate new dimensions
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
if self.upscaler and self.upscaler != "bicubic":
# Use AI upscaling if available
return self._ai_upscale(image, (new_width, new_height))
else:
# Use high-quality bicubic upscaling
return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
def _ai_upscale(self, image: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray:
"""AI-powered upscaling (placeholder for Real-ESRGAN integration)"""
# For now, use bicubic as fallback
# In production, integrate Real-ESRGAN or similar
return cv2.resize(image, target_size, interpolation=cv2.INTER_CUBIC)
def _get_frame_hash(self, frame: np.ndarray) -> str:
"""Generate a hash for frame content to detect scene changes"""
# Resize frame to small size for faster hashing
small_frame = cv2.resize(frame, (64, 64))
frame_bytes = small_frame.tobytes()
return hashlib.md5(frame_bytes).hexdigest()
def _detect_scene_change(self, current_frame: np.ndarray) -> bool:
"""Detect if there's a significant scene change"""
if self.last_frame_hash is None:
self.last_frame_hash = self._get_frame_hash(current_frame)
return True
current_hash = self._get_frame_hash(current_frame)
# Calculate structural similarity
if self.enable_depth_caching:
# Convert to grayscale for comparison
gray_current = cv2.cvtColor(current_frame, cv2.COLOR_RGB2GRAY)
gray_last = cv2.cvtColor(self.last_frame_hash, cv2.COLOR_RGB2GRAY) if hasattr(self, 'last_frame') else None
if gray_last is not None:
# Calculate structural similarity index
from skimage.metrics import structural_similarity as ssim
try:
similarity = ssim(gray_current, gray_last)
scene_change = similarity < (1.0 - self.scene_change_threshold)
except:
# Fallback to hash comparison
scene_change = current_hash != self.last_frame_hash
else:
scene_change = current_hash != self.last_frame_hash
else:
scene_change = current_hash != self.last_frame_hash
if scene_change:
self.last_frame_hash = current_hash
self.last_frame = current_frame.copy()
return scene_change
def _get_memory_usage_mb(self) -> float:
"""Get current memory usage in MB"""
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
def _manage_memory(self):
"""Manage memory usage by clearing caches if needed"""
current_memory = self._get_memory_usage_mb()
if current_memory > self.max_memory_mb:
# Clear oldest cache entries
if len(self.depth_cache) > 10:
# Remove oldest 50% of depth cache
keys_to_remove = list(self.depth_cache.keys())[:len(self.depth_cache)//2]
for key in keys_to_remove:
del self.depth_cache[key]
if len(self.frame_cache) > 5:
# Remove oldest 50% of frame cache
keys_to_remove = list(self.frame_cache.keys())[:len(self.frame_cache)//2]
for key in keys_to_remove:
del self.frame_cache[key]
# Force garbage collection
gc.collect()
print(f"Memory management: Cleared caches, current usage: {self._get_memory_usage_mb():.1f}MB")
def _get_cached_depth(self, frame_hash: str, frame: np.ndarray) -> Optional[np.ndarray]:
"""Get cached depth map if available"""
if not self.enable_depth_caching or frame_hash not in self.depth_cache:
return None
# Verify the cached depth is still valid
cached_depth = self.depth_cache[frame_hash]
if cached_depth.shape[:2] == frame.shape[:2]:
return cached_depth.copy()
return None
def _cache_depth(self, frame_hash: str, depth: np.ndarray):
"""Cache depth map for reuse"""
if self.enable_depth_caching:
self.depth_cache[frame_hash] = depth.copy()
self.memory_usage += depth.nbytes / 1024 / 1024 # Track memory usage
def _get_cached_frame(self, frame_hash: str) -> Optional[np.ndarray]:
"""Get cached processed frame if available"""
if frame_hash in self.frame_cache:
return self.frame_cache[frame_hash].copy()
return None
def _cache_frame(self, frame_hash: str, frame: np.ndarray):
"""Cache processed frame for reuse"""
self.frame_cache[frame_hash] = frame.copy()
self.memory_usage += frame.nbytes / 1024 / 1024
def create_video_chunks(self, input_path: str, temp_dir: str) -> List[Dict]:
"""Split video into chunks for parallel processing"""
try:
video = mp.VideoFileClip(input_path)
duration = video.duration
fps = video.fps
chunks = []
chunk_count = int(np.ceil(duration / self.chunk_duration))
print(f"Creating {chunk_count} chunks of {self.chunk_duration}s each")
for i in range(chunk_count):
start_time = i * self.chunk_duration
end_time = min((i + 1) * self.chunk_duration, duration)
chunk_path = os.path.join(temp_dir, f"chunk_{i:03d}.mp4")
# Extract chunk
chunk = video.subclip(start_time, end_time)
chunk.write_videofile(chunk_path, verbose=False, logger=None)
chunk.close()
chunks.append({
'index': i,
'path': chunk_path,
'start_time': start_time,
'end_time': end_time,
'duration': end_time - start_time
})
video.close()
return chunks
except Exception as e:
print(f"Error creating video chunks: {e}")
return []
def process_chunk_worker(self, chunk_info: Dict, config: Dict) -> Dict:
"""Worker function for processing a single chunk"""
try:
# Create a new processor instance for this worker
processor = VideoProcessor(
processing_resolution=tuple(config['processing_resolution']),
upscale_factor=config['upscale_factor'],
enable_ai_upscaling=config['enable_ai_upscaling'],
depth_fps=config['depth_fps'],
enable_frame_interpolation=config['enable_frame_interpolation']
)
# Process the chunk
output_path = chunk_info['path'].replace('.mp4', '_processed.mp4')
result = processor.process_video(
chunk_info['path'],
output_path
)
if result['success']:
return {
'success': True,
'chunk_index': chunk_info['index'],
'output_path': output_path,
'start_time': chunk_info['start_time'],
'end_time': chunk_info['end_time']
}
else:
return {
'success': False,
'chunk_index': chunk_info['index'],
'error': result['error']
}
except Exception as e:
return {
'success': False,
'chunk_index': chunk_info['index'],
'error': str(e)
}
def stitch_chunks_with_ffmpeg(self, chunk_results: List[Dict], output_path: str, fps: float) -> bool:
"""Stitch processed chunks back together using FFmpeg"""
try:
# Filter successful chunks and sort by index
successful_chunks = [r for r in chunk_results if r['success']]
successful_chunks.sort(key=lambda x: x['chunk_index'])
if not successful_chunks:
print("No successful chunks to stitch")
return False
# Create file list for FFmpeg concat
concat_file = os.path.join(os.path.dirname(output_path), "concat_list.txt")
with open(concat_file, 'w') as f:
for chunk in successful_chunks:
f.write(f"file '{chunk['output_path']}'\n")
# Use FFmpeg to concatenate chunks
cmd = [
'ffmpeg', '-y', # -y to overwrite output file
'-f', 'concat',
'-safe', '0',
'-i', concat_file,
'-c', 'copy', # Copy streams without re-encoding
'-r', str(fps), # Set output frame rate
output_path
]
print(f"Stitching chunks with FFmpeg: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("Successfully stitched chunks")
# Clean up concat file
os.remove(concat_file)
return True
else:
print(f"FFmpeg error: {result.stderr}")
return False
except Exception as e:
print(f"Error stitching chunks: {e}")
return False
def estimate_depth(self, image: np.ndarray, frame_hash: str = None) -> np.ndarray:
"""Estimate depth map for a single frame with caching"""
# Check cache first
if frame_hash and self.enable_depth_caching:
cached_depth = self._get_cached_depth(frame_hash, image)
if cached_depth is not None:
return cached_depth
if self.depth_estimator is None:
# Fallback: create a simple depth map based on image gradients
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
depth = cv2.Laplacian(gray, cv2.CV_64F)
depth = np.abs(depth)
depth = cv2.GaussianBlur(depth, (5, 5), 0)
depth = (depth - depth.min()) / (depth.max() - depth.min())
else:
try:
# Convert to PIL Image for the model
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# Get depth prediction
result = self.depth_estimator(pil_image)
depth = np.array(result['depth'])
# Normalize depth map
depth = (depth - depth.min()) / (depth.max() - depth.min())
except Exception as e:
print(f"Error in depth estimation: {e}")
# Fallback to gradient-based depth
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
depth = cv2.Laplacian(gray, cv2.CV_64F)
depth = np.abs(depth)
depth = cv2.GaussianBlur(depth, (5, 5), 0)
depth = (depth - depth.min()) / (depth.max() - depth.min())
# Cache the result
if frame_hash and self.enable_depth_caching:
self._cache_depth(frame_hash, depth)
return depth
def create_stereo_pair(self, image: np.ndarray, depth: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Create left and right eye views for VR180"""
height, width = image.shape[:2]
# Create disparity map (inverse of depth for stereo effect)
disparity = 1.0 - depth
disparity = disparity * 30 # Scale disparity
# Create left and right views
left_view = image.copy()
right_view = image.copy()
# Apply horizontal shift based on disparity
for y in range(height):
for x in range(width):
shift = int(disparity[y, x])
# Left view: shift pixels to the right
if x + shift < width:
left_view[y, x] = image[y, min(x + shift, width - 1)]
# Right view: shift pixels to the left
if x - shift >= 0:
right_view[y, x] = image[y, max(x - shift, 0)]
return left_view, right_view
def create_vr180_frame(self, left_view: np.ndarray, right_view: np.ndarray) -> np.ndarray:
"""Combine left and right views into VR180 format"""
height, width = left_view.shape[:2]
# Create VR180 frame (side-by-side)
vr180_frame = np.zeros((height, width * 2, 3), dtype=np.uint8)
# Place left view on the left half
vr180_frame[:, :width] = left_view
# Place right view on the right half
vr180_frame[:, width:] = right_view
return vr180_frame
def process_video(self, input_path: str, output_path: str, progress_callback=None, use_chunking: bool = True) -> Dict:
"""Process video from 2D to VR180 with chunking, downscaling and upscaling"""
start_time = time.time()
try:
# Load video to get metadata
video = mp.VideoFileClip(input_path)
fps = video.fps
duration = video.duration
original_size = video.size
video.close()
print(f"Processing video: {input_path}")
print(f"Original resolution: {original_size}")
print(f"Processing resolution: {self.processing_resolution}")
print(f"Duration: {duration}s, FPS: {fps}")
# Decide whether to use chunking based on video duration
if use_chunking and duration > self.chunk_duration:
return self._process_video_chunked(input_path, output_path, fps, original_size, progress_callback)
else:
return self._process_video_single(input_path, output_path, fps, original_size, progress_callback)
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _process_video_chunked(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict:
"""Process video using chunked parallel processing"""
try:
# Create temporary directory for chunks
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Using temporary directory: {temp_dir}")
# Step 1: Create video chunks
if progress_callback:
progress_callback(0.1, desc="Creating video chunks...")
chunks = self.create_video_chunks(input_path, temp_dir)
if not chunks:
return {'success': False, 'error': 'Failed to create video chunks'}
print(f"Created {len(chunks)} chunks")
# Step 2: Process chunks in parallel
if progress_callback:
progress_callback(0.2, desc="Processing chunks in parallel...")
# Prepare configuration for workers
config = {
'processing_resolution': self.processing_resolution,
'upscale_factor': self.upscale_factor,
'enable_ai_upscaling': self.enable_ai_upscaling,
'depth_fps': self.depth_fps,
'enable_frame_interpolation': self.enable_frame_interpolation
}
# Process chunks in parallel
chunk_results = []
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all chunk processing tasks
future_to_chunk = {
executor.submit(self.process_chunk_worker, chunk, config): chunk
for chunk in chunks
}
# Collect results as they complete
completed_chunks = 0
for future in as_completed(future_to_chunk):
result = future.result()
chunk_results.append(result)
completed_chunks += 1
if progress_callback:
progress = 0.2 + (0.6 * completed_chunks / len(chunks))
progress_callback(progress, desc=f"Processed {completed_chunks}/{len(chunks)} chunks")
if result['success']:
print(f"✅ Chunk {result['chunk_index']} completed successfully")
else:
print(f"❌ Chunk {result['chunk_index']} failed: {result['error']}")
# Step 3: Stitch chunks together
if progress_callback:
progress_callback(0.8, desc="Stitching chunks together...")
success = self.stitch_chunks_with_ffmpeg(chunk_results, output_path, fps)
if not success:
return {'success': False, 'error': 'Failed to stitch chunks together'}
# Step 4: Clean up chunk files
for chunk in chunks:
if os.path.exists(chunk['path']):
os.remove(chunk['path'])
processed_path = chunk['path'].replace('.mp4', '_processed.mp4')
if os.path.exists(processed_path):
os.remove(processed_path)
processing_time = time.time() - start_time
if progress_callback:
progress_callback(1.0, desc="Processing complete!")
return {
'success': True,
'processing_time': processing_time,
'output_path': output_path,
'original_resolution': original_size,
'processing_resolution': self.processing_resolution,
'chunks_processed': len([r for r in chunk_results if r['success']]),
'total_chunks': len(chunks),
'upscale_factor': self.upscale_factor
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _process_video_single(self, input_path: str, output_path: str, fps: float, original_size: Tuple, progress_callback=None) -> Dict:
"""Process video as a single unit (fallback for short videos)"""
try:
# Load video
video = mp.VideoFileClip(input_path)
# Calculate target output resolution
target_width = int(original_size[0] * self.upscale_factor)
target_height = int(original_size[1] * self.upscale_factor)
target_resolution = (target_width, target_height)
print(f"Target output resolution: {target_resolution}")
# Process frames
processed_frames = []
total_frames = int(video.duration * fps)
for i, frame in enumerate(video.iter_frames()):
if i % 10 == 0: # Print progress every 10 frames
progress = i / total_frames
if progress_callback:
progress_callback(progress, desc=f"Processing frame {i}/{total_frames}")
print(f"Processing frame {i}/{total_frames}")
# Convert frame to numpy array
frame_array = np.array(frame)
# Generate frame hash for caching
frame_hash = self._get_frame_hash(frame_array)
# Check if we can reuse cached frame
cached_frame = self._get_cached_frame(frame_hash)
if cached_frame is not None:
processed_frames.append(cached_frame)
continue
# Detect scene change for depth caching optimization
scene_changed = self._detect_scene_change(frame_array)
# Step 1: Downscale for processing
downscaled_frame = self.downscale_image(frame_array, self.processing_resolution)
# Step 2: Estimate depth on downscaled frame (with caching)
depth = self.estimate_depth(downscaled_frame, frame_hash if scene_changed else None)
# Step 3: Create stereo pair on downscaled frame
left_view, right_view = self.create_stereo_pair(downscaled_frame, depth)
# Step 4: Upscale the stereo pair to target resolution
left_upscaled = self.upscale_image(left_view, target_resolution)
right_upscaled = self.upscale_image(right_view, target_resolution)
# Step 5: Create VR180 frame
vr180_frame = self.create_vr180_frame(left_upscaled, right_upscaled)
# Cache the processed frame
self._cache_frame(frame_hash, vr180_frame)
processed_frames.append(vr180_frame)
# Manage memory usage
if i % 50 == 0: # Check memory every 50 frames
self._manage_memory()
# Save processed video
if processed_frames:
if progress_callback:
progress_callback(0.9, desc="Saving processed video...")
# Create video writer
height, width = processed_frames[0].shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
for frame in processed_frames:
# Convert RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
out.write(frame_bgr)
out.release()
if progress_callback:
progress_callback(1.0, desc="Processing complete!")
video.close()
processing_time = time.time() - start_time
return {
'success': True,
'processing_time': processing_time,
'output_path': output_path,
'original_resolution': original_size,
'processing_resolution': self.processing_resolution,
'output_resolution': (width, height),
'upscale_factor': self.upscale_factor
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def create_preview_frame(self, input_path: str) -> np.ndarray:
"""Create a preview frame for the UI"""
try:
video = mp.VideoFileClip(input_path)
frame = video.get_frame(0) # Get first frame
video.close()
# Process the frame
frame_array = np.array(frame)
depth = self.estimate_depth(frame_array)
left_view, right_view = self.create_stereo_pair(frame_array, depth)
vr180_frame = self.create_vr180_frame(left_view, right_view)
return vr180_frame
except Exception as e:
print(f"Error creating preview: {e}")
return None