Spaces:
Running
Running
| """ | |
| Pip's Brain - The emotional processing pipeline. | |
| Orchestrates all services with parallel execution for minimal latency. | |
| LLM Priority: Gemini 2.5 (primary) -> Anthropic Claude (fallback) | |
| """ | |
| import asyncio | |
| from typing import Optional, Callable, AsyncGenerator | |
| from dataclasses import dataclass | |
| import random | |
| import os | |
| from services.gemini_client import GeminiClient | |
| from services.anthropic_client import AnthropicClient | |
| from services.sambanova_client import SambanovaClient | |
| from pip_prompts import ( | |
| EMOTION_ANALYZER_PROMPT, | |
| ACTION_DECIDER_PROMPT, | |
| PROMPT_ENHANCER_PROMPT, | |
| CONVERSATION_PROMPT, | |
| INTERVENTION_PROMPT, | |
| QUICK_ACK_PROMPT, | |
| EMOTION_ANALYZER_QUICK_PROMPT | |
| ) | |
| from pip_artist import PipArtist, GeneratedImage | |
| from pip_voice import PipVoice, VoiceResponse | |
| from pip_character import emotion_to_pip_state, PipState | |
| from pip_latency import LatencyManager, StreamingContext | |
| class PipResponse: | |
| """Complete response from Pip.""" | |
| acknowledgment: str | |
| response_text: str | |
| emotion_state: dict | |
| action: dict | |
| image: Optional[GeneratedImage] | |
| audio: Optional[VoiceResponse] | |
| pip_state: str | |
| image_prompt: Optional[str] = None | |
| class ConversationMessage: | |
| """A message in conversation history.""" | |
| role: str # "user" or "assistant" | |
| content: str | |
| class UserAPIKeys: | |
| """User-provided API keys for a session.""" | |
| google_api_key: Optional[str] = None | |
| anthropic_api_key: Optional[str] = None | |
| openai_api_key: Optional[str] = None | |
| elevenlabs_api_key: Optional[str] = None | |
| huggingface_token: Optional[str] = None | |
| class PipBrain: | |
| """ | |
| Pip's central brain - orchestrates emotional intelligence pipeline. | |
| LLM Priority: | |
| 1. Gemini 2.5 (primary) - fast and capable | |
| 2. Anthropic Claude (fallback) - when Gemini fails | |
| 3. SambaNova (fast acknowledgments) | |
| Processing flow: | |
| 1. Quick acknowledgment (Gemini Flash/SambaNova) - immediate | |
| 2. Emotion analysis (Gemini Pro) - parallel | |
| 3. Action decision (Gemini Flash) - after emotion | |
| 4. Prompt enhancement (Gemini Flash) - parallel with action | |
| 5. Image generation (load balanced) - after prompt | |
| 6. Full response (Gemini/Claude) - streaming | |
| """ | |
| def __init__(self, user_keys: UserAPIKeys = None): | |
| """ | |
| Initialize Pip's brain with optional user-provided API keys. | |
| """ | |
| # Store user keys | |
| self.user_keys = user_keys | |
| # Initialize clients with user keys if provided | |
| google_key = user_keys.google_api_key if user_keys else None | |
| anthropic_key = user_keys.anthropic_api_key if user_keys else None | |
| # Primary LLM: Gemini | |
| self.gemini = GeminiClient(api_key=google_key) | |
| # Fallback LLM: Claude (only if API key available) | |
| self.claude = AnthropicClient(api_key=anthropic_key) if os.getenv("ANTHROPIC_API_KEY") or anthropic_key else None | |
| # Fast LLM for acknowledgments | |
| self.sambanova = SambanovaClient() | |
| # Other services | |
| self.artist = PipArtist() | |
| self.voice = PipVoice() | |
| self.latency_manager = LatencyManager() | |
| # Conversation history per session | |
| self._conversations: dict[str, list[ConversationMessage]] = {} | |
| # Current mode per session | |
| self._modes: dict[str, str] = {} # "auto", "alchemist", "artist", "dream", "night" | |
| # Track which LLM to use | |
| self._gemini_available = True | |
| self._claude_available = self.claude is not None | |
| def set_mode(self, session_id: str, mode: str): | |
| """Set the interaction mode for a session.""" | |
| self._modes[session_id] = mode | |
| def get_mode(self, session_id: str) -> str: | |
| """Get current mode for session.""" | |
| return self._modes.get(session_id, "auto") | |
| def _get_conversation_history(self, session_id: str) -> list[dict]: | |
| """Get formatted conversation history.""" | |
| history = self._conversations.get(session_id, []) | |
| return [{"role": m.role, "content": m.content} for m in history[-10:]] # Last 10 messages | |
| def _add_to_history(self, session_id: str, role: str, content: str): | |
| """Add message to conversation history.""" | |
| if session_id not in self._conversations: | |
| self._conversations[session_id] = [] | |
| self._conversations[session_id].append(ConversationMessage(role=role, content=content)) | |
| async def process( | |
| self, | |
| user_input: str, | |
| session_id: str = "default", | |
| generate_voice: bool = False, | |
| on_state_change: Callable[[str], None] = None, | |
| on_text_chunk: Callable[[str], None] = None, | |
| on_acknowledgment: Callable[[str], None] = None | |
| ) -> PipResponse: | |
| """ | |
| Process user input through the emotional pipeline. | |
| NOTE: Image generation is now SEPARATE - use visualize_current_mood() for images. | |
| Args: | |
| user_input: What the user said | |
| session_id: Session identifier for conversation continuity | |
| generate_voice: Whether to generate voice response | |
| on_state_change: Callback for Pip state changes | |
| on_text_chunk: Callback for streaming text | |
| on_acknowledgment: Callback for quick acknowledgment | |
| Returns: | |
| PipResponse with text response (no image unless intervention) | |
| """ | |
| # Add user message to history | |
| self._add_to_history(session_id, "user", user_input) | |
| # Get current mode | |
| mode = self.get_mode(session_id) | |
| # Notify listening state | |
| if on_state_change: | |
| on_state_change("listening") | |
| # Phase 1: Parallel - Quick ack + Emotion analysis | |
| # Use Gemini for quick ack, with SambaNova fallback | |
| ack_task = asyncio.create_task( | |
| self._quick_acknowledge_with_fallback(user_input) | |
| ) | |
| # Use Gemini for emotion analysis, with Claude fallback | |
| emotion_task = asyncio.create_task( | |
| self._analyze_emotion_with_fallback(user_input) | |
| ) | |
| # Get acknowledgment ASAP | |
| acknowledgment = await ack_task | |
| if on_acknowledgment: | |
| on_acknowledgment(acknowledgment) | |
| if on_state_change: | |
| on_state_change("thinking") | |
| # Wait for emotion analysis | |
| emotion_state = await emotion_task | |
| # Determine Pip's visual state from emotion | |
| pip_visual_state = emotion_to_pip_state( | |
| emotion_state.get("primary_emotions", []), | |
| emotion_state.get("intensity", 5) | |
| ) | |
| if on_state_change: | |
| on_state_change(pip_visual_state) | |
| # Phase 2: Decide action (using Gemini with fallback) | |
| action = await self._decide_action_with_fallback(emotion_state) | |
| # Start voice generation early if enabled (parallel with response) | |
| voice_task = None | |
| if generate_voice: | |
| voice_task = asyncio.create_task( | |
| self._generate_voice_for_response( | |
| "", | |
| emotion_state, | |
| action | |
| ) | |
| ) | |
| # Phase 3: Generate response (streaming) | |
| if on_state_change: | |
| on_state_change("speaking") | |
| response_text = "" | |
| # Check if intervention is needed | |
| if emotion_state.get("intervention_needed", False): | |
| # Use intervention prompt with fallback | |
| async for chunk in self._generate_intervention_with_fallback( | |
| user_input, emotion_state | |
| ): | |
| response_text += chunk | |
| if on_text_chunk: | |
| on_text_chunk(chunk) | |
| else: | |
| # Normal conversation - try Gemini first, then Claude, then SambaNova | |
| async for chunk in self._generate_response_with_fallback( | |
| user_input, | |
| emotion_state, | |
| action, | |
| self._get_conversation_history(session_id) | |
| ): | |
| response_text += chunk | |
| if on_text_chunk: | |
| on_text_chunk(chunk) | |
| # Add response to history | |
| self._add_to_history(session_id, "assistant", response_text) | |
| # Generate voice for the full response now | |
| voice_response = None | |
| if generate_voice and response_text: | |
| # Cancel the early task if it was started | |
| if voice_task: | |
| voice_task.cancel() | |
| voice_response = await self.voice.speak( | |
| response_text, | |
| emotion_state.get("primary_emotions", []), | |
| action.get("action", "reflect"), | |
| emotion_state.get("intensity", 5) | |
| ) | |
| # Final state update | |
| if on_state_change: | |
| on_state_change(pip_visual_state) | |
| # NO IMAGE - images are now generated on demand via visualize_current_mood() | |
| return PipResponse( | |
| acknowledgment=acknowledgment, | |
| response_text=response_text, | |
| emotion_state=emotion_state, | |
| action=action, | |
| image=None, # No auto image | |
| audio=voice_response, | |
| pip_state=pip_visual_state, | |
| image_prompt=None | |
| ) | |
| async def _generate_voice_for_response( | |
| self, | |
| text: str, | |
| emotion_state: dict, | |
| action: dict | |
| ) -> Optional[VoiceResponse]: | |
| """Helper to generate voice response.""" | |
| if not text: | |
| return None | |
| return await self.voice.speak( | |
| text, | |
| emotion_state.get("primary_emotions", []), | |
| action.get("action", "reflect"), | |
| emotion_state.get("intensity", 5) | |
| ) | |
| # ========================================================================= | |
| # FALLBACK METHODS - Try Gemini first, then Claude/SambaNova | |
| # ========================================================================= | |
| async def _quick_acknowledge_with_fallback(self, user_input: str) -> str: | |
| """Quick acknowledgment with Gemini -> SambaNova fallback.""" | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| result = await self.gemini.quick_acknowledge(user_input, QUICK_ACK_PROMPT) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Gemini quick ack failed: {e}") | |
| # Fallback to SambaNova | |
| try: | |
| return await self.sambanova.quick_acknowledge(user_input, QUICK_ACK_PROMPT) | |
| except Exception as e: | |
| print(f"SambaNova quick ack failed: {e}") | |
| return "I hear you..." | |
| async def _analyze_emotion_with_fallback(self, user_input: str) -> dict: | |
| """Emotion analysis with Gemini -> Claude fallback.""" | |
| default_emotion = { | |
| "primary_emotions": ["neutral"], | |
| "secondary_emotions": [], | |
| "intensity": 5, | |
| "underlying_needs": ["connection"], | |
| "intervention_needed": False | |
| } | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| result = await self.gemini.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Gemini emotion analysis failed: {e}") | |
| self._gemini_available = False # Temporarily disable | |
| # Fallback to Claude | |
| if self._claude_available and self.claude: | |
| try: | |
| result = await self.claude.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Claude emotion analysis failed: {e}") | |
| return default_emotion | |
| async def _decide_action_with_fallback(self, emotion_state: dict) -> dict: | |
| """Action decision with Gemini -> Claude fallback.""" | |
| default_action = { | |
| "action": "reflect", | |
| "image_style": "warm", | |
| "suggested_response_tone": "empathetic" | |
| } | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| result = await self.gemini.decide_action(emotion_state, ACTION_DECIDER_PROMPT) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Gemini action decision failed: {e}") | |
| # Fallback to Claude | |
| if self._claude_available and self.claude: | |
| try: | |
| result = await self.claude.decide_action(emotion_state, ACTION_DECIDER_PROMPT) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Claude action decision failed: {e}") | |
| return default_action | |
| async def _generate_response_with_fallback( | |
| self, | |
| user_input: str, | |
| emotion_state: dict, | |
| action: dict, | |
| history: list | |
| ) -> AsyncGenerator[str, None]: | |
| """Generate response with Gemini -> Claude -> SambaNova fallback.""" | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| yielded = False | |
| async for chunk in self.gemini.generate_response_stream( | |
| user_input, emotion_state, action, CONVERSATION_PROMPT, history | |
| ): | |
| yielded = True | |
| yield chunk | |
| if yielded: | |
| return | |
| except Exception as e: | |
| print(f"Gemini response generation failed: {e}") | |
| # Fallback to Claude | |
| if self._claude_available and self.claude: | |
| try: | |
| yielded = False | |
| async for chunk in self.claude.generate_response_stream( | |
| user_input, emotion_state, action, CONVERSATION_PROMPT, history | |
| ): | |
| yielded = True | |
| yield chunk | |
| if yielded: | |
| return | |
| except Exception as e: | |
| print(f"Claude response generation failed: {e}") | |
| # Final fallback to SambaNova | |
| try: | |
| async for chunk in self.sambanova.generate_response_stream( | |
| user_input, emotion_state, CONVERSATION_PROMPT | |
| ): | |
| yield chunk | |
| except Exception as e: | |
| print(f"All LLMs failed: {e}") | |
| yield "I'm here with you. Tell me more about what's on your mind." | |
| async def _generate_intervention_with_fallback( | |
| self, | |
| user_input: str, | |
| emotion_state: dict | |
| ) -> AsyncGenerator[str, None]: | |
| """Generate intervention response with fallback.""" | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| yielded = False | |
| async for chunk in self.gemini.generate_intervention_response( | |
| user_input, emotion_state, INTERVENTION_PROMPT | |
| ): | |
| yielded = True | |
| yield chunk | |
| if yielded: | |
| return | |
| except Exception as e: | |
| print(f"Gemini intervention failed: {e}") | |
| # Fallback to Claude | |
| if self._claude_available and self.claude: | |
| try: | |
| async for chunk in self.claude.generate_intervention_response( | |
| user_input, emotion_state, INTERVENTION_PROMPT | |
| ): | |
| yield chunk | |
| return | |
| except Exception as e: | |
| print(f"Claude intervention failed: {e}") | |
| # Safe default | |
| yield "I hear that you're going through something difficult. I'm here with you, and I care about how you're feeling. If you're in crisis, please reach out to a helpline or someone you trust." | |
| async def _generate_text_with_fallback(self, prompt: str) -> Optional[str]: | |
| """Generate text with Gemini -> Claude fallback.""" | |
| # Try Gemini first | |
| if self._gemini_available: | |
| try: | |
| result = await self.gemini.generate_text(prompt) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Gemini text generation failed: {e}") | |
| # Fallback to Claude | |
| if self._claude_available and self.claude: | |
| try: | |
| result = await self.claude.generate_text(prompt) | |
| if result: | |
| return result | |
| except Exception as e: | |
| print(f"Claude text generation failed: {e}") | |
| return None | |
| async def visualize_current_mood( | |
| self, | |
| session_id: str = "default" | |
| ) -> tuple[Optional[GeneratedImage], str]: | |
| """ | |
| Generate an image based on the current conversation context. | |
| Called explicitly by user via "Visualize" button. | |
| Uses the full conversation history to create a contextual, meaningful image. | |
| Returns: | |
| (GeneratedImage, explanation) - The image and a 1-sentence explanation of why | |
| """ | |
| history = self._conversations.get(session_id, []) | |
| mode = self.get_mode(session_id) | |
| if not history: | |
| # No conversation yet - generate a welcoming image | |
| prompt = "A warm, inviting scene with soft morning light, gentle colors, a sense of new beginnings and openness, peaceful atmosphere" | |
| image = await self.artist.generate_for_mood(prompt, "warm", "welcome") | |
| return image, "A fresh start, waiting to capture whatever you'd like to share." | |
| # Build context from recent conversation | |
| recent_messages = history[-6:] # Last 3 exchanges | |
| conv_summary = "\n".join([ | |
| f"{m.role}: {m.content}" for m in recent_messages | |
| ]) | |
| # Get the last user message for primary context | |
| last_user_msg = "" | |
| for m in reversed(history): | |
| if m.role == "user": | |
| last_user_msg = m.content | |
| break | |
| # Analyze emotion of recent conversation (using fallback) | |
| emotion_state = await self._analyze_emotion_with_fallback(conv_summary) | |
| emotions = emotion_state.get('primary_emotions', ['neutral']) | |
| # Generate image prompt AND explanation together | |
| prompt_and_explain = f"""Based on this conversation, create TWO things: | |
| CONVERSATION: | |
| {conv_summary} | |
| DETECTED EMOTIONS: {', '.join(emotions)} | |
| MODE: {mode} | |
| 1. IMAGE_PROMPT: A vivid, specific image prompt (2-3 sentences) that: | |
| - Captures the emotional essence of this conversation | |
| - Would resonate with someone feeling these emotions | |
| - Matches the {mode} aesthetic | |
| 2. EXPLANATION: ONE sentence (15 words max) explaining WHY this image fits the conversation. | |
| - Be poetic/thoughtful, not clinical | |
| - Help the user see the connection | |
| - Start with something like "Because...", "I see...", "This reflects...", "Your words painted..." | |
| Respond in this exact format: | |
| IMAGE_PROMPT: [your prompt here] | |
| EXPLANATION: [your explanation here]""" | |
| try: | |
| # Try Gemini first, then Claude | |
| result = None | |
| if self._gemini_available: | |
| try: | |
| result = await self.gemini.generate_text(prompt_and_explain) | |
| except Exception as e: | |
| print(f"[DEBUG] Gemini failed for prompt/explain: {e}") | |
| if not result and self._claude_available and self.claude: | |
| try: | |
| result = await self.claude.generate_text(prompt_and_explain) | |
| except Exception as e: | |
| print(f"[DEBUG] Claude failed for prompt/explain: {e}") | |
| print(f"[DEBUG] LLM response for prompt/explain: {result[:200] if result else 'None'}...") | |
| # Parse the result | |
| image_prompt = "" | |
| explanation = "" | |
| if result and "IMAGE_PROMPT:" in result and "EXPLANATION:" in result: | |
| parts = result.split("EXPLANATION:") | |
| image_prompt = parts[0].replace("IMAGE_PROMPT:", "").strip() | |
| explanation = parts[1].strip() | |
| print(f"[DEBUG] Parsed - prompt: {image_prompt[:50]}..., explanation: {explanation}") | |
| else: | |
| # Fallback | |
| print(f"[DEBUG] Using fallback - result didn't have expected format") | |
| image_prompt = await self.sambanova.enhance_prompt( | |
| last_user_msg, | |
| emotion_state, | |
| mode, | |
| PROMPT_ENHANCER_PROMPT | |
| ) | |
| explanation = f"I sensed {emotions[0]} in your words and wanted to reflect that back to you." | |
| except Exception as e: | |
| print(f"[DEBUG] Error generating prompt/explanation: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| image_prompt = f"An emotional landscape representing {', '.join(emotions)}, with soft ethereal lighting and dreamlike quality" | |
| explanation = f"Your {emotions[0]} touched me, and I wanted to show you how I felt it." | |
| print(f"[DEBUG] Final explanation before image gen: '{explanation}'") | |
| # Generate the image | |
| action = emotion_state.get("suggested_action", "reflect") | |
| style = "dreamy" if mode == "dream" else "warm" if mode == "night" else "artistic" | |
| image = await self.artist.generate_for_mood(image_prompt, style, action) | |
| return image, explanation | |
| async def process_streaming( | |
| self, | |
| user_input: str, | |
| session_id: str = "default" | |
| ) -> AsyncGenerator[dict, None]: | |
| """ | |
| Streaming version of process that yields updates as they happen. | |
| Yields dicts with: | |
| - {"type": "state", "value": "thinking"} | |
| - {"type": "ack", "value": "I hear you..."} | |
| - {"type": "text_chunk", "value": "..."} | |
| - {"type": "image", "value": GeneratedImage} | |
| - {"type": "complete", "value": PipResponse} | |
| """ | |
| # Add to history | |
| self._add_to_history(session_id, "user", user_input) | |
| mode = self.get_mode(session_id) | |
| yield {"type": "state", "value": "listening"} | |
| # Phase 1: Quick ack + emotion (parallel) | |
| ack_task = asyncio.create_task( | |
| self.sambanova.quick_acknowledge(user_input, QUICK_ACK_PROMPT) | |
| ) | |
| emotion_task = asyncio.create_task( | |
| self.claude.analyze_emotion(user_input, EMOTION_ANALYZER_PROMPT) | |
| ) | |
| acknowledgment = await ack_task | |
| yield {"type": "ack", "value": acknowledgment} | |
| yield {"type": "state", "value": "thinking"} | |
| emotion_state = await emotion_task | |
| pip_state = emotion_to_pip_state( | |
| emotion_state.get("primary_emotions", []), | |
| emotion_state.get("intensity", 5) | |
| ) | |
| yield {"type": "emotion", "value": emotion_state} | |
| yield {"type": "state", "value": pip_state} | |
| # Phase 2: Action + Prompt (parallel) | |
| action_task = asyncio.create_task( | |
| self.claude.decide_action(emotion_state, ACTION_DECIDER_PROMPT) | |
| ) | |
| prompt_task = asyncio.create_task( | |
| self.sambanova.enhance_prompt(user_input, emotion_state, mode, PROMPT_ENHANCER_PROMPT) | |
| ) | |
| action, image_prompt = await asyncio.gather(action_task, prompt_task) | |
| yield {"type": "action", "value": action} | |
| # Phase 3: Start image generation | |
| image_task = asyncio.create_task( | |
| self.artist.generate_for_mood( | |
| image_prompt, | |
| action.get("image_style", "warm"), | |
| action.get("action", "reflect") | |
| ) | |
| ) | |
| # Phase 4: Stream response | |
| yield {"type": "state", "value": "speaking"} | |
| response_text = "" | |
| if emotion_state.get("intervention_needed", False): | |
| async for chunk in self.claude.generate_intervention_response( | |
| user_input, emotion_state, INTERVENTION_PROMPT | |
| ): | |
| response_text += chunk | |
| yield {"type": "text_chunk", "value": chunk} | |
| else: | |
| if self._should_use_claude(): | |
| async for chunk in self.claude.generate_response_stream( | |
| user_input, emotion_state, action, CONVERSATION_PROMPT, | |
| self._get_conversation_history(session_id) | |
| ): | |
| response_text += chunk | |
| yield {"type": "text_chunk", "value": chunk} | |
| else: | |
| async for chunk in self.sambanova.generate_response_stream( | |
| user_input, emotion_state, CONVERSATION_PROMPT | |
| ): | |
| response_text += chunk | |
| yield {"type": "text_chunk", "value": chunk} | |
| self._add_to_history(session_id, "assistant", response_text) | |
| # Wait for image | |
| generated_image = await image_task | |
| yield {"type": "image", "value": generated_image} | |
| # Final state | |
| yield {"type": "state", "value": pip_state} | |
| # Complete response | |
| yield { | |
| "type": "complete", | |
| "value": PipResponse( | |
| acknowledgment=acknowledgment, | |
| response_text=response_text, | |
| emotion_state=emotion_state, | |
| action=action, | |
| image=generated_image, | |
| audio=None, | |
| pip_state=pip_state, | |
| image_prompt=image_prompt | |
| ) | |
| } | |
| def _should_use_claude(self) -> bool: | |
| """ | |
| Decide whether to use Claude or SambaNova for conversation. | |
| Simple alternation for load balancing. | |
| """ | |
| self._use_claude_for_conversation = not self._use_claude_for_conversation | |
| return self._use_claude_for_conversation | |
| def _build_prompt_context(self, emotion_state: dict, mode: str) -> dict: | |
| """Build context for prompt enhancement.""" | |
| return { | |
| "emotions": emotion_state.get("primary_emotions", []), | |
| "intensity": emotion_state.get("intensity", 5), | |
| "needs": emotion_state.get("underlying_needs", []), | |
| "mode": mode | |
| } | |
| def clear_history(self, session_id: str): | |
| """Clear conversation history for a session.""" | |
| if session_id in self._conversations: | |
| del self._conversations[session_id] | |
| def get_history(self, session_id: str) -> list[dict]: | |
| """Get conversation history for display.""" | |
| return [ | |
| {"role": m.role, "content": m.content} | |
| for m in self._conversations.get(session_id, []) | |
| ] | |
| async def summarize_conversation( | |
| self, | |
| session_id: str = "default", | |
| generate_voice: bool = True | |
| ) -> dict: | |
| """ | |
| Create a memory artifact from the conversation. | |
| Uses FULL conversation context to create a deeply meaningful summary, | |
| image, and audio that captures the entire emotional journey. | |
| Returns: | |
| dict with summary, image, and audio | |
| """ | |
| history = self._conversations.get(session_id, []) | |
| mode = self.get_mode(session_id) | |
| if not history: | |
| return { | |
| "summary": "No conversation to summarize yet!", | |
| "image": None, | |
| "audio": None, | |
| "emotions_journey": [] | |
| } | |
| # Build FULL conversation text (not truncated) | |
| conv_text = "\n".join([ | |
| f"{m.role}: {m.content}" for m in history | |
| ]) | |
| # Extract key themes and emotional arc | |
| analysis_prompt = f"""Analyze this COMPLETE conversation deeply. | |
| FULL CONVERSATION: | |
| {conv_text} | |
| Identify: | |
| 1. EMOTIONAL ARC: How did the person's emotions change from start to end? | |
| 2. KEY MOMENTS: What were the most significant things they shared? | |
| 3. THEMES: What topics or concerns came up repeatedly? | |
| 4. RESOLUTION: Did they reach any realizations or feel better by the end? | |
| 5. VISUAL METAPHOR: What single image/scene could capture this entire journey? | |
| Respond in JSON: | |
| {{ | |
| "emotional_arc": "description of how emotions evolved", | |
| "key_moments": ["moment1", "moment2"], | |
| "themes": ["theme1", "theme2"], | |
| "resolution": "how conversation concluded emotionally", | |
| "visual_metaphor": "a vivid scene description that captures the journey", | |
| "dominant_emotions": ["emotion1", "emotion2", "emotion3"], | |
| "intensity_end": 1-10 | |
| }}""" | |
| try: | |
| # Deep analysis using Gemini/Claude with fallback | |
| import json | |
| analysis_raw = await self._generate_text_with_fallback(analysis_prompt) | |
| # Parse analysis | |
| try: | |
| # Try to extract JSON from response | |
| if "```json" in analysis_raw: | |
| analysis_raw = analysis_raw.split("```json")[1].split("```")[0] | |
| elif "```" in analysis_raw: | |
| analysis_raw = analysis_raw.split("```")[1].split("```")[0] | |
| analysis = json.loads(analysis_raw.strip()) | |
| except: | |
| analysis = { | |
| "emotional_arc": "A meaningful exchange", | |
| "key_moments": ["sharing feelings"], | |
| "themes": ["connection"], | |
| "resolution": "feeling heard", | |
| "visual_metaphor": "Two soft lights connecting in a gentle space", | |
| "dominant_emotions": ["reflection", "warmth"], | |
| "intensity_end": 5 | |
| } | |
| # Generate warm summary based on analysis | |
| summary_prompt = f"""You are Pip, a warm emotional companion. Create a brief (2-3 sentences) heartfelt summary of your conversation with this person. | |
| ANALYSIS OF CONVERSATION: | |
| - Emotional journey: {analysis.get('emotional_arc', 'meaningful exchange')} | |
| - Key moments: {', '.join(analysis.get('key_moments', ['connection']))} | |
| - How it ended: {analysis.get('resolution', 'feeling heard')} | |
| Write warmly, personally, as if you genuinely care about this person. Reference specific things they shared (but keep it brief). End with warmth.""" | |
| summary = await self._generate_text_with_fallback(summary_prompt) | |
| if not summary: | |
| summary = "We had a meaningful conversation together. I'm here whenever you want to talk again!" | |
| # Create RICH image prompt using full context | |
| visual_metaphor = analysis.get('visual_metaphor', 'A peaceful scene of connection and understanding') | |
| emotions = analysis.get('dominant_emotions', ['reflection', 'peace']) | |
| themes = analysis.get('themes', ['connection']) | |
| memory_image_prompt = f"""Create a deeply meaningful visual memory: | |
| VISUAL CONCEPT: {visual_metaphor} | |
| EMOTIONAL ESSENCE: | |
| - Emotions to convey: {', '.join(emotions)} | |
| - Themes: {', '.join(themes)} | |
| - Emotional resolution: {analysis.get('resolution', 'peace')} | |
| STYLE REQUIREMENTS: | |
| - Mode: {mode} ({'magical/ethereal' if mode == 'alchemist' else 'dreamy/surreal' if mode == 'dream' else 'calm/starlit' if mode == 'night' else 'artistic/painterly'}) | |
| - Soft, emotional lighting | |
| - Colors that match the emotional journey | |
| - Abstract elements suggesting conversation/connection | |
| - NO text, NO words, NO letters | |
| - Evocative, gallery-worthy composition | |
| This should feel like a precious memory captured in art.""" | |
| # Generate memory image with full context | |
| memory_image = await self.artist.generate_for_mood( | |
| memory_image_prompt, | |
| "dreamy", | |
| "reflect" | |
| ) | |
| # Generate audio if requested | |
| audio_response = None | |
| if generate_voice: | |
| audio_response = await self.voice.speak( | |
| summary, | |
| emotions, | |
| "reflect", | |
| analysis.get("intensity_end", 5) | |
| ) | |
| return { | |
| "summary": summary, | |
| "image": memory_image, | |
| "audio": audio_response, | |
| "emotions_journey": emotions, | |
| "analysis": analysis # Include full analysis for debugging/display | |
| } | |
| except Exception as e: | |
| print(f"Error summarizing conversation: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| "summary": "I enjoyed our conversation! Let's chat again soon.", | |
| "image": None, | |
| "audio": None, | |
| "emotions_journey": [] | |
| } | |
| # Singleton instance for easy access | |
| _brain_instance: Optional[PipBrain] = None | |
| def get_brain() -> PipBrain: | |
| """Get or create the Pip brain instance.""" | |
| global _brain_instance | |
| if _brain_instance is None: | |
| _brain_instance = PipBrain() | |
| return _brain_instance | |