Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import uuid | |
| import asyncio | |
| import httpx | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, Response, StreamingResponse | |
| from starlette.requests import ClientDisconnect | |
| app = FastAPI() | |
| # ===================================================== | |
| # CONFIG | |
| # ===================================================== | |
| MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") | |
| # Default CF Workers AI model (can override via request body) | |
| DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast") | |
| # ===================================================== | |
| # LOAD CF CREDENTIALS | |
| # Format env: CF_1=account_id,api_key | |
| # ===================================================== | |
| CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...} | |
| for i in range(1, 101): | |
| raw = os.getenv(f"CF_{i}") | |
| if not raw: | |
| continue | |
| parts = raw.split(",", 1) | |
| if len(parts) != 2: | |
| print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped") | |
| continue | |
| account_id, api_key = parts[0].strip(), parts[1].strip() | |
| if account_id and api_key: | |
| CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key}) | |
| if not CF_ACCOUNTS: | |
| print("[WARN] No CF credentials found, inserting dummy") | |
| CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"}) | |
| # ===================================================== | |
| # KEY STATUS | |
| # ===================================================== | |
| key_status = {} | |
| for idx, acc in enumerate(CF_ACCOUNTS, 1): | |
| kid = acc["account_id"] | |
| key_status[kid] = { | |
| "index": idx, | |
| "healthy": True, | |
| "busy": False, | |
| "success": 0, | |
| "fail": 0, | |
| } | |
| rr_index = 0 | |
| _key_lock = asyncio.Lock() | |
| # ===================================================== | |
| # HELPERS | |
| # ===================================================== | |
| def log(x): | |
| print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True) | |
| def sse(obj): | |
| return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n" | |
| def auth_ok(req: Request): | |
| token = req.headers.get("Authorization", "").replace("Bearer ", "") | |
| return token == MASTER_API_KEY | |
| CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1" | |
| def cf_base(account_id: str) -> str: | |
| return CF_AI_BASE.format(account_id=account_id) | |
| async def get_key(exclude=None): | |
| global rr_index | |
| if exclude is None: | |
| exclude = set() | |
| async with _key_lock: | |
| for _ in range(len(CF_ACCOUNTS)): | |
| rr_index = (rr_index + 1) % len(CF_ACCOUNTS) | |
| acc = CF_ACCOUNTS[rr_index] | |
| kid = acc["account_id"] | |
| st = key_status[kid] | |
| if st["healthy"] and not st["busy"] and kid not in exclude: | |
| st["busy"] = True | |
| return acc | |
| return None | |
| async def release_key(acc): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["busy"] = False | |
| async def mark_fail(acc): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["fail"] += 1 | |
| async def mark_ok(acc): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["success"] += 1 | |
| key_status[kid]["fail"] = 0 | |
| async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3): | |
| elapsed = 0.0 | |
| while elapsed < max_wait: | |
| acc = await get_key(exclude) | |
| if acc: | |
| return acc | |
| await asyncio.sleep(interval) | |
| elapsed += interval | |
| return None | |
| def is_rate_limited_status(status_code: int) -> bool: | |
| """Cek rate limit hanya dari HTTP status code.""" | |
| return status_code == 429 | |
| def is_rate_limited_error_body(text: str) -> bool: | |
| """ | |
| Cek rate limit dari body HTTP error response. | |
| HANYA dipakai pada non-200 HTTP response body atau JSON error object | |
| — BUKAN pada token output model (supaya tidak false positive). | |
| """ | |
| t = text.lower() | |
| return "rate limit" in t or "too many requests" in t or "usage limit" in t | |
| def parse_sse_chunk(raw: str): | |
| """ | |
| Parse satu SSE data chunk dari CF (OpenAI-compatible format). | |
| Return: (token, is_cf_error, error_text) | |
| - token : string content untuk di-stream ke client (bisa "" kalau thinking/kosong) | |
| - is_cf_error: True kalau chunk ini adalah error dari CF API, bukan output model | |
| - error_text : teks error kalau is_cf_error=True | |
| """ | |
| try: | |
| j = json.loads(raw) | |
| except json.JSONDecodeError: | |
| # Non-JSON → kemungkinan error text plain dari CF | |
| return None, True, raw | |
| # JSON dengan "error" key dan tanpa "choices" → error dari CF API | |
| if "error" in j and "choices" not in j: | |
| return None, True, json.dumps(j) | |
| # Normal OpenAI delta chunk | |
| choices = j.get("choices", []) | |
| if not choices: | |
| return "", False, "" | |
| delta = choices[0].get("delta", {}) | |
| # content utama (None selama thinking phase di beberapa model) | |
| content = delta.get("content") or "" | |
| # Beberapa model thinking (Kimi K2, DeepSeek R1, dll) pakai reasoning_content | |
| # untuk thinking tokens — ikutkan supaya thinking juga ke-stream | |
| reasoning = delta.get("reasoning_content") or delta.get("reasoning") or "" | |
| return reasoning + content, False, "" | |
| # ===================================================== | |
| # ROOT | |
| # ===================================================== | |
| async def root(): | |
| async with _key_lock: | |
| safe = {} | |
| for kid, v in key_status.items(): | |
| masked = kid[:6] + "****" + kid[-4:] | |
| safe[masked] = { | |
| "index": v["index"], | |
| "healthy": v["healthy"], | |
| "busy": v["busy"], | |
| "success": v["success"], | |
| "fail": v["fail"], | |
| } | |
| return { | |
| "status": "ok", | |
| "accounts": len(CF_ACCOUNTS), | |
| "default_model": DEFAULT_CF_MODEL, | |
| "detail": safe | |
| } | |
| # ===================================================== | |
| # /v1/models — live proxy langsung ke CF | |
| # ===================================================== | |
| async def models(req: Request): | |
| if not auth_ok(req): | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| acc = None | |
| async with _key_lock: | |
| for a in CF_ACCOUNTS: | |
| if key_status[a["account_id"]]["healthy"]: | |
| acc = a | |
| break | |
| if not acc: | |
| return JSONResponse({"error": "No healthy accounts"}, status_code=503) | |
| try: | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| r = await client.get( | |
| f"{cf_base(acc['account_id'])}/models", | |
| headers={"Authorization": f"Bearer {acc['api_key']}"} | |
| ) | |
| if r.status_code != 200: | |
| return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code) | |
| return Response(content=r.content, media_type="application/json") | |
| except Exception as e: | |
| log(f"[/v1/models] exception: {e}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # ===================================================== | |
| # /v1/chat/completions — OpenAI-compatible endpoint | |
| # ===================================================== | |
| async def chat(req: Request): | |
| if not auth_ok(req): | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| try: | |
| body = await req.json() | |
| except Exception: | |
| return JSONResponse({"error": "Bad JSON"}, status_code=400) | |
| is_stream = body.get("stream", False) | |
| model = body.get("model", DEFAULT_CF_MODEL) | |
| cf_body = {**body, "model": model} | |
| # ----------------------------------------- | |
| # NON STREAM | |
| # ----------------------------------------- | |
| if not is_stream: | |
| tried = set() | |
| for _ in range(len(CF_ACCOUNTS)): | |
| acc = await wait_for_free_key(exclude=tried) | |
| if not acc: | |
| break | |
| tried.add(acc["account_id"]) | |
| try: | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| r = await client.post( | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={ | |
| "Authorization": f"Bearer {acc['api_key']}", | |
| "Content-Type": "application/json", | |
| } | |
| ) | |
| # FIX: cek rate limit hanya dari HTTP status/error body, bukan dari model output | |
| if is_rate_limited_status(r.status_code) or ( | |
| r.status_code != 200 and is_rate_limited_error_body(r.text) | |
| ): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| if r.status_code != 200: | |
| log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next") | |
| await mark_fail(acc) | |
| continue | |
| await mark_ok(acc) | |
| return Response(content=r.content, media_type="application/json") | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... exception: {e}") | |
| await mark_fail(acc) | |
| finally: | |
| await release_key(acc) | |
| return JSONResponse({"error": "All accounts failed"}, status_code=500) | |
| # ----------------------------------------- | |
| # STREAM — pipe OpenAI SSE langsung ke client | |
| # ----------------------------------------- | |
| async def gen(): | |
| tried = set() | |
| for _ in range(len(CF_ACCOUNTS)): | |
| acc = await wait_for_free_key(exclude=tried) | |
| if not acc: | |
| break | |
| tried.add(acc["account_id"]) | |
| try: | |
| async with httpx.AsyncClient(timeout=None) as client: | |
| async with client.stream( | |
| "POST", | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={ | |
| "Authorization": f"Bearer {acc['api_key']}", | |
| "Content-Type": "application/json", | |
| } | |
| ) as r: | |
| # FIX: hanya cek status code untuk rate limit di sini | |
| if is_rate_limited_status(r.status_code): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| if r.status_code != 200: | |
| log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| hit_limit = False | |
| async for line in r.aiter_lines(): | |
| if not line: | |
| continue | |
| if line.strip() == "data: [DONE]": | |
| break | |
| raw = line[6:] if line.startswith("data: ") else line | |
| # FIX: gunakan parse_sse_chunk, cek error hanya pada CF error object | |
| # — jangan cek kata "rate limit" pada konten model | |
| _, is_cf_err, err_text = parse_sse_chunk(raw) | |
| if is_cf_err and is_rate_limited_error_body(err_text): | |
| log(f"Account {acc['account_id'][:8]}... mid-stream CF error, switching key") | |
| hit_limit = True | |
| break | |
| yield line + "\n\n" | |
| if hit_limit: | |
| await mark_fail(acc) | |
| continue | |
| yield "data: [DONE]\n\n" | |
| await mark_ok(acc) | |
| return | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... stream exception: {e}") | |
| await mark_fail(acc) | |
| finally: | |
| await release_key(acc) | |
| yield sse({"error": "All accounts failed"}) | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(gen(), media_type="text/event-stream") | |
| # ===================================================== | |
| # /v1/messages — Anthropic-compatible endpoint | |
| # Konversi Anthropic format → CF OpenAI-compatible | |
| # ===================================================== | |
| async def anthropic(req: Request): | |
| if not auth_ok(req): | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| try: | |
| body = await req.json() | |
| except ClientDisconnect: | |
| return Response(status_code=499) | |
| except Exception: | |
| return JSONResponse({"error": "Bad JSON"}, status_code=400) | |
| stream = body.get("stream", False) | |
| model = body.get("model", DEFAULT_CF_MODEL) | |
| max_tokens = body.get("max_tokens", 2048) | |
| # Konversi Anthropic messages → OpenAI format | |
| messages = [] | |
| if body.get("system"): | |
| messages.append({"role": "system", "content": body["system"]}) | |
| for m in body.get("messages", []): | |
| content = m.get("content", "") | |
| if isinstance(content, list): | |
| txt = "".join(x.get("text", "") for x in content if x.get("type") == "text") | |
| content = txt | |
| messages.append({"role": m["role"], "content": content}) | |
| cf_body = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "stream": stream, | |
| } | |
| # ----------------------------------------- | |
| # NON STREAM | |
| # ----------------------------------------- | |
| if not stream: | |
| tried = set() | |
| for _ in range(len(CF_ACCOUNTS)): | |
| acc = await wait_for_free_key(exclude=tried) | |
| if not acc: | |
| break | |
| tried.add(acc["account_id"]) | |
| try: | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| r = await client.post( | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={ | |
| "Authorization": f"Bearer {acc['api_key']}", | |
| "Content-Type": "application/json", | |
| } | |
| ) | |
| # FIX: cek rate limit hanya dari HTTP status/error body | |
| if is_rate_limited_status(r.status_code) or ( | |
| r.status_code != 200 and is_rate_limited_error_body(r.text) | |
| ): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| if r.status_code != 200: | |
| log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next") | |
| await mark_fail(acc) | |
| continue | |
| data = r.json() | |
| content_text = data["choices"][0]["message"]["content"] or "" | |
| usage = data.get("usage", {}) | |
| out = { | |
| "id": "msg_" + uuid.uuid4().hex[:10], | |
| "type": "message", | |
| "role": "assistant", | |
| "model": model, | |
| "content": [{"type": "text", "text": content_text}], | |
| "stop_reason": "end_turn", | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": usage.get("prompt_tokens", 0), | |
| "output_tokens": usage.get("completion_tokens", 0), | |
| } | |
| } | |
| await mark_ok(acc) | |
| return JSONResponse(out) | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... exception: {e}") | |
| await mark_fail(acc) | |
| finally: | |
| await release_key(acc) | |
| return JSONResponse({"error": "All accounts failed"}, status_code=500) | |
| # ----------------------------------------- | |
| # STREAM — CF kirim OpenAI SSE, kita konversi ke Anthropic SSE | |
| # ----------------------------------------- | |
| async def agen(): | |
| tried = set() | |
| msg_id = "msg_" + uuid.uuid4().hex[:10] | |
| envelope_sent = False | |
| for _ in range(len(CF_ACCOUNTS)): | |
| acc = await wait_for_free_key(exclude=tried) | |
| if not acc: | |
| break | |
| tried.add(acc["account_id"]) | |
| try: | |
| async with httpx.AsyncClient(timeout=None) as client: | |
| async with client.stream( | |
| "POST", | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={ | |
| "Authorization": f"Bearer {acc['api_key']}", | |
| "Content-Type": "application/json", | |
| } | |
| ) as r: | |
| # FIX: hanya cek status code untuk rate limit | |
| if is_rate_limited_status(r.status_code): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| if r.status_code != 200: | |
| log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next") | |
| await mark_fail(acc) | |
| continue | |
| # Kirim Anthropic envelope hanya sekali | |
| if not envelope_sent: | |
| envelope_sent = True | |
| yield sse({ | |
| "type": "message_start", | |
| "message": { | |
| "id": msg_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "model": model, | |
| "content": [], | |
| "stop_reason": None, | |
| "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| }) | |
| # FIX: tambah "text": "" sesuai spec Anthropic | |
| yield sse({ | |
| "type": "content_block_start", | |
| "index": 0, | |
| "content_block": {"type": "text", "text": ""} | |
| }) | |
| hit_limit = False | |
| async for line in r.aiter_lines(): | |
| if not line: | |
| continue | |
| if line.strip() == "data: [DONE]": | |
| break | |
| raw = line[6:] if line.startswith("data: ") else line | |
| # ============================================= | |
| # FIX UTAMA: parse chunk dulu, baru cek error | |
| # JANGAN cek is_rate_limited pada teks model! | |
| # Ini penyebab response berhenti di tengah karena | |
| # model nulis kata "rate limit" / "too many requests" | |
| # dalam output / thinking-nya. | |
| # ============================================= | |
| token, is_cf_err, err_text = parse_sse_chunk(raw) | |
| if is_cf_err: | |
| if is_rate_limited_error_body(err_text): | |
| log(f"Account {acc['account_id'][:8]}... mid-stream CF rate limit, switching key") | |
| hit_limit = True | |
| else: | |
| log(f"Account {acc['account_id'][:8]}... mid-stream CF error: {err_text[:120]}") | |
| break | |
| # token "" → thinking phase tanpa content, skip saja | |
| if token: | |
| yield sse({ | |
| "type": "content_block_delta", | |
| "index": 0, | |
| "delta": {"type": "text_delta", "text": token} | |
| }) | |
| if hit_limit: | |
| await mark_fail(acc) | |
| continue | |
| await mark_ok(acc) | |
| break | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... agen exception: {e}") | |
| await mark_fail(acc) | |
| finally: | |
| await release_key(acc) | |
| # Tutup Anthropic SSE envelope | |
| # Edge case: semua account gagal sebelum sempat kirim envelope | |
| if not envelope_sent: | |
| yield sse({ | |
| "type": "message_start", | |
| "message": { | |
| "id": msg_id, "type": "message", "role": "assistant", | |
| "model": model, "content": [], "stop_reason": None, | |
| "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| }) | |
| yield sse({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}) | |
| yield sse({"type": "content_block_stop", "index": 0}) | |
| yield sse({ | |
| "type": "message_delta", | |
| "delta": {"stop_reason": "end_turn", "stop_sequence": None}, | |
| "usage": {"output_tokens": 0} | |
| }) | |
| yield sse({"type": "message_stop"}) | |
| return StreamingResponse(agen(), media_type="text/event-stream") |