""" Pip's Brain - The emotional processing pipeline. Orchestrates all services with parallel execution for minimal latency. LLM Priority: Gemini 2.5 (primary) -> Anthropic Claude (fallback) """ import asyncio from typing import Optional, Callable, AsyncGenerator from dataclasses import dataclass import random import os from services.gemini_client import GeminiClient from services.anthropic_client import AnthropicClient from services.sambanova_client import SambanovaClient from pip_prompts import ( EMOTION_ANALYZER_PROMPT, ACTION_DECIDER_PROMPT, PROMPT_ENHANCER_PROMPT, CONVERSATION_PROMPT, INTERVENTION_PROMPT, QUICK_ACK_PROMPT, EMOTION_ANALYZER_QUICK_PROMPT ) from pip_artist import PipArtist, GeneratedImage from pip_voice import PipVoice, VoiceResponse from pip_character import emotion_to_pip_state, PipState from pip_latency import LatencyManager, StreamingContext @dataclass class PipResponse: """Complete response from Pip.""" acknowledgment: str response_text: str emotion_state: dict action: dict image: Optional[GeneratedImage] audio: Optional[VoiceResponse] pip_state: str image_prompt: Optional[str] = None @dataclass class ConversationMessage: """A message in conversation history.""" role: str # "user" or "assistant" content: str @dataclass class UserAPIKeys: """User-provided API keys for a session.""" google_api_key: Optional[str] = None anthropic_api_key: Optional[str] = None openai_api_key: Optional[str] = None elevenlabs_api_key: Optional[str] = None huggingface_token: Optional[str] = None class PipBrain: """ Pip's central brain - orchestrates emotional intelligence pipeline. LLM Priority: 1. Gemini 2.5 (primary) - fast and capable 2. Anthropic Claude (fallback) - when Gemini fails 3. SambaNova (fast acknowledgments) Processing flow: 1. Quick acknowledgment (Gemini Flash/SambaNova) - immediate 2. Emotion analysis (Gemini Pro) - parallel 3. Action decision (Gemini Flash) - after emotion 4. Prompt enhancement (Gemini Flash) - parallel with action 5. Image generation (load balanced) - after prompt 6. Full response (Gemini/Claude) - streaming """ def __init__(self, user_keys: UserAPIKeys = None): """ Initialize Pip's brain with optional user-provided API keys. """ # Store user keys self.user_keys = user_keys # Initialize clients with user keys if provided google_key = user_keys.google_api_key if user_keys else None anthropic_key = user_keys.anthropic_api_key if user_keys else None # Primary LLM: Gemini self.gemini = GeminiClient(api_key=google_key) # Fallback LLM: Claude (only if API key available) self.claude = AnthropicClient(api_key=anthropic_key) if os.getenv("ANTHROPIC_API_KEY") or anthropic_key else None # Fast LLM for acknowledgments self.sambanova = SambanovaClient() # Other services self.artist = PipArtist() self.voice = PipVoice() self.latency_manager = LatencyManager() # Conversation history per session self._conversations: dict[str, list[ConversationMessage]] = {} # Current mode per session self._modes: dict[str, str] = {} # "auto", "alchemist", "artist", "dream", "night" # Track which LLM to use self._gemini_available = True self._claude_available = self.claude is not None def set_mode(self, session_id: str, mode: str): """Set the interaction mode for a session.""" self._modes[session_id] = mode def get_mode(self, session_id: str) -> str: """Get current mode for session.""" return self._modes.get(session_id, "auto") def _get_conversation_history(self, session_id: str) -> list[dict]: """Get formatted conversation history.""" history = self._conversations.get(session_id, []) return [{"role": m.role, "content": m.content} for m in history[-10:]] # Last 10 messages def _add_to_history(self, session_id: str, role: str, content: str): """Add message to conversation history.""" if session_id not in self._conversations: self._conversations[session_id] = [] self._conversations[session_id].append(ConversationMessage(role=role, content=content)) async def process( self, user_input: str, session_id: str = "default", generate_voice: bool = False, on_state_change: Callable[[str], None] = None, on_text_chunk: Callable[[str], None] = None, on_acknowledgment: Callable[[str], None] = None ) -> PipResponse: """ Process user input through the emotional pipeline. NOTE: Image generation is now SEPARATE - use visualize_current_mood() for images. Args: user_input: What the user said session_id: Session identifier for conversation continuity generate_voice: Whether to generate voice response on_state_change: Callback for Pip state changes on_text_chunk: Callback for streaming text on_acknowledgment: Callback for quick acknowledgment Returns: PipResponse with text response (no image unless intervention) """ # Add user message to history self._add_to_history(session_id, "user", user_input) # Get current mode mode = self.get_mode(session_id) # Notify listening state if on_state_change: on_state_change("listening") # Phase 1: Parallel - Quick ack + Emotion analysis # Use Gemini for quick ack, with SambaNova fallback ack_task = asyncio.create_task( self._quick_acknowledge_with_fallback(user_input) ) # Use Gemini for emotion analysis, with Claude fallback emotion_task = asyncio.create_task( self._analyze_emotion_with_fallback(user_input) ) # Get acknowledgment ASAP acknowledgment = await ack_task if on_acknowledgment: on_acknowledgment(acknowledgment) if on_state_change: on_state_change("thinking") # Wait for emotion analysis emotion_state = await emotion_task # Determine Pip's visual state from emotion pip_visual_state = emotion_to_pip_state( emotion_state.get("primary_emotions", []), emotion_state.get("intensity", 5) ) if on_state_change: on_state_change(pip_visual_state) # Phase 2: Decide action (using Gemini with fallback) action = await self._decide_action_with_fallback(emotion_state) # Start voice generation early if enabled (parallel with response) voice_task = None if generate_voice: voice_task = asyncio.create_task( self._generate_voice_for_response( "", emotion_state, action ) ) # Phase 3: Generate response (streaming) if on_state_change: on_state_change("speaking") response_text = "" # Check if intervention is needed if emotion_state.get("intervention_needed", False): # Use intervention prompt with fallback async for chunk in self._generate_intervention_with_fallback( user_input, emotion_state ): response_text += chunk if on_text_chunk: on_text_chunk(chunk) else: # Normal conversation - try Gemini first, then Claude, then SambaNova async for chunk in self._generate_response_with_fallback( user_input, emotion_state, action, self._get_conversation_history(session_id) ): response_text += chunk if on_text_chunk: on_text_chunk(chunk) # Add response to history self._add_to_history(session_id, "assistant", response_text) # Generate voice for the full response now voice_response = None if generate_voice and response_text: # Cancel the early task if it was started if voice_task: voice_task.cancel() voice_response = await self.voice.speak( response_text, emotion_state.get("primary_emotions", []), action.get("action", "reflect"), emotion_state.get("intensity", 5) ) # Final state update if on_state_change: on_state_change(pip_visual_state) # NO IMAGE - images are now generated on demand via visualize_current_mood() return PipResponse( acknowledgment=acknowledgment, response_text=response_text, emotion_state=emotion_state, action=action, image=None, # No auto image audio=voice_response, pip_state=pip_visual_state, image_prompt=None ) async def _generate_voice_for_response( self, text: str, emotion_state: dict, action: dict ) -> Optional[VoiceResponse]: """Helper to generate voice response.""" if not text: return None return await self.voice.speak( text, emotion_state.get("primary_emotions", []), action.get("action", "reflect"), emotion_state.get("intensity", 5) ) # ========================================================================= # FALLBACK METHODS - Try Gemini first, then Claude/SambaNova # ========================================================================= async def _quick_acknowledge_with_fallback(self, user_input: str) -> str: """Quick acknowledgment with Gemini -> SambaNova fallback.""" # Try Gemini first if self._gemini_available: try: result = await self.gemini.quick_acknowledge(user_input, QUICK_ACK_PROMPT) if result: return result except Exception as e: print(f"Gemini quick ack failed: {e}") # Fallback to SambaNova try: return await self.sambanova.quick_acknowledge(user_input, QUICK_ACK_PROMPT) except Exception as e: print(f"SambaNova quick ack failed: {e}") return "I hear you..." async def _analyze_emotion_with_fallback(self, user_input: str) -> dict: """Emotion analysis with Gemini -> Claude fallback.""" default_emotion = { "primary_emotions": ["neutral"], "secondary_emotions": [], "intensity": 5, "underlying_needs": ["connection"], "intervention_needed": False } # Try Gemini first if self._gemini_available: try: result = await self.gemini.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) if result: return result except Exception as e: print(f"Gemini emotion analysis failed: {e}") self._gemini_available = False # Temporarily disable # Fallback to Claude if self._claude_available and self.claude: try: result = await self.claude.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) if result: return result except Exception as e: print(f"Claude emotion analysis failed: {e}") return default_emotion async def _decide_action_with_fallback(self, emotion_state: dict) -> dict: """Action decision with Gemini -> Claude fallback.""" default_action = { "action": "reflect", "image_style": "warm", "suggested_response_tone": "empathetic" } # Try Gemini first if self._gemini_available: try: result = await self.gemini.decide_action(emotion_state, ACTION_DECIDER_PROMPT) if result: return result except Exception as e: print(f"Gemini action decision failed: {e}") # Fallback to Claude if self._claude_available and self.claude: try: result = await self.claude.decide_action(emotion_state, ACTION_DECIDER_PROMPT) if result: return result except Exception as e: print(f"Claude action decision failed: {e}") return default_action async def _generate_response_with_fallback( self, user_input: str, emotion_state: dict, action: dict, history: list ) -> AsyncGenerator[str, None]: """Generate response with Gemini -> Claude -> SambaNova fallback.""" # Try Gemini first if self._gemini_available: try: yielded = False async for chunk in self.gemini.generate_response_stream( user_input, emotion_state, action, CONVERSATION_PROMPT, history ): yielded = True yield chunk if yielded: return except Exception as e: print(f"Gemini response generation failed: {e}") # Fallback to Claude if self._claude_available and self.claude: try: yielded = False async for chunk in self.claude.generate_response_stream( user_input, emotion_state, action, CONVERSATION_PROMPT, history ): yielded = True yield chunk if yielded: return except Exception as e: print(f"Claude response generation failed: {e}") # Final fallback to SambaNova try: async for chunk in self.sambanova.generate_response_stream( user_input, emotion_state, CONVERSATION_PROMPT ): yield chunk except Exception as e: print(f"All LLMs failed: {e}") yield "I'm here with you. Tell me more about what's on your mind." async def _generate_intervention_with_fallback( self, user_input: str, emotion_state: dict ) -> AsyncGenerator[str, None]: """Generate intervention response with fallback.""" # Try Gemini first if self._gemini_available: try: yielded = False async for chunk in self.gemini.generate_intervention_response( user_input, emotion_state, INTERVENTION_PROMPT ): yielded = True yield chunk if yielded: return except Exception as e: print(f"Gemini intervention failed: {e}") # Fallback to Claude if self._claude_available and self.claude: try: async for chunk in self.claude.generate_intervention_response( user_input, emotion_state, INTERVENTION_PROMPT ): yield chunk return except Exception as e: print(f"Claude intervention failed: {e}") # Safe default yield "I hear that you're going through something difficult. I'm here with you, and I care about how you're feeling. If you're in crisis, please reach out to a helpline or someone you trust." async def _generate_text_with_fallback(self, prompt: str) -> Optional[str]: """Generate text with Gemini -> Claude fallback.""" # Try Gemini first if self._gemini_available: try: result = await self.gemini.generate_text(prompt) if result: return result except Exception as e: print(f"Gemini text generation failed: {e}") # Fallback to Claude if self._claude_available and self.claude: try: result = await self.claude.generate_text(prompt) if result: return result except Exception as e: print(f"Claude text generation failed: {e}") return None async def visualize_current_mood( self, session_id: str = "default" ) -> tuple[Optional[GeneratedImage], str]: """ Generate an image based on the current conversation context. Called explicitly by user via "Visualize" button. Uses the full conversation history to create a contextual, meaningful image. Returns: (GeneratedImage, explanation) - The image and a 1-sentence explanation of why """ history = self._conversations.get(session_id, []) mode = self.get_mode(session_id) if not history: # No conversation yet - generate a welcoming image prompt = "A warm, inviting scene with soft morning light, gentle colors, a sense of new beginnings and openness, peaceful atmosphere" image = await self.artist.generate_for_mood(prompt, "warm", "welcome") return image, "A fresh start, waiting to capture whatever you'd like to share." # Build context from recent conversation recent_messages = history[-6:] # Last 3 exchanges conv_summary = "\n".join([ f"{m.role}: {m.content}" for m in recent_messages ]) # Get the last user message for primary context last_user_msg = "" for m in reversed(history): if m.role == "user": last_user_msg = m.content break # Analyze emotion of recent conversation (using fallback) emotion_state = await self._analyze_emotion_with_fallback(conv_summary) emotions = emotion_state.get('primary_emotions', ['neutral']) # Generate image prompt AND explanation together prompt_and_explain = f"""Based on this conversation, create TWO things: CONVERSATION: {conv_summary} DETECTED EMOTIONS: {', '.join(emotions)} MODE: {mode} 1. IMAGE_PROMPT: A vivid, specific image prompt (2-3 sentences) that: - Captures the emotional essence of this conversation - Would resonate with someone feeling these emotions - Matches the {mode} aesthetic 2. EXPLANATION: ONE sentence (15 words max) explaining WHY this image fits the conversation. - Be poetic/thoughtful, not clinical - Help the user see the connection - Start with something like "Because...", "I see...", "This reflects...", "Your words painted..." Respond in this exact format: IMAGE_PROMPT: [your prompt here] EXPLANATION: [your explanation here]""" try: # Try Gemini first, then Claude result = None if self._gemini_available: try: result = await self.gemini.generate_text(prompt_and_explain) except Exception as e: print(f"[DEBUG] Gemini failed for prompt/explain: {e}") if not result and self._claude_available and self.claude: try: result = await self.claude.generate_text(prompt_and_explain) except Exception as e: print(f"[DEBUG] Claude failed for prompt/explain: {e}") print(f"[DEBUG] LLM response for prompt/explain: {result[:200] if result else 'None'}...") # Parse the result image_prompt = "" explanation = "" if result and "IMAGE_PROMPT:" in result and "EXPLANATION:" in result: parts = result.split("EXPLANATION:") image_prompt = parts[0].replace("IMAGE_PROMPT:", "").strip() explanation = parts[1].strip() print(f"[DEBUG] Parsed - prompt: {image_prompt[:50]}..., explanation: {explanation}") else: # Fallback print(f"[DEBUG] Using fallback - result didn't have expected format") image_prompt = await self.sambanova.enhance_prompt( last_user_msg, emotion_state, mode, PROMPT_ENHANCER_PROMPT ) explanation = f"I sensed {emotions[0]} in your words and wanted to reflect that back to you." except Exception as e: print(f"[DEBUG] Error generating prompt/explanation: {e}") import traceback traceback.print_exc() image_prompt = f"An emotional landscape representing {', '.join(emotions)}, with soft ethereal lighting and dreamlike quality" explanation = f"Your {emotions[0]} touched me, and I wanted to show you how I felt it." print(f"[DEBUG] Final explanation before image gen: '{explanation}'") # Generate the image action = emotion_state.get("suggested_action", "reflect") style = "dreamy" if mode == "dream" else "warm" if mode == "night" else "artistic" image = await self.artist.generate_for_mood(image_prompt, style, action) return image, explanation async def process_streaming( self, user_input: str, session_id: str = "default" ) -> AsyncGenerator[dict, None]: """ Streaming version of process that yields updates as they happen. Yields dicts with: - {"type": "state", "value": "thinking"} - {"type": "ack", "value": "I hear you..."} - {"type": "text_chunk", "value": "..."} - {"type": "image", "value": GeneratedImage} - {"type": "complete", "value": PipResponse} """ # Add to history self._add_to_history(session_id, "user", user_input) mode = self.get_mode(session_id) yield {"type": "state", "value": "listening"} # Phase 1: Quick ack + emotion (parallel) ack_task = asyncio.create_task( self.sambanova.quick_acknowledge(user_input, QUICK_ACK_PROMPT) ) emotion_task = asyncio.create_task( self.claude.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) ) acknowledgment = await ack_task yield {"type": "ack", "value": acknowledgment} yield {"type": "state", "value": "thinking"} emotion_state = await emotion_task pip_state = emotion_to_pip_state( emotion_state.get("primary_emotions", []), emotion_state.get("intensity", 5) ) yield {"type": "emotion", "value": emotion_state} yield {"type": "state", "value": pip_state} # Phase 2: Action + Prompt (parallel) action_task = asyncio.create_task( self.claude.decide_action(emotion_state, ACTION_DECIDER_PROMPT) ) prompt_task = asyncio.create_task( self.sambanova.enhance_prompt(user_input, emotion_state, mode, PROMPT_ENHANCER_PROMPT) ) action, image_prompt = await asyncio.gather(action_task, prompt_task) yield {"type": "action", "value": action} # Phase 3: Start image generation image_task = asyncio.create_task( self.artist.generate_for_mood( image_prompt, action.get("image_style", "warm"), action.get("action", "reflect") ) ) # Phase 4: Stream response yield {"type": "state", "value": "speaking"} response_text = "" if emotion_state.get("intervention_needed", False): async for chunk in self.claude.generate_intervention_response( user_input, emotion_state, INTERVENTION_PROMPT ): response_text += chunk yield {"type": "text_chunk", "value": chunk} else: if self._should_use_claude(): async for chunk in self.claude.generate_response_stream( user_input, emotion_state, action, CONVERSATION_PROMPT, self._get_conversation_history(session_id) ): response_text += chunk yield {"type": "text_chunk", "value": chunk} else: async for chunk in self.sambanova.generate_response_stream( user_input, emotion_state, CONVERSATION_PROMPT ): response_text += chunk yield {"type": "text_chunk", "value": chunk} self._add_to_history(session_id, "assistant", response_text) # Wait for image generated_image = await image_task yield {"type": "image", "value": generated_image} # Final state yield {"type": "state", "value": pip_state} # Complete response yield { "type": "complete", "value": PipResponse( acknowledgment=acknowledgment, response_text=response_text, emotion_state=emotion_state, action=action, image=generated_image, audio=None, pip_state=pip_state, image_prompt=image_prompt ) } def _should_use_claude(self) -> bool: """ Decide whether to use Claude or SambaNova for conversation. Simple alternation for load balancing. """ self._use_claude_for_conversation = not self._use_claude_for_conversation return self._use_claude_for_conversation def _build_prompt_context(self, emotion_state: dict, mode: str) -> dict: """Build context for prompt enhancement.""" return { "emotions": emotion_state.get("primary_emotions", []), "intensity": emotion_state.get("intensity", 5), "needs": emotion_state.get("underlying_needs", []), "mode": mode } def clear_history(self, session_id: str): """Clear conversation history for a session.""" if session_id in self._conversations: del self._conversations[session_id] def get_history(self, session_id: str) -> list[dict]: """Get conversation history for display.""" return [ {"role": m.role, "content": m.content} for m in self._conversations.get(session_id, []) ] async def summarize_conversation( self, session_id: str = "default", generate_voice: bool = True ) -> dict: """ Create a memory artifact from the conversation. Uses FULL conversation context to create a deeply meaningful summary, image, and audio that captures the entire emotional journey. Returns: dict with summary, image, and audio """ history = self._conversations.get(session_id, []) mode = self.get_mode(session_id) if not history: return { "summary": "No conversation to summarize yet!", "image": None, "audio": None, "emotions_journey": [] } # Build FULL conversation text (not truncated) conv_text = "\n".join([ f"{m.role}: {m.content}" for m in history ]) # Extract key themes and emotional arc analysis_prompt = f"""Analyze this COMPLETE conversation deeply. FULL CONVERSATION: {conv_text} Identify: 1. EMOTIONAL ARC: How did the person's emotions change from start to end? 2. KEY MOMENTS: What were the most significant things they shared? 3. THEMES: What topics or concerns came up repeatedly? 4. RESOLUTION: Did they reach any realizations or feel better by the end? 5. VISUAL METAPHOR: What single image/scene could capture this entire journey? Respond in JSON: {{ "emotional_arc": "description of how emotions evolved", "key_moments": ["moment1", "moment2"], "themes": ["theme1", "theme2"], "resolution": "how conversation concluded emotionally", "visual_metaphor": "a vivid scene description that captures the journey", "dominant_emotions": ["emotion1", "emotion2", "emotion3"], "intensity_end": 1-10 }}""" try: # Deep analysis using Gemini/Claude with fallback import json analysis_raw = await self._generate_text_with_fallback(analysis_prompt) # Parse analysis try: # Try to extract JSON from response if "```json" in analysis_raw: analysis_raw = analysis_raw.split("```json")[1].split("```")[0] elif "```" in analysis_raw: analysis_raw = analysis_raw.split("```")[1].split("```")[0] analysis = json.loads(analysis_raw.strip()) except: analysis = { "emotional_arc": "A meaningful exchange", "key_moments": ["sharing feelings"], "themes": ["connection"], "resolution": "feeling heard", "visual_metaphor": "Two soft lights connecting in a gentle space", "dominant_emotions": ["reflection", "warmth"], "intensity_end": 5 } # Generate warm summary based on analysis summary_prompt = f"""You are Pip, a warm emotional companion. Create a brief (2-3 sentences) heartfelt summary of your conversation with this person. ANALYSIS OF CONVERSATION: - Emotional journey: {analysis.get('emotional_arc', 'meaningful exchange')} - Key moments: {', '.join(analysis.get('key_moments', ['connection']))} - How it ended: {analysis.get('resolution', 'feeling heard')} Write warmly, personally, as if you genuinely care about this person. Reference specific things they shared (but keep it brief). End with warmth.""" summary = await self._generate_text_with_fallback(summary_prompt) if not summary: summary = "We had a meaningful conversation together. I'm here whenever you want to talk again!" # Create RICH image prompt using full context visual_metaphor = analysis.get('visual_metaphor', 'A peaceful scene of connection and understanding') emotions = analysis.get('dominant_emotions', ['reflection', 'peace']) themes = analysis.get('themes', ['connection']) memory_image_prompt = f"""Create a deeply meaningful visual memory: VISUAL CONCEPT: {visual_metaphor} EMOTIONAL ESSENCE: - Emotions to convey: {', '.join(emotions)} - Themes: {', '.join(themes)} - Emotional resolution: {analysis.get('resolution', 'peace')} STYLE REQUIREMENTS: - Mode: {mode} ({'magical/ethereal' if mode == 'alchemist' else 'dreamy/surreal' if mode == 'dream' else 'calm/starlit' if mode == 'night' else 'artistic/painterly'}) - Soft, emotional lighting - Colors that match the emotional journey - Abstract elements suggesting conversation/connection - NO text, NO words, NO letters - Evocative, gallery-worthy composition This should feel like a precious memory captured in art.""" # Generate memory image with full context memory_image = await self.artist.generate_for_mood( memory_image_prompt, "dreamy", "reflect" ) # Generate audio if requested audio_response = None if generate_voice: audio_response = await self.voice.speak( summary, emotions, "reflect", analysis.get("intensity_end", 5) ) return { "summary": summary, "image": memory_image, "audio": audio_response, "emotions_journey": emotions, "analysis": analysis # Include full analysis for debugging/display } except Exception as e: print(f"Error summarizing conversation: {e}") import traceback traceback.print_exc() return { "summary": "I enjoyed our conversation! Let's chat again soon.", "image": None, "audio": None, "emotions_journey": [] } # Singleton instance for easy access _brain_instance: Optional[PipBrain] = None def get_brain() -> PipBrain: """Get or create the Pip brain instance.""" global _brain_instance if _brain_instance is None: _brain_instance = PipBrain() return _brain_instance