Spaces:
Sleeping
Sleeping
| """ | |
| Async video processor with WebSocket support for real-time updates | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| from typing import Dict, Optional | |
| from fastapi import WebSocket, WebSocketDisconnect | |
| from fastapi.responses import StreamingResponse | |
| import cv2 | |
| import numpy as np | |
| from video_processor import VideoProcessor | |
| from model_manager import model_manager | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| class AsyncVideoProcessor: | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| self.processing_jobs: Dict[str, Dict] = {} | |
| # Wait for models to be ready before creating processor | |
| if not model_manager.wait_for_models(timeout=30): | |
| print("Warning: Models not ready, using fallback mode") | |
| self.processor = VideoProcessor( | |
| processing_resolution=(1280, 720), | |
| upscale_factor=1.5, | |
| enable_depth_caching=True, | |
| chunk_duration=10, | |
| max_workers=4 | |
| ) | |
| async def connect(self, websocket: WebSocket, job_id: str): | |
| """Connect a WebSocket for real-time updates""" | |
| await websocket.accept() | |
| self.active_connections[job_id] = websocket | |
| def disconnect(self, job_id: str): | |
| """Disconnect a WebSocket""" | |
| if job_id in self.active_connections: | |
| del self.active_connections[job_id] | |
| async def send_progress(self, job_id: str, progress: float, message: str): | |
| """Send progress update to connected client""" | |
| if job_id in self.active_connections: | |
| try: | |
| await self.active_connections[job_id].send_text( | |
| json.dumps({ | |
| "type": "progress", | |
| "progress": progress, | |
| "message": message, | |
| "timestamp": time.time() | |
| }) | |
| ) | |
| except: | |
| self.disconnect(job_id) | |
| async def send_error(self, job_id: str, error: str): | |
| """Send error message to connected client""" | |
| if job_id in self.active_connections: | |
| try: | |
| await self.active_connections[job_id].send_text( | |
| json.dumps({ | |
| "type": "error", | |
| "error": error, | |
| "timestamp": time.time() | |
| }) | |
| ) | |
| except: | |
| self.disconnect(job_id) | |
| async def send_completion(self, job_id: str, result: Dict): | |
| """Send completion message to connected client""" | |
| if job_id in self.active_connections: | |
| try: | |
| await self.active_connections[job_id].send_text( | |
| json.dumps({ | |
| "type": "complete", | |
| "result": result, | |
| "timestamp": time.time() | |
| }) | |
| ) | |
| except: | |
| self.disconnect(job_id) | |
| async def process_video_async(self, input_path: str, output_path: str, job_id: str): | |
| """Process video asynchronously with real-time updates""" | |
| try: | |
| # Update job status | |
| self.processing_jobs[job_id] = { | |
| "status": "processing", | |
| "start_time": time.time(), | |
| "input_path": input_path, | |
| "output_path": output_path | |
| } | |
| await self.send_progress(0.0, "Starting video processing...") | |
| # Process video with progress callback | |
| def progress_callback(progress: float, desc: str = ""): | |
| asyncio.create_task(self.send_progress(job_id, progress, desc)) | |
| result = self.processor.process_video( | |
| input_path, | |
| output_path, | |
| progress_callback=progress_callback | |
| ) | |
| if result['success']: | |
| self.processing_jobs[job_id]["status"] = "completed" | |
| self.processing_jobs[job_id]["end_time"] = time.time() | |
| await self.send_completion(job_id, result) | |
| else: | |
| self.processing_jobs[job_id]["status"] = "failed" | |
| await self.send_error(job_id, result.get('error', 'Unknown error')) | |
| except Exception as e: | |
| self.processing_jobs[job_id]["status"] = "failed" | |
| await self.send_error(job_id, str(e)) | |
| finally: | |
| # Clean up after 5 minutes | |
| asyncio.create_task(self._cleanup_job(job_id, 300)) | |
| async def _cleanup_job(self, job_id: str, delay: int): | |
| """Clean up job data after delay""" | |
| await asyncio.sleep(delay) | |
| if job_id in self.processing_jobs: | |
| del self.processing_jobs[job_id] | |
| self.disconnect(job_id) | |
| def get_job_status(self, job_id: str) -> Optional[Dict]: | |
| """Get current job status""" | |
| return self.processing_jobs.get(job_id) | |
| def generate_thumbnail(self, video_path: str, timestamp: float = 0.0) -> str: | |
| """Generate thumbnail for video preview""" | |
| try: | |
| import moviepy.editor as mp | |
| video = mp.VideoFileClip(video_path) | |
| frame = video.get_frame(timestamp) | |
| video.close() | |
| # Convert to PIL Image and save | |
| from PIL import Image | |
| pil_image = Image.fromarray(frame) | |
| # Create thumbnails directory | |
| thumb_dir = Path("thumbnails") | |
| thumb_dir.mkdir(exist_ok=True) | |
| # Generate thumbnail filename | |
| video_name = Path(video_path).stem | |
| thumb_path = thumb_dir / f"{video_name}_thumb.jpg" | |
| # Resize and save | |
| pil_image.thumbnail((320, 180), Image.Resampling.LANCZOS) | |
| pil_image.save(thumb_path, "JPEG", quality=85) | |
| return str(thumb_path) | |
| except Exception as e: | |
| print(f"Error generating thumbnail: {e}") | |
| return None | |
| def get_video_info(self, video_path: str) -> Dict: | |
| """Get video metadata""" | |
| try: | |
| import moviepy.editor as mp | |
| video = mp.VideoFileClip(video_path) | |
| info = { | |
| "duration": video.duration, | |
| "fps": video.fps, | |
| "size": video.size, | |
| "width": video.w, | |
| "height": video.h, | |
| "file_size": os.path.getsize(video_path) | |
| } | |
| video.close() | |
| return info | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # Global instance | |
| async_processor = AsyncVideoProcessor() | |