pipV1 / pip_latency.py
Itsjustamit's picture
files for v1
cd35cc5 verified
"""
Pip's Latency Manager - Streaming coordinator for responsive interactions.
Manages progressive responses and Pip's state changes during conversation.
"""
import asyncio
from typing import Callable, Optional, AsyncGenerator
from dataclasses import dataclass, field
from enum import Enum
import time
class PipState(Enum):
"""Pip's visual/behavioral states."""
IDLE = "neutral"
LISTENING = "listening"
ATTENTIVE = "attentive"
THINKING = "thinking"
RESPONDING = "speaking"
HAPPY = "happy"
SAD = "sad"
CONCERNED = "concerned"
EXCITED = "excited"
SLEEPY = "sleepy"
@dataclass
class StreamingContext:
"""Context for a streaming interaction."""
start_time: float = field(default_factory=time.time)
user_input: str = ""
current_state: PipState = PipState.IDLE
acknowledgment_sent: bool = False
emotion_analyzed: bool = False
image_generating: bool = False
response_streaming: bool = False
completed: bool = False
# Callbacks
on_state_change: Optional[Callable[[PipState], None]] = None
on_text_chunk: Optional[Callable[[str], None]] = None
on_acknowledgment: Optional[Callable[[str], None]] = None
on_image_ready: Optional[Callable[[str], None]] = None
def elapsed_ms(self) -> int:
"""Get elapsed time in milliseconds."""
return int((time.time() - self.start_time) * 1000)
class LatencyManager:
"""
Manages streaming responses and state transitions for minimal perceived latency.
Key strategies:
1. Immediate acknowledgment (< 500ms)
2. Progressive state changes to show engagement
3. Parallel processing where possible
4. Streaming responses as they generate
"""
# Timing thresholds (ms)
ACK_DEADLINE = 500 # Send acknowledgment within this
ATTENTIVE_THRESHOLD = 2000 # Switch to attentive after this
THINKING_THRESHOLD = 3000 # Switch to thinking after this
def __init__(self):
self._active_contexts: dict[str, StreamingContext] = {}
def create_context(
self,
session_id: str,
user_input: str,
on_state_change: Callable[[PipState], None] = None,
on_text_chunk: Callable[[str], None] = None,
on_acknowledgment: Callable[[str], None] = None,
on_image_ready: Callable[[str], None] = None
) -> StreamingContext:
"""
Create a new streaming context for an interaction.
"""
context = StreamingContext(
user_input=user_input,
current_state=PipState.LISTENING,
on_state_change=on_state_change,
on_text_chunk=on_text_chunk,
on_acknowledgment=on_acknowledgment,
on_image_ready=on_image_ready
)
self._active_contexts[session_id] = context
# Notify initial state
if on_state_change:
on_state_change(PipState.LISTENING)
return context
def get_context(self, session_id: str) -> Optional[StreamingContext]:
"""Get active context for session."""
return self._active_contexts.get(session_id)
def update_state(self, session_id: str, new_state: PipState):
"""Update Pip's state and notify."""
context = self._active_contexts.get(session_id)
if context and context.current_state != new_state:
context.current_state = new_state
if context.on_state_change:
context.on_state_change(new_state)
def complete_context(self, session_id: str):
"""Mark context as complete and clean up."""
if session_id in self._active_contexts:
self._active_contexts[session_id].completed = True
del self._active_contexts[session_id]
async def run_with_progressive_states(
self,
session_id: str,
acknowledgment_task: asyncio.Task,
emotion_task: asyncio.Task,
prompt_task: asyncio.Task,
response_generator: AsyncGenerator[str, None],
image_task: asyncio.Task
) -> dict:
"""
Orchestrate all tasks with progressive state updates.
This is the main coordination function that:
1. Sends acknowledgment ASAP
2. Updates state as time passes
3. Streams response chunks
4. Delivers image when ready
Returns dict with all results.
"""
context = self._active_contexts.get(session_id)
if not context:
return {"error": "No active context"}
results = {
"acknowledgment": None,
"emotion": None,
"prompt": None,
"response": "",
"image": None
}
# Start state progression task
state_task = asyncio.create_task(
self._progress_states(session_id)
)
try:
# Wait for acknowledgment (should be fastest)
try:
ack = await asyncio.wait_for(acknowledgment_task, timeout=1.0)
results["acknowledgment"] = ack
context.acknowledgment_sent = True
if context.on_acknowledgment:
context.on_acknowledgment(ack)
except asyncio.TimeoutError:
# Acknowledgment took too long, continue anyway
pass
# Update to thinking state
self.update_state(session_id, PipState.THINKING)
# Wait for emotion analysis
try:
emotion = await asyncio.wait_for(emotion_task, timeout=5.0)
results["emotion"] = emotion
context.emotion_analyzed = True
# Update state based on emotion
pip_state = self._emotion_to_state(emotion)
self.update_state(session_id, pip_state)
except asyncio.TimeoutError:
# Use default emotion if analysis times out
results["emotion"] = {"primary_emotions": ["neutral"], "intensity": 5}
# Get prompt (should be ready by now)
try:
results["prompt"] = await asyncio.wait_for(prompt_task, timeout=3.0)
except asyncio.TimeoutError:
results["prompt"] = None
# Start image generation (don't wait, will arrive later)
context.image_generating = True
# Stream response
self.update_state(session_id, PipState.RESPONDING)
context.response_streaming = True
full_response = ""
async for chunk in response_generator:
full_response += chunk
if context.on_text_chunk:
context.on_text_chunk(chunk)
results["response"] = full_response
context.response_streaming = False
# Wait for image
try:
image = await asyncio.wait_for(image_task, timeout=30.0)
results["image"] = image
if context.on_image_ready:
context.on_image_ready(image)
except asyncio.TimeoutError:
results["image"] = None
finally:
state_task.cancel()
try:
await state_task
except asyncio.CancelledError:
pass
return results
async def _progress_states(self, session_id: str):
"""
Progressively update states based on elapsed time.
Shows Pip is engaged during long operations.
"""
context = self._active_contexts.get(session_id)
if not context:
return
while not context.completed:
elapsed = context.elapsed_ms()
# Only progress if not in a higher-priority state
if context.current_state == PipState.LISTENING:
if elapsed > self.ATTENTIVE_THRESHOLD:
self.update_state(session_id, PipState.ATTENTIVE)
elif context.current_state == PipState.ATTENTIVE:
if elapsed > self.THINKING_THRESHOLD and not context.response_streaming:
self.update_state(session_id, PipState.THINKING)
await asyncio.sleep(0.5)
def _emotion_to_state(self, emotion: dict) -> PipState:
"""Convert emotion analysis to Pip state."""
if not emotion:
return PipState.THINKING
emotions = emotion.get("primary_emotions", [])
intensity = emotion.get("intensity", 5)
if not emotions:
return PipState.THINKING
primary = emotions[0].lower()
# Map emotions to states
emotion_state_map = {
"happy": PipState.HAPPY,
"joy": PipState.HAPPY,
"excited": PipState.EXCITED,
"sad": PipState.SAD,
"melancholy": PipState.SAD,
"anxious": PipState.CONCERNED,
"worried": PipState.CONCERNED,
"tired": PipState.SLEEPY,
"peaceful": PipState.SLEEPY,
}
state = emotion_state_map.get(primary, PipState.THINKING)
# High intensity happy -> excited
if state == PipState.HAPPY and intensity >= 8:
return PipState.EXCITED
return state
class ListeningProgressManager:
"""
Manages Pip's engagement signals while user is speaking/typing.
Shows progressive interest during long inputs.
"""
def __init__(self, on_state_change: Callable[[PipState], None] = None):
self.on_state_change = on_state_change
self._listening_start: Optional[float] = None
self._last_activity: Optional[float] = None
def start_listening(self):
"""Called when user starts input."""
self._listening_start = time.time()
self._last_activity = time.time()
if self.on_state_change:
self.on_state_change(PipState.LISTENING)
def activity(self):
"""Called on user activity (typing, speaking)."""
self._last_activity = time.time()
async def run_engagement_loop(self):
"""
Run engagement animations while listening.
Shows Pip getting more engaged over time.
"""
if not self._listening_start:
return
while True:
if self._last_activity is None:
break
elapsed = time.time() - self._listening_start
idle_time = time.time() - self._last_activity
# If user stopped typing for > 2s, they might be done
if idle_time > 2.0:
break
# Progressive engagement
if elapsed > 5.0 and self.on_state_change:
# After 5s, show more attentive
self.on_state_change(PipState.ATTENTIVE)
await asyncio.sleep(0.5)
def stop_listening(self):
"""Called when user finishes input."""
self._listening_start = None
self._last_activity = None