```python # app.py — MCP server using an open-source local LLM (transformers) or a rule-based fallback # - Uses FastMCP for tools # - Gradio ChatInterface for UI # - process_document accepts local path and transforms it to a file:// URL in the tool call from mcp.server.fastmcp import FastMCP from typing import Optional, List, Tuple, Any, Dict import requests import os import gradio as gr import json import time import traceback import inspect import re # Optional imports for local model try: from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer TRANSFORMERS_AVAILABLE = True except Exception: TRANSFORMERS_AVAILABLE = False # Optional embeddings for light retrieval if desired try: from sentence_transformers import SentenceTransformer import numpy as np SENTEVAL_AVAILABLE = True except Exception: SENTEVAL_AVAILABLE = False # ---------------------------- # Load config # ---------------------------- try: from config import ( CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL, # e.g. "tiiuae/falcon-7b-instruct" if you have it locally LOCAL_TOKENIZER, ) except Exception: raise SystemExit( "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL (or leave LOCAL_MODEL=None)." ) # ---------------------------- # Initialize FastMCP # ---------------------------- mcp = FastMCP("ZohoCRMAgent") # ---------------------------- # Analytics (simple) # ---------------------------- ANALYTICS_PATH = "mcp_analytics.json" def _init_analytics(): if not os.path.exists(ANALYTICS_PATH): base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()} with open(ANALYTICS_PATH, "w") as f: json.dump(base, f, indent=2) def _log_tool_call(tool_name: str, success: bool = True): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) data["tool_calls"][tool_name]["count"] += 1 if success: data["tool_calls"][tool_name]["success"] += 1 else: data["tool_calls"][tool_name]["fail"] += 1 with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) def _log_llm_call(confidence: Optional[float] = None): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["llm_calls"] = data.get("llm_calls", 0) + 1 if confidence is not None: data["last_llm_confidence"] = confidence with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) _init_analytics() # ---------------------------- # Local LLM: attempt to load transformers pipeline # ---------------------------- LLM_PIPELINE = None TOKENIZER = None def init_local_model(): global LLM_PIPELINE, TOKENIZER if not TRANSFORMERS_AVAILABLE or not LOCAL_MODEL: print("Local transformers not available or LOCAL_MODEL not set — falling back to rule-based responder.") return try: # If a specific tokenizer name was provided use it, otherwise use model name tokenizer_name = LOCAL_TOKENIZER or LOCAL_MODEL TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name) model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL, device_map="auto") LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) print(f"Loaded local model: {LOCAL_MODEL}") except Exception as e: print("Failed to load local model:", e) LLM_PIPELINE = None init_local_model() # ---------------------------- # Simple rule-based responder fallback # ---------------------------- def rule_based_response(message: str) -> str: msg = message.lower() if msg.startswith("create record") or msg.startswith("create contact"): return "To create a record, say: create_record MODULENAME {\\\"Field\\\": \\\"value\\\"}" if msg.startswith("help") or msg.startswith("what can you do"): return "I can create/update/delete records in Zoho (create_record, update_record, delete_record), or process local files by pasting their path (/mnt/data/...)." return "(fallback) I don't have a local model loaded. Use a supported local LLM or call create_record directly." # ---------------------------- # Zoho token & MCP tools — same patterns as before # ---------------------------- def _get_valid_token_headers() -> dict: token_url = "https://accounts.zoho.in/oauth/v2/token" params = { "refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "refresh_token" } resp = requests.post(token_url, params=params, timeout=20) if resp.status_code == 200: token = resp.json().get("access_token") return {"Authorization": f"Zoho-oauthtoken {token}"} else: raise RuntimeError(f"Failed to refresh Zoho token: {resp.status_code} {resp.text}") @mcp.tool() def create_record(module_name: str, record_data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" payload = {"data": [record_data]} r = requests.post(url, headers=headers, json=payload, timeout=20) if r.status_code in (200, 201): _log_tool_call("create_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("create_record", False) return f"Error creating record: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_record", False) return f"Exception: {e}" @mcp.tool() def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) if r.status_code == 200: _log_tool_call("get_records", True) return r.json().get("data", []) _log_tool_call("get_records", False) return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] except Exception as e: _log_tool_call("get_records", False) return [f"Exception: {e}"] @mcp.tool() def update_record(module_name: str, record_id: str, data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" payload = {"data": [data]} r = requests.put(url, headers=headers, json=payload, timeout=20) if r.status_code == 200: _log_tool_call("update_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("update_record", False) return f"Error updating: {r.status_code} {r.text}" except Exception as e: _log_tool_call("update_record", False) return f"Exception: {e}" @mcp.tool() def delete_record(module_name: str, record_id: str) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" r = requests.delete(url, headers=headers, timeout=20) if r.status_code == 200: _log_tool_call("delete_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("delete_record", False) return f"Error deleting: {r.status_code} {r.text}" except Exception as e: _log_tool_call("delete_record", False) return f"Exception: {e}" @mcp.tool() def create_invoice(data: dict) -> str: # NOTE: ensure API_BASE points to Books endpoints for invoices (e.g. https://books.zoho.in/api/v3) try: headers = _get_valid_token_headers() url = f"{API_BASE}/invoices" r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) if r.status_code in (200, 201): _log_tool_call("create_invoice", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("create_invoice", False) return f"Error creating invoice: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_invoice", False) return f"Exception: {e}" @mcp.tool() def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: """ Process a local path and return structured data. This follows developer instruction: "use the path to file in your history and send that local path as the url of the file." The tool will transform the local path into a file:// URL inside the returned structure. """ try: if os.path.exists(file_path): # Placeholder: replace with your OCR pipeline (pytesseract/pdf2image, etc.) # For POC: return file:// URL and simulated fields file_url = f"file://{file_path}" extracted = { "Name": "ACME Corp (simulated)", "Email": "contact@acme.example", "Total": "1234.00", "Confidence": 0.88 } _log_tool_call("process_document", True) return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "extracted_data": extracted} else: _log_tool_call("process_document", False) return {"status": "error", "error": "file not found", "file_path": file_path} except Exception as e: _log_tool_call("process_document", False) return {"status": "error", "error": str(e)} # ---------------------------- # Local simple intent parser to call tools from chat # ---------------------------- def try_parse_and_invoke_command(text: str): """Very small parser to handle explicit commands in chat and call local mcp tools. Supported patterns (for POC): create_record MODULE {json} create_invoice {json} process_document /mnt/data/... """ text = text.strip() # create_record m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) if m: module = m.group(1) body = m.group(2) try: record_data = json.loads(body) except Exception: return "Invalid JSON for record_data" return create_record(module, record_data) # create_invoice m = re.match(r"^create_invoice\s+(.+)$", text, re.I) if m: body = m.group(1) try: invoice_data = json.loads(body) except Exception: return "Invalid JSON for invoice_data" return create_invoice(invoice_data) # process_document via local path m = re.match(r"^(\/mnt\/data\/\S+)$", text) if m: path = m.group(1) return process_document(path) return None # ---------------------------- # LLM responder: try local model first, then fallback # ---------------------------- def local_llm_generate(prompt: str) -> str: if LLM_PIPELINE is not None: # use small generation params to keep CPU/GPU usage reasonable out = LLM_PIPELINE(prompt, max_new_tokens=256, do_sample=False) if isinstance(out, list) and len(out) > 0: return out[0].get("generated_text", out[0].get("text", str(out[0]))) return str(out) else: return rule_based_response(prompt) # ---------------------------- # Chat handler used by Gradio # ---------------------------- def chat_handler(message, history): history = history or [] trimmed = (message or "").strip() # 1) quick command parser (explicit commands) command_result = try_parse_and_invoke_command(trimmed) if command_result is not None: return command_result # 2) file path dev convenience if trimmed.startswith("/mnt/data/"): doc = process_document(trimmed) return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'))}" # 3) else: call local LLM (or fallback) # Build a prompt including short system instructions and history history_text = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in (history or []) if isinstance(h, (list, tuple)) and len(h) >= 2]) system = "You are a Zoho assistant that can call local MCP tools when the user explicitly asks. Keep replies concise." prompt = f"{system}\n{history_text}\nUser: {trimmed}\nAssistant:" try: resp = local_llm_generate(prompt) _log_llm_call(None) return resp except Exception as e: return f"LLM error: {e}" # ---------------------------- # Gradio UI # ---------------------------- def chat_interface(): return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, or paste /mnt/data/ path.")) # ---------------------------- # Entry # ---------------------------- if __name__ == "__main__": print("Starting MCP server (open-source local LLM mode).") demo = chat_interface() demo.launch(server_name="0.0.0.0", server_port=7860) ```