Spaces:
Sleeping
Sleeping
| """ | |
| Simple Router | |
| Routes tasks to appropriate modules and executes them | |
| Handles single-step and simple multi-step tasks | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| import time | |
| import uuid | |
| from app.routing.execution_plan import ( | |
| ExecutionPlan, | |
| ExecutionStep, | |
| ExecutionResult, | |
| StepStatus | |
| ) | |
| from app.routing.module_executor import ModuleExecutor | |
| from app.modules.base import BaseModule | |
| from app.modules.registry import ModuleRegistry, ModuleSelector | |
| from app.orchestrator.models import TaskClassification | |
| from app.orchestrator.parameter_models import ExtractedParameters | |
| from app.core.logging import get_logger | |
| from app.core.exceptions import TaskProcessingError | |
| logger = get_logger(__name__) | |
| class SimpleRouter: | |
| """ | |
| Simple task router for single-step and linear multi-step tasks | |
| Does NOT handle complex branching or parallel execution | |
| """ | |
| def __init__( | |
| self, | |
| module_registry: Optional[ModuleRegistry] = None, | |
| module_selector: Optional[ModuleSelector] = None | |
| ): | |
| """ | |
| Initialize simple router | |
| Args: | |
| module_registry: Module registry (creates new if None) | |
| module_selector: Module selector (creates new if None) | |
| """ | |
| self.registry = module_registry or ModuleRegistry() | |
| self.selector = module_selector or ModuleSelector(self.registry) | |
| self.executor = ModuleExecutor() | |
| logger.info("SimpleRouter initialized") | |
| async def route_and_execute( | |
| self, | |
| classification: TaskClassification, | |
| parameters: ExtractedParameters, | |
| task_description: str | |
| ) -> ExecutionResult: | |
| """ | |
| Route task and execute | |
| Main entry point for simple routing | |
| Args: | |
| classification: Task classification | |
| parameters: Extracted parameters | |
| task_description: Original task description | |
| Returns: | |
| ExecutionResult: Final execution result | |
| """ | |
| task_id = str(uuid.uuid4())[:8] | |
| logger.info(f"π Starting task routing | Task ID: {task_id}") | |
| logger.info(f"Task type: {classification.primary_task.value}") | |
| logger.info(f"Complexity: {classification.complexity.value}") | |
| start_time = time.time() | |
| try: | |
| # Step 1: Select modules | |
| logger.info("π Step 1: Module selection") | |
| selected_modules = self.selector.select_modules(classification, parameters) | |
| if not selected_modules: | |
| logger.warning("No modules selected for task") | |
| return ExecutionResult( | |
| task_id=task_id, | |
| success=False, | |
| total_steps=0, | |
| errors=["No modules available to execute this task"] | |
| ) | |
| logger.info(f"β Selected {len(selected_modules)} modules") | |
| # Step 2: Create execution plan | |
| logger.info("π Step 2: Creating execution plan") | |
| plan = self._create_execution_plan( | |
| task_id=task_id, | |
| modules=selected_modules, | |
| parameters=parameters, | |
| task_description=task_description | |
| ) | |
| logger.info(f"β Execution plan created with {len(plan.steps)} steps") | |
| # Step 3: Execute plan | |
| logger.info("π Step 3: Executing plan") | |
| result = await self._execute_plan(plan, selected_modules) | |
| execution_time = time.time() - start_time | |
| result.execution_time = execution_time | |
| logger.info( | |
| f"β Task execution complete | " | |
| f"Success: {result.success} | " | |
| f"Time: {execution_time:.2f}s" | |
| ) | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| logger.error(f"β Task execution failed: {str(e)}", exc_info=True) | |
| return ExecutionResult( | |
| task_id=task_id, | |
| success=False, | |
| errors=[str(e)], | |
| execution_time=execution_time | |
| ) | |
| def _create_execution_plan( | |
| self, | |
| task_id: str, | |
| modules: List[BaseModule], | |
| parameters: ExtractedParameters, | |
| task_description: str | |
| ) -> ExecutionPlan: | |
| """ | |
| Create execution plan from selected modules | |
| Args: | |
| task_id: Task identifier | |
| modules: Selected modules | |
| parameters: Extracted parameters | |
| task_description: Task description | |
| Returns: | |
| ExecutionPlan: Execution plan | |
| """ | |
| steps = [] | |
| for i, module in enumerate(modules, 1): | |
| # Configure module parameters | |
| module_params = self._configure_module_parameters( | |
| module=module, | |
| parameters=parameters, | |
| step_number=i | |
| ) | |
| # Create step | |
| step = ExecutionStep( | |
| step_number=i, | |
| module_name=module.name, | |
| description=self._generate_step_description(module, i, len(modules)), | |
| parameters=module_params, | |
| depends_on=[i - 1] if i > 1 else [] # Linear dependency | |
| ) | |
| steps.append(step) | |
| # Estimate duration | |
| estimated_duration = sum( | |
| module.estimate_cost(step.parameters) * 10 | |
| for module, step in zip(modules, steps) | |
| ) | |
| plan = ExecutionPlan( | |
| task_id=task_id, | |
| description=task_description[:100], | |
| steps=steps, | |
| estimated_duration=int(estimated_duration), | |
| complexity=self._assess_plan_complexity(modules) | |
| ) | |
| return plan | |
| def _configure_module_parameters( | |
| self, | |
| module: BaseModule, | |
| parameters: ExtractedParameters, | |
| step_number: int | |
| ) -> Dict[str, Any]: | |
| """ | |
| Configure parameters for specific module | |
| Args: | |
| module: Module to configure | |
| parameters: Extracted parameters | |
| step_number: Step number | |
| Returns: | |
| Dict: Configured parameters for module | |
| """ | |
| config = {} | |
| # Data source (for scrapers/API clients) | |
| if parameters.data_sources and step_number == 1: | |
| data_source = parameters.data_sources[0] | |
| config['url'] = data_source.location | |
| config['format'] = data_source.format | |
| # Filters (for processors) | |
| if parameters.filters: | |
| config['filters'] = [ | |
| { | |
| 'field': f.field, | |
| 'operator': f.operator, | |
| 'value': f.value | |
| } | |
| for f in parameters.filters | |
| ] | |
| # Columns (for data extraction) | |
| if parameters.columns: | |
| config['columns'] = [col.name for col in parameters.columns] | |
| # Aggregations (for processors) | |
| if parameters.aggregations: | |
| config['aggregations'] = [ | |
| { | |
| 'function': agg.function, | |
| 'field': agg.field, | |
| 'group_by': agg.group_by | |
| } | |
| for agg in parameters.aggregations | |
| ] | |
| # Sorting (for processors) | |
| if parameters.sorting: | |
| config['sorting'] = [ | |
| { | |
| 'field': sort.field, | |
| 'order': sort.order | |
| } | |
| for sort in parameters.sorting | |
| ] | |
| # Visualizations (for visualizers) | |
| if parameters.visualizations: | |
| viz = parameters.visualizations[0] | |
| config['chart_type'] = viz.chart_type | |
| config['title'] = viz.title | |
| # Output format (for exporters) | |
| if parameters.output: | |
| config['format'] = parameters.output.format | |
| config['filename'] = parameters.output.filename | |
| return config | |
| def _generate_step_description( | |
| self, | |
| module: BaseModule, | |
| step_num: int, | |
| total_steps: int | |
| ) -> str: | |
| """Generate human-readable step description""" | |
| descriptions = { | |
| 'static_scraper': f'Step {step_num}/{total_steps}: Scrape data from website', | |
| 'dynamic_scraper': f'Step {step_num}/{total_steps}: Scrape dynamic content (JavaScript)', | |
| 'api_client': f'Step {step_num}/{total_steps}: Fetch data from API', | |
| 'data_processor': f'Step {step_num}/{total_steps}: Process and filter data', | |
| 'chart_creator': f'Step {step_num}/{total_steps}: Create visualization', | |
| 'csv_exporter': f'Step {step_num}/{total_steps}: Export to CSV', | |
| } | |
| return descriptions.get( | |
| module.name, | |
| f'Step {step_num}/{total_steps}: Execute {module.name}' | |
| ) | |
| def _assess_plan_complexity(self, modules: List[BaseModule]) -> str: | |
| """Assess plan complexity""" | |
| if len(modules) == 1: | |
| return "simple" | |
| elif len(modules) <= 3: | |
| return "medium" | |
| else: | |
| return "complex" | |
| async def _execute_plan( | |
| self, | |
| plan: ExecutionPlan, | |
| modules: List[BaseModule] | |
| ) -> ExecutionResult: | |
| """ | |
| Execute the plan | |
| Args: | |
| plan: Execution plan | |
| modules: Modules to execute | |
| Returns: | |
| ExecutionResult: Execution result | |
| """ | |
| logger.info(f"Executing plan with {len(plan.steps)} steps") | |
| step_results = [] | |
| final_data = None | |
| for step in plan.steps: | |
| logger.info(f"π {step.description}") | |
| # Get module | |
| module = next(m for m in modules if m.name == step.module_name) | |
| # Update status | |
| step.status = StepStatus.RUNNING | |
| # Execute | |
| try: | |
| result = await self.executor.execute_module( | |
| module=module, | |
| parameters=step.parameters, | |
| context=self.executor.get_context() | |
| ) | |
| # Update step | |
| step.result = result | |
| if result.success: | |
| step.status = StepStatus.COMPLETED | |
| final_data = result.data | |
| step_results.append({ | |
| 'step': step.step_number, | |
| 'module': step.module_name, | |
| 'success': True, | |
| 'execution_time': result.execution_time | |
| }) | |
| else: | |
| step.status = StepStatus.FAILED | |
| step.error = result.error | |
| step_results.append({ | |
| 'step': step.step_number, | |
| 'module': step.module_name, | |
| 'success': False, | |
| 'error': result.error | |
| }) | |
| # Stop on first failure | |
| logger.error(f"Step {step.step_number} failed, stopping execution") | |
| break | |
| except Exception as e: | |
| step.status = StepStatus.FAILED | |
| step.error = str(e) | |
| step_results.append({ | |
| 'step': step.step_number, | |
| 'module': step.module_name, | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| logger.error(f"Step {step.step_number} exception: {e}") | |
| break | |
| # Create result | |
| completed_steps = sum( | |
| 1 for step in plan.steps | |
| if step.status == StepStatus.COMPLETED | |
| ) | |
| success = plan.is_completed() and not plan.has_failed() | |
| errors = [ | |
| f"Step {step.step_number} ({step.module_name}): {step.error}" | |
| for step in plan.steps | |
| if step.error | |
| ] | |
| result = ExecutionResult( | |
| task_id=plan.task_id, | |
| success=success, | |
| data=final_data, | |
| steps_completed=completed_steps, | |
| total_steps=len(plan.steps), | |
| step_results=step_results, | |
| errors=errors, | |
| metadata={ | |
| 'plan_complexity': plan.complexity, | |
| 'estimated_duration': plan.estimated_duration | |
| } | |
| ) | |
| return result | |
| def cleanup(self): | |
| """Clean up resources""" | |
| self.executor.clear_context() | |
| logger.debug("SimpleRouter cleanup complete") | |