Spaces:
Paused
Paused
| from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory | |
| from google import genai | |
| import os | |
| from google.genai import types | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import json | |
| import requests | |
| import threading | |
| import uuid | |
| import time | |
| import tempfile | |
| import subprocess | |
| import shutil | |
| import re | |
| app = Flask(__name__) | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" | |
| TELEGRAM_CHAT_ID = "-1002564204301" | |
| GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs') | |
| if GOOGLE_API_KEY: | |
| try: | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| except Exception as e: | |
| print(f"Erreur client Gemini: {e}") | |
| client = None | |
| else: | |
| print("GEMINI_API_KEY non trouvé.") | |
| client = None | |
| task_results = {} | |
| def load_prompt_from_file(filename): | |
| try: | |
| prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') | |
| filepath = os.path.join(prompts_dir, filename) | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"Erreur chargement prompt '{filename}': {e}") | |
| return "" | |
| def get_prompt_for_style(style): | |
| return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt') | |
| def check_latex_installation(): | |
| try: | |
| subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) | |
| return True | |
| except Exception: | |
| return False | |
| IS_LATEX_INSTALLED = check_latex_installation() | |
| def clean_latex_code(latex_code): | |
| match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_latex: | |
| return match_latex.group(1).strip() | |
| match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_generic: | |
| return match_generic.group(1).strip() | |
| return latex_code.strip() | |
| def latex_to_pdf(latex_code, output_filename_base, output_dir): | |
| if not IS_LATEX_INSTALLED: | |
| return None, "pdflatex non disponible." | |
| tex_filename = f"{output_filename_base}.tex" | |
| tex_path = os.path.join(output_dir, tex_filename) | |
| pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf") | |
| try: | |
| with open(tex_path, "w", encoding="utf-8") as tex_file: | |
| tex_file.write(latex_code) | |
| my_env = os.environ.copy() | |
| my_env["LC_ALL"] = "C.UTF-8" | |
| my_env["LANG"] = "C.UTF-8" | |
| last_result = None | |
| for _ in range(2): | |
| process = subprocess.run( | |
| ["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path], | |
| capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, | |
| ) | |
| last_result = process | |
| if not os.path.exists(pdf_path) and process.returncode != 0: | |
| break | |
| if os.path.exists(pdf_path): | |
| return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}" | |
| else: | |
| error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation." | |
| return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}" | |
| except Exception as e: | |
| return None, f"Exception génération PDF: {str(e)}" | |
| def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): | |
| try: | |
| if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" | |
| files = {'photo': (filename, file_data)} | |
| else: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" | |
| files = {'document': (filename, file_data)} | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| requests.post(url, files=files, data=data, timeout=30) | |
| except Exception as e: | |
| print(f"Erreur envoi Telegram: {e}") | |
| def process_files_background(task_id, files_data, resolution_style): | |
| uploaded_file_refs = [] | |
| try: | |
| task_results[task_id]['status'] = 'processing' | |
| if not client: | |
| raise ConnectionError("Client Gemini non initialisé.") | |
| contents = [] | |
| for file_info in files_data: | |
| if file_info['type'].startswith('image/'): | |
| img = Image.open(io.BytesIO(file_info['data'])) | |
| buffered = io.BytesIO() | |
| img.save(buffered, format="PNG") | |
| img_base64_str = base64.b64encode(buffered.getvalue()).decode() | |
| contents.append({'inline_data': {'mime_type': 'image/png', 'data': img_base64_str}}) | |
| elif file_info['type'] == 'application/pdf': | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf: | |
| temp_pdf.write(file_info['data']) | |
| temp_pdf_path = temp_pdf.name | |
| file_ref = client.files.upload(file=temp_pdf_path) | |
| uploaded_file_refs.append(file_ref) | |
| contents.append(file_ref) | |
| os.unlink(temp_pdf_path) | |
| except Exception as e: | |
| raise ValueError(f"Impossible d'uploader le PDF: {str(e)}") | |
| if not contents: | |
| raise ValueError("Aucun contenu valide.") | |
| prompt_to_use = get_prompt_for_style(resolution_style) | |
| if not prompt_to_use: | |
| raise ValueError(f"Prompt introuvable pour le style '{resolution_style}'.") | |
| contents.append(prompt_to_use) | |
| task_results[task_id]['status'] = 'generating_latex' | |
| gemini_response = client.models.generate_content( | |
| model="gemini-2.5-pro", | |
| contents=contents, | |
| config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)]) | |
| ) | |
| full_latex_response = "" | |
| if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts: | |
| for part in gemini_response.candidates[0].content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| full_latex_response += part.text | |
| if not full_latex_response.strip(): | |
| raise ValueError("Gemini a retourné une réponse vide.") | |
| task_results[task_id]['status'] = 'cleaning_latex' | |
| cleaned_latex = clean_latex_code(full_latex_response) | |
| task_results[task_id]['status'] = 'generating_pdf' | |
| pdf_filename_base = f"solution_{task_id}" | |
| pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR) | |
| if pdf_file_path: | |
| task_results[task_id]['status'] = 'completed' | |
| task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path) | |
| task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)}" | |
| else: | |
| raise RuntimeError(f"Échec de la génération PDF: {pdf_message}") | |
| except Exception as e: | |
| print(f"Task {task_id} Erreur: {e}") | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = str(e) | |
| task_results[task_id]['response'] = f"Erreur: {str(e)}" | |
| finally: | |
| for file_ref in uploaded_file_refs: | |
| try: client.files.delete(file_ref) | |
| except: pass | |
| def index(): | |
| return render_template('index.html') | |
| def solve(): | |
| try: | |
| if 'user_files' not in request.files: | |
| return jsonify({'error': 'Aucun fichier fourni'}), 400 | |
| uploaded_files = request.files.getlist('user_files') | |
| if not uploaded_files or all(f.filename == '' for f in uploaded_files): | |
| return jsonify({'error': 'Aucun fichier sélectionné'}), 400 | |
| resolution_style = request.form.get('style', 'colorful') | |
| files_data = [] | |
| file_count = {'images': 0, 'pdfs': 0} | |
| for file in uploaded_files: | |
| if not file.filename: continue | |
| file_data = file.read() | |
| file_type = file.content_type or 'application/octet-stream' | |
| if file_type.startswith('image/'): | |
| file_count['images'] += 1 | |
| files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) | |
| send_to_telegram(file_data, file.filename, f"Image reçue - Style: {resolution_style}") | |
| elif file_type == 'application/pdf': | |
| if file_count['pdfs'] >= 1: | |
| return jsonify({'error': 'Un seul PDF autorisé'}), 400 | |
| file_count['pdfs'] += 1 | |
| files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) | |
| send_to_telegram(file_data, file.filename, f"PDF reçu - Style: {resolution_style}") | |
| if not files_data: | |
| return jsonify({'error': 'Aucun fichier valide (image/pdf) trouvé'}), 400 | |
| task_id = str(uuid.uuid4()) | |
| task_results[task_id] = { | |
| 'status': 'pending', | |
| 'response': '', | |
| 'error': None, | |
| 'time_started': time.time(), | |
| 'style': resolution_style, | |
| 'file_count': file_count, | |
| 'first_filename': files_data[0]['filename'] | |
| } | |
| threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start() | |
| return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']}) | |
| except Exception as e: | |
| print(f"Erreur /solve: {e}") | |
| return jsonify({'error': f'Erreur serveur: {e}'}), 500 | |
| def get_task_status(task_id): | |
| task = task_results.get(task_id) | |
| if not task: | |
| return jsonify({'error': 'Tâche introuvable'}), 404 | |
| response_data = {'status': task['status'], 'response': task.get('response'), 'error': task.get('error')} | |
| if task['status'] == 'completed': | |
| response_data['download_url'] = f"/download/{task_id}" | |
| return jsonify(response_data) | |
| def stream_task_progress(task_id): | |
| def generate(): | |
| last_status_sent = None | |
| while True: | |
| task = task_results.get(task_id) | |
| if not task: | |
| yield f'data: {json.dumps({"error": "Tâche disparue", "status": "error"})}\n\n' | |
| break | |
| current_status = task['status'] | |
| if current_status != last_status_sent: | |
| data_to_send = {"status": current_status} | |
| if current_status == 'completed': | |
| data_to_send["response"] = task.get("response", "") | |
| data_to_send["download_url"] = f"/download/{task_id}" | |
| elif current_status == 'error': | |
| data_to_send["error"] = task.get("error", "Erreur inconnue") | |
| yield f'data: {json.dumps(data_to_send)}\n\n' | |
| last_status_sent = current_status | |
| if current_status in ['completed', 'error']: | |
| break | |
| time.sleep(1) | |
| return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) | |
| def download_pdf(task_id): | |
| task = task_results.get(task_id) | |
| if not task or task['status'] != 'completed' or 'pdf_filename' not in task: | |
| return "Fichier non trouvé ou non prêt.", 404 | |
| try: | |
| return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True) | |
| except FileNotFoundError: | |
| return "Fichier introuvable sur le serveur.", 404 | |
| if __name__ == '__main__': | |
| os.makedirs(GENERATED_PDF_DIR, exist_ok=True) | |
| if not GOOGLE_API_KEY: | |
| print("CRITICAL: GOOGLE_API_KEY non défini.") | |
| if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: | |
| print("CRITICAL: Clés Telegram non définies.") | |
| app.run(debug=True, host='0.0.0.0', port=5000) |