Spaces:
Sleeping
Sleeping
| """ | |
| Task Processing Service | |
| Simplified with unified LLM analysis in task_fetcher + AnswerSubmitter integration | |
| """ | |
| from typing import Dict, Any, Optional | |
| import asyncio | |
| from app.models.request import ManualTriggeredRequestBody | |
| from app.core.logging import get_logger | |
| from app.core.exceptions import TaskProcessingError | |
| from app.orchestrator.orchestrator_engine import OrchestratorEngine | |
| from app.modules import get_fully_loaded_registry # β AUTO-REGISTRATION | |
| from app.services.task_fetcher import TaskFetcher | |
| from app.modules.submitters.answer_submitter import AnswerSubmitter # β NEW | |
| from app.services.answer_generator import AnswerGenerator | |
| from app.utils.llm_client import get_llm_client | |
| from app.utils.submit_answer import submit_answer | |
| logger = get_logger(__name__) | |
| class TaskProcessor: | |
| """ | |
| Service class for processing TDS quiz tasks | |
| Uses unified LLM analysis from task_fetcher + AnswerSubmitter | |
| """ | |
| def __init__(self): | |
| """Initialize task processor with auto-registration""" | |
| logger.info("π Initializing TaskProcessor") | |
| # β AUTO-REGISTER ALL MODULES | |
| self.registry = get_fully_loaded_registry() | |
| self.answer_submitter = AnswerSubmitter() | |
| self.llm_client = get_llm_client() | |
| self.answer_generator = AnswerGenerator(self.llm_client) | |
| # Initialize orchestrator engine | |
| self.orchestrator = OrchestratorEngine(self.registry) | |
| logger.info(f"β TaskProcessor initialized with {len(self.registry.modules)} modules") | |
| async def process(self, task_data: ManualTriggeredRequestBody) -> Dict[str, Any]: | |
| """ | |
| Process TDS quiz task - COMPLETE END-TO-END FLOW | |
| Flow: | |
| 1. Fetch and analyze Request URL | |
| 2. Execute orchestration (scrape β extract β answer) | |
| 3. Extract answer | |
| 4. Submit to TDS β NEW | |
| 5. Handle chained quizzes β NEW | |
| 6. Build response | |
| """ | |
| logger.info("=" * 80) | |
| # logger.info(f"π Processing task for: {task_data.email}") | |
| logger.info(f"π Request URL: {task_data.url}") | |
| logger.info("=" * 80) | |
| request_url = str(task_data.url) | |
| question_url = None | |
| submission_url = None | |
| try: | |
| # =================================================================== | |
| # STEP 1: FETCH AND ANALYZE REQUEST URL | |
| # =================================================================== | |
| logger.info("\n" + "=" * 80) | |
| logger.info("STEP 1: FETCHING & ANALYZING REQUEST URL") | |
| logger.info("=" * 80) | |
| # β FIXED: Use proper async context manager pattern | |
| async with TaskFetcher() as fetcher: | |
| result = await fetcher.fetch_and_analyze(url=request_url) | |
| print("========") | |
| print("analysis") | |
| print(result) | |
| # Initialize answer generator if needed | |
| if not getattr(self.answer_generator, "_generator_agent", None): | |
| await self.answer_generator.initialize() | |
| answer = await self.answer_generator.generate( | |
| analysis=result["analysis"], | |
| question_metadata=result["question_metadata"], | |
| base_url=result["base_url"], | |
| user_email=result["user_email"], | |
| downloaded_files=result["downloaded_files"] | |
| ) | |
| print("================================= answer") | |
| print(answer) | |
| return submit_answer( | |
| submit_url="https://tds-llm-analysis.s-anand.net/submit", | |
| answer=answer, | |
| req_url=request_url, | |
| background_tasks=None | |
| ) | |
| except Exception as e: | |
| logger.error(f"β Task processing failed: {str(e)}", exc_info=True) | |
| raise TaskProcessingError(f"Failed to process task: {str(e)}") | |
| async def _process_chained_quiz(self, email: str, next_url: str, submission_url: str) -> Dict: | |
| """Process chained quiz in background""" | |
| try: | |
| logger.info(f"π Processing chained quiz: {next_url}") | |
| async with TaskFetcher() as fetcher: | |
| analysis = await fetcher.fetch_and_analyze(url=next_url) | |
| orchestration_result = await self.orchestrator.execute_task( | |
| task_input=analysis['task_description'], | |
| task_url=next_url, | |
| context={'email': email, 'submission_url': submission_url} | |
| ) | |
| answer = self._extract_answer(orchestration_result) | |
| submission_result = await self.answer_submitter.execute({ | |
| 'submission_url': submission_url, | |
| 'email': email, | |
| 'secret': str(answer), | |
| 'quiz_url': next_url, | |
| 'answer': answer | |
| }) | |
| logger.info(f"β Chained quiz {next_url} completed: {submission_result.success}") | |
| return { | |
| 'next_url': next_url, | |
| 'success': getattr(submission_result, 'success', False), | |
| 'answer': answer, | |
| 'correct': getattr(submission_result, 'data', {}).get('correct') | |
| } | |
| except Exception as e: | |
| logger.error(f"β Chained quiz failed: {next_url} - {e}", exc_info=True) | |
| return {'next_url': next_url, 'success': False, 'error': str(e)} | |
| def _extract_answer(self, orchestration_result: Dict[str, Any]) -> Any: | |
| """Extract final answer from orchestration result""" | |
| if not orchestration_result.get('success'): | |
| return None | |
| data = orchestration_result.get('data', {}) | |
| if not isinstance(data, dict): | |
| return data | |
| # Priority extraction fields | |
| priority_fields = ['secret_code', 'answer', 'extracted', 'result', 'secret'] | |
| for field in priority_fields: | |
| if field in data: | |
| return data[field] | |
| if f'{field}s' in data: | |
| values = data.get(f'{field}s', {}) | |
| if values: | |
| return list(values.values())[-1] | |
| if data.get('extracted_values'): | |
| extracted = data['extracted_values'] | |
| return list(extracted.values())[-1] if extracted else data | |
| return data | |
| def _build_response( | |
| self, | |
| task_data: ManualTriggeredRequestBody, | |
| request_url: str, | |
| question_url: str, | |
| submission_url: str, | |
| analysis: Dict, | |
| orchestration_result: Dict, | |
| submission_result: Optional[Dict], | |
| answer: Any | |
| ) -> Dict[str, Any]: | |
| """Build comprehensive API response""" | |
| overall_success = ( | |
| orchestration_result.get('success', False) and | |
| (submission_result.get('success', False) if submission_result else False) | |
| ) | |
| return { | |
| 'success': overall_success, | |
| 'status': 'completed' if overall_success else 'failed', | |
| 'email': task_data.email, | |
| 'urls': { | |
| 'request_url': request_url, | |
| 'question_url': question_url, | |
| 'submission_url': submission_url | |
| }, | |
| 'task_id': orchestration_result.get('task_id'), | |
| 'execution_id': orchestration_result.get('execution_id'), | |
| 'duration': orchestration_result.get('duration'), | |
| 'answer': answer, | |
| 'submission': getattr(submission_result, 'data', None), | |
| 'orchestration': { | |
| 'success': orchestration_result.get('success'), | |
| 'modules_used': orchestration_result.get('modules_used', []), | |
| 'strategy': orchestration_result.get('strategy') | |
| }, | |
| 'task_details': { | |
| 'description': analysis.get('task_description', '')[:200], | |
| 'instructions': len(analysis.get('instructions', [])) | |
| } | |
| } | |
| async def cleanup(self): | |
| """Clean up resources""" | |
| try: | |
| await self.orchestrator.cleanup() | |
| await self.answer_submitter.cleanup() | |
| logger.info("β TaskProcessor cleanup complete") | |
| except Exception as e: | |
| logger.error(f"Cleanup failed: {e}") | |