Spaces:
Running
Running
| """ | |
| Pip's Latency Manager - Streaming coordinator for responsive interactions. | |
| Manages progressive responses and Pip's state changes during conversation. | |
| """ | |
| import asyncio | |
| from typing import Callable, Optional, AsyncGenerator | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import time | |
| class PipState(Enum): | |
| """Pip's visual/behavioral states.""" | |
| IDLE = "neutral" | |
| LISTENING = "listening" | |
| ATTENTIVE = "attentive" | |
| THINKING = "thinking" | |
| RESPONDING = "speaking" | |
| HAPPY = "happy" | |
| SAD = "sad" | |
| CONCERNED = "concerned" | |
| EXCITED = "excited" | |
| SLEEPY = "sleepy" | |
| class StreamingContext: | |
| """Context for a streaming interaction.""" | |
| start_time: float = field(default_factory=time.time) | |
| user_input: str = "" | |
| current_state: PipState = PipState.IDLE | |
| acknowledgment_sent: bool = False | |
| emotion_analyzed: bool = False | |
| image_generating: bool = False | |
| response_streaming: bool = False | |
| completed: bool = False | |
| # Callbacks | |
| on_state_change: Optional[Callable[[PipState], None]] = None | |
| on_text_chunk: Optional[Callable[[str], None]] = None | |
| on_acknowledgment: Optional[Callable[[str], None]] = None | |
| on_image_ready: Optional[Callable[[str], None]] = None | |
| def elapsed_ms(self) -> int: | |
| """Get elapsed time in milliseconds.""" | |
| return int((time.time() - self.start_time) * 1000) | |
| class LatencyManager: | |
| """ | |
| Manages streaming responses and state transitions for minimal perceived latency. | |
| Key strategies: | |
| 1. Immediate acknowledgment (< 500ms) | |
| 2. Progressive state changes to show engagement | |
| 3. Parallel processing where possible | |
| 4. Streaming responses as they generate | |
| """ | |
| # Timing thresholds (ms) | |
| ACK_DEADLINE = 500 # Send acknowledgment within this | |
| ATTENTIVE_THRESHOLD = 2000 # Switch to attentive after this | |
| THINKING_THRESHOLD = 3000 # Switch to thinking after this | |
| def __init__(self): | |
| self._active_contexts: dict[str, StreamingContext] = {} | |
| def create_context( | |
| self, | |
| session_id: str, | |
| user_input: str, | |
| on_state_change: Callable[[PipState], None] = None, | |
| on_text_chunk: Callable[[str], None] = None, | |
| on_acknowledgment: Callable[[str], None] = None, | |
| on_image_ready: Callable[[str], None] = None | |
| ) -> StreamingContext: | |
| """ | |
| Create a new streaming context for an interaction. | |
| """ | |
| context = StreamingContext( | |
| user_input=user_input, | |
| current_state=PipState.LISTENING, | |
| on_state_change=on_state_change, | |
| on_text_chunk=on_text_chunk, | |
| on_acknowledgment=on_acknowledgment, | |
| on_image_ready=on_image_ready | |
| ) | |
| self._active_contexts[session_id] = context | |
| # Notify initial state | |
| if on_state_change: | |
| on_state_change(PipState.LISTENING) | |
| return context | |
| def get_context(self, session_id: str) -> Optional[StreamingContext]: | |
| """Get active context for session.""" | |
| return self._active_contexts.get(session_id) | |
| def update_state(self, session_id: str, new_state: PipState): | |
| """Update Pip's state and notify.""" | |
| context = self._active_contexts.get(session_id) | |
| if context and context.current_state != new_state: | |
| context.current_state = new_state | |
| if context.on_state_change: | |
| context.on_state_change(new_state) | |
| def complete_context(self, session_id: str): | |
| """Mark context as complete and clean up.""" | |
| if session_id in self._active_contexts: | |
| self._active_contexts[session_id].completed = True | |
| del self._active_contexts[session_id] | |
| async def run_with_progressive_states( | |
| self, | |
| session_id: str, | |
| acknowledgment_task: asyncio.Task, | |
| emotion_task: asyncio.Task, | |
| prompt_task: asyncio.Task, | |
| response_generator: AsyncGenerator[str, None], | |
| image_task: asyncio.Task | |
| ) -> dict: | |
| """ | |
| Orchestrate all tasks with progressive state updates. | |
| This is the main coordination function that: | |
| 1. Sends acknowledgment ASAP | |
| 2. Updates state as time passes | |
| 3. Streams response chunks | |
| 4. Delivers image when ready | |
| Returns dict with all results. | |
| """ | |
| context = self._active_contexts.get(session_id) | |
| if not context: | |
| return {"error": "No active context"} | |
| results = { | |
| "acknowledgment": None, | |
| "emotion": None, | |
| "prompt": None, | |
| "response": "", | |
| "image": None | |
| } | |
| # Start state progression task | |
| state_task = asyncio.create_task( | |
| self._progress_states(session_id) | |
| ) | |
| try: | |
| # Wait for acknowledgment (should be fastest) | |
| try: | |
| ack = await asyncio.wait_for(acknowledgment_task, timeout=1.0) | |
| results["acknowledgment"] = ack | |
| context.acknowledgment_sent = True | |
| if context.on_acknowledgment: | |
| context.on_acknowledgment(ack) | |
| except asyncio.TimeoutError: | |
| # Acknowledgment took too long, continue anyway | |
| pass | |
| # Update to thinking state | |
| self.update_state(session_id, PipState.THINKING) | |
| # Wait for emotion analysis | |
| try: | |
| emotion = await asyncio.wait_for(emotion_task, timeout=5.0) | |
| results["emotion"] = emotion | |
| context.emotion_analyzed = True | |
| # Update state based on emotion | |
| pip_state = self._emotion_to_state(emotion) | |
| self.update_state(session_id, pip_state) | |
| except asyncio.TimeoutError: | |
| # Use default emotion if analysis times out | |
| results["emotion"] = {"primary_emotions": ["neutral"], "intensity": 5} | |
| # Get prompt (should be ready by now) | |
| try: | |
| results["prompt"] = await asyncio.wait_for(prompt_task, timeout=3.0) | |
| except asyncio.TimeoutError: | |
| results["prompt"] = None | |
| # Start image generation (don't wait, will arrive later) | |
| context.image_generating = True | |
| # Stream response | |
| self.update_state(session_id, PipState.RESPONDING) | |
| context.response_streaming = True | |
| full_response = "" | |
| async for chunk in response_generator: | |
| full_response += chunk | |
| if context.on_text_chunk: | |
| context.on_text_chunk(chunk) | |
| results["response"] = full_response | |
| context.response_streaming = False | |
| # Wait for image | |
| try: | |
| image = await asyncio.wait_for(image_task, timeout=30.0) | |
| results["image"] = image | |
| if context.on_image_ready: | |
| context.on_image_ready(image) | |
| except asyncio.TimeoutError: | |
| results["image"] = None | |
| finally: | |
| state_task.cancel() | |
| try: | |
| await state_task | |
| except asyncio.CancelledError: | |
| pass | |
| return results | |
| async def _progress_states(self, session_id: str): | |
| """ | |
| Progressively update states based on elapsed time. | |
| Shows Pip is engaged during long operations. | |
| """ | |
| context = self._active_contexts.get(session_id) | |
| if not context: | |
| return | |
| while not context.completed: | |
| elapsed = context.elapsed_ms() | |
| # Only progress if not in a higher-priority state | |
| if context.current_state == PipState.LISTENING: | |
| if elapsed > self.ATTENTIVE_THRESHOLD: | |
| self.update_state(session_id, PipState.ATTENTIVE) | |
| elif context.current_state == PipState.ATTENTIVE: | |
| if elapsed > self.THINKING_THRESHOLD and not context.response_streaming: | |
| self.update_state(session_id, PipState.THINKING) | |
| await asyncio.sleep(0.5) | |
| def _emotion_to_state(self, emotion: dict) -> PipState: | |
| """Convert emotion analysis to Pip state.""" | |
| if not emotion: | |
| return PipState.THINKING | |
| emotions = emotion.get("primary_emotions", []) | |
| intensity = emotion.get("intensity", 5) | |
| if not emotions: | |
| return PipState.THINKING | |
| primary = emotions[0].lower() | |
| # Map emotions to states | |
| emotion_state_map = { | |
| "happy": PipState.HAPPY, | |
| "joy": PipState.HAPPY, | |
| "excited": PipState.EXCITED, | |
| "sad": PipState.SAD, | |
| "melancholy": PipState.SAD, | |
| "anxious": PipState.CONCERNED, | |
| "worried": PipState.CONCERNED, | |
| "tired": PipState.SLEEPY, | |
| "peaceful": PipState.SLEEPY, | |
| } | |
| state = emotion_state_map.get(primary, PipState.THINKING) | |
| # High intensity happy -> excited | |
| if state == PipState.HAPPY and intensity >= 8: | |
| return PipState.EXCITED | |
| return state | |
| class ListeningProgressManager: | |
| """ | |
| Manages Pip's engagement signals while user is speaking/typing. | |
| Shows progressive interest during long inputs. | |
| """ | |
| def __init__(self, on_state_change: Callable[[PipState], None] = None): | |
| self.on_state_change = on_state_change | |
| self._listening_start: Optional[float] = None | |
| self._last_activity: Optional[float] = None | |
| def start_listening(self): | |
| """Called when user starts input.""" | |
| self._listening_start = time.time() | |
| self._last_activity = time.time() | |
| if self.on_state_change: | |
| self.on_state_change(PipState.LISTENING) | |
| def activity(self): | |
| """Called on user activity (typing, speaking).""" | |
| self._last_activity = time.time() | |
| async def run_engagement_loop(self): | |
| """ | |
| Run engagement animations while listening. | |
| Shows Pip getting more engaged over time. | |
| """ | |
| if not self._listening_start: | |
| return | |
| while True: | |
| if self._last_activity is None: | |
| break | |
| elapsed = time.time() - self._listening_start | |
| idle_time = time.time() - self._last_activity | |
| # If user stopped typing for > 2s, they might be done | |
| if idle_time > 2.0: | |
| break | |
| # Progressive engagement | |
| if elapsed > 5.0 and self.on_state_change: | |
| # After 5s, show more attentive | |
| self.on_state_change(PipState.ATTENTIVE) | |
| await asyncio.sleep(0.5) | |
| def stop_listening(self): | |
| """Called when user finishes input.""" | |
| self._listening_start = None | |
| self._last_activity = None | |