proxycf / proxy_cf.py
Elysiadev11's picture
Update proxy_cf.py
56972cd verified
import os
import json
import time
import uuid
import asyncio
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from starlette.requests import ClientDisconnect
app = FastAPI()
# =====================================================
# CONFIG
# =====================================================
MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
# Default CF Workers AI model (can override via request body)
DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast")
# =====================================================
# LOAD CF CREDENTIALS
# Format env: CF_1=account_id,api_key
# =====================================================
CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
for i in range(1, 101):
raw = os.getenv(f"CF_{i}")
if not raw:
continue
parts = raw.split(",", 1)
if len(parts) != 2:
print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped")
continue
account_id, api_key = parts[0].strip(), parts[1].strip()
if account_id and api_key:
CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
if not CF_ACCOUNTS:
print("[WARN] No CF credentials found, inserting dummy")
CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"})
# =====================================================
# KEY STATUS
# =====================================================
key_status = {}
for idx, acc in enumerate(CF_ACCOUNTS, 1):
kid = acc["account_id"]
key_status[kid] = {
"index": idx,
"healthy": True,
"busy": False,
"success": 0,
"fail": 0,
}
rr_index = 0
_key_lock = asyncio.Lock()
# =====================================================
# HELPERS
# =====================================================
def log(x):
print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
def sse(obj):
return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
def auth_ok(req: Request):
token = req.headers.get("Authorization", "").replace("Bearer ", "")
return token == MASTER_API_KEY
CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1"
def cf_base(account_id: str) -> str:
return CF_AI_BASE.format(account_id=account_id)
async def get_key(exclude=None):
global rr_index
if exclude is None:
exclude = set()
async with _key_lock:
for _ in range(len(CF_ACCOUNTS)):
rr_index = (rr_index + 1) % len(CF_ACCOUNTS)
acc = CF_ACCOUNTS[rr_index]
kid = acc["account_id"]
st = key_status[kid]
if st["healthy"] and not st["busy"] and kid not in exclude:
st["busy"] = True
return acc
return None
async def release_key(acc):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["busy"] = False
async def mark_fail(acc):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["fail"] += 1
async def mark_ok(acc):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["success"] += 1
key_status[kid]["fail"] = 0
async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
elapsed = 0.0
while elapsed < max_wait:
acc = await get_key(exclude)
if acc:
return acc
await asyncio.sleep(interval)
elapsed += interval
return None
def is_rate_limited_status(status_code: int) -> bool:
"""Cek rate limit hanya dari HTTP status code."""
return status_code == 429
def is_rate_limited_error_body(text: str) -> bool:
"""
Cek rate limit dari body HTTP error response.
HANYA dipakai pada non-200 HTTP response body atau JSON error object
— BUKAN pada token output model (supaya tidak false positive).
"""
t = text.lower()
return "rate limit" in t or "too many requests" in t or "usage limit" in t
def parse_sse_chunk(raw: str):
"""
Parse satu SSE data chunk dari CF (OpenAI-compatible format).
Return: (token, is_cf_error, error_text)
- token : string content untuk di-stream ke client (bisa "" kalau thinking/kosong)
- is_cf_error: True kalau chunk ini adalah error dari CF API, bukan output model
- error_text : teks error kalau is_cf_error=True
"""
try:
j = json.loads(raw)
except json.JSONDecodeError:
# Non-JSON → kemungkinan error text plain dari CF
return None, True, raw
# JSON dengan "error" key dan tanpa "choices" → error dari CF API
if "error" in j and "choices" not in j:
return None, True, json.dumps(j)
# Normal OpenAI delta chunk
choices = j.get("choices", [])
if not choices:
return "", False, ""
delta = choices[0].get("delta", {})
# content utama (None selama thinking phase di beberapa model)
content = delta.get("content") or ""
# Beberapa model thinking (Kimi K2, DeepSeek R1, dll) pakai reasoning_content
# untuk thinking tokens — ikutkan supaya thinking juga ke-stream
reasoning = delta.get("reasoning_content") or delta.get("reasoning") or ""
return reasoning + content, False, ""
# =====================================================
# ROOT
# =====================================================
@app.get("/")
async def root():
async with _key_lock:
safe = {}
for kid, v in key_status.items():
masked = kid[:6] + "****" + kid[-4:]
safe[masked] = {
"index": v["index"],
"healthy": v["healthy"],
"busy": v["busy"],
"success": v["success"],
"fail": v["fail"],
}
return {
"status": "ok",
"accounts": len(CF_ACCOUNTS),
"default_model": DEFAULT_CF_MODEL,
"detail": safe
}
# =====================================================
# /v1/models — live proxy langsung ke CF
# =====================================================
@app.get("/v1/models")
async def models(req: Request):
if not auth_ok(req):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
acc = None
async with _key_lock:
for a in CF_ACCOUNTS:
if key_status[a["account_id"]]["healthy"]:
acc = a
break
if not acc:
return JSONResponse({"error": "No healthy accounts"}, status_code=503)
try:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(
f"{cf_base(acc['account_id'])}/models",
headers={"Authorization": f"Bearer {acc['api_key']}"}
)
if r.status_code != 200:
return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code)
return Response(content=r.content, media_type="application/json")
except Exception as e:
log(f"[/v1/models] exception: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
# =====================================================
# /v1/chat/completions — OpenAI-compatible endpoint
# =====================================================
@app.post("/v1/chat/completions")
async def chat(req: Request):
if not auth_ok(req):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
try:
body = await req.json()
except Exception:
return JSONResponse({"error": "Bad JSON"}, status_code=400)
is_stream = body.get("stream", False)
model = body.get("model", DEFAULT_CF_MODEL)
cf_body = {**body, "model": model}
# -----------------------------------------
# NON STREAM
# -----------------------------------------
if not is_stream:
tried = set()
for _ in range(len(CF_ACCOUNTS)):
acc = await wait_for_free_key(exclude=tried)
if not acc:
break
tried.add(acc["account_id"])
try:
async with httpx.AsyncClient(timeout=180) as client:
r = await client.post(
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={
"Authorization": f"Bearer {acc['api_key']}",
"Content-Type": "application/json",
}
)
# FIX: cek rate limit hanya dari HTTP status/error body, bukan dari model output
if is_rate_limited_status(r.status_code) or (
r.status_code != 200 and is_rate_limited_error_body(r.text)
):
log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
await mark_fail(acc)
continue
if r.status_code != 200:
log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
await mark_fail(acc)
continue
await mark_ok(acc)
return Response(content=r.content, media_type="application/json")
except Exception as e:
log(f"Account {acc['account_id'][:8]}... exception: {e}")
await mark_fail(acc)
finally:
await release_key(acc)
return JSONResponse({"error": "All accounts failed"}, status_code=500)
# -----------------------------------------
# STREAM — pipe OpenAI SSE langsung ke client
# -----------------------------------------
async def gen():
tried = set()
for _ in range(len(CF_ACCOUNTS)):
acc = await wait_for_free_key(exclude=tried)
if not acc:
break
tried.add(acc["account_id"])
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={
"Authorization": f"Bearer {acc['api_key']}",
"Content-Type": "application/json",
}
) as r:
# FIX: hanya cek status code untuk rate limit di sini
if is_rate_limited_status(r.status_code):
log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
await mark_fail(acc)
continue
if r.status_code != 200:
log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next")
await mark_fail(acc)
continue
hit_limit = False
async for line in r.aiter_lines():
if not line:
continue
if line.strip() == "data: [DONE]":
break
raw = line[6:] if line.startswith("data: ") else line
# FIX: gunakan parse_sse_chunk, cek error hanya pada CF error object
# — jangan cek kata "rate limit" pada konten model
_, is_cf_err, err_text = parse_sse_chunk(raw)
if is_cf_err and is_rate_limited_error_body(err_text):
log(f"Account {acc['account_id'][:8]}... mid-stream CF error, switching key")
hit_limit = True
break
yield line + "\n\n"
if hit_limit:
await mark_fail(acc)
continue
yield "data: [DONE]\n\n"
await mark_ok(acc)
return
except Exception as e:
log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
await mark_fail(acc)
finally:
await release_key(acc)
yield sse({"error": "All accounts failed"})
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
# =====================================================
# /v1/messages — Anthropic-compatible endpoint
# Konversi Anthropic format → CF OpenAI-compatible
# =====================================================
@app.post("/v1/messages")
async def anthropic(req: Request):
if not auth_ok(req):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
try:
body = await req.json()
except ClientDisconnect:
return Response(status_code=499)
except Exception:
return JSONResponse({"error": "Bad JSON"}, status_code=400)
stream = body.get("stream", False)
model = body.get("model", DEFAULT_CF_MODEL)
max_tokens = body.get("max_tokens", 2048)
# Konversi Anthropic messages → OpenAI format
messages = []
if body.get("system"):
messages.append({"role": "system", "content": body["system"]})
for m in body.get("messages", []):
content = m.get("content", "")
if isinstance(content, list):
txt = "".join(x.get("text", "") for x in content if x.get("type") == "text")
content = txt
messages.append({"role": m["role"], "content": content})
cf_body = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"stream": stream,
}
# -----------------------------------------
# NON STREAM
# -----------------------------------------
if not stream:
tried = set()
for _ in range(len(CF_ACCOUNTS)):
acc = await wait_for_free_key(exclude=tried)
if not acc:
break
tried.add(acc["account_id"])
try:
async with httpx.AsyncClient(timeout=180) as client:
r = await client.post(
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={
"Authorization": f"Bearer {acc['api_key']}",
"Content-Type": "application/json",
}
)
# FIX: cek rate limit hanya dari HTTP status/error body
if is_rate_limited_status(r.status_code) or (
r.status_code != 200 and is_rate_limited_error_body(r.text)
):
log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
await mark_fail(acc)
continue
if r.status_code != 200:
log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
await mark_fail(acc)
continue
data = r.json()
content_text = data["choices"][0]["message"]["content"] or ""
usage = data.get("usage", {})
out = {
"id": "msg_" + uuid.uuid4().hex[:10],
"type": "message",
"role": "assistant",
"model": model,
"content": [{"type": "text", "text": content_text}],
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
}
}
await mark_ok(acc)
return JSONResponse(out)
except Exception as e:
log(f"Account {acc['account_id'][:8]}... exception: {e}")
await mark_fail(acc)
finally:
await release_key(acc)
return JSONResponse({"error": "All accounts failed"}, status_code=500)
# -----------------------------------------
# STREAM — CF kirim OpenAI SSE, kita konversi ke Anthropic SSE
# -----------------------------------------
async def agen():
tried = set()
msg_id = "msg_" + uuid.uuid4().hex[:10]
envelope_sent = False
for _ in range(len(CF_ACCOUNTS)):
acc = await wait_for_free_key(exclude=tried)
if not acc:
break
tried.add(acc["account_id"])
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={
"Authorization": f"Bearer {acc['api_key']}",
"Content-Type": "application/json",
}
) as r:
# FIX: hanya cek status code untuk rate limit
if is_rate_limited_status(r.status_code):
log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
await mark_fail(acc)
continue
if r.status_code != 200:
log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next")
await mark_fail(acc)
continue
# Kirim Anthropic envelope hanya sekali
if not envelope_sent:
envelope_sent = True
yield sse({
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"model": model,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
})
# FIX: tambah "text": "" sesuai spec Anthropic
yield sse({
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""}
})
hit_limit = False
async for line in r.aiter_lines():
if not line:
continue
if line.strip() == "data: [DONE]":
break
raw = line[6:] if line.startswith("data: ") else line
# =============================================
# FIX UTAMA: parse chunk dulu, baru cek error
# JANGAN cek is_rate_limited pada teks model!
# Ini penyebab response berhenti di tengah karena
# model nulis kata "rate limit" / "too many requests"
# dalam output / thinking-nya.
# =============================================
token, is_cf_err, err_text = parse_sse_chunk(raw)
if is_cf_err:
if is_rate_limited_error_body(err_text):
log(f"Account {acc['account_id'][:8]}... mid-stream CF rate limit, switching key")
hit_limit = True
else:
log(f"Account {acc['account_id'][:8]}... mid-stream CF error: {err_text[:120]}")
break
# token "" → thinking phase tanpa content, skip saja
if token:
yield sse({
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": token}
})
if hit_limit:
await mark_fail(acc)
continue
await mark_ok(acc)
break
except Exception as e:
log(f"Account {acc['account_id'][:8]}... agen exception: {e}")
await mark_fail(acc)
finally:
await release_key(acc)
# Tutup Anthropic SSE envelope
# Edge case: semua account gagal sebelum sempat kirim envelope
if not envelope_sent:
yield sse({
"type": "message_start",
"message": {
"id": msg_id, "type": "message", "role": "assistant",
"model": model, "content": [], "stop_reason": None,
"stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0}
}
})
yield sse({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
yield sse({"type": "content_block_stop", "index": 0})
yield sse({
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 0}
})
yield sse({"type": "message_stop"})
return StreamingResponse(agen(), media_type="text/event-stream")