File size: 3,295 Bytes
baa1efb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
Execution Context Manager
Manages shared state and data flow between components
"""

from typing import Dict, Any, Optional, List
from datetime import datetime
import uuid

from pydantic import BaseModel, Field
from app.core.logging import get_logger

logger = get_logger(__name__)


class ExecutionContext(BaseModel):
    """
    Execution context shared across all orchestration steps
    Contains state, intermediate results, and metadata
    """
    
    # Identification
    task_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
    execution_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
    
    # Task information
    original_task: str = Field(description="Original task description")
    task_url: Optional[str] = Field(None, description="Task URL if applicable")
    
    # Execution state
    started_at: datetime = Field(default_factory=datetime.now)
    completed_at: Optional[datetime] = None
    status: str = Field(default="initialized", description="Execution status")
    
    # Intermediate results from each step
    step_results: Dict[str, Any] = Field(
        default_factory=dict,
        description="Results from each orchestration step"
    )
    
    # Shared data between modules
    shared_data: Dict[str, Any] = Field(
        default_factory=dict,
        description="Data shared between execution modules"
    )
    
    # Metadata
    metadata: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional metadata"
    )
    
    # Execution log
    execution_log: List[str] = Field(
        default_factory=list,
        description="Execution event log"
    )
    
    class Config:
        arbitrary_types_allowed = True
    
    def log_event(self, event: str):
        """Log an execution event"""
        timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        log_entry = f"[{timestamp}] {event}"
        self.execution_log.append(log_entry)
        logger.debug(log_entry)
    
    def set_step_result(self, step_name: str, result: Any):
        """Store result from a step"""
        self.step_results[step_name] = result
        self.log_event(f"Step '{step_name}' completed")
    
    def get_step_result(self, step_name: str) -> Optional[Any]:
        """Get result from a step"""
        return self.step_results.get(step_name)
    
    def set_shared_data(self, key: str, value: Any):
        """Set shared data"""
        self.shared_data[key] = value
    
    def get_shared_data(self, key: str, default: Any = None) -> Any:
        """Get shared data"""
        return self.shared_data.get(key, default)
    
    def mark_completed(self):
        """Mark execution as completed"""
        self.completed_at = datetime.now()
        self.status = "completed"
        self.log_event("Execution completed")
    
    def mark_failed(self, error: str):
        """Mark execution as failed"""
        self.completed_at = datetime.now()
        self.status = "failed"
        self.log_event(f"Execution failed: {error}")
    
    def get_duration(self) -> float:
        """Get execution duration in seconds"""
        if self.completed_at:
            return (self.completed_at - self.started_at).total_seconds()
        return (datetime.now() - self.started_at).total_seconds()