RFTs_Forecasts / app.py
RFTSystems's picture
Update app.py
28358f3 verified
# ===============================================================
# Rendered Frame Theory — Live Prediction Console (Open Method)
# Domains: Atmospheric / Seismic / Magnetic / Solar
# Adds: Verifiable "Forecast Receipt" export + Receipt Upload Verification
# ===============================================================
import math
import os
import sys
import json
import uuid
import base64
import hashlib
import platform
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime, timezone, timedelta
import gradio as gr
import httpx
import numpy as np
import pandas as pd
APP_NAME = "Rendered Frame Theory — Live Prediction Console (Open Method)"
APP_VERSION = "v1.1-receipts+verify"
UA = {"User-Agent": "RFTSystems/LivePredictionConsole"}
# ---------- Constants --------------------------------------------------------
T_EARTH = 365.2422 * 24 * 3600.0
OMEGA_OBS = 2.0 * math.pi / T_EARTH
K_TAU = 1.38
ALPHA_R = 1.02
REGION_BBOX = {
"Global": None,
"EMEA": (-35.0, -20.0, 70.0, 60.0),
"AMER": (-60.0, -170.0, 72.0, -30.0),
"APAC": (-50.0, 60.0, 60.0, 180.0),
}
RING_OF_FIRE_BBOXES = [
(-60.0, 120.0, 60.0, 180.0),
(-60.0, -180.0, 60.0, -100.0),
(10.0, -90.0, 60.0, -60.0),
]
# ---------- Core Helpers -----------------------------------------------------
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def utc_now_iso() -> str:
return utc_now().isoformat().replace("+00:00", "Z")
def clamp(x: float, a: float, b: float) -> float:
return max(a, min(b, x))
def tau_eff_from_z(z: float) -> float:
z = max(0.0, float(z))
return K_TAU * math.log(1.0 + z)
def stable_log_ratio(x: float, x0: float) -> float:
x = max(float(x), 1e-30)
x0 = max(float(x0), 1e-30)
return math.log(x / x0)
def index_from_tau(tau: float) -> float:
return float(OMEGA_OBS * float(tau) * ALPHA_R)
def sha256_hex(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
def safe_json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, indent=2, sort_keys=True, default=str)
def env_snapshot() -> Dict[str, Any]:
return {
"app_name": APP_NAME,
"app_version": APP_VERSION,
"python": sys.version,
"platform": platform.platform(),
"packages": {
"gradio": getattr(gr, "__version__", "unknown"),
"httpx": getattr(httpx, "__version__", "unknown"),
"numpy": getattr(np, "__version__", "unknown"),
"pandas": getattr(pd, "__version__", "unknown"),
},
"constants": {
"T_EARTH": T_EARTH,
"OMEGA_OBS": OMEGA_OBS,
"K_TAU": K_TAU,
"ALPHA_R": ALPHA_R,
},
# Optional: set as HF Space secret if you want deterministic provenance for code versioning
"git_commit": os.environ.get("RFT_GIT_COMMIT", ""),
}
# ---------- Provenance / Fetch Logging --------------------------------------
def record_fetch(
prov_list: List[Dict[str, Any]],
name: str,
url: str,
params: Optional[Dict[str, Any]],
status_code: Optional[int],
content_type: Optional[str],
body_bytes: Optional[bytes],
include_raw_payloads: bool,
fetched_at_utc: str,
error: Optional[str] = None,
request_url: Optional[str] = None,
) -> None:
body_bytes = body_bytes or b""
item: Dict[str, Any] = {
"name": name,
"fetched_at_utc": fetched_at_utc,
"url": url,
"params": params or {},
"request_url": request_url or "",
"status_code": status_code,
"content_type": content_type or "",
"bytes_len": int(len(body_bytes)),
"sha256": sha256_hex(body_bytes) if body_bytes else "",
"error": error or "",
}
if include_raw_payloads and body_bytes:
item["raw_b64"] = base64.b64encode(body_bytes).decode("ascii")
item["raw_encoding"] = "base64"
prov_list.append(item)
def http_get_json(
name: str,
url: str,
params: Optional[Dict[str, Any]],
prov_list: List[Dict[str, Any]],
include_raw_payloads: bool,
timeout: float,
) -> Any:
fetched_at = utc_now_iso()
try:
r = httpx.get(url, params=params, headers=UA, timeout=timeout)
body = r.content
ct = r.headers.get("content-type", "")
req_url = str(r.request.url) if r.request else ""
record_fetch(
prov_list=prov_list,
name=name,
url=url,
params=params,
status_code=r.status_code,
content_type=ct,
body_bytes=body,
include_raw_payloads=include_raw_payloads,
fetched_at_utc=fetched_at,
error=None,
request_url=req_url,
)
r.raise_for_status()
return r.json()
except Exception as e:
record_fetch(
prov_list=prov_list,
name=name,
url=url,
params=params,
status_code=None,
content_type=None,
body_bytes=None,
include_raw_payloads=include_raw_payloads,
fetched_at_utc=fetched_at,
error=str(e),
request_url=None,
)
raise
# ---------- Data Adapters ----------------------------------------------------
def geocode_location(q: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool):
q = (q or "").strip()
if not q:
return None, None, "Empty location"
url = "https://geocoding-api.open-meteo.com/v1/search"
params = {"name": q, "count": 1, "language": "en", "format": "json"}
js = http_get_json("GEOCODE_OPENMETEO", url, params, prov_list, include_raw_payloads, timeout=12)
results = js.get("results") or []
if not results:
return None, None, f"Could not geocode '{q}'"
top = results[0]
lat = float(top["latitude"])
lon = float(top["longitude"])
display = f"{top.get('name','')}, {top.get('country_code','')}".strip().strip(",")
return lat, lon, display
def fetch_openmeteo_hourly(lat: float, lon: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool, past_days: int = 1):
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,relative_humidity_2m,pressure_msl,wind_speed_10m",
"past_days": past_days,
"forecast_days": 1,
"timezone": "UTC",
}
js = http_get_json("OPENMETEO_HOURLY", url, params, prov_list, include_raw_payloads, timeout=18)
hourly = js.get("hourly") or {}
return {
"time": hourly.get("time") or [],
"temp": hourly.get("temperature_2m") or [],
"rh": hourly.get("relative_humidity_2m") or [],
"p": hourly.get("pressure_msl") or [],
"wind": hourly.get("wind_speed_10m") or [],
"meta": {"source": "Open-Meteo", "url": url, "params": params},
}
def fetch_kp_last_24h(prov_list: List[Dict[str, Any]], include_raw_payloads: bool):
url = "https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"
js = http_get_json("NOAA_SWPC_KP_1M", url, None, prov_list, include_raw_payloads, timeout=15)
if not isinstance(js, list) or not js:
return []
vals = []
for row in js:
kp = row.get("kp_index")
if kp is None:
continue
try:
vals.append(float(kp))
except Exception:
pass
return vals[-1440:]
def fetch_goes_xray_1day(prov_list: List[Dict[str, Any]], include_raw_payloads: bool):
url = "https://services.swpc.noaa.gov/json/goes/primary/xrays-1-day.json"
js = http_get_json("NOAA_SWPC_GOES_XRAY_1D", url, None, prov_list, include_raw_payloads, timeout=15)
if not isinstance(js, list) or not js:
return []
out = []
for row in js:
f = row.get("flux")
if f is None:
continue
try:
out.append(float(f))
except Exception:
pass
return out
def fetch_usgs_quakes(
hours: int,
minmag: float,
prov_list: List[Dict[str, Any]],
include_raw_payloads: bool,
bbox: Optional[Tuple[float, float, float, float]] = None,
center: Optional[Tuple[float, float]] = None,
radius_km: Optional[float] = None,
) -> Dict[str, Any]:
url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
end = utc_now()
start = end - timedelta(hours=int(hours))
start_iso = start.isoformat().replace("+00:00", "Z")
end_iso = end.isoformat().replace("+00:00", "Z")
params: Dict[str, Any] = {
"format": "geojson",
"starttime": start_iso,
"endtime": end_iso,
"minmagnitude": str(float(minmag)),
"orderby": "time",
}
if bbox is not None:
minlat, minlon, maxlat, maxlon = bbox
params.update(
{
"minlatitude": str(minlat),
"minlongitude": str(minlon),
"maxlatitude": str(maxlat),
"maxlongitude": str(maxlon),
}
)
if center is not None and radius_km is not None:
lat, lon = center
params.update(
{
"latitude": str(float(lat)),
"longitude": str(float(lon)),
"maxradiuskm": str(float(radius_km)),
}
)
js = http_get_json("USGS_FDSN_EVENTS", url, params, prov_list, include_raw_payloads, timeout=22)
feats = js.get("features") if isinstance(js, dict) else None
if not feats:
return {"events": [], "start": start_iso, "end": end_iso, "url": url, "params": params}
out = []
for f in feats:
props = f.get("properties") or {}
out.append(
{
"id": f.get("id"),
"mag": props.get("mag"),
"place": props.get("place"),
"time": props.get("time"),
}
)
return {"events": out, "start": start_iso, "end": end_iso, "url": url, "params": params}
# ---------- Verification Links (user-facing) --------------------------------
def build_verification_links(
lat: float,
lon: float,
seismic_mode: str,
seismic_region: str,
radius_km: float,
usgs_meta: Optional[Dict[str, Any]],
) -> str:
swpc_kp_page = "https://www.swpc.noaa.gov/products/planetary-k-index"
swpc_kp_json = "https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"
goes_plot_page = "https://www.swpc.noaa.gov/products/goes-x-ray-flux"
goes_xray_json = "https://services.swpc.noaa.gov/json/goes/primary/xrays-1-day.json"
open_meteo_link = (
"https://api.open-meteo.com/v1/forecast"
f"?latitude={lat:.5f}&longitude={lon:.5f}"
"&hourly=temperature_2m,pressure_msl,wind_speed_10m&past_days=1&forecast_days=1&timezone=UTC"
)
usgs_map = "https://earthquake.usgs.gov/earthquakes/map/"
scope = "Unknown"
usgs_query = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson"
def build_q(meta: Dict[str, Any]) -> str:
base = meta.get("url", "https://earthquake.usgs.gov/fdsnws/event/1/query")
params = meta.get("params", {})
pairs = [f"{k}={v}" for k, v in params.items()]
return base + "?" + "&".join(pairs)
if usgs_meta:
# RingOfFire can be multi-request
if "requests" in usgs_meta and isinstance(usgs_meta["requests"], list) and usgs_meta["requests"]:
usgs_query = build_q(usgs_meta["requests"][0])
scope = f"Multi-request (RingOfFire): showing first of {len(usgs_meta['requests'])}"
else:
usgs_query = build_q(usgs_meta)
if seismic_mode == "Local radius":
scope = f"Local radius query ({int(radius_km)} km around your location)"
else:
scope = f"Region mode ({seismic_region})"
return (
"### Verify instantly (official sources)\n"
f"- **Magnetic (Kp):** {swpc_kp_page} \n"
f" Live JSON: {swpc_kp_json}\n"
f"- **Solar (GOES X-ray):** {goes_plot_page} \n"
f" Live JSON: {goes_xray_json}\n"
f"- **Atmospheric (Open-Meteo API for this location):** {open_meteo_link}\n"
f"- **Seismic (USGS map):** {usgs_map} \n"
f" **USGS query used:** {usgs_query} \n"
f" Scope: {scope}\n"
)
# ---------- Agents -----------------------------------------------------------
def magnetic_agent(prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]:
kp = fetch_kp_last_24h(prov_list, include_raw_payloads)
if len(kp) < 30:
return {"enabled": False, "reason": "NOAA Kp feed too short"}
last = float(kp[-1])
tail = kp[-360:] if len(kp) >= 360 else kp
drift = float(np.std(tail)) if len(tail) >= 10 else 0.0
slope = float((tail[-1] - tail[0]) / max(1, len(tail) - 1))
z = clamp((last / 9.0) + (drift / 2.0) + 2.0 * abs(slope), 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if last >= 7.0 or z >= 2.0:
pred = "warning"
rule = "Kp>=7 OR z>=2.0"
elif last >= 5.0 or z >= 1.2:
pred = "watch"
rule = "Kp>=5 OR z>=1.2"
elif last >= 4.0 or z >= 0.8:
pred = "monitor"
rule = "Kp>=4 OR z>=0.8"
else:
pred = "hold"
rule = "else"
live = f"Global Kp={last:.1f} | drift={drift:.2f} | slope={slope:.4f}"
return {
"enabled": True,
"domain": "Magnetic",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "NOAA SWPC planetary_k_index_1m (global)",
"inputs_used": {"kp_last": last, "kp_drift": drift, "kp_slope": slope, "tail_len": len(tail)},
"location_effect": "Location does not change Magnetic. Kp is global.",
"do": "Use to track global geomagnetic regime shifts.",
"dont": "Do not treat as a city magnetometer.",
}
def solar_agent(prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]:
flux = fetch_goes_xray_1day(prov_list, include_raw_payloads)
if len(flux) < 50:
return {"enabled": False, "reason": "GOES X-ray feed too short"}
tail = flux[-120:] if len(flux) >= 120 else flux[-60:]
f_mean = float(np.mean(tail))
f_peak = float(np.max(tail))
lr = stable_log_ratio(f_mean, 1e-8)
z = clamp(lr / 10.0, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if f_peak >= 1e-4 or z >= 2.2:
pred = "flare likely"
rule = "peak>=1e-4 OR z>=2.2"
elif f_peak >= 1e-5 or z >= 1.5:
pred = "flare watch"
rule = "peak>=1e-5 OR z>=1.5"
elif f_mean >= 1e-6 or z >= 0.9:
pred = "monitor"
rule = "mean>=1e-6 OR z>=0.9"
else:
pred = "hold"
rule = "else"
live = f"Global GOES mean={f_mean:.2e} | peak={f_peak:.2e}"
return {
"enabled": True,
"domain": "Solar",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "NOAA SWPC GOES xrays-1-day (global)",
"inputs_used": {"flux_mean": f_mean, "flux_peak": f_peak, "tail_len": len(tail)},
"location_effect": "Location does not change Solar. GOES flux is global.",
"do": "Use to track global solar radiative regime shifts.",
"dont": "Do not treat as flare timing or CME arrival prediction.",
}
def atmospheric_agent(lat: float, lon: float, display: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]:
wx = fetch_openmeteo_hourly(lat, lon, prov_list, include_raw_payloads, past_days=1)
temp = wx["temp"]
p = wx["p"]
wind = wx["wind"]
if len(temp) < 13:
return {"enabled": False, "reason": "Open-Meteo hourly series too short"}
t12 = [float(x) for x in temp[-13:]]
dT = float(max(t12) - min(t12))
dp = None
if len(p) >= 13:
p12 = [float(x) for x in p[-13:]]
dp = float(p12[-1] - p12[0])
w_mean = None
if len(wind) >= 13:
w12 = [float(x) for x in wind[-13:]]
w_mean = float(np.mean(w12))
z_dt = clamp(dT / 10.0, 0.0, 2.0)
z_dp = clamp((abs(dp) / 12.0) if dp is not None else 0.0, 0.0, 1.5)
z = clamp(z_dt + z_dp, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if dT >= 10.0 or (dp is not None and dp <= -10.0):
pred = "storm risk"
rule = "ΔT>=10 OR ΔP<=-10"
elif dT >= 7.0 or (dp is not None and dp <= -6.0):
pred = "swing"
rule = "ΔT>=7 OR ΔP<=-6"
elif dT >= 4.0:
pred = "mild swing"
rule = "ΔT>=4"
else:
pred = "stable"
rule = "else"
parts = [f"{display} ΔT(12h)={dT:.1f}°C"]
if dp is not None:
parts.append(f"ΔP(12h)={dp:.1f} hPa")
if w_mean is not None:
parts.append(f"wind≈{w_mean:.1f} m/s")
live = " | ".join(parts)
return {
"enabled": True,
"domain": "Atmospheric",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "Open-Meteo hourly (location-based)",
"inputs_used": {"dT_12h": dT, "dP_12h": dp, "wind_mean": w_mean, "lat": lat, "lon": lon},
"location_effect": "Location changes Atmospheric.",
"do": "Use as a short-term stability detector from ΔT and ΔP.",
"dont": "Do not treat as precipitation probability or full NWP forecast.",
}
def seismic_agent_region(region: str, prov_list: List[Dict[str, Any]], include_raw_payloads: bool):
if region == "RingOfFire":
seen = set()
eqs = []
metas = []
for bb in RING_OF_FIRE_BBOXES:
res = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bb, prov_list=prov_list, include_raw_payloads=include_raw_payloads)
metas.append({"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]})
for e in res["events"]:
eid = e.get("id")
if eid and eid not in seen:
seen.add(eid)
eqs.append(e)
meta = {"mode": "RingOfFireMultiBBox", "requests": metas}
else:
bbox = REGION_BBOX.get(region, None)
res = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bbox, prov_list=prov_list, include_raw_payloads=include_raw_payloads)
eqs = res["events"]
meta = {"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]}
return eqs, f"Region={region}", meta
def seismic_agent_local(lat: float, lon: float, radius_km: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool):
res = fetch_usgs_quakes(hours=24, minmag=2.5, center=(lat, lon), radius_km=radius_km, prov_list=prov_list, include_raw_payloads=include_raw_payloads)
return res["events"], f"Local radius={int(radius_km)}km", {"url": res["url"], "params": res["params"], "start": res["start"], "end": res["end"]}
def seismic_score(eqs: List[Dict[str, Any]]):
N = int(len(eqs))
mags = []
for e in eqs:
m = e.get("mag")
if m is None:
continue
try:
mags.append(float(m))
except Exception:
pass
Mmax = float(max(mags)) if mags else 0.0
z_count = clamp(N / 60.0, 0.0, 1.5)
z_mag = clamp(max(0.0, Mmax - 4.0) / 2.5, 0.0, 1.5)
z = clamp(z_count + z_mag, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if Mmax >= 6.5 or z >= 2.2:
pred = "alert"
rule = "Mmax>=6.5 OR z>=2.2"
elif Mmax >= 5.5 or z >= 1.5:
pred = "watch"
rule = "Mmax>=5.5 OR z>=1.5"
elif N >= 25 or z >= 1.0:
pred = "monitor"
rule = "N>=25 OR z>=1.0"
else:
pred = "quiet"
rule = "else"
return pred, rule, z, tau, idx, N, Mmax
def seismic_agent(mode: str, region: str, lat: float, lon: float, radius_km: float, prov_list: List[Dict[str, Any]], include_raw_payloads: bool) -> Dict[str, Any]:
if mode == "Local radius":
eqs, scope, usgs_meta = seismic_agent_local(lat, lon, radius_km, prov_list, include_raw_payloads)
location_effect = "Location changes Seismic in Local radius mode."
do = "Use to monitor seismic activity within the selected radius around your typed location."
dont = "Do not treat as time/epicenter prediction."
truth_scope = f"USGS events within {int(radius_km)} km"
else:
eqs, scope, usgs_meta = seismic_agent_region(region, prov_list, include_raw_payloads)
location_effect = "Location does not change Seismic in Region mode. Region selector does."
do = "Use as a regional seismic stress monitor."
dont = "Do not treat as time/epicenter prediction."
truth_scope = f"USGS events filtered by region={region}"
pred, rule, z, tau, idx, N, Mmax = seismic_score(eqs)
live = f"{scope} | quakes(24h,M≥2.5)={N} | max M{Mmax:.1f}"
return {
"enabled": True,
"domain": "Seismic",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": f"USGS FDSN event feed ({truth_scope})",
"inputs_used": {
"count_24h": N,
"max_mag_24h": Mmax,
"mode": mode,
"region": region,
"radius_km": float(radius_km),
"lat": float(lat),
"lon": float(lon),
},
"location_effect": location_effect,
"do": do,
"dont": dont,
"what_it_is_not": "Not an earthquake time predictor. Not a rupture location predictor.",
"why": "z_seis compresses activity density and severity into a bounded stress coordinate; τ_eff rises as ln(1+z).",
"how": "Fetch USGS → count + max magnitude → z_seis → τ_eff → Index → label via fixed thresholds.",
"usgs_meta": usgs_meta,
}
# ---------- Receipt Build/Save ----------------------------------------------
def build_receipt(
run_id: str,
run_started_utc: str,
run_finished_utc: str,
location_text: str,
lat: float,
lon: float,
display: str,
seismic_mode: str,
seismic_region: str,
radius_km: float,
df: pd.DataFrame,
atm: Dict[str, Any],
sei: Dict[str, Any],
mag: Dict[str, Any],
sol: Dict[str, Any],
prov_list: List[Dict[str, Any]],
include_raw_payloads: bool,
) -> Dict[str, Any]:
return {
"receipt_version": 1,
"run_id": run_id,
"run_started_utc": run_started_utc,
"run_finished_utc": run_finished_utc,
"settings": {
"location_text": location_text,
"geocode_result": {"display": display, "lat": lat, "lon": lon},
"seismic_mode": seismic_mode,
"seismic_region": seismic_region,
"radius_km": float(radius_km),
"include_raw_payloads": bool(include_raw_payloads),
},
"outputs": {
"table_rows": df.to_dict(orient="records"),
"agents": {
"atmospheric": atm,
"seismic": sei,
"magnetic": mag,
"solar": sol,
},
},
"provenance": {"fetches": prov_list},
"environment": env_snapshot(),
"verification_note": (
"Receipt is tamper-evident via sha256 for each upstream payload. "
"If raw payloads are embedded (raw_b64), integrity + offline verification is strong. "
"If not embedded, you can still compare provider payloads later, but providers may revise feeds."
),
}
def write_receipt_to_file(receipt: Dict[str, Any]) -> str:
run_id = receipt.get("run_id", "run")
path = f"/tmp/rft_forecast_receipt_{run_id}.json"
with open(path, "w", encoding="utf-8") as f:
f.write(safe_json_dumps(receipt))
return path
# ---------- Forecast Runner --------------------------------------------------
def run_forecast(location_text: str, seismic_mode: str, seismic_region: str, radius_km: float, include_raw_payloads: bool):
run_started = utc_now_iso()
run_id = uuid.uuid4().hex[:12]
prov: List[Dict[str, Any]] = []
# Geocode
try:
lat, lon, display = geocode_location(location_text, prov, include_raw_payloads)
except Exception as e:
df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": f"Geocode error: {e}"}])
empty = {"enabled": False, "reason": f"Geocode error: {e}"}
receipt = build_receipt(run_id, run_started, utc_now_iso(), location_text, float("nan"), float("nan"), "", seismic_mode, seismic_region, radius_km, df, empty, empty, empty, empty, prov, include_raw_payloads)
receipt_path = write_receipt_to_file(receipt)
return f"❌ Geocode error: {e}", df, "", empty, empty, empty, empty, receipt, receipt_path
if lat is None:
df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": display}])
empty = {"enabled": False, "reason": display}
receipt = build_receipt(run_id, run_started, utc_now_iso(), location_text, float("nan"), float("nan"), display, seismic_mode, seismic_region, radius_km, df, empty, empty, empty, empty, prov, include_raw_payloads)
receipt_path = write_receipt_to_file(receipt)
return f"❌ {display}", df, "", empty, empty, empty, empty, receipt, receipt_path
# Agents
try:
atm = atmospheric_agent(lat, lon, display, prov, include_raw_payloads)
except Exception as e:
atm = {"enabled": False, "reason": f"atmos error: {e}"}
try:
sei = seismic_agent(seismic_mode, seismic_region, lat, lon, radius_km, prov, include_raw_payloads)
except Exception as e:
sei = {"enabled": False, "reason": f"seismic error: {e}"}
try:
mag = magnetic_agent(prov, include_raw_payloads)
except Exception as e:
mag = {"enabled": False, "reason": f"magnetic error: {e}"}
try:
sol = solar_agent(prov, include_raw_payloads)
except Exception as e:
sol = {"enabled": False, "reason": f"solar error: {e}"}
def fmt_row(domain: str, out: Dict[str, Any]):
if not out.get("enabled"):
return {"Domain": domain, "RFT Prediction": "DISABLED", "Live Status": out.get("reason", "missing inputs")}
idx = out.get("index", None)
z = out.get("z", None)
tau = out.get("tau_eff", None)
idx_s = f"{float(idx):.3e}" if isinstance(idx, (int, float)) else "n/a"
z_s = f"{float(z):.2f}" if isinstance(z, (int, float)) else "n/a"
t_s = f"{float(tau):.3f}" if isinstance(tau, (int, float)) else "n/a"
return {
"Domain": domain,
"RFT Prediction": f"{out.get('prediction','hold')} | idx={idx_s} | z={z_s} | τ={t_s}",
"Live Status": out.get("live_status", ""),
}
df = pd.DataFrame(
[
fmt_row("Atmospheric", atm),
fmt_row("Seismic", sei),
fmt_row("Magnetic", mag),
fmt_row("Solar", sol),
]
)
run_finished = utc_now_iso()
header = f"**Location:** {display} (lat {lat:.3f}, lon {lon:.3f}) | **UTC:** {run_finished} | **Run ID:** `{run_id}`"
usgs_meta = None
if isinstance(sei, dict):
usgs_meta = sei.get("usgs_meta", None)
verify_md = build_verification_links(lat, lon, seismic_mode, seismic_region, radius_km, usgs_meta)
receipt = build_receipt(
run_id=run_id,
run_started_utc=run_started,
run_finished_utc=run_finished,
location_text=location_text,
lat=lat,
lon=lon,
display=display,
seismic_mode=seismic_mode,
seismic_region=seismic_region,
radius_km=radius_km,
df=df,
atm=atm,
sei=sei,
mag=mag,
sol=sol,
prov_list=prov,
include_raw_payloads=include_raw_payloads,
)
receipt_path = write_receipt_to_file(receipt)
return header, df, verify_md, atm, sei, mag, sol, receipt, receipt_path
# ---------- Receipt Verification --------------------------------------------
def _safe_float(x):
try:
if x is None:
return None
return float(x)
except Exception:
return None
def _close(a, b, tol=1e-9):
if a is None or b is None:
return False
return abs(float(a) - float(b)) <= tol * max(1.0, abs(float(a)), abs(float(b)))
def _verify_payload_hashes(fetches: List[Dict[str, Any]]):
rows = []
ok_all = True
for f in (fetches or []):
name = f.get("name", "")
h = f.get("sha256", "")
raw_b64 = f.get("raw_b64", None)
if not raw_b64:
rows.append({"Check": f"payload:{name}", "Status": "SKIP", "Detail": "No raw_b64 embedded"})
continue
try:
raw = base64.b64decode(raw_b64.encode("ascii"))
h2 = sha256_hex(raw)
ok = (h2 == h)
ok_all = ok_all and ok
rows.append({"Check": f"payload:{name}", "Status": "PASS" if ok else "FAIL", "Detail": f"sha256(receipt)={h} sha256(decoded)={h2}"})
except Exception as e:
ok_all = False
rows.append({"Check": f"payload:{name}", "Status": "FAIL", "Detail": f"Decode/hash error: {e}"})
return ok_all, rows
def _recompute_domain(domain: str, agent: Dict[str, Any]):
if not agent or not agent.get("enabled"):
return {"enabled": False}
iu = agent.get("inputs_used") or {}
dom = (domain or "").strip().lower()
if dom == "atmospheric":
dT = _safe_float(iu.get("dT_12h"))
dp = _safe_float(iu.get("dP_12h"))
if dT is None:
return {"error": "Missing inputs_used.dT_12h"}
z_dt = clamp(dT / 10.0, 0.0, 2.0)
z_dp = clamp((abs(dp) / 12.0) if dp is not None else 0.0, 0.0, 1.5)
z = clamp(z_dt + z_dp, 0.0, 3.0)
if dT >= 10.0 or (dp is not None and dp <= -10.0):
pred = "storm risk"
elif dT >= 7.0 or (dp is not None and dp <= -6.0):
pred = "swing"
elif dT >= 4.0:
pred = "mild swing"
else:
pred = "stable"
elif dom == "seismic":
N = _safe_float(iu.get("count_24h"))
Mmax = _safe_float(iu.get("max_mag_24h"))
if N is None or Mmax is None:
return {"error": "Missing inputs_used.count_24h or inputs_used.max_mag_24h"}
z_count = clamp(N / 60.0, 0.0, 1.5)
z_mag = clamp(max(0.0, Mmax - 4.0) / 2.5, 0.0, 1.5)
z = clamp(z_count + z_mag, 0.0, 3.0)
if Mmax >= 6.5 or z >= 2.2:
pred = "alert"
elif Mmax >= 5.5 or z >= 1.5:
pred = "watch"
elif N >= 25 or z >= 1.0:
pred = "monitor"
else:
pred = "quiet"
elif dom == "magnetic":
last = _safe_float(iu.get("kp_last"))
drift = _safe_float(iu.get("kp_drift"))
slope = _safe_float(iu.get("kp_slope"))
if last is None or drift is None or slope is None:
return {"error": "Missing inputs_used.kp_last/kp_drift/kp_slope"}
z = clamp((last / 9.0) + (drift / 2.0) + 2.0 * abs(slope), 0.0, 3.0)
if last >= 7.0 or z >= 2.0:
pred = "warning"
elif last >= 5.0 or z >= 1.2:
pred = "watch"
elif last >= 4.0 or z >= 0.8:
pred = "monitor"
else:
pred = "hold"
elif dom == "solar":
f_mean = _safe_float(iu.get("flux_mean"))
f_peak = _safe_float(iu.get("flux_peak"))
if f_mean is None or f_peak is None:
return {"error": "Missing inputs_used.flux_mean/flux_peak"}
lr = stable_log_ratio(f_mean, 1e-8)
z = clamp(lr / 10.0, 0.0, 3.0)
if f_peak >= 1e-4 or z >= 2.2:
pred = "flare likely"
elif f_peak >= 1e-5 or z >= 1.5:
pred = "flare watch"
elif f_mean >= 1e-6 or z >= 0.9:
pred = "monitor"
else:
pred = "hold"
else:
return {"error": f"Unknown domain '{domain}'"}
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
return {"z": z, "tau_eff": tau, "index": idx, "prediction": pred}
def verify_receipt(uploaded_file):
if uploaded_file is None:
return "❌ Upload a receipt JSON first.", pd.DataFrame([])
try:
content = uploaded_file.read()
receipt = json.loads(content.decode("utf-8"))
except Exception as e:
return f"❌ Could not read JSON: {e}", pd.DataFrame([])
checks: List[Dict[str, str]] = []
ok = True
for key in ["run_id", "settings", "outputs", "provenance", "environment"]:
if key not in receipt:
ok = False
checks.append({"Check": f"has:{key}", "Status": "FAIL", "Detail": "Missing key"})
else:
checks.append({"Check": f"has:{key}", "Status": "PASS", "Detail": ""})
fetches = (receipt.get("provenance") or {}).get("fetches") or []
ok_payloads, rows = _verify_payload_hashes(fetches)
checks.extend(rows)
ok = ok and ok_payloads
agents = ((receipt.get("outputs") or {}).get("agents") or {})
mapping = {
"Atmospheric": agents.get("atmospheric"),
"Seismic": agents.get("seismic"),
"Magnetic": agents.get("magnetic"),
"Solar": agents.get("solar"),
}
for dom, agent in mapping.items():
if not agent or not agent.get("enabled"):
checks.append({"Check": f"recompute:{dom}", "Status": "SKIP", "Detail": "Agent disabled"})
continue
rec = _recompute_domain(dom, agent)
if rec.get("error"):
ok = False
checks.append({"Check": f"recompute:{dom}", "Status": "FAIL", "Detail": rec["error"]})
continue
z_ok = _close(rec["z"], agent.get("z"), tol=1e-6)
t_ok = _close(rec["tau_eff"], agent.get("tau_eff"), tol=1e-6)
i_ok = _close(rec["index"], agent.get("index"), tol=1e-6)
p_ok = (str(rec["prediction"]).strip().lower() == str(agent.get("prediction")).strip().lower())
ok = ok and z_ok and t_ok and i_ok and p_ok
checks.append({"Check": f"{dom}:z", "Status": "PASS" if z_ok else "FAIL", "Detail": f"expected={agent.get('z')} recomputed={rec['z']}"})
checks.append({"Check": f"{dom}:tau", "Status": "PASS" if t_ok else "FAIL", "Detail": f"expected={agent.get('tau_eff')} recomputed={rec['tau_eff']}"})
checks.append({"Check": f"{dom}:index", "Status": "PASS" if i_ok else "FAIL", "Detail": f"expected={agent.get('index')} recomputed={rec['index']}"})
checks.append({"Check": f"{dom}:label", "Status": "PASS" if p_ok else "FAIL", "Detail": f"expected={agent.get('prediction')} recomputed={rec['prediction']}"})
status = "✅ Receipt verification PASS" if ok else "⚠️ Receipt verification FAIL (see checks)"
return status, pd.DataFrame(checks)
# ---------- Markdown Tabs ----------------------------------------------------
INSTRUCTIONS_MD = """
## Use and interpretation
**Location input**
- Used for Atmospheric.
- Used for Seismic only if Seismic Mode is set to Local radius.
- Not used for Solar or Magnetic (global signals).
**Seismic Mode**
- Region mode: counts quakes in large region (EMEA/AMER/APAC/RingOfFire/Global).
- Local radius mode: counts quakes within a radius (km) around your typed location.
**Run Forecast**
- Pulls live data and recomputes from scratch.
- No auto-refresh. No memory. No smoothing.
- No guessing when data is missing (DISABLED instead).
**Forecast Receipts**
- Each run generates a downloadable receipt JSON that includes:
- source URLs + params + timestamps
- sha256 hashes of upstream payloads
- computed intermediates + label rule fired
- environment snapshot (versions + constants)
- Optional: embed raw upstream payloads for stronger offline verification.
"""
METHOD_MD = f"""
## Open method equations
Shared core:
- τ_eff = {K_TAU} · ln(1 + z)
- Ω_obs = 2π / T_earth = {OMEGA_OBS:.6e}
- α_R = {ALPHA_R}
- Index = Ω_obs · τ_eff · α_R
z definitions:
- Atmospheric: z_atm = clamp( clamp(ΔT/10,0..2) + clamp(|ΔP|/12,0..1.5), 0..3 )
- Seismic: z_seis = clamp( clamp(N/60,0..1.5) + clamp(max(0,Mmax-4)/2.5,0..1.5), 0..3 )
- Magnetic: z_mag = clamp( (Kp_last/9) + (drift/2) + 2·|slope|, 0..3 )
- Solar: z_solar= clamp( ln(F_mean/1e-8)/10, 0..3 )
Decision thresholds are shown per-domain in the agent output as `rule_fired`.
"""
# ---------- UI ---------------------------------------------------------------
with gr.Blocks(title=APP_NAME) as demo:
gr.Markdown(f"# {APP_NAME}")
gr.Markdown(f"**Build:** `{APP_VERSION}`")
with gr.Tab("Live Forecast"):
loc = gr.Textbox(label="Location", value="London")
seismic_mode = gr.Radio(
choices=["Region", "Local radius"],
value="Local radius",
label="Seismic Mode"
)
with gr.Row():
region = gr.Dropdown(["Global", "EMEA", "AMER", "APAC", "RingOfFire"], value="EMEA", label="Seismic Region (used in Region mode)")
radius = gr.Slider(50, 2000, value=500, step=50, label="Seismic Radius km (used in Local radius mode)")
include_raw = gr.Checkbox(
value=False,
label="Receipt durability: embed raw upstream payloads (larger download, stronger verification)"
)
btn = gr.Button("Run Forecast", variant="primary")
header_md = gr.Markdown()
table = gr.Dataframe(headers=["Domain", "RFT Prediction", "Live Status"], interactive=False)
verify_md = gr.Markdown()
with gr.Accordion("Atmospheric details", open=False):
atm_json = gr.JSON(label="Atmospheric agent output")
with gr.Accordion("Seismic details", open=False):
sei_json = gr.JSON(label="Seismic agent output")
with gr.Accordion("Magnetic details", open=False):
mag_json = gr.JSON(label="Magnetic agent output")
with gr.Accordion("Solar details", open=False):
sol_json = gr.JSON(label="Solar agent output")
with gr.Accordion("Forecast Receipt (verifiable history)", open=True):
gr.Markdown(
"- Download the receipt to freeze this run.\n"
"- If you enabled raw payloads, payload-hash verification is offline.\n"
"- If not enabled, you still get URLs/params/timestamps + sha256 for audit trails."
)
receipt_json = gr.JSON(label="Receipt JSON")
receipt_file = gr.File(label="Download receipt (.json)")
btn.click(
run_forecast,
inputs=[loc, seismic_mode, region, radius, include_raw],
outputs=[header_md, table, verify_md, atm_json, sei_json, mag_json, sol_json, receipt_json, receipt_file],
)
with gr.Tab("Verify Receipt"):
gr.Markdown(
"Upload a previously downloaded Forecast Receipt JSON to verify:\n\n"
"- Structural integrity\n"
"- Embedded payload hash checks (if raw payloads were included)\n"
"- Recomputed z / τ_eff / index and label against stored intermediates\n"
)
up = gr.File(label="Upload receipt (.json)", file_types=[".json"])
vbtn = gr.Button("Verify", variant="primary")
vstatus = gr.Markdown()
vtable = gr.Dataframe(headers=["Check", "Status", "Detail"], interactive=False)
vbtn.click(verify_receipt, inputs=[up], outputs=[vstatus, vtable])
with gr.Tab("Method (Open)"):
gr.Markdown(INSTRUCTIONS_MD)
gr.Markdown(METHOD_MD)
if __name__ == "__main__":
demo.launch()