# =============================================================== # Rendered Frame Theory — Live Prediction Console (Open Method) # Domains: Atmospheric / Seismic / Magnetic / Solar # Adds: Verifiable "Forecast Receipt" export + Receipt Upload Verification # =============================================================== import math import os import sys import json import uuid import base64 import hashlib import platform from typing import Optional, Dict, Any, List, Tuple from datetime import datetime, timezone, timedelta import gradio as gr import httpx import numpy as np import pandas as pd APP_NAME = "Rendered Frame Theory — Live Prediction Console (Open Method)" APP_VERSION = "v1.1-receipts+verify" UA = {"User-Agent": "RFTSystems/LivePredictionConsole"} # ---------- Constants -------------------------------------------------------- T_EARTH = 365.2422 * 24 * 3600.0 OMEGA_OBS = 2.0 * math.pi / T_EARTH K_TAU = 1.38 ALPHA_R = 1.02 REGION_BBOX = { "Global": None, "EMEA": (-35.0, -20.0, 70.0, 60.0), "AMER": (-60.0, -170.0, 72.0, -30.0), "APAC": (-50.0, 60.0, 60.0, 180.0), } RING_OF_FIRE_BBOXES = [ (-60.0, 120.0, 60.0, 180.0), (-60.0, -180.0, 60.0, -100.0), (10.0, -90.0, 60.0, -60.0), ] # ---------- Core Helpers ----------------------------------------------------- def utc_now() -> datetime: return datetime.now(timezone.utc) def utc_now_iso() -> str: return utc_now().isoformat().replace("+00:00", "Z") def clamp(x: float, a: float, b: float) -> float: return max(a, min(b, x)) def tau_eff_from_z(z: float) -> float: z = max(0.0, float(z)) return K_TAU * math.log(1.0 + z) def stable_log_ratio(x: float, x0: float) -> float: x = max(float(x), 1e-30) x0 = max(float(x0), 1e-30) return math.log(x / x0) def index_from_tau(tau: float) -> float: return float(OMEGA_OBS * float(tau) * ALPHA_R) def sha256_hex(b: bytes) -> str: return hashlib.sha256(b).hexdigest() def safe_json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, indent=2, sort_keys=True, default=str) def env_snapshot() -> Dict[str, Any]: return { "app_name": APP_NAME, "app_version": APP_VERSION, "python": sys.version, "platform": platform.platform(), "packages": { "gradio": getattr(gr, "__version__", "unknown"), "httpx": getattr(httpx, "__version__", "unknown"), "numpy": getattr(np, "__version__", "unknown"), "pandas": getattr(pd, "__version__", "unknown"), }, "constants": { "T_EARTH": T_EARTH, "OMEGA_OBS": OMEGA_OBS, "K_TAU": K_TAU, "ALPHA_R": ALPHA_R, }, # Optional: set as HF Space secret if you want deterministic provenance for code versioning "git_commit": os.environ.get("RFT_GIT_COMMIT", ""), } # ---------- Provenance / Fetch Logging -------------------------------------- def record_fetch( prov_list: List[Dict[str, Any]], name: str, url: str, params: Optional[Dict[str, Any]], status_code: Optional[int], content_type: Optional[str], body_bytes: Optional[bytes], include_raw_payloads: bool, fetched_at_utc: str, error: Optional[str] = None, request_url: Optional[str] = None, ) -> None: body_bytes = body_bytes or b"" item: Dict[str, Any] = { "name": name, "fetched_at_utc": fetched_at_utc, "url": url, "params": params or {}, "request_url": request_url or "", "status_code": status_code, "content_type": content_type or "", "bytes_len": int(len(body_bytes)), "sha256": sha256_hex(body_bytes) if body_bytes else "", "error": error or "", } if include_raw_payloads and body_bytes: item["raw_b64"] = base64.b64encode(body_bytes).decode("ascii") item["raw_encoding"] = "base64" prov_list.append(item) def http_get_json( name: str, url: str, params: Optional[Dict[str, Any]], prov_list: List[Dict[str, Any]], include_raw_payloads: bool, timeout: float, ) -> Any: fetched_at = utc_now_iso() try: r = httpx.get(url, params=params, headers=UA, timeout=timeout) body = r.content ct = r.headers.get("content-type", "") req_url = str(r.request.url) if r.request else "" record_fetch( prov_list=prov_list, name=name, url=url, params=params, status_code=r.status_code, content_type=ct, body_bytes=body, include_raw_payloads=include_raw_payloads, fetched_at_utc=fetched_at, error=None, request_url=req_url, ) r.raise_for_status() return r.json() except Exception as e: record_fetch( prov_list=prov_list, name=name, url=url, params=params, status_code=None, content_type=None, body_bytes=None, include_raw_payloads=include_raw_payloads, fetched_at_utc=fetched_at, error=str(e), request_url=None, ) raise # ---------- Data Adapters ---------------------------------------------------- def geocode_location(q: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool): q = (q or "").strip() if not q: return None, None, "Empty location" url = "https://geocoding-api.open-meteo.com/v1/search" params = {"name": q, "count": 1, "language": "en", "format": "json"} js = http_get_json("GEOCODE_OPENMETEO", url, params, prov_list, include_raw_payloads, timeout=12) results = js.get("results") or [] if not results: return None, None, f"Could not geocode '{q}'" top = results[0] lat = float(top["latitude"]) lon = float(top["longitude"]) display = f"{top.get('name','')}, {top.get('country_code','')}".strip().strip(",") return lat, lon, display def fetch_openmeteo_hourly(lat: float, lon: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool, past_days: int = 1): url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "hourly": "temperature_2m,relative_humidity_2m,pressure_msl,wind_speed_10m", "past_days": past_days, "forecast_days": 1, "timezone": "UTC", } js = http_get_json("OPENMETEO_HOURLY", url, params, prov_list, include_raw_payloads, timeout=18) hourly = js.get("hourly") or {} return { "time": hourly.get("time") or [], "temp": hourly.get("temperature_2m") or [], "rh": hourly.get("relative_humidity_2m") or [], "p": hourly.get("pressure_msl") or [], "wind": hourly.get("wind_speed_10m") or [], "meta": {"source": "Open-Meteo", "url": url, "params": params}, } def fetch_kp_last_24h(prov_list: List[Dict[str, Any]], include_raw_payloads: bool): url = "https://services.swpc.noaa.gov/json/planetary_k_index_1m.json" js = http_get_json("NOAA_SWPC_KP_1M", url, None, prov_list, include_raw_payloads, timeout=15) if not isinstance(js, list) or not js: return [] vals = [] for row in js: kp = row.get("kp_index") if kp is None: continue try: vals.append(float(kp)) except Exception: pass return vals[-1440:] def fetch_goes_xray_1day(prov_list: List[Dict[str, Any]], include_raw_payloads: bool): url = "https://services.swpc.noaa.gov/json/goes/primary/xrays-1-day.json" js = http_get_json("NOAA_SWPC_GOES_XRAY_1D", url, None, prov_list, include_raw_payloads, timeout=15) if not isinstance(js, list) or not js: return [] out = [] for row in js: f = row.get("flux") if f is None: continue try: out.append(float(f)) except Exception: pass return out def fetch_usgs_quakes( hours: int, minmag: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool, bbox: Optional[Tuple[float, float, float, float]] = None, center: Optional[Tuple[float, float]] = None, radius_km: Optional[float] = None, ) -> Dict[str, Any]: url = "https://earthquake.usgs.gov/fdsnws/event/1/query" end = utc_now() start = end - timedelta(hours=int(hours)) start_iso = start.isoformat().replace("+00:00", "Z") end_iso = end.isoformat().replace("+00:00", "Z") params: Dict[str, Any] = { "format": "geojson", "starttime": start_iso, "endtime": end_iso, "minmagnitude": str(float(minmag)), "orderby": "time", } if bbox is not None: minlat, minlon, maxlat, maxlon = bbox params.update( { "minlatitude": str(minlat), "minlongitude": str(minlon), "maxlatitude": str(maxlat), "maxlongitude": str(maxlon), } ) if center is not None and radius_km is not None: lat, lon = center params.update( { "latitude": str(float(lat)), "longitude": str(float(lon)), "maxradiuskm": str(float(radius_km)), } ) js = http_get_json("USGS_FDSN_EVENTS", url, params, prov_list, include_raw_payloads, timeout=22) feats = js.get("features") if isinstance(js, dict) else None if not feats: return {"events": [], "start": start_iso, "end": end_iso, "url": url, "params": params} out = [] for f in feats: props = f.get("properties") or {} out.append( { "id": f.get("id"), "mag": props.get("mag"), "place": props.get("place"), "time": props.get("time"), } ) return {"events": out, "start": start_iso, "end": end_iso, "url": url, "params": params} # ---------- Verification Links (user-facing) -------------------------------- def build_verification_links( lat: float, lon: float, seismic_mode: str, seismic_region: str, radius_km: float, usgs_meta: Optional[Dict[str, Any]], ) -> str: swpc_kp_page = "https://www.swpc.noaa.gov/products/planetary-k-index" swpc_kp_json = "https://services.swpc.noaa.gov/json/planetary_k_index_1m.json" goes_plot_page = "https://www.swpc.noaa.gov/products/goes-x-ray-flux" goes_xray_json = "https://services.swpc.noaa.gov/json/goes/primary/xrays-1-day.json" open_meteo_link = ( "https://api.open-meteo.com/v1/forecast" f"?latitude={lat:.5f}&longitude={lon:.5f}" "&hourly=temperature_2m,pressure_msl,wind_speed_10m&past_days=1&forecast_days=1&timezone=UTC" ) usgs_map = "https://earthquake.usgs.gov/earthquakes/map/" scope = "Unknown" usgs_query = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson" def build_q(meta: Dict[str, Any]) -> str: base = meta.get("url", "https://earthquake.usgs.gov/fdsnws/event/1/query") params = meta.get("params", {}) pairs = [f"{k}={v}" for k, v in params.items()] return base + "?" + "&".join(pairs) if usgs_meta: # RingOfFire can be multi-request if "requests" in usgs_meta and isinstance(usgs_meta["requests"], list) and usgs_meta["requests"]: usgs_query = build_q(usgs_meta["requests"][0]) scope = f"Multi-request (RingOfFire): showing first of {len(usgs_meta['requests'])}" else: usgs_query = build_q(usgs_meta) if seismic_mode == "Local radius": scope = f"Local radius query ({int(radius_km)} km around your location)" else: scope = f"Region mode ({seismic_region})" return ( "### Verify instantly (official sources)\n" f"- **Magnetic (Kp):** {swpc_kp_page} \n" f" Live JSON: {swpc_kp_json}\n" f"- **Solar (GOES X-ray):** {goes_plot_page} \n" f" Live JSON: {goes_xray_json}\n" f"- **Atmospheric (Open-Meteo API for this location):** {open_meteo_link}\n" f"- **Seismic (USGS map):** {usgs_map} \n" f" **USGS query used:** {usgs_query} \n" f" Scope: {scope}\n" ) # ---------- Agents ----------------------------------------------------------- def magnetic_agent(prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]: kp = fetch_kp_last_24h(prov_list, include_raw_payloads) if len(kp) < 30: return {"enabled": False, "reason": "NOAA Kp feed too short"} last = float(kp[-1]) tail = kp[-360:] if len(kp) >= 360 else kp drift = float(np.std(tail)) if len(tail) >= 10 else 0.0 slope = float((tail[-1] - tail[0]) / max(1, len(tail) - 1)) z = clamp((last / 9.0) + (drift / 2.0) + 2.0 * abs(slope), 0.0, 3.0) tau = tau_eff_from_z(z) idx = index_from_tau(tau) if last >= 7.0 or z >= 2.0: pred = "warning" rule = "Kp>=7 OR z>=2.0" elif last >= 5.0 or z >= 1.2: pred = "watch" rule = "Kp>=5 OR z>=1.2" elif last >= 4.0 or z >= 0.8: pred = "monitor" rule = "Kp>=4 OR z>=0.8" else: pred = "hold" rule = "else" live = f"Global Kp={last:.1f} | drift={drift:.2f} | slope={slope:.4f}" return { "enabled": True, "domain": "Magnetic", "prediction": pred, "rule_fired": rule, "z": float(z), "tau_eff": float(tau), "omega_obs": float(OMEGA_OBS), "alpha_r": float(ALPHA_R), "index": float(idx), "live_status": live, "truth_source": "NOAA SWPC planetary_k_index_1m (global)", "inputs_used": {"kp_last": last, "kp_drift": drift, "kp_slope": slope, "tail_len": len(tail)}, "location_effect": "Location does not change Magnetic. Kp is global.", "do": "Use to track global geomagnetic regime shifts.", "dont": "Do not treat as a city magnetometer.", } def solar_agent(prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]: flux = fetch_goes_xray_1day(prov_list, include_raw_payloads) if len(flux) < 50: return {"enabled": False, "reason": "GOES X-ray feed too short"} tail = flux[-120:] if len(flux) >= 120 else flux[-60:] f_mean = float(np.mean(tail)) f_peak = float(np.max(tail)) lr = stable_log_ratio(f_mean, 1e-8) z = clamp(lr / 10.0, 0.0, 3.0) tau = tau_eff_from_z(z) idx = index_from_tau(tau) if f_peak >= 1e-4 or z >= 2.2: pred = "flare likely" rule = "peak>=1e-4 OR z>=2.2" elif f_peak >= 1e-5 or z >= 1.5: pred = "flare watch" rule = "peak>=1e-5 OR z>=1.5" elif f_mean >= 1e-6 or z >= 0.9: pred = "monitor" rule = "mean>=1e-6 OR z>=0.9" else: pred = "hold" rule = "else" live = f"Global GOES mean={f_mean:.2e} | peak={f_peak:.2e}" return { "enabled": True, "domain": "Solar", "prediction": pred, "rule_fired": rule, "z": float(z), "tau_eff": float(tau), "omega_obs": float(OMEGA_OBS), "alpha_r": float(ALPHA_R), "index": float(idx), "live_status": live, "truth_source": "NOAA SWPC GOES xrays-1-day (global)", "inputs_used": {"flux_mean": f_mean, "flux_peak": f_peak, "tail_len": len(tail)}, "location_effect": "Location does not change Solar. GOES flux is global.", "do": "Use to track global solar radiative regime shifts.", "dont": "Do not treat as flare timing or CME arrival prediction.", } def atmospheric_agent(lat: float, lon: float, display: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]: wx = fetch_openmeteo_hourly(lat, lon, prov_list, include_raw_payloads, past_days=1) temp = wx["temp"] p = wx["p"] wind = wx["wind"] if len(temp) < 13: return {"enabled": False, "reason": "Open-Meteo hourly series too short"} t12 = [float(x) for x in temp[-13:]] dT = float(max(t12) - min(t12)) dp = None if len(p) >= 13: p12 = [float(x) for x in p[-13:]] dp = float(p12[-1] - p12[0]) w_mean = None if len(wind) >= 13: w12 = [float(x) for x in wind[-13:]] w_mean = float(np.mean(w12)) z_dt = clamp(dT / 10.0, 0.0, 2.0) z_dp = clamp((abs(dp) / 12.0) if dp is not None else 0.0, 0.0, 1.5) z = clamp(z_dt + z_dp, 0.0, 3.0) tau = tau_eff_from_z(z) idx = index_from_tau(tau) if dT >= 10.0 or (dp is not None and dp <= -10.0): pred = "storm risk" rule = "ΔT>=10 OR ΔP<=-10" elif dT >= 7.0 or (dp is not None and dp <= -6.0): pred = "swing" rule = "ΔT>=7 OR ΔP<=-6" elif dT >= 4.0: pred = "mild swing" rule = "ΔT>=4" else: pred = "stable" rule = "else" parts = [f"{display} ΔT(12h)={dT:.1f}°C"] if dp is not None: parts.append(f"ΔP(12h)={dp:.1f} hPa") if w_mean is not None: parts.append(f"wind≈{w_mean:.1f} m/s") live = " | ".join(parts) return { "enabled": True, "domain": "Atmospheric", "prediction": pred, "rule_fired": rule, "z": float(z), "tau_eff": float(tau), "omega_obs": float(OMEGA_OBS), "alpha_r": float(ALPHA_R), "index": float(idx), "live_status": live, "truth_source": "Open-Meteo hourly (location-based)", "inputs_used": {"dT_12h": dT, "dP_12h": dp, "wind_mean": w_mean, "lat": lat, "lon": lon}, "location_effect": "Location changes Atmospheric.", "do": "Use as a short-term stability detector from ΔT and ΔP.", "dont": "Do not treat as precipitation probability or full NWP forecast.", } def seismic_agent_region(region: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool): if region == "RingOfFire": seen = set() eqs = [] metas = [] for bb in RING_OF_FIRE_BBOXES: res = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bb, prov_list=prov_list, include_raw_payloads=include_raw_payloads) metas.append({"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]}) for e in res["events"]: eid = e.get("id") if eid and eid not in seen: seen.add(eid) eqs.append(e) meta = {"mode": "RingOfFireMultiBBox", "requests": metas} else: bbox = REGION_BBOX.get(region, None) res = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bbox, prov_list=prov_list, include_raw_payloads=include_raw_payloads) eqs = res["events"] meta = {"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]} return eqs, f"Region={region}", meta def seismic_agent_local(lat: float, lon: float, radius_km: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool): res = fetch_usgs_quakes(hours=24, minmag=2.5, center=(lat, lon), radius_km=radius_km, prov_list=prov_list, include_raw_payloads=include_raw_payloads) return res["events"], f"Local radius={int(radius_km)}km", {"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]} def seismic_score(eqs: List[Dict[str, Any]]): N = int(len(eqs)) mags = [] for e in eqs: m = e.get("mag") if m is None: continue try: mags.append(float(m)) except Exception: pass Mmax = float(max(mags)) if mags else 0.0 z_count = clamp(N / 60.0, 0.0, 1.5) z_mag = clamp(max(0.0, Mmax - 4.0) / 2.5, 0.0, 1.5) z = clamp(z_count + z_mag, 0.0, 3.0) tau = tau_eff_from_z(z) idx = index_from_tau(tau) if Mmax >= 6.5 or z >= 2.2: pred = "alert" rule = "Mmax>=6.5 OR z>=2.2" elif Mmax >= 5.5 or z >= 1.5: pred = "watch" rule = "Mmax>=5.5 OR z>=1.5" elif N >= 25 or z >= 1.0: pred = "monitor" rule = "N>=25 OR z>=1.0" else: pred = "quiet" rule = "else" return pred, rule, z, tau, idx, N, Mmax def seismic_agent(mode: str, region: str, lat: float, lon: float, radius_km: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]: if mode == "Local radius": eqs, scope, usgs_meta = seismic_agent_local(lat, lon, radius_km, prov_list, include_raw_payloads) location_effect = "Location changes Seismic in Local radius mode." do = "Use to monitor seismic activity within the selected radius around your typed location." dont = "Do not treat as time/epicenter prediction." truth_scope = f"USGS events within {int(radius_km)} km" else: eqs, scope, usgs_meta = seismic_agent_region(region, prov_list, include_raw_payloads) location_effect = "Location does not change Seismic in Region mode. Region selector does." do = "Use as a regional seismic stress monitor." dont = "Do not treat as time/epicenter prediction." truth_scope = f"USGS events filtered by region={region}" pred, rule, z, tau, idx, N, Mmax = seismic_score(eqs) live = f"{scope} | quakes(24h,M≥2.5)={N} | max M{Mmax:.1f}" return { "enabled": True, "domain": "Seismic", "prediction": pred, "rule_fired": rule, "z": float(z), "tau_eff": float(tau), "omega_obs": float(OMEGA_OBS), "alpha_r": float(ALPHA_R), "index": float(idx), "live_status": live, "truth_source": f"USGS FDSN event feed ({truth_scope})", "inputs_used": { "count_24h": N, "max_mag_24h": Mmax, "mode": mode, "region": region, "radius_km": float(radius_km), "lat": float(lat), "lon": float(lon), }, "location_effect": location_effect, "do": do, "dont": dont, "what_it_is_not": "Not an earthquake time predictor. Not a rupture location predictor.", "why": "z_seis compresses activity density and severity into a bounded stress coordinate; τ_eff rises as ln(1+z).", "how": "Fetch USGS → count + max magnitude → z_seis → τ_eff → Index → label via fixed thresholds.", "usgs_meta": usgs_meta, } # ---------- Receipt Build/Save ---------------------------------------------- def build_receipt( run_id: str, run_started_utc: str, run_finished_utc: str, location_text: str, lat: float, lon: float, display: str, seismic_mode: str, seismic_region: str, radius_km: float, df: pd.DataFrame, atm: Dict[str, Any], sei: Dict[str, Any], mag: Dict[str, Any], sol: Dict[str, Any], prov_list: List[Dict[str, Any]], include_raw_payloads: bool, ) -> Dict[str, Any]: return { "receipt_version": 1, "run_id": run_id, "run_started_utc": run_started_utc, "run_finished_utc": run_finished_utc, "settings": { "location_text": location_text, "geocode_result": {"display": display, "lat": lat, "lon": lon}, "seismic_mode": seismic_mode, "seismic_region": seismic_region, "radius_km": float(radius_km), "include_raw_payloads": bool(include_raw_payloads), }, "outputs": { "table_rows": df.to_dict(orient="records"), "agents": { "atmospheric": atm, "seismic": sei, "magnetic": mag, "solar": sol, }, }, "provenance": {"fetches": prov_list}, "environment": env_snapshot(), "verification_note": ( "Receipt is tamper-evident via sha256 for each upstream payload. " "If raw payloads are embedded (raw_b64), integrity + offline verification is strong. " "If not embedded, you can still compare provider payloads later, but providers may revise feeds." ), } def write_receipt_to_file(receipt: Dict[str, Any]) -> str: run_id = receipt.get("run_id", "run") path = f"/tmp/rft_forecast_receipt_{run_id}.json" with open(path, "w", encoding="utf-8") as f: f.write(safe_json_dumps(receipt)) return path # ---------- Forecast Runner -------------------------------------------------- def run_forecast(location_text: str, seismic_mode: str, seismic_region: str, radius_km: float, include_raw_payloads: bool): run_started = utc_now_iso() run_id = uuid.uuid4().hex[:12] prov: List[Dict[str, Any]] = [] # Geocode try: lat, lon, display = geocode_location(location_text, prov, include_raw_payloads) except Exception as e: df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": f"Geocode error: {e}"}]) empty = {"enabled": False, "reason": f"Geocode error: {e}"} receipt = build_receipt(run_id, run_started, utc_now_iso(), location_text, float("nan"), float("nan"), "", seismic_mode, seismic_region, radius_km, df, empty, empty, empty, empty, prov, include_raw_payloads) receipt_path = write_receipt_to_file(receipt) return f"❌ Geocode error: {e}", df, "", empty, empty, empty, empty, receipt, receipt_path if lat is None: df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": display}]) empty = {"enabled": False, "reason": display} receipt = build_receipt(run_id, run_started, utc_now_iso(), location_text, float("nan"), float("nan"), display, seismic_mode, seismic_region, radius_km, df, empty, empty, empty, empty, prov, include_raw_payloads) receipt_path = write_receipt_to_file(receipt) return f"❌ {display}", df, "", empty, empty, empty, empty, receipt, receipt_path # Agents try: atm = atmospheric_agent(lat, lon, display, prov, include_raw_payloads) except Exception as e: atm = {"enabled": False, "reason": f"atmos error: {e}"} try: sei = seismic_agent(seismic_mode, seismic_region, lat, lon, radius_km, prov, include_raw_payloads) except Exception as e: sei = {"enabled": False, "reason": f"seismic error: {e}"} try: mag = magnetic_agent(prov, include_raw_payloads) except Exception as e: mag = {"enabled": False, "reason": f"magnetic error: {e}"} try: sol = solar_agent(prov, include_raw_payloads) except Exception as e: sol = {"enabled": False, "reason": f"solar error: {e}"} def fmt_row(domain: str, out: Dict[str, Any]): if not out.get("enabled"): return {"Domain": domain, "RFT Prediction": "DISABLED", "Live Status": out.get("reason", "missing inputs")} idx = out.get("index", None) z = out.get("z", None) tau = out.get("tau_eff", None) idx_s = f"{float(idx):.3e}" if isinstance(idx, (int, float)) else "n/a" z_s = f"{float(z):.2f}" if isinstance(z, (int, float)) else "n/a" t_s = f"{float(tau):.3f}" if isinstance(tau, (int, float)) else "n/a" return { "Domain": domain, "RFT Prediction": f"{out.get('prediction','hold')} | idx={idx_s} | z={z_s} | τ={t_s}", "Live Status": out.get("live_status", ""), } df = pd.DataFrame( [ fmt_row("Atmospheric", atm), fmt_row("Seismic", sei), fmt_row("Magnetic", mag), fmt_row("Solar", sol), ] ) run_finished = utc_now_iso() header = f"**Location:** {display} (lat {lat:.3f}, lon {lon:.3f}) | **UTC:** {run_finished} | **Run ID:** `{run_id}`" usgs_meta = None if isinstance(sei, dict): usgs_meta = sei.get("usgs_meta", None) verify_md = build_verification_links(lat, lon, seismic_mode, seismic_region, radius_km, usgs_meta) receipt = build_receipt( run_id=run_id, run_started_utc=run_started, run_finished_utc=run_finished, location_text=location_text, lat=lat, lon=lon, display=display, seismic_mode=seismic_mode, seismic_region=seismic_region, radius_km=radius_km, df=df, atm=atm, sei=sei, mag=mag, sol=sol, prov_list=prov, include_raw_payloads=include_raw_payloads, ) receipt_path = write_receipt_to_file(receipt) return header, df, verify_md, atm, sei, mag, sol, receipt, receipt_path # ---------- Receipt Verification -------------------------------------------- def _safe_float(x): try: if x is None: return None return float(x) except Exception: return None def _close(a, b, tol=1e-9): if a is None or b is None: return False return abs(float(a) - float(b)) <= tol * max(1.0, abs(float(a)), abs(float(b))) def _verify_payload_hashes(fetches: List[Dict[str, Any]]): rows = [] ok_all = True for f in (fetches or []): name = f.get("name", "") h = f.get("sha256", "") raw_b64 = f.get("raw_b64", None) if not raw_b64: rows.append({"Check": f"payload:{name}", "Status": "SKIP", "Detail": "No raw_b64 embedded"}) continue try: raw = base64.b64decode(raw_b64.encode("ascii")) h2 = sha256_hex(raw) ok = (h2 == h) ok_all = ok_all and ok rows.append({"Check": f"payload:{name}", "Status": "PASS" if ok else "FAIL", "Detail": f"sha256(receipt)={h} sha256(decoded)={h2}"}) except Exception as e: ok_all = False rows.append({"Check": f"payload:{name}", "Status": "FAIL", "Detail": f"Decode/hash error: {e}"}) return ok_all, rows def _recompute_domain(domain: str, agent: Dict[str, Any]): if not agent or not agent.get("enabled"): return {"enabled": False} iu = agent.get("inputs_used") or {} dom = (domain or "").strip().lower() if dom == "atmospheric": dT = _safe_float(iu.get("dT_12h")) dp = _safe_float(iu.get("dP_12h")) if dT is None: return {"error": "Missing inputs_used.dT_12h"} z_dt = clamp(dT / 10.0, 0.0, 2.0) z_dp = clamp((abs(dp) / 12.0) if dp is not None else 0.0, 0.0, 1.5) z = clamp(z_dt + z_dp, 0.0, 3.0) if dT >= 10.0 or (dp is not None and dp <= -10.0): pred = "storm risk" elif dT >= 7.0 or (dp is not None and dp <= -6.0): pred = "swing" elif dT >= 4.0: pred = "mild swing" else: pred = "stable" elif dom == "seismic": N = _safe_float(iu.get("count_24h")) Mmax = _safe_float(iu.get("max_mag_24h")) if N is None or Mmax is None: return {"error": "Missing inputs_used.count_24h or inputs_used.max_mag_24h"} z_count = clamp(N / 60.0, 0.0, 1.5) z_mag = clamp(max(0.0, Mmax - 4.0) / 2.5, 0.0, 1.5) z = clamp(z_count + z_mag, 0.0, 3.0) if Mmax >= 6.5 or z >= 2.2: pred = "alert" elif Mmax >= 5.5 or z >= 1.5: pred = "watch" elif N >= 25 or z >= 1.0: pred = "monitor" else: pred = "quiet" elif dom == "magnetic": last = _safe_float(iu.get("kp_last")) drift = _safe_float(iu.get("kp_drift")) slope = _safe_float(iu.get("kp_slope")) if last is None or drift is None or slope is None: return {"error": "Missing inputs_used.kp_last/kp_drift/kp_slope"} z = clamp((last / 9.0) + (drift / 2.0) + 2.0 * abs(slope), 0.0, 3.0) if last >= 7.0 or z >= 2.0: pred = "warning" elif last >= 5.0 or z >= 1.2: pred = "watch" elif last >= 4.0 or z >= 0.8: pred = "monitor" else: pred = "hold" elif dom == "solar": f_mean = _safe_float(iu.get("flux_mean")) f_peak = _safe_float(iu.get("flux_peak")) if f_mean is None or f_peak is None: return {"error": "Missing inputs_used.flux_mean/flux_peak"} lr = stable_log_ratio(f_mean, 1e-8) z = clamp(lr / 10.0, 0.0, 3.0) if f_peak >= 1e-4 or z >= 2.2: pred = "flare likely" elif f_peak >= 1e-5 or z >= 1.5: pred = "flare watch" elif f_mean >= 1e-6 or z >= 0.9: pred = "monitor" else: pred = "hold" else: return {"error": f"Unknown domain '{domain}'"} tau = tau_eff_from_z(z) idx = index_from_tau(tau) return {"z": z, "tau_eff": tau, "index": idx, "prediction": pred} def verify_receipt(uploaded_file): if uploaded_file is None: return "❌ Upload a receipt JSON first.", pd.DataFrame([]) try: # Gradio on HF often passes a NamedString / filepath path = str(uploaded_file) with open(path, "rb") as f: content = f.read() receipt = json.loads(content.decode("utf-8")) except Exception as e: return f"❌ Could not read JSON: {e}", pd.DataFrame([]) checks: List[Dict[str, str]] = [] ok = True for key in ["run_id", "settings", "outputs", "provenance", "environment"]: if key not in receipt: ok = False checks.append({"Check": f"has:{key}", "Status": "FAIL", "Detail": "Missing key"}) else: checks.append({"Check": f"has:{key}", "Status": "PASS", "Detail": ""}) fetches = (receipt.get("provenance") or {}).get("fetches") or [] ok_payloads, rows = _verify_payload_hashes(fetches) checks.extend(rows) ok = ok and ok_payloads agents = ((receipt.get("outputs") or {}).get("agents") or {}) mapping = { "Atmospheric": agents.get("atmospheric"), "Seismic": agents.get("seismic"), "Magnetic": agents.get("magnetic"), "Solar": agents.get("solar"), } for dom, agent in mapping.items(): if not agent or not agent.get("enabled"): checks.append({"Check": f"recompute:{dom}", "Status": "SKIP", "Detail": "Agent disabled"}) continue rec = _recompute_domain(dom, agent) if rec.get("error"): ok = False checks.append({"Check": f"recompute:{dom}", "Status": "FAIL", "Detail": rec["error"]}) continue z_ok = _close(rec["z"], agent.get("z"), tol=1e-6) t_ok = _close(rec["tau_eff"], agent.get("tau_eff"), tol=1e-6) i_ok = _close(rec["index"], agent.get("index"), tol=1e-6) p_ok = (str(rec["prediction"]).strip().lower() == str(agent.get("prediction")).strip().lower()) ok = ok and z_ok and t_ok and i_ok and p_ok checks.append({"Check": f"{dom}:z", "Status": "PASS" if z_ok else "FAIL", "Detail": f"expected={agent.get('z')} recomputed={rec['z']}"}) checks.append({"Check": f"{dom}:tau", "Status": "PASS" if t_ok else "FAIL", "Detail": f"expected={agent.get('tau_eff')} recomputed={rec['tau_eff']}"}) checks.append({"Check": f"{dom}:index", "Status": "PASS" if i_ok else "FAIL", "Detail": f"expected={agent.get('index')} recomputed={rec['index']}"}) checks.append({"Check": f"{dom}:label", "Status": "PASS" if p_ok else "FAIL", "Detail": f"expected={agent.get('prediction')} recomputed={rec['prediction']}"}) status = "✅ Receipt verification PASS" if ok else "⚠️ Receipt verification FAIL (see checks)" return status, pd.DataFrame(checks) # ---------- Markdown Tabs ---------------------------------------------------- INSTRUCTIONS_MD = """ ## Use and interpretation **Location input** - Used for Atmospheric. - Used for Seismic only if Seismic Mode is set to Local radius. - Not used for Solar or Magnetic (global signals). **Seismic Mode** - Region mode: counts quakes in large region (EMEA/AMER/APAC/RingOfFire/Global). - Local radius mode: counts quakes within a radius (km) around your typed location. **Run Forecast** - Pulls live data and recomputes from scratch. - No auto-refresh. No memory. No smoothing. - No guessing when data is missing (DISABLED instead). **Forecast Receipts** - Each run generates a downloadable receipt JSON that includes: - source URLs + params + timestamps - sha256 hashes of upstream payloads - computed intermediates + label rule fired - environment snapshot (versions + constants) - Optional: embed raw upstream payloads for stronger offline verification. """ METHOD_MD = f""" ## Open method equations Shared core: - τ_eff = {K_TAU} · ln(1 + z) - Ω_obs = 2π / T_earth = {OMEGA_OBS:.6e} - α_R = {ALPHA_R} - Index = Ω_obs · τ_eff · α_R z definitions: - Atmospheric: z_atm = clamp( clamp(ΔT/10,0..2) + clamp(|ΔP|/12,0..1.5), 0..3 ) - Seismic: z_seis = clamp( clamp(N/60,0..1.5) + clamp(max(0,Mmax-4)/2.5,0..1.5), 0..3 ) - Magnetic: z_mag = clamp( (Kp_last/9) + (drift/2) + 2·|slope|, 0..3 ) - Solar: z_solar= clamp( ln(F_mean/1e-8)/10, 0..3 ) Decision thresholds are shown per-domain in the agent output as `rule_fired`. """ # ---------- UI --------------------------------------------------------------- with gr.Blocks(title=APP_NAME) as demo: gr.Markdown(f"# {APP_NAME}") gr.Markdown(f"**Build:** `{APP_VERSION}`") with gr.Tab("Live Forecast"): loc = gr.Textbox(label="Location", value="London") seismic_mode = gr.Radio( choices=["Region", "Local radius"], value="Local radius", label="Seismic Mode" ) with gr.Row(): region = gr.Dropdown(["Global", "EMEA", "AMER", "APAC", "RingOfFire"], value="EMEA", label="Seismic Region (used in Region mode)") radius = gr.Slider(50, 2000, value=500, step=50, label="Seismic Radius km (used in Local radius mode)") include_raw = gr.Checkbox( value=False, label="Receipt durability: embed raw upstream payloads (larger download, stronger verification)" ) btn = gr.Button("Run Forecast", variant="primary") header_md = gr.Markdown() table = gr.Dataframe(headers=["Domain", "RFT Prediction", "Live Status"], interactive=False) verify_md = gr.Markdown() with gr.Accordion("Atmospheric details", open=False): atm_json = gr.JSON(label="Atmospheric agent output") with gr.Accordion("Seismic details", open=False): sei_json = gr.JSON(label="Seismic agent output") with gr.Accordion("Magnetic details", open=False): mag_json = gr.JSON(label="Magnetic agent output") with gr.Accordion("Solar details", open=False): sol_json = gr.JSON(label="Solar agent output") with gr.Accordion("Forecast Receipt (verifiable history)", open=True): gr.Markdown( "- Download the receipt to freeze this run.\n" "- If you enabled raw payloads, payload-hash verification is offline.\n" "- If not enabled, you still get URLs/params/timestamps + sha256 for audit trails." ) receipt_json = gr.JSON(label="Receipt JSON") receipt_file = gr.File(label="Download receipt (.json)") btn.click( run_forecast, inputs=[loc, seismic_mode, region, radius, include_raw], outputs=[header_md, table, verify_md, atm_json, sei_json, mag_json, sol_json, receipt_json, receipt_file], ) with gr.Tab("Verify Receipt"): gr.Markdown( "Upload a previously downloaded Forecast Receipt JSON to verify:\n\n" "- Structural integrity\n" "- Embedded payload hash checks (if raw payloads were included)\n" "- Recomputed z / τ_eff / index and label against stored intermediates\n" ) up = gr.File(label="Upload receipt (.json)", file_types=[".json"]) vbtn = gr.Button("Verify", variant="primary") vstatus = gr.Markdown() vtable = gr.Dataframe(headers=["Check", "Status", "Detail"], interactive=False) vbtn.click(verify_receipt, inputs=[up], outputs=[vstatus, vtable]) with gr.Tab("Method (Open)"): gr.Markdown(INSTRUCTIONS_MD) gr.Markdown(METHOD_MD) if __name__ == "__main__": demo.launch()