Spaces:
Runtime error
Runtime error
| ```python | |
| # app.py — MCP server using an open-source local LLM (transformers) or a rule-based fallback | |
| # - Uses FastMCP for tools | |
| # - Gradio ChatInterface for UI | |
| # - process_document accepts local path and transforms it to a file:// URL in the tool call | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import re | |
| # Optional imports for local model | |
| try: | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| TRANSFORMERS_AVAILABLE = True | |
| except Exception: | |
| TRANSFORMERS_AVAILABLE = False | |
| # Optional embeddings for light retrieval if desired | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| SENTEVAL_AVAILABLE = True | |
| except Exception: | |
| SENTEVAL_AVAILABLE = False | |
| # ---------------------------- | |
| # Load config | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, | |
| LOCAL_MODEL, # e.g. "tiiuae/falcon-7b-instruct" if you have it locally | |
| LOCAL_TOKENIZER, | |
| ) | |
| except Exception: | |
| raise SystemExit( | |
| "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL (or leave LOCAL_MODEL=None)." | |
| ) | |
| # ---------------------------- | |
| # Initialize FastMCP | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics (simple) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()} | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # Local LLM: attempt to load transformers pipeline | |
| # ---------------------------- | |
| LLM_PIPELINE = None | |
| TOKENIZER = None | |
| def init_local_model(): | |
| global LLM_PIPELINE, TOKENIZER | |
| if not TRANSFORMERS_AVAILABLE or not LOCAL_MODEL: | |
| print("Local transformers not available or LOCAL_MODEL not set — falling back to rule-based responder.") | |
| return | |
| try: | |
| # If a specific tokenizer name was provided use it, otherwise use model name | |
| tokenizer_name = LOCAL_TOKENIZER or LOCAL_MODEL | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name) | |
| model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL, device_map="auto") | |
| LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) | |
| print(f"Loaded local model: {LOCAL_MODEL}") | |
| except Exception as e: | |
| print("Failed to load local model:", e) | |
| LLM_PIPELINE = None | |
| init_local_model() | |
| # ---------------------------- | |
| # Simple rule-based responder fallback | |
| # ---------------------------- | |
| def rule_based_response(message: str) -> str: | |
| msg = message.lower() | |
| if msg.startswith("create record") or msg.startswith("create contact"): | |
| return "To create a record, say: create_record MODULENAME {\\\"Field\\\": \\\"value\\\"}" | |
| if msg.startswith("help") or msg.startswith("what can you do"): | |
| return "I can create/update/delete records in Zoho (create_record, update_record, delete_record), or process local files by pasting their path (/mnt/data/...)." | |
| return "(fallback) I don't have a local model loaded. Use a supported local LLM or call create_record directly." | |
| # ---------------------------- | |
| # Zoho token & MCP tools — same patterns as before | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| resp = requests.post(token_url, params=params, timeout=20) | |
| if resp.status_code == 200: | |
| token = resp.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {token}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {resp.status_code} {resp.text}") | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| def create_invoice(data: dict) -> str: | |
| # NOTE: ensure API_BASE points to Books endpoints for invoices (e.g. https://books.zoho.in/api/v3) | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/invoices" | |
| r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """ | |
| Process a local path and return structured data. This follows developer instruction: | |
| "use the path to file in your history and send that local path as the url of the file." | |
| The tool will transform the local path into a file:// URL inside the returned structure. | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| # Placeholder: replace with your OCR pipeline (pytesseract/pdf2image, etc.) | |
| # For POC: return file:// URL and simulated fields | |
| file_url = f"file://{file_path}" | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "[email protected]", | |
| "Total": "1234.00", | |
| "Confidence": 0.88 | |
| } | |
| _log_tool_call("process_document", True) | |
| return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "extracted_data": extracted} | |
| else: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": "file not found", "file_path": file_path} | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Local simple intent parser to call tools from chat | |
| # ---------------------------- | |
| def try_parse_and_invoke_command(text: str): | |
| """Very small parser to handle explicit commands in chat and call local mcp tools. | |
| Supported patterns (for POC): | |
| create_record MODULE {json} | |
| create_invoice {json} | |
| process_document /mnt/data/... | |
| """ | |
| text = text.strip() | |
| # create_record | |
| m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) | |
| if m: | |
| module = m.group(1) | |
| body = m.group(2) | |
| try: | |
| record_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for record_data" | |
| return create_record(module, record_data) | |
| # create_invoice | |
| m = re.match(r"^create_invoice\s+(.+)$", text, re.I) | |
| if m: | |
| body = m.group(1) | |
| try: | |
| invoice_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for invoice_data" | |
| return create_invoice(invoice_data) | |
| # process_document via local path | |
| m = re.match(r"^(\/mnt\/data\/\S+)$", text) | |
| if m: | |
| path = m.group(1) | |
| return process_document(path) | |
| return None | |
| # ---------------------------- | |
| # LLM responder: try local model first, then fallback | |
| # ---------------------------- | |
| def local_llm_generate(prompt: str) -> str: | |
| if LLM_PIPELINE is not None: | |
| # use small generation params to keep CPU/GPU usage reasonable | |
| out = LLM_PIPELINE(prompt, max_new_tokens=256, do_sample=False) | |
| if isinstance(out, list) and len(out) > 0: | |
| return out[0].get("generated_text", out[0].get("text", str(out[0]))) | |
| return str(out) | |
| else: | |
| return rule_based_response(prompt) | |
| # ---------------------------- | |
| # Chat handler used by Gradio | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| # 1) quick command parser (explicit commands) | |
| command_result = try_parse_and_invoke_command(trimmed) | |
| if command_result is not None: | |
| return command_result | |
| # 2) file path dev convenience | |
| if trimmed.startswith("/mnt/data/"): | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'))}" | |
| # 3) else: call local LLM (or fallback) | |
| # Build a prompt including short system instructions and history | |
| history_text = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in (history or []) if isinstance(h, (list, tuple)) and len(h) >= 2]) | |
| system = "You are a Zoho assistant that can call local MCP tools when the user explicitly asks. Keep replies concise." | |
| prompt = f"{system}\n{history_text}\nUser: {trimmed}\nAssistant:" | |
| try: | |
| resp = local_llm_generate(prompt) | |
| _log_llm_call(None) | |
| return resp | |
| except Exception as e: | |
| return f"LLM error: {e}" | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| def chat_interface(): | |
| return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, or paste /mnt/data/ path.")) | |
| # ---------------------------- | |
| # Entry | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| print("Starting MCP server (open-source local LLM mode).") | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |
| ``` |