import os import json import time import uuid import asyncio import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse from starlette.requests import ClientDisconnect app = FastAPI() # ===================================================== # CONFIG # ===================================================== MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") # Default CF Workers AI model (can override via request body) DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast") # ===================================================== # LOAD CF CREDENTIALS # Format env: CF_1=account_id,api_key # ===================================================== CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...} for i in range(1, 101): raw = os.getenv(f"CF_{i}") if not raw: continue parts = raw.split(",", 1) if len(parts) != 2: print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped") continue account_id, api_key = parts[0].strip(), parts[1].strip() if account_id and api_key: CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key}) if not CF_ACCOUNTS: print("[WARN] No CF credentials found, inserting dummy") CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"}) # ===================================================== # KEY STATUS # ===================================================== key_status = {} for idx, acc in enumerate(CF_ACCOUNTS, 1): kid = acc["account_id"] key_status[kid] = { "index": idx, "healthy": True, "busy": False, "success": 0, "fail": 0, } rr_index = 0 _key_lock = asyncio.Lock() # ===================================================== # HELPERS # ===================================================== def log(x): print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True) def sse(obj): return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n" def auth_ok(req: Request): token = req.headers.get("Authorization", "").replace("Bearer ", "") return token == MASTER_API_KEY CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1" def cf_base(account_id: str) -> str: return CF_AI_BASE.format(account_id=account_id) async def get_key(exclude=None): global rr_index if exclude is None: exclude = set() async with _key_lock: for _ in range(len(CF_ACCOUNTS)): rr_index = (rr_index + 1) % len(CF_ACCOUNTS) acc = CF_ACCOUNTS[rr_index] kid = acc["account_id"] st = key_status[kid] if st["healthy"] and not st["busy"] and kid not in exclude: st["busy"] = True return acc return None async def release_key(acc): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["busy"] = False async def mark_fail(acc): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["fail"] += 1 async def mark_ok(acc): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["success"] += 1 key_status[kid]["fail"] = 0 async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3): elapsed = 0.0 while elapsed < max_wait: acc = await get_key(exclude) if acc: return acc await asyncio.sleep(interval) elapsed += interval return None def is_rate_limited_status(status_code: int) -> bool: """Cek rate limit hanya dari HTTP status code.""" return status_code == 429 def is_rate_limited_error_body(text: str) -> bool: """ Cek rate limit dari body HTTP error response. HANYA dipakai pada non-200 HTTP response body atau JSON error object — BUKAN pada token output model (supaya tidak false positive). """ t = text.lower() return "rate limit" in t or "too many requests" in t or "usage limit" in t def parse_sse_chunk(raw: str): """ Parse satu SSE data chunk dari CF (OpenAI-compatible format). Return: (token, is_cf_error, error_text) - token : string content untuk di-stream ke client (bisa "" kalau thinking/kosong) - is_cf_error: True kalau chunk ini adalah error dari CF API, bukan output model - error_text : teks error kalau is_cf_error=True """ try: j = json.loads(raw) except json.JSONDecodeError: # Non-JSON → kemungkinan error text plain dari CF return None, True, raw # JSON dengan "error" key dan tanpa "choices" → error dari CF API if "error" in j and "choices" not in j: return None, True, json.dumps(j) # Normal OpenAI delta chunk choices = j.get("choices", []) if not choices: return "", False, "" delta = choices[0].get("delta", {}) # content utama (None selama thinking phase di beberapa model) content = delta.get("content") or "" # Beberapa model thinking (Kimi K2, DeepSeek R1, dll) pakai reasoning_content # untuk thinking tokens — ikutkan supaya thinking juga ke-stream reasoning = delta.get("reasoning_content") or delta.get("reasoning") or "" return reasoning + content, False, "" # ===================================================== # ROOT # ===================================================== @app.get("/") async def root(): async with _key_lock: safe = {} for kid, v in key_status.items(): masked = kid[:6] + "****" + kid[-4:] safe[masked] = { "index": v["index"], "healthy": v["healthy"], "busy": v["busy"], "success": v["success"], "fail": v["fail"], } return { "status": "ok", "accounts": len(CF_ACCOUNTS), "default_model": DEFAULT_CF_MODEL, "detail": safe } # ===================================================== # /v1/models — live proxy langsung ke CF # ===================================================== @app.get("/v1/models") async def models(req: Request): if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) acc = None async with _key_lock: for a in CF_ACCOUNTS: if key_status[a["account_id"]]["healthy"]: acc = a break if not acc: return JSONResponse({"error": "No healthy accounts"}, status_code=503) try: async with httpx.AsyncClient(timeout=30) as client: r = await client.get( f"{cf_base(acc['account_id'])}/models", headers={"Authorization": f"Bearer {acc['api_key']}"} ) if r.status_code != 200: return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code) return Response(content=r.content, media_type="application/json") except Exception as e: log(f"[/v1/models] exception: {e}") return JSONResponse({"error": str(e)}, status_code=500) # ===================================================== # /v1/chat/completions — OpenAI-compatible endpoint # ===================================================== @app.post("/v1/chat/completions") async def chat(req: Request): if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: body = await req.json() except Exception: return JSONResponse({"error": "Bad JSON"}, status_code=400) is_stream = body.get("stream", False) model = body.get("model", DEFAULT_CF_MODEL) cf_body = {**body, "model": model} # ----------------------------------------- # NON STREAM # ----------------------------------------- if not is_stream: tried = set() for _ in range(len(CF_ACCOUNTS)): acc = await wait_for_free_key(exclude=tried) if not acc: break tried.add(acc["account_id"]) try: async with httpx.AsyncClient(timeout=180) as client: r = await client.post( f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={ "Authorization": f"Bearer {acc['api_key']}", "Content-Type": "application/json", } ) # FIX: cek rate limit hanya dari HTTP status/error body, bukan dari model output if is_rate_limited_status(r.status_code) or ( r.status_code != 200 and is_rate_limited_error_body(r.text) ): log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next") await mark_fail(acc) continue if r.status_code != 200: log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next") await mark_fail(acc) continue await mark_ok(acc) return Response(content=r.content, media_type="application/json") except Exception as e: log(f"Account {acc['account_id'][:8]}... exception: {e}") await mark_fail(acc) finally: await release_key(acc) return JSONResponse({"error": "All accounts failed"}, status_code=500) # ----------------------------------------- # STREAM — pipe OpenAI SSE langsung ke client # ----------------------------------------- async def gen(): tried = set() for _ in range(len(CF_ACCOUNTS)): acc = await wait_for_free_key(exclude=tried) if not acc: break tried.add(acc["account_id"]) try: async with httpx.AsyncClient(timeout=None) as client: async with client.stream( "POST", f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={ "Authorization": f"Bearer {acc['api_key']}", "Content-Type": "application/json", } ) as r: # FIX: hanya cek status code untuk rate limit di sini if is_rate_limited_status(r.status_code): log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next") await mark_fail(acc) continue if r.status_code != 200: log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next") await mark_fail(acc) continue hit_limit = False async for line in r.aiter_lines(): if not line: continue if line.strip() == "data: [DONE]": break raw = line[6:] if line.startswith("data: ") else line # FIX: gunakan parse_sse_chunk, cek error hanya pada CF error object # — jangan cek kata "rate limit" pada konten model _, is_cf_err, err_text = parse_sse_chunk(raw) if is_cf_err and is_rate_limited_error_body(err_text): log(f"Account {acc['account_id'][:8]}... mid-stream CF error, switching key") hit_limit = True break yield line + "\n\n" if hit_limit: await mark_fail(acc) continue yield "data: [DONE]\n\n" await mark_ok(acc) return except Exception as e: log(f"Account {acc['account_id'][:8]}... stream exception: {e}") await mark_fail(acc) finally: await release_key(acc) yield sse({"error": "All accounts failed"}) yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream") # ===================================================== # /v1/messages — Anthropic-compatible endpoint # Konversi Anthropic format → CF OpenAI-compatible # ===================================================== @app.post("/v1/messages") async def anthropic(req: Request): if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: body = await req.json() except ClientDisconnect: return Response(status_code=499) except Exception: return JSONResponse({"error": "Bad JSON"}, status_code=400) stream = body.get("stream", False) model = body.get("model", DEFAULT_CF_MODEL) max_tokens = body.get("max_tokens", 2048) # Konversi Anthropic messages → OpenAI format messages = [] if body.get("system"): messages.append({"role": "system", "content": body["system"]}) for m in body.get("messages", []): content = m.get("content", "") if isinstance(content, list): txt = "".join(x.get("text", "") for x in content if x.get("type") == "text") content = txt messages.append({"role": m["role"], "content": content}) cf_body = { "model": model, "messages": messages, "max_tokens": max_tokens, "stream": stream, } # ----------------------------------------- # NON STREAM # ----------------------------------------- if not stream: tried = set() for _ in range(len(CF_ACCOUNTS)): acc = await wait_for_free_key(exclude=tried) if not acc: break tried.add(acc["account_id"]) try: async with httpx.AsyncClient(timeout=180) as client: r = await client.post( f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={ "Authorization": f"Bearer {acc['api_key']}", "Content-Type": "application/json", } ) # FIX: cek rate limit hanya dari HTTP status/error body if is_rate_limited_status(r.status_code) or ( r.status_code != 200 and is_rate_limited_error_body(r.text) ): log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next") await mark_fail(acc) continue if r.status_code != 200: log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next") await mark_fail(acc) continue data = r.json() content_text = data["choices"][0]["message"]["content"] or "" usage = data.get("usage", {}) out = { "id": "msg_" + uuid.uuid4().hex[:10], "type": "message", "role": "assistant", "model": model, "content": [{"type": "text", "text": content_text}], "stop_reason": "end_turn", "stop_sequence": None, "usage": { "input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), } } await mark_ok(acc) return JSONResponse(out) except Exception as e: log(f"Account {acc['account_id'][:8]}... exception: {e}") await mark_fail(acc) finally: await release_key(acc) return JSONResponse({"error": "All accounts failed"}, status_code=500) # ----------------------------------------- # STREAM — CF kirim OpenAI SSE, kita konversi ke Anthropic SSE # ----------------------------------------- async def agen(): tried = set() msg_id = "msg_" + uuid.uuid4().hex[:10] envelope_sent = False for _ in range(len(CF_ACCOUNTS)): acc = await wait_for_free_key(exclude=tried) if not acc: break tried.add(acc["account_id"]) try: async with httpx.AsyncClient(timeout=None) as client: async with client.stream( "POST", f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={ "Authorization": f"Bearer {acc['api_key']}", "Content-Type": "application/json", } ) as r: # FIX: hanya cek status code untuk rate limit if is_rate_limited_status(r.status_code): log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next") await mark_fail(acc) continue if r.status_code != 200: log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next") await mark_fail(acc) continue # Kirim Anthropic envelope hanya sekali if not envelope_sent: envelope_sent = True yield sse({ "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "model": model, "content": [], "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } }) # FIX: tambah "text": "" sesuai spec Anthropic yield sse({ "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""} }) hit_limit = False async for line in r.aiter_lines(): if not line: continue if line.strip() == "data: [DONE]": break raw = line[6:] if line.startswith("data: ") else line # ============================================= # FIX UTAMA: parse chunk dulu, baru cek error # JANGAN cek is_rate_limited pada teks model! # Ini penyebab response berhenti di tengah karena # model nulis kata "rate limit" / "too many requests" # dalam output / thinking-nya. # ============================================= token, is_cf_err, err_text = parse_sse_chunk(raw) if is_cf_err: if is_rate_limited_error_body(err_text): log(f"Account {acc['account_id'][:8]}... mid-stream CF rate limit, switching key") hit_limit = True else: log(f"Account {acc['account_id'][:8]}... mid-stream CF error: {err_text[:120]}") break # token "" → thinking phase tanpa content, skip saja if token: yield sse({ "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": token} }) if hit_limit: await mark_fail(acc) continue await mark_ok(acc) break except Exception as e: log(f"Account {acc['account_id'][:8]}... agen exception: {e}") await mark_fail(acc) finally: await release_key(acc) # Tutup Anthropic SSE envelope # Edge case: semua account gagal sebelum sempat kirim envelope if not envelope_sent: yield sse({ "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "model": model, "content": [], "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } }) yield sse({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}) yield sse({"type": "content_block_stop", "index": 0}) yield sse({ "type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": {"output_tokens": 0} }) yield sse({"type": "message_stop"}) return StreamingResponse(agen(), media_type="text/event-stream")