""" Execution Context Manager Manages shared state and data flow between components """ from typing import Dict, Any, Optional, List from datetime import datetime import uuid from pydantic import BaseModel, Field from app.core.logging import get_logger logger = get_logger(__name__) class ExecutionContext(BaseModel): """ Execution context shared across all orchestration steps Contains state, intermediate results, and metadata """ # Identification task_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) execution_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) # Task information original_task: str = Field(description="Original task description") task_url: Optional[str] = Field(None, description="Task URL if applicable") # Execution state started_at: datetime = Field(default_factory=datetime.now) completed_at: Optional[datetime] = None status: str = Field(default="initialized", description="Execution status") # Intermediate results from each step step_results: Dict[str, Any] = Field( default_factory=dict, description="Results from each orchestration step" ) # Shared data between modules shared_data: Dict[str, Any] = Field( default_factory=dict, description="Data shared between execution modules" ) # Metadata metadata: Dict[str, Any] = Field( default_factory=dict, description="Additional metadata" ) # Execution log execution_log: List[str] = Field( default_factory=list, description="Execution event log" ) class Config: arbitrary_types_allowed = True def log_event(self, event: str): """Log an execution event""" timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] log_entry = f"[{timestamp}] {event}" self.execution_log.append(log_entry) logger.debug(log_entry) def set_step_result(self, step_name: str, result: Any): """Store result from a step""" self.step_results[step_name] = result self.log_event(f"Step '{step_name}' completed") def get_step_result(self, step_name: str) -> Optional[Any]: """Get result from a step""" return self.step_results.get(step_name) def set_shared_data(self, key: str, value: Any): """Set shared data""" self.shared_data[key] = value def get_shared_data(self, key: str, default: Any = None) -> Any: """Get shared data""" return self.shared_data.get(key, default) def mark_completed(self): """Mark execution as completed""" self.completed_at = datetime.now() self.status = "completed" self.log_event("Execution completed") def mark_failed(self, error: str): """Mark execution as failed""" self.completed_at = datetime.now() self.status = "failed" self.log_event(f"Execution failed: {error}") def get_duration(self) -> float: """Get execution duration in seconds""" if self.completed_at: return (self.completed_at - self.started_at).total_seconds() return (datetime.now() - self.started_at).total_seconds()