| import os |
| import gradio as gr |
| import requests |
| import pandas as pd |
| from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, tool |
| import re |
| import json |
| import math |
| import tempfile |
| from pathlib import Path |
| from urllib.parse import urlparse, parse_qs |
| import yt_dlp |
| from PIL import Image |
| import pytesseract |
|
|
| hf_token = os.getenv("HF_TOKEN") |
| SPACE_ID = os.getenv("SPACE_ID") |
| SPACE_HOST = os.getenv("SPACE_HOST") |
| |
| @tool |
| def web_browser(url: str) -> str: |
| """ |
| Fetches content from a web URL. |
| |
| Args: |
| url: The URL to fetch content from. |
| |
| Returns: |
| Text content from the webpage. |
| """ |
| try: |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| } |
| response = requests.get(url, headers=headers, timeout=10) |
| response.raise_for_status() |
| |
| |
| content = response.text |
| |
| content = re.sub(r'<[^>]+>', ' ', content) |
| content = re.sub(r'\s+', ' ', content).strip() |
| |
| return content[:2000] + "..." if len(content) > 2000 else content |
| |
| except Exception as e: |
| return f"Error accessing URL: {str(e)}" |
|
|
| @tool |
| def youtube_transcript_extractor(url: str) -> str: |
| """ |
| Extracts transcript or information from YouTube videos. |
| |
| Args: |
| url: YouTube URL. |
| |
| Returns: |
| Video information and transcript if available. |
| """ |
| try: |
| |
| if "youtube.com/watch" in url: |
| video_id = parse_qs(urlparse(url).query).get('v', [None])[0] |
| elif "youtu.be/" in url: |
| video_id = urlparse(url).path[1:] |
| else: |
| return "Invalid YouTube URL format" |
| |
| if not video_id: |
| return "Could not extract video ID from URL" |
| |
| |
| ydl_opts = { |
| 'quiet': True, |
| 'no_warnings': True, |
| 'writesubtitles': True, |
| 'writeautomaticsub': True, |
| } |
| |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False) |
| |
| result = f"Title: {info.get('title', 'N/A')}\n" |
| result += f"Description: {info.get('description', 'N/A')[:500]}...\n" |
| result += f"Duration: {info.get('duration', 'N/A')} seconds\n" |
| result += f"View count: {info.get('view_count', 'N/A')}\n" |
| |
| |
| if 'subtitles' in info and info['subtitles']: |
| result += "\n--- Transcript Available ---\n" |
| |
| |
| return result |
| |
| except Exception as e: |
| return f"Error extracting YouTube content: {str(e)}" |
|
|
| @tool |
| def image_ocr_analyzer(image_path: str) -> str: |
| """ |
| Performs OCR on images to extract text. |
| |
| Args: |
| image_path: Path to the image file. |
| |
| Returns: |
| Extracted text from the image. |
| """ |
| try: |
| |
| image = Image.open(image_path) |
| |
| |
| extracted_text = pytesseract.image_to_string(image) |
| |
| if not extracted_text.strip(): |
| return "No text found in the image" |
| |
| return f"Extracted text:\n{extracted_text.strip()}" |
| |
| except Exception as e: |
| return f"Error performing OCR: {str(e)}" |
|
|
| @tool |
| def pdf_text_extractor(file_path: str) -> str: |
| """ |
| Extracts text from PDF files. |
| |
| Args: |
| file_path: Path to the PDF file. |
| |
| Returns: |
| Extracted text from PDF. |
| """ |
| try: |
| import PyPDF2 |
| |
| with open(file_path, 'rb') as file: |
| pdf_reader = PyPDF2.PdfReader(file) |
| text = "" |
| |
| for page_num in range(len(pdf_reader.pages)): |
| page = pdf_reader.pages[page_num] |
| text += page.extract_text() + "\n" |
| |
| return text[:3000] + "..." if len(text) > 3000 else text |
| |
| except Exception as e: |
| return f"Error extracting PDF text: {str(e)}" |
|
|
| @tool |
| def veterinary_document_analyzer(text: str) -> str: |
| """ |
| Analyzes veterinary documents to extract specific information like names. |
| |
| Args: |
| text: Document text to analyze. |
| |
| Returns: |
| Extracted veterinary information. |
| """ |
| try: |
| |
| vet_patterns = [ |
| r"Dr\.?\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
| r"Doctor\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
| r"veterinarian\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
| r"DVM\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
| ] |
| |
| found_vets = [] |
| for pattern in vet_patterns: |
| matches = re.findall(pattern, text, re.IGNORECASE) |
| for match in matches: |
| full_name = f"{match[0]} {match[1]}" |
| if full_name not in found_vets: |
| found_vets.append(full_name) |
| |
| if found_vets: |
| return f"Found veterinarian(s): {', '.join(found_vets)}" |
| else: |
| return "No veterinarian names found in the document" |
| |
| except Exception as e: |
| return f"Error analyzing veterinary document: {str(e)}" |
|
|
| |
| @tool |
| def analyze_excel_file(file_path: str, analysis_type: str = "general") -> str: |
| """ |
| Analyzes Excel files with multiple analysis types. |
| """ |
| try: |
| df = pd.read_excel(file_path) |
|
|
| if analysis_type == "general": |
| return f"Excel file contains {len(df)} rows and {len(df.columns)} columns. Columns: {list(df.columns)}" |
|
|
| elif analysis_type == "food_sales": |
| if 'category' in df.columns and 'price' in df.columns and 'quantity' in df.columns: |
| food_df = df[df['category'].str.lower() == 'food'] |
| total_sales = (food_df['price'] * food_df['quantity']).sum() |
| return f"Total food sales: ${total_sales:.2f}" |
| else: |
| return "Required columns (category, price, quantity) not found" |
|
|
| elif analysis_type == "summary": |
| summary = df.describe(include='all').to_string() |
| return f"Data summary:\n{summary}" |
|
|
| elif analysis_type == "categories": |
| if 'category' in df.columns: |
| categories = df['category'].value_counts() |
| return f"Categories breakdown:\n{categories.to_string()}" |
| else: |
| return "No category column found" |
|
|
| return "Unknown analysis type" |
|
|
| except Exception as e: |
| return f"Error analyzing Excel file: {str(e)}" |
|
|
| @tool |
| def advanced_calculator(expression: str) -> str: |
| """ |
| Evaluates mathematical expressions safely, including advanced functions. |
| """ |
| try: |
| expression = expression.replace('^', '**') |
| allowed_functions = { |
| 'abs': abs, 'round': round, 'min': min, 'max': max, |
| 'sum': sum, 'len': len, |
| 'sqrt': math.sqrt, 'pow': math.pow, 'log': math.log, |
| 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, |
| 'pi': math.pi, 'e': math.e, |
| 'floor': math.floor, 'ceil': math.ceil |
| } |
| result = eval(expression, {"__builtins__": {}}, allowed_functions) |
| return str(result) |
|
|
| except Exception as e: |
| return f"Error in calculation: {str(e)}" |
|
|
| @tool |
| def smart_text_analyzer(text: str, task_type: str = "general") -> str: |
| """ |
| Analyzes text with focus on GAIA-specific tasks. |
| |
| Args: |
| text: Text to analyze. |
| task_type: 'general', 'names', 'dates', 'numbers', 'veterinary'. |
| |
| Returns: |
| Analysis results. |
| """ |
| try: |
| if task_type == "names": |
| |
| name_pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b' |
| names = re.findall(name_pattern, text) |
| return f"Found names: {list(set(names))}" |
| |
| elif task_type == "veterinary": |
| return veterinary_document_analyzer(text) |
| |
| elif task_type == "dates": |
| date_patterns = [ |
| r'\d{1,2}/\d{1,2}/\d{4}', |
| r'\d{4}-\d{2}-\d{2}', |
| r'\b\w+\s+\d{1,2},\s+\d{4}\b' |
| ] |
| dates = [] |
| for pattern in date_patterns: |
| dates.extend(re.findall(pattern, text)) |
| return f"Found dates: {dates}" |
| |
| elif task_type == "numbers": |
| numbers = re.findall(r'-?\d+\.?\d*', text) |
| return f"Found numbers: {[float(n) for n in numbers if n]}" |
| |
| else: |
| return f"Characters: {len(text)}, Words: {len(text.split())}, Lines: {len(text.splitlines())}" |
|
|
| except Exception as e: |
| return f"Error in text analysis: {str(e)}" |
|
|
| |
| |
| model = HfApiModel( |
| max_tokens=2048, |
| temperature=0.1, |
| model_id='microsoft/DialoGPT-medium', |
| |
| ) |
|
|
| |
| search_tool = DuckDuckGoSearchTool() |
|
|
| |
| tools = [ |
| search_tool, |
| web_browser, |
| youtube_transcript_extractor, |
| image_ocr_analyzer, |
| pdf_text_extractor, |
| veterinary_document_analyzer, |
| smart_text_analyzer, |
| advanced_calculator, |
| analyze_excel_file, |
| ] |
|
|
| |
| agent_code = CodeAgent( |
| tools=tools, |
| model=model, |
| max_steps=15, |
| additional_authorized_imports=[ |
| "os", "tempfile", "pathlib", "re", "json", "math", "pandas", |
| "requests", "PIL", "pytesseract", "PyPDF2", "yt_dlp" |
| ] |
| ) |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| class BasicAgent: |
| def __init__(self): |
| print("Enhanced GAIA Agent initialized with web browsing capabilities.") |
| self.agent = agent_code |
|
|
| def __call__(self, question: str) -> str: |
| try: |
| |
| enhanced_question = self._create_gaia_prompt(question) |
| |
| result = self.agent.run(enhanced_question) |
| |
| |
| cleaned_result = self._clean_gaia_result(result) |
| |
| return cleaned_result if cleaned_result else "No response generated." |
| |
| except Exception as e: |
| print(f"Agent error: {e}") |
| |
| try: |
| fallback_prompt = f""" |
| CRITICAL GAIA TASK: {question} |
| |
| Use available tools to find the answer. If it's a YouTube video, use youtube_transcript_extractor. |
| If it's about documents, use appropriate analyzers. |
| Be precise and direct in your final answer. |
| """ |
| simple_result = self.agent.run(fallback_prompt) |
| return simple_result if simple_result else f"Error: {e}" |
| except: |
| return f"Error: {e}" |
| |
| def _create_gaia_prompt(self, question: str) -> str: |
| """Crée un prompt optimisé pour GAIA.""" |
| return f""" |
| GAIA EVALUATION TASK - ANSWER PRECISELY |
| |
| Question: {question} |
| |
| INSTRUCTIONS: |
| 1. If this involves a YouTube video, use youtube_transcript_extractor tool |
| 2. If this involves web content, use web_browser tool |
| 3. If this involves documents/PDFs, use appropriate analyzers |
| 4. If this involves images, use image_ocr_analyzer |
| 5. If this needs search, use the search tool |
| 6. For calculations, use advanced_calculator |
| 7. Be EXACT and SPECIFIC in your final answer |
| 8. Don't provide explanations unless asked - just the answer |
| |
| Work step by step and use the right tools for this task. |
| """ |
|
|
|
|
|
|
|
|
|
|
| def run_and_submit_all( profile: gr.OAuthProfile | None): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
| |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username= f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| print("User not logged in.") |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
| |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| print(agent_code) |
|
|
| |
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| print("Fetched questions list is empty.") |
| return "Fetched questions list is empty or invalid format.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching questions: {e}") |
| return f"Error fetching questions: {e}", None |
| except requests.exceptions.JSONDecodeError as e: |
| print(f"Error decoding JSON response from questions endpoint: {e}") |
| print(f"Response text: {response.text[:500]}") |
| return f"Error decoding server response for questions: {e}", None |
| except Exception as e: |
| print(f"An unexpected error occurred fetching questions: {e}") |
| return f"An unexpected error occurred fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| print(f"Running agent on {len(questions_data)} questions...") |
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| print(f"Skipping item with missing task_id or question: {item}") |
| continue |
| try: |
| submitted_answer = agent(question_text) |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| except Exception as e: |
| print(f"Error running agent on task {task_id}: {e}") |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
| if not answers_payload: |
| print("Agent did not produce any answers to submit.") |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| print(status_update) |
|
|
| |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=60) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| print("Submission successful.") |
| results_df = pd.DataFrame(results_log) |
| return final_status, results_df |
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Server responded with status {e.response.status_code}." |
| try: |
| error_json = e.response.json() |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| except requests.exceptions.JSONDecodeError: |
| error_detail += f" Response: {e.response.text[:500]}" |
| status_message = f"Submission Failed: {error_detail}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.Timeout: |
| status_message = "Submission Failed: The request timed out." |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.RequestException as e: |
| status_message = f"Submission Failed: Network error - {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except Exception as e: |
| status_message = f"An unexpected error occurred during submission: {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Instructions:** |
| |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
| |
| --- |
| **Disclaimers:** |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
| """ |
| ) |
|
|
| gr.LoginButton() |
|
|
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| print("\n" + "-"*30 + " App Starting " + "-"*30) |
| |
| space_host_startup = os.getenv("SPACE_HOST") |
| space_id_startup = os.getenv("SPACE_ID") |
|
|
| if space_host_startup: |
| print(f"✅ SPACE_HOST found: {space_host_startup}") |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| else: |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
| if space_id_startup: |
| print(f"✅ SPACE_ID found: {space_id_startup}") |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| else: |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
| print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
| print("Launching Gradio Interface for Basic Agent Evaluation...") |
| demo.launch(debug=True, share=False) |