Spaces:
Running
Running
| """ | |
| SambaNova client for Pip's fast responses. | |
| Handles: Quick acknowledgments, prompt enhancement, load-balanced conversation. | |
| Uses OpenAI-compatible API. | |
| """ | |
| import os | |
| import asyncio | |
| from typing import AsyncGenerator | |
| from openai import AsyncOpenAI | |
| class SambanovaClient: | |
| """SambaNova-powered fast inference for Pip.""" | |
| def __init__(self): | |
| api_key = os.getenv("SAMBANOVA_API_KEY") | |
| self.available = bool(api_key) | |
| if self.available: | |
| self.client = AsyncOpenAI( | |
| api_key=api_key, | |
| base_url=os.getenv("SAMBANOVA_BASE_URL", "https://api.sambanova.ai/v1") | |
| ) | |
| else: | |
| self.client = None | |
| print("⚠️ SambaNova: No API key found - service disabled") | |
| # Using Llama 3.1 or DeepSeek on SambaNova | |
| self.model = "Meta-Llama-3.1-8B-Instruct" | |
| self._rate_limited = False | |
| self._rate_limit_reset = 0 | |
| async def _check_rate_limit(self): | |
| """Check if we're currently rate limited.""" | |
| import time | |
| if self._rate_limited and time.time() < self._rate_limit_reset: | |
| return True | |
| self._rate_limited = False | |
| return False | |
| async def _handle_rate_limit(self): | |
| """Mark as rate limited for 60 seconds.""" | |
| import time | |
| self._rate_limited = True | |
| self._rate_limit_reset = time.time() + 60 # Reset after 60 seconds | |
| print("SambaNova rate limited - will use fallback for 60 seconds") | |
| async def quick_acknowledge(self, user_input: str, system_prompt: str) -> str: | |
| """ | |
| Generate a quick acknowledgment while heavier processing happens. | |
| This should be FAST - just a brief "I hear you" type response. | |
| """ | |
| # If not available or rate limited, return a fallback | |
| if not self.available or not self.client: | |
| return "I hear you..." | |
| if await self._check_rate_limit(): | |
| return "I hear you..." | |
| try: | |
| response = await self.client.chat.completions.create( | |
| model=self.model, | |
| max_tokens=50, # Keep it short for speed | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_input} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "429" in error_str or "rate" in error_str: | |
| await self._handle_rate_limit() | |
| print(f"SambaNova quick_acknowledge error: {e}") | |
| return "I hear you..." # Fallback | |
| async def enhance_prompt( | |
| self, | |
| user_input: str, | |
| emotion_state: dict, | |
| mode: str, | |
| system_prompt: str | |
| ) -> str: | |
| """ | |
| Transform user context into a detailed, vivid image prompt. | |
| This is where user-specific imagery is crafted. | |
| """ | |
| emotions = emotion_state.get('primary_emotions', ['peaceful']) | |
| fallback = f"A beautiful, calming scene representing {emotions[0] if emotions else 'peace'}, soft colors, dreamy atmosphere" | |
| # If not available or rate limited, return a simple prompt | |
| if not self.available or not self.client: | |
| return fallback | |
| if await self._check_rate_limit(): | |
| return fallback | |
| context = f""" | |
| User said: "{user_input}" | |
| Detected emotions: {emotion_state.get('primary_emotions', [])} | |
| Emotional intensity: {emotion_state.get('intensity', 5)}/10 | |
| Current mode: {mode} | |
| Action: {emotion_state.get('action', 'reflect')} | |
| Generate a vivid, specific image prompt based on THIS user's context. | |
| """ | |
| try: | |
| response = await self.client.chat.completions.create( | |
| model=self.model, | |
| max_tokens=300, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": context} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "429" in error_str or "rate" in error_str: | |
| await self._handle_rate_limit() | |
| print(f"SambaNova enhance_prompt error: {e}") | |
| emotions = emotion_state.get('primary_emotions', ['peaceful']) | |
| return f"A beautiful, calming scene representing {emotions[0] if emotions else 'peace'}, soft colors, dreamy atmosphere" | |
| async def generate_response_stream( | |
| self, | |
| user_input: str, | |
| emotion_state: dict, | |
| system_prompt: str | |
| ) -> AsyncGenerator[str, None]: | |
| """ | |
| Generate conversational response with streaming. | |
| Used for load-balanced conversation when Claude is busy. | |
| """ | |
| # If not available or rate limited, yield a fallback | |
| if not self.available or not self.client: | |
| yield "I understand how you're feeling. Let me take a moment to think about this..." | |
| return | |
| if await self._check_rate_limit(): | |
| yield "I understand how you're feeling. Let me take a moment to think about this..." | |
| return | |
| context = f""" | |
| User's emotions: {emotion_state.get('primary_emotions', [])} | |
| Intensity: {emotion_state.get('intensity', 5)}/10 | |
| User said: {user_input} | |
| """ | |
| try: | |
| stream = await self.client.chat.completions.create( | |
| model=self.model, | |
| max_tokens=512, | |
| stream=True, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": context} | |
| ] | |
| ) | |
| async for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "429" in error_str or "rate" in error_str: | |
| await self._handle_rate_limit() | |
| print(f"SambaNova generate_response_stream error: {e}") | |
| yield "I understand how you're feeling. Let me think about the best way to respond..." | |
| async def analyze_emotion_fast(self, user_input: str, system_prompt: str) -> dict: | |
| """ | |
| Quick emotion analysis fallback when Claude is overloaded. | |
| Less nuanced but faster. | |
| """ | |
| import json | |
| default_response = { | |
| "primary_emotions": ["neutral"], | |
| "intensity": 5, | |
| "pip_expression": "neutral", | |
| "intervention_needed": False | |
| } | |
| # If not available or rate limited, return basic analysis | |
| if not self.available or not self.client: | |
| return default_response | |
| if await self._check_rate_limit(): | |
| return default_response | |
| try: | |
| response = await self.client.chat.completions.create( | |
| model=self.model, | |
| max_tokens=256, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_input} | |
| ] | |
| ) | |
| content = response.choices[0].message.content | |
| if "```json" in content: | |
| content = content.split("```json")[1].split("```")[0] | |
| elif "```" in content: | |
| content = content.split("```")[1].split("```")[0] | |
| return json.loads(content.strip()) | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "429" in error_str or "rate" in error_str: | |
| await self._handle_rate_limit() | |
| print(f"SambaNova analyze_emotion_fast error: {e}") | |
| return { | |
| "primary_emotions": ["neutral"], | |
| "intensity": 5, | |
| "pip_expression": "neutral", | |
| "intervention_needed": False | |
| } | |