""" Pip's Latency Manager - Streaming coordinator for responsive interactions. Manages progressive responses and Pip's state changes during conversation. """ import asyncio from typing import Callable, Optional, AsyncGenerator from dataclasses import dataclass, field from enum import Enum import time class PipState(Enum): """Pip's visual/behavioral states.""" IDLE = "neutral" LISTENING = "listening" ATTENTIVE = "attentive" THINKING = "thinking" RESPONDING = "speaking" HAPPY = "happy" SAD = "sad" CONCERNED = "concerned" EXCITED = "excited" SLEEPY = "sleepy" @dataclass class StreamingContext: """Context for a streaming interaction.""" start_time: float = field(default_factory=time.time) user_input: str = "" current_state: PipState = PipState.IDLE acknowledgment_sent: bool = False emotion_analyzed: bool = False image_generating: bool = False response_streaming: bool = False completed: bool = False # Callbacks on_state_change: Optional[Callable[[PipState], None]] = None on_text_chunk: Optional[Callable[[str], None]] = None on_acknowledgment: Optional[Callable[[str], None]] = None on_image_ready: Optional[Callable[[str], None]] = None def elapsed_ms(self) -> int: """Get elapsed time in milliseconds.""" return int((time.time() - self.start_time) * 1000) class LatencyManager: """ Manages streaming responses and state transitions for minimal perceived latency. Key strategies: 1. Immediate acknowledgment (< 500ms) 2. Progressive state changes to show engagement 3. Parallel processing where possible 4. Streaming responses as they generate """ # Timing thresholds (ms) ACK_DEADLINE = 500 # Send acknowledgment within this ATTENTIVE_THRESHOLD = 2000 # Switch to attentive after this THINKING_THRESHOLD = 3000 # Switch to thinking after this def __init__(self): self._active_contexts: dict[str, StreamingContext] = {} def create_context( self, session_id: str, user_input: str, on_state_change: Callable[[PipState], None] = None, on_text_chunk: Callable[[str], None] = None, on_acknowledgment: Callable[[str], None] = None, on_image_ready: Callable[[str], None] = None ) -> StreamingContext: """ Create a new streaming context for an interaction. """ context = StreamingContext( user_input=user_input, current_state=PipState.LISTENING, on_state_change=on_state_change, on_text_chunk=on_text_chunk, on_acknowledgment=on_acknowledgment, on_image_ready=on_image_ready ) self._active_contexts[session_id] = context # Notify initial state if on_state_change: on_state_change(PipState.LISTENING) return context def get_context(self, session_id: str) -> Optional[StreamingContext]: """Get active context for session.""" return self._active_contexts.get(session_id) def update_state(self, session_id: str, new_state: PipState): """Update Pip's state and notify.""" context = self._active_contexts.get(session_id) if context and context.current_state != new_state: context.current_state = new_state if context.on_state_change: context.on_state_change(new_state) def complete_context(self, session_id: str): """Mark context as complete and clean up.""" if session_id in self._active_contexts: self._active_contexts[session_id].completed = True del self._active_contexts[session_id] async def run_with_progressive_states( self, session_id: str, acknowledgment_task: asyncio.Task, emotion_task: asyncio.Task, prompt_task: asyncio.Task, response_generator: AsyncGenerator[str, None], image_task: asyncio.Task ) -> dict: """ Orchestrate all tasks with progressive state updates. This is the main coordination function that: 1. Sends acknowledgment ASAP 2. Updates state as time passes 3. Streams response chunks 4. Delivers image when ready Returns dict with all results. """ context = self._active_contexts.get(session_id) if not context: return {"error": "No active context"} results = { "acknowledgment": None, "emotion": None, "prompt": None, "response": "", "image": None } # Start state progression task state_task = asyncio.create_task( self._progress_states(session_id) ) try: # Wait for acknowledgment (should be fastest) try: ack = await asyncio.wait_for(acknowledgment_task, timeout=1.0) results["acknowledgment"] = ack context.acknowledgment_sent = True if context.on_acknowledgment: context.on_acknowledgment(ack) except asyncio.TimeoutError: # Acknowledgment took too long, continue anyway pass # Update to thinking state self.update_state(session_id, PipState.THINKING) # Wait for emotion analysis try: emotion = await asyncio.wait_for(emotion_task, timeout=5.0) results["emotion"] = emotion context.emotion_analyzed = True # Update state based on emotion pip_state = self._emotion_to_state(emotion) self.update_state(session_id, pip_state) except asyncio.TimeoutError: # Use default emotion if analysis times out results["emotion"] = {"primary_emotions": ["neutral"], "intensity": 5} # Get prompt (should be ready by now) try: results["prompt"] = await asyncio.wait_for(prompt_task, timeout=3.0) except asyncio.TimeoutError: results["prompt"] = None # Start image generation (don't wait, will arrive later) context.image_generating = True # Stream response self.update_state(session_id, PipState.RESPONDING) context.response_streaming = True full_response = "" async for chunk in response_generator: full_response += chunk if context.on_text_chunk: context.on_text_chunk(chunk) results["response"] = full_response context.response_streaming = False # Wait for image try: image = await asyncio.wait_for(image_task, timeout=30.0) results["image"] = image if context.on_image_ready: context.on_image_ready(image) except asyncio.TimeoutError: results["image"] = None finally: state_task.cancel() try: await state_task except asyncio.CancelledError: pass return results async def _progress_states(self, session_id: str): """ Progressively update states based on elapsed time. Shows Pip is engaged during long operations. """ context = self._active_contexts.get(session_id) if not context: return while not context.completed: elapsed = context.elapsed_ms() # Only progress if not in a higher-priority state if context.current_state == PipState.LISTENING: if elapsed > self.ATTENTIVE_THRESHOLD: self.update_state(session_id, PipState.ATTENTIVE) elif context.current_state == PipState.ATTENTIVE: if elapsed > self.THINKING_THRESHOLD and not context.response_streaming: self.update_state(session_id, PipState.THINKING) await asyncio.sleep(0.5) def _emotion_to_state(self, emotion: dict) -> PipState: """Convert emotion analysis to Pip state.""" if not emotion: return PipState.THINKING emotions = emotion.get("primary_emotions", []) intensity = emotion.get("intensity", 5) if not emotions: return PipState.THINKING primary = emotions[0].lower() # Map emotions to states emotion_state_map = { "happy": PipState.HAPPY, "joy": PipState.HAPPY, "excited": PipState.EXCITED, "sad": PipState.SAD, "melancholy": PipState.SAD, "anxious": PipState.CONCERNED, "worried": PipState.CONCERNED, "tired": PipState.SLEEPY, "peaceful": PipState.SLEEPY, } state = emotion_state_map.get(primary, PipState.THINKING) # High intensity happy -> excited if state == PipState.HAPPY and intensity >= 8: return PipState.EXCITED return state class ListeningProgressManager: """ Manages Pip's engagement signals while user is speaking/typing. Shows progressive interest during long inputs. """ def __init__(self, on_state_change: Callable[[PipState], None] = None): self.on_state_change = on_state_change self._listening_start: Optional[float] = None self._last_activity: Optional[float] = None def start_listening(self): """Called when user starts input.""" self._listening_start = time.time() self._last_activity = time.time() if self.on_state_change: self.on_state_change(PipState.LISTENING) def activity(self): """Called on user activity (typing, speaking).""" self._last_activity = time.time() async def run_engagement_loop(self): """ Run engagement animations while listening. Shows Pip getting more engaged over time. """ if not self._listening_start: return while True: if self._last_activity is None: break elapsed = time.time() - self._listening_start idle_time = time.time() - self._last_activity # If user stopped typing for > 2s, they might be done if idle_time > 2.0: break # Progressive engagement if elapsed > 5.0 and self.on_state_change: # After 5s, show more attentive self.on_state_change(PipState.ATTENTIVE) await asyncio.sleep(0.5) def stop_listening(self): """Called when user finishes input.""" self._listening_start = None self._last_activity = None