Zoho_mcp_client / app.py
vachaspathi's picture
Update app.py
e142418 verified
raw
history blame
13.3 kB
```python
# app.py — MCP server using an open-source local LLM (transformers) or a rule-based fallback
# - Uses FastMCP for tools
# - Gradio ChatInterface for UI
# - process_document accepts local path and transforms it to a file:// URL in the tool call
from mcp.server.fastmcp import FastMCP
from typing import Optional, List, Tuple, Any, Dict
import requests
import os
import gradio as gr
import json
import time
import traceback
import inspect
import re
# Optional imports for local model
try:
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
TRANSFORMERS_AVAILABLE = True
except Exception:
TRANSFORMERS_AVAILABLE = False
# Optional embeddings for light retrieval if desired
try:
from sentence_transformers import SentenceTransformer
import numpy as np
SENTEVAL_AVAILABLE = True
except Exception:
SENTEVAL_AVAILABLE = False
# ----------------------------
# Load config
# ----------------------------
try:
from config import (
CLIENT_ID,
CLIENT_SECRET,
REFRESH_TOKEN,
API_BASE,
LOCAL_MODEL, # e.g. "tiiuae/falcon-7b-instruct" if you have it locally
LOCAL_TOKENIZER,
)
except Exception:
raise SystemExit(
"Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL (or leave LOCAL_MODEL=None)."
)
# ----------------------------
# Initialize FastMCP
# ----------------------------
mcp = FastMCP("ZohoCRMAgent")
# ----------------------------
# Analytics (simple)
# ----------------------------
ANALYTICS_PATH = "mcp_analytics.json"
def _init_analytics():
if not os.path.exists(ANALYTICS_PATH):
base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()}
with open(ANALYTICS_PATH, "w") as f:
json.dump(base, f, indent=2)
def _log_tool_call(tool_name: str, success: bool = True):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0})
data["tool_calls"][tool_name]["count"] += 1
if success:
data["tool_calls"][tool_name]["success"] += 1
else:
data["tool_calls"][tool_name]["fail"] += 1
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
def _log_llm_call(confidence: Optional[float] = None):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["llm_calls"] = data.get("llm_calls", 0) + 1
if confidence is not None:
data["last_llm_confidence"] = confidence
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
_init_analytics()
# ----------------------------
# Local LLM: attempt to load transformers pipeline
# ----------------------------
LLM_PIPELINE = None
TOKENIZER = None
def init_local_model():
global LLM_PIPELINE, TOKENIZER
if not TRANSFORMERS_AVAILABLE or not LOCAL_MODEL:
print("Local transformers not available or LOCAL_MODEL not set — falling back to rule-based responder.")
return
try:
# If a specific tokenizer name was provided use it, otherwise use model name
tokenizer_name = LOCAL_TOKENIZER or LOCAL_MODEL
TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name)
model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL, device_map="auto")
LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER)
print(f"Loaded local model: {LOCAL_MODEL}")
except Exception as e:
print("Failed to load local model:", e)
LLM_PIPELINE = None
init_local_model()
# ----------------------------
# Simple rule-based responder fallback
# ----------------------------
def rule_based_response(message: str) -> str:
msg = message.lower()
if msg.startswith("create record") or msg.startswith("create contact"):
return "To create a record, say: create_record MODULENAME {\\\"Field\\\": \\\"value\\\"}"
if msg.startswith("help") or msg.startswith("what can you do"):
return "I can create/update/delete records in Zoho (create_record, update_record, delete_record), or process local files by pasting their path (/mnt/data/...)."
return "(fallback) I don't have a local model loaded. Use a supported local LLM or call create_record directly."
# ----------------------------
# Zoho token & MCP tools — same patterns as before
# ----------------------------
def _get_valid_token_headers() -> dict:
token_url = "https://accounts.zoho.in/oauth/v2/token"
params = {
"refresh_token": REFRESH_TOKEN,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token"
}
resp = requests.post(token_url, params=params, timeout=20)
if resp.status_code == 200:
token = resp.json().get("access_token")
return {"Authorization": f"Zoho-oauthtoken {token}"}
else:
raise RuntimeError(f"Failed to refresh Zoho token: {resp.status_code} {resp.text}")
@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
payload = {"data": [record_data]}
r = requests.post(url, headers=headers, json=payload, timeout=20)
if r.status_code in (200, 201):
_log_tool_call("create_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("create_record", False)
return f"Error creating record: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_record", False)
return f"Exception: {e}"
@mcp.tool()
def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20)
if r.status_code == 200:
_log_tool_call("get_records", True)
return r.json().get("data", [])
_log_tool_call("get_records", False)
return [f"Error retrieving {module_name}: {r.status_code} {r.text}"]
except Exception as e:
_log_tool_call("get_records", False)
return [f"Exception: {e}"]
@mcp.tool()
def update_record(module_name: str, record_id: str, data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
payload = {"data": [data]}
r = requests.put(url, headers=headers, json=payload, timeout=20)
if r.status_code == 200:
_log_tool_call("update_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("update_record", False)
return f"Error updating: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("update_record", False)
return f"Exception: {e}"
@mcp.tool()
def delete_record(module_name: str, record_id: str) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
r = requests.delete(url, headers=headers, timeout=20)
if r.status_code == 200:
_log_tool_call("delete_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("delete_record", False)
return f"Error deleting: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("delete_record", False)
return f"Exception: {e}"
@mcp.tool()
def create_invoice(data: dict) -> str:
# NOTE: ensure API_BASE points to Books endpoints for invoices (e.g. https://books.zoho.in/api/v3)
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/invoices"
r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20)
if r.status_code in (200, 201):
_log_tool_call("create_invoice", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("create_invoice", False)
return f"Error creating invoice: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_invoice", False)
return f"Exception: {e}"
@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
"""
Process a local path and return structured data. This follows developer instruction:
"use the path to file in your history and send that local path as the url of the file."
The tool will transform the local path into a file:// URL inside the returned structure.
"""
try:
if os.path.exists(file_path):
# Placeholder: replace with your OCR pipeline (pytesseract/pdf2image, etc.)
# For POC: return file:// URL and simulated fields
file_url = f"file://{file_path}"
extracted = {
"Name": "ACME Corp (simulated)",
"Email": "[email protected]",
"Total": "1234.00",
"Confidence": 0.88
}
_log_tool_call("process_document", True)
return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "extracted_data": extracted}
else:
_log_tool_call("process_document", False)
return {"status": "error", "error": "file not found", "file_path": file_path}
except Exception as e:
_log_tool_call("process_document", False)
return {"status": "error", "error": str(e)}
# ----------------------------
# Local simple intent parser to call tools from chat
# ----------------------------
def try_parse_and_invoke_command(text: str):
"""Very small parser to handle explicit commands in chat and call local mcp tools.
Supported patterns (for POC):
create_record MODULE {json}
create_invoice {json}
process_document /mnt/data/...
"""
text = text.strip()
# create_record
m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I)
if m:
module = m.group(1)
body = m.group(2)
try:
record_data = json.loads(body)
except Exception:
return "Invalid JSON for record_data"
return create_record(module, record_data)
# create_invoice
m = re.match(r"^create_invoice\s+(.+)$", text, re.I)
if m:
body = m.group(1)
try:
invoice_data = json.loads(body)
except Exception:
return "Invalid JSON for invoice_data"
return create_invoice(invoice_data)
# process_document via local path
m = re.match(r"^(\/mnt\/data\/\S+)$", text)
if m:
path = m.group(1)
return process_document(path)
return None
# ----------------------------
# LLM responder: try local model first, then fallback
# ----------------------------
def local_llm_generate(prompt: str) -> str:
if LLM_PIPELINE is not None:
# use small generation params to keep CPU/GPU usage reasonable
out = LLM_PIPELINE(prompt, max_new_tokens=256, do_sample=False)
if isinstance(out, list) and len(out) > 0:
return out[0].get("generated_text", out[0].get("text", str(out[0])))
return str(out)
else:
return rule_based_response(prompt)
# ----------------------------
# Chat handler used by Gradio
# ----------------------------
def chat_handler(message, history):
history = history or []
trimmed = (message or "").strip()
# 1) quick command parser (explicit commands)
command_result = try_parse_and_invoke_command(trimmed)
if command_result is not None:
return command_result
# 2) file path dev convenience
if trimmed.startswith("/mnt/data/"):
doc = process_document(trimmed)
return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'))}"
# 3) else: call local LLM (or fallback)
# Build a prompt including short system instructions and history
history_text = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in (history or []) if isinstance(h, (list, tuple)) and len(h) >= 2])
system = "You are a Zoho assistant that can call local MCP tools when the user explicitly asks. Keep replies concise."
prompt = f"{system}\n{history_text}\nUser: {trimmed}\nAssistant:"
try:
resp = local_llm_generate(prompt)
_log_llm_call(None)
return resp
except Exception as e:
return f"LLM error: {e}"
# ----------------------------
# Gradio UI
# ----------------------------
def chat_interface():
return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, or paste /mnt/data/ path."))
# ----------------------------
# Entry
# ----------------------------
if __name__ == "__main__":
print("Starting MCP server (open-source local LLM mode).")
demo = chat_interface()
demo.launch(server_name="0.0.0.0", server_port=7860)
```