""" SambaNova client for Pip's fast responses. Handles: Quick acknowledgments, prompt enhancement, load-balanced conversation. Uses OpenAI-compatible API. """ import os import asyncio from typing import AsyncGenerator from openai import AsyncOpenAI class SambanovaClient: """SambaNova-powered fast inference for Pip.""" def __init__(self): api_key = os.getenv("SAMBANOVA_API_KEY") self.available = bool(api_key) if self.available: self.client = AsyncOpenAI( api_key=api_key, base_url=os.getenv("SAMBANOVA_BASE_URL", "https://api.sambanova.ai/v1") ) else: self.client = None print("⚠️ SambaNova: No API key found - service disabled") # Using Llama 3.1 or DeepSeek on SambaNova self.model = "Meta-Llama-3.1-8B-Instruct" self._rate_limited = False self._rate_limit_reset = 0 async def _check_rate_limit(self): """Check if we're currently rate limited.""" import time if self._rate_limited and time.time() < self._rate_limit_reset: return True self._rate_limited = False return False async def _handle_rate_limit(self): """Mark as rate limited for 60 seconds.""" import time self._rate_limited = True self._rate_limit_reset = time.time() + 60 # Reset after 60 seconds print("SambaNova rate limited - will use fallback for 60 seconds") async def quick_acknowledge(self, user_input: str, system_prompt: str) -> str: """ Generate a quick acknowledgment while heavier processing happens. This should be FAST - just a brief "I hear you" type response. """ # If not available or rate limited, return a fallback if not self.available or not self.client: return "I hear you..." if await self._check_rate_limit(): return "I hear you..." try: response = await self.client.chat.completions.create( model=self.model, max_tokens=50, # Keep it short for speed messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ] ) return response.choices[0].message.content except Exception as e: error_str = str(e).lower() if "429" in error_str or "rate" in error_str: await self._handle_rate_limit() print(f"SambaNova quick_acknowledge error: {e}") return "I hear you..." # Fallback async def enhance_prompt( self, user_input: str, emotion_state: dict, mode: str, system_prompt: str ) -> str: """ Transform user context into a detailed, vivid image prompt. This is where user-specific imagery is crafted. """ emotions = emotion_state.get('primary_emotions', ['peaceful']) fallback = f"A beautiful, calming scene representing {emotions[0] if emotions else 'peace'}, soft colors, dreamy atmosphere" # If not available or rate limited, return a simple prompt if not self.available or not self.client: return fallback if await self._check_rate_limit(): return fallback context = f""" User said: "{user_input}" Detected emotions: {emotion_state.get('primary_emotions', [])} Emotional intensity: {emotion_state.get('intensity', 5)}/10 Current mode: {mode} Action: {emotion_state.get('action', 'reflect')} Generate a vivid, specific image prompt based on THIS user's context. """ try: response = await self.client.chat.completions.create( model=self.model, max_tokens=300, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": context} ] ) return response.choices[0].message.content except Exception as e: error_str = str(e).lower() if "429" in error_str or "rate" in error_str: await self._handle_rate_limit() print(f"SambaNova enhance_prompt error: {e}") emotions = emotion_state.get('primary_emotions', ['peaceful']) return f"A beautiful, calming scene representing {emotions[0] if emotions else 'peace'}, soft colors, dreamy atmosphere" async def generate_response_stream( self, user_input: str, emotion_state: dict, system_prompt: str ) -> AsyncGenerator[str, None]: """ Generate conversational response with streaming. Used for load-balanced conversation when Claude is busy. """ # If not available or rate limited, yield a fallback if not self.available or not self.client: yield "I understand how you're feeling. Let me take a moment to think about this..." return if await self._check_rate_limit(): yield "I understand how you're feeling. Let me take a moment to think about this..." return context = f""" User's emotions: {emotion_state.get('primary_emotions', [])} Intensity: {emotion_state.get('intensity', 5)}/10 User said: {user_input} """ try: stream = await self.client.chat.completions.create( model=self.model, max_tokens=512, stream=True, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": context} ] ) async for chunk in stream: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: error_str = str(e).lower() if "429" in error_str or "rate" in error_str: await self._handle_rate_limit() print(f"SambaNova generate_response_stream error: {e}") yield "I understand how you're feeling. Let me think about the best way to respond..." async def analyze_emotion_fast(self, user_input: str, system_prompt: str) -> dict: """ Quick emotion analysis fallback when Claude is overloaded. Less nuanced but faster. """ import json default_response = { "primary_emotions": ["neutral"], "intensity": 5, "pip_expression": "neutral", "intervention_needed": False } # If not available or rate limited, return basic analysis if not self.available or not self.client: return default_response if await self._check_rate_limit(): return default_response try: response = await self.client.chat.completions.create( model=self.model, max_tokens=256, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ] ) content = response.choices[0].message.content if "```json" in content: content = content.split("```json")[1].split("```")[0] elif "```" in content: content = content.split("```")[1].split("```")[0] return json.loads(content.strip()) except Exception as e: error_str = str(e).lower() if "429" in error_str or "rate" in error_str: await self._handle_rate_limit() print(f"SambaNova analyze_emotion_fast error: {e}") return { "primary_emotions": ["neutral"], "intensity": 5, "pip_expression": "neutral", "intervention_needed": False }