Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| import uuid | |
| import os | |
| from datetime import datetime | |
| from config import config | |
| from database import db | |
| from ai_model import ai_models | |
| import uvicorn | |
| import json | |
| app = FastAPI( | |
| title="MobileDoc API", | |
| description="Mobile Doctor Backend MVP", | |
| version="1.0.0" | |
| ) | |
| # ---------------- Middleware ---------------- | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ---------------- File Handling ---------------- | |
| os.makedirs(config.UPLOAD_DIR, exist_ok=True) | |
| app.mount("/uploads", StaticFiles(directory=config.UPLOAD_DIR), name="uploads") | |
| # ---------------- Pydantic Models ---------------- | |
| class UserProfile(BaseModel): | |
| username: str | |
| email: str | |
| age: int | |
| gender: str | |
| allergies: str = "" | |
| conditions: str = "" | |
| class LoginRequest(BaseModel): | |
| username: str | |
| class SymptomsRequest(BaseModel): | |
| user_id: str | |
| symptoms: str | |
| class AnalysisResponse(BaseModel): | |
| success: bool | |
| data: dict | |
| message: str = "" | |
| # ---------------- Routes ---------------- | |
| async def root(): | |
| return {"status": "AI Health Diagnostics API Running", "timestamp": datetime.now().isoformat()} | |
| # ---------- Create User Profile ---------- | |
| async def create_profile(profile: UserProfile): | |
| try: | |
| user_id = str(uuid.uuid4()) | |
| user_data = { | |
| "id": user_id, | |
| "username": profile.username.strip(), | |
| "email": profile.email.strip(), | |
| "age": profile.age, | |
| "gender": profile.gender, | |
| "allergies": profile.allergies, | |
| "conditions": profile.conditions | |
| } | |
| db.create_user(user_data) | |
| return AnalysisResponse( | |
| success=True, | |
| data={"user_id": user_id}, | |
| message="Profile created successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ---------- Check User Profile ---------- | |
| async def check_profile(request: LoginRequest): | |
| username = request.username.strip() | |
| try: | |
| response = db.client.table("users").select("*").eq("username", username).execute() | |
| users = response.data or [] | |
| if not users: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user = users[0] | |
| return AnalysisResponse( | |
| success=True, | |
| data={"user_id": user["id"], "username": user["username"]}, | |
| message="Profile found" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ---------- Symptom Check ---------- | |
| async def symptom_check(request: SymptomsRequest): | |
| try: | |
| # Fetch user from Supabase | |
| user_response = db.client.table("users").select("*").eq("id", request.user_id).execute() | |
| users = user_response.data or [] | |
| if not users: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user_profile = users[0] | |
| # Run AI analysis | |
| analysis_result = ai_models.analyze_symptoms(request.symptoms, user_profile) | |
| # Log analysis | |
| db.log_symptom_analysis({ | |
| "id": str(uuid.uuid4()), | |
| "user_id": request.user_id, | |
| "symptoms": request.symptoms, | |
| "result": json.dumps(analysis_result) | |
| }) | |
| return AnalysisResponse( | |
| success=True, | |
| data=analysis_result, | |
| message="Symptoms analyzed successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ---------- Image Analysis ---------- | |
| async def analyze_image( | |
| user_id: str = Form(...), | |
| image_type: str = Form("skin"), | |
| file: UploadFile = File(...) | |
| ): | |
| try: | |
| allowed_types = ["image/jpeg", "image/png", "image/jpg"] | |
| if file.content_type not in allowed_types: | |
| raise HTTPException(status_code=400, detail="Invalid image format") | |
| image_data = await file.read() | |
| if len(image_data) > config.MAX_IMAGE_SIZE: | |
| raise HTTPException(status_code=400, detail="Image too large") | |
| analysis_result = ai_models.analyze_image(image_data, image_type) | |
| filename = f"{uuid.uuid4()}_{file.filename}" | |
| file_path = os.path.join(config.UPLOAD_DIR, filename) | |
| with open(file_path, "wb") as f: | |
| f.write(image_data) | |
| db.log_image_analysis({ | |
| "id": str(uuid.uuid4()), | |
| "user_id": user_id, | |
| "filename": filename, | |
| "result": json.dumps(analysis_result), | |
| "confidence": analysis_result.get("confidence", 0.0) | |
| }) | |
| return AnalysisResponse( | |
| success=True, | |
| data=analysis_result, | |
| message="Image analyzed successfully" | |
| ) | |
| except Exception as e: | |
| print("🔥 SERVER ERROR:", repr(e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ---------- User History ---------- | |
| async def get_user_history(user_id: str): | |
| try: | |
| symptoms = db.client.table("symptoms_history").select("*").eq("user_id", user_id).order("created_at", desc=True).limit(10).execute() | |
| images = db.client.table("image_analysis").select("*").eq("user_id", user_id).order("created_at", desc=True).limit(10).execute() | |
| return AnalysisResponse( | |
| success=True, | |
| data={ | |
| "symptom_checks": symptoms.data or [], | |
| "image_analyses": images.data or [] | |
| }, | |
| message="History retrieved successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ---------- Run Server ---------- | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |