Spaces:
Running
Running
| import os | |
| import math | |
| import json | |
| import random | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| # =============================================================== | |
| # Rendered Frame Theory (RFT) — Agent Console (All-in-One Space) | |
| # Author: Liam Grinstead | |
| # Purpose: Transparent, reproducible, benchmarkable agent demos | |
| # Dependencies: numpy, pandas, matplotlib, gradio (NO scipy) | |
| # =============================================================== | |
| OUTDIR = "outputs" | |
| os.makedirs(OUTDIR, exist_ok=True) | |
| # ----------------------------- | |
| # Shared utilities | |
| # ----------------------------- | |
| def set_seed(seed: int): | |
| seed = int(seed) % (2**32 - 1) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| def clamp(x, lo, hi): | |
| return max(lo, min(hi, x)) | |
| def save_plot(fig, name: str): | |
| path = os.path.join(OUTDIR, name) | |
| fig.savefig(path, dpi=150, bbox_inches="tight") | |
| plt.close(fig) | |
| return path | |
| def df_to_csv_file(df: pd.DataFrame, name: str): | |
| path = os.path.join(OUTDIR, name) | |
| df.to_csv(path, index=False) | |
| return path | |
| # ----------------------------- | |
| # RFT Core: τ_eff + gating | |
| # ----------------------------- | |
| def tau_eff_adaptive( | |
| uncertainty: float, | |
| base: float = 1.0, | |
| slow_by: float = 1.0, | |
| gain: float = 1.2, | |
| cap: float = 4.0 | |
| ): | |
| u = clamp(float(uncertainty), 0.0, 1.0) | |
| tau = base + slow_by + gain * u | |
| return clamp(tau, base, cap) | |
| def rft_confidence(uncertainty: float): | |
| return clamp(1.0 - float(uncertainty), 0.0, 1.0) | |
| def rft_gate(conf: float, tau_eff: float, threshold: float): | |
| conf = float(conf) | |
| tau_eff = float(tau_eff) | |
| effective = threshold + 0.08 * (tau_eff - 1.0) | |
| return conf >= clamp(effective, 0.0, 0.999) | |
| # ----------------------------- | |
| # NEO Simulation | |
| # ----------------------------- | |
| def simulate_neo( | |
| seed: int, | |
| steps: int, | |
| dt: float, | |
| alert_km: float, | |
| noise_km: float, | |
| rft_conf_threshold: float, | |
| tau_gain: float, | |
| show_debug: bool | |
| ): | |
| set_seed(seed) | |
| pos = np.array([9000.0, 2500.0, 1000.0], dtype=float) # km | |
| vel = np.array([-55.0, -8.0, -3.0], dtype=float) # km/step (scaled) | |
| rows = [] | |
| alerts_baseline = 0 | |
| alerts_rft_raw = 0 | |
| alerts_rft_filtered = 0 | |
| ops_proxy = 0 | |
| for t in range(int(steps)): | |
| drift = 0.05 * np.array([math.sin(0.03*t), math.cos(0.02*t), math.sin(0.015*t)]) | |
| pos_true = pos + vel * dt + drift | |
| meas = pos_true + np.random.normal(0.0, noise_km, size=3) | |
| dist = float(np.linalg.norm(meas)) | |
| speed = float(np.linalg.norm(vel)) | |
| uncertainty = clamp((noise_km / max(alert_km, 1.0)) * 2.0 + (speed / 200.0) * 0.2, 0.0, 1.0) | |
| baseline_alert = dist <= alert_km | |
| if baseline_alert: | |
| alerts_baseline += 1 | |
| tau = tau_eff_adaptive(uncertainty=uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) | |
| conf = rft_confidence(uncertainty) | |
| rft_candidate = dist <= alert_km | |
| rft_alert = bool(rft_candidate and rft_gate(conf, tau, rft_conf_threshold)) | |
| if rft_candidate: | |
| alerts_rft_raw += 1 | |
| if rft_alert: | |
| alerts_rft_filtered += 1 | |
| ops_proxy += 12 | |
| rows.append({ | |
| "t": t, | |
| "dt": dt, | |
| "x_km": meas[0], | |
| "y_km": meas[1], | |
| "z_km": meas[2], | |
| "dist_km": dist, | |
| "noise_km": noise_km, | |
| "uncertainty": uncertainty, | |
| "tau_eff": tau, | |
| "confidence": conf, | |
| "baseline_alert": int(baseline_alert), | |
| "rft_candidate": int(rft_candidate), | |
| "rft_alert": int(rft_alert), | |
| }) | |
| pos = pos_true | |
| df = pd.DataFrame(rows) | |
| fig1 = plt.figure(figsize=(10, 4)) | |
| ax = fig1.add_subplot(111) | |
| ax.plot(df["t"], df["dist_km"]) | |
| ax.axhline(alert_km, linestyle="--") | |
| ax.set_title("NEO: Distance to target vs time") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("distance (km)") | |
| p_dist = save_plot(fig1, f"neo_distance_seed{seed}.png") | |
| fig2 = plt.figure(figsize=(10, 4)) | |
| ax = fig2.add_subplot(111) | |
| ax.plot(df["t"], df["confidence"]) | |
| ax.plot(df["t"], df["tau_eff"]) | |
| ax.set_title("NEO: Confidence and τ_eff (Adaptive)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("value") | |
| p_conf = save_plot(fig2, f"neo_conf_tau_seed{seed}.png") | |
| fig3 = plt.figure(figsize=(10, 3)) | |
| ax = fig3.add_subplot(111) | |
| ax.step(df["t"], df["baseline_alert"], where="post") | |
| ax.step(df["t"], df["rft_alert"], where="post") | |
| ax.set_title("NEO: Alerts (Baseline vs RFT)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("alert (0/1)") | |
| p_alerts = save_plot(fig3, f"neo_alerts_seed{seed}.png") | |
| csv_path = df_to_csv_file(df, f"neo_log_seed{seed}.csv") | |
| summary = { | |
| "seed": int(seed), | |
| "steps": int(steps), | |
| "alert_km": float(alert_km), | |
| "baseline_alerts": int(alerts_baseline), | |
| "rft_candidates": int(alerts_rft_raw), | |
| "rft_alerts_filtered": int(alerts_rft_filtered), | |
| "false_positive_proxy_reduction_%": float( | |
| 100.0 * (1.0 - (alerts_rft_filtered / max(alerts_rft_raw, 1))) | |
| ), | |
| "ops_proxy": int(ops_proxy), | |
| } | |
| debug_lines = "" | |
| if show_debug: | |
| debug_lines = "Debug view (first 12 rows):\n" + df.head(12).to_string(index=False) | |
| return summary, debug_lines, [p_dist, p_conf, p_alerts], csv_path | |
| # ----------------------------- | |
| # Satellite Jitter Simulation | |
| # ----------------------------- | |
| def simulate_jitter( | |
| seed: int, | |
| steps: int, | |
| dt: float, | |
| noise: float, | |
| baseline_kp: float, | |
| rft_kp: float, | |
| gate_threshold: float, | |
| tau_gain: float | |
| ): | |
| set_seed(seed) | |
| jitter = 0.0 | |
| jitter_rate = 0.0 | |
| rows = [] | |
| duty_baseline = 0 | |
| duty_rft = 0 | |
| ops_proxy = 0 | |
| for t in range(int(steps)): | |
| micro = 0.25 * math.sin(0.05 * t) + 0.12 * math.sin(0.13 * t) | |
| jitter_rate += np.random.normal(0.0, noise) * 0.08 | |
| jitter += jitter_rate * dt + micro + np.random.normal(0.0, noise) | |
| u_base = -baseline_kp * jitter | |
| jitter_base_next = jitter + u_base * 0.35 | |
| duty_baseline += int(abs(u_base) > 0.01) | |
| uncertainty = clamp(noise * 3.0, 0.0, 1.0) | |
| tau = tau_eff_adaptive(uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) | |
| conf = rft_confidence(uncertainty) | |
| should_act = rft_gate(conf, tau, gate_threshold) and (abs(jitter) > 0.35) | |
| u_rft = (-rft_kp * jitter) if should_act else 0.0 | |
| jitter_rft_next = jitter + u_rft * 0.35 | |
| duty_rft += int(abs(u_rft) > 0.01) | |
| ops_proxy += 10 | |
| rows.append({ | |
| "t": t, | |
| "jitter": jitter, | |
| "u_baseline": u_base, | |
| "u_rft": u_rft, | |
| "baseline_active": int(abs(u_base) > 0.01), | |
| "rft_active": int(abs(u_rft) > 0.01), | |
| "tau_eff": tau, | |
| "confidence": conf, | |
| "noise": noise, | |
| "jitter_baseline_next": jitter_base_next, | |
| "jitter_rft_next": jitter_rft_next, | |
| }) | |
| jitter = jitter_rft_next | |
| jitter_rate *= 0.92 | |
| df = pd.DataFrame(rows) | |
| def rms(x): | |
| return float(np.sqrt(np.mean(np.square(np.asarray(x))))) | |
| jitter_rms = rms(df["jitter"].values) | |
| duty_b = duty_baseline / max(steps, 1) | |
| duty_r = duty_rft / max(steps, 1) | |
| fig1 = plt.figure(figsize=(10, 4)) | |
| ax = fig1.add_subplot(111) | |
| ax.plot(df["t"], df["jitter"]) | |
| ax.set_title("Jitter: residual vs time (running RFT plant)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("jitter (arb)") | |
| p_jit = save_plot(fig1, f"jitter_residual_seed{seed}.png") | |
| fig2 = plt.figure(figsize=(10, 3)) | |
| ax = fig2.add_subplot(111) | |
| ax.step(df["t"], df["baseline_active"], where="post") | |
| ax.step(df["t"], df["rft_active"], where="post") | |
| ax.set_title("Jitter: Actuation duty (Baseline vs RFT gating)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("active (0/1)") | |
| p_duty = save_plot(fig2, f"jitter_duty_seed{seed}.png") | |
| fig3 = plt.figure(figsize=(10, 4)) | |
| ax = fig3.add_subplot(111) | |
| ax.plot(df["t"], df["tau_eff"]) | |
| ax.plot(df["t"], df["confidence"]) | |
| ax.set_title("Jitter: τ_eff and confidence") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("value") | |
| p_tau = save_plot(fig3, f"jitter_tau_seed{seed}.png") | |
| csv_path = df_to_csv_file(df, f"jitter_log_seed{seed}.csv") | |
| summary = { | |
| "seed": int(seed), | |
| "steps": int(steps), | |
| "jitter_rms": jitter_rms, | |
| "baseline_duty_ratio": float(duty_b), | |
| "rft_duty_ratio": float(duty_r), | |
| "duty_reduction_%": float(100.0 * (1.0 - (duty_r / max(duty_b, 1e-9)))), | |
| "ops_proxy": int(ops_proxy), | |
| } | |
| return summary, [p_jit, p_duty, p_tau], csv_path | |
| # ----------------------------- | |
| # Starship-style Landing Harness (2D) | |
| # ----------------------------- | |
| def simulate_landing( | |
| seed: int, | |
| steps: int, | |
| dt: float, | |
| wind_max: float, | |
| thrust_noise: float, | |
| kp_baseline: float, | |
| kp_rft: float, | |
| gate_threshold: float, | |
| tau_gain: float, | |
| goal_m: float | |
| ): | |
| set_seed(seed) | |
| alt = 1000.0 | |
| vv = -45.0 | |
| x = 60.0 | |
| xv = 0.0 | |
| ix = 0.0 | |
| anomalies = 0 | |
| actions = 0 | |
| ops_proxy = 0 | |
| rows = [] | |
| g = -9.81 | |
| LAT_CTRL = 0.95 | |
| WIND_PUSH = 0.28 | |
| VERT_CTRL = 0.22 | |
| for t in range(int(steps)): | |
| gust = math.sin(0.08 * t) + 0.55 * math.sin(0.21 * t + 0.7) | |
| wind = (wind_max * 0.75) * gust + np.random.normal(0.0, 0.65) | |
| wind = clamp(wind, -wind_max, wind_max) | |
| thrust_dev = np.random.normal(0.0, thrust_noise) | |
| meas_alt = alt + np.random.normal(0, 0.6) | |
| meas_vv = vv + np.random.normal(0, 0.35) | |
| meas_x = x + np.random.normal(0, 0.8) | |
| meas_xv = xv + np.random.normal(0, 0.25) | |
| uncertainty = clamp((abs(thrust_dev) / 5.0) * 0.18 + (abs(wind) / max(wind_max, 1e-9)) * 0.30, 0.0, 1.0) | |
| tau = tau_eff_adaptive(uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) | |
| conf = rft_confidence(uncertainty) | |
| anomaly_types = [] | |
| if abs(wind) > (0.85 * wind_max): | |
| anomaly_types.append("High wind") | |
| if meas_alt < 200 and abs(meas_x) > 20: | |
| anomaly_types.append("High lateral error near ground") | |
| if meas_alt < 150 and abs(meas_vv) > 15: | |
| anomaly_types.append("High descent rate near ground") | |
| is_anomaly = len(anomaly_types) > 0 | |
| if is_anomaly: | |
| anomalies += 1 | |
| u_base_x = -kp_baseline * meas_x - 0.30 * meas_xv | |
| u_base_v = -kp_baseline * (meas_vv + 5.0) | |
| phase = 1.0 - clamp(meas_alt / 1000.0, 0.0, 1.0) | |
| lookahead = 1.0 + 1.6 * phase | |
| wind_ff = (WIND_PUSH * wind) / max(LAT_CTRL, 1e-9) | |
| if meas_alt < 600: | |
| ix = clamp(ix + (meas_x * dt) * 0.0025, -40.0, 40.0) | |
| do_action = bool(rft_gate(conf, tau, gate_threshold)) | |
| u_rft_x = 0.0 | |
| u_rft_v = 0.0 | |
| if do_action: | |
| u_rft_x = (-kp_rft * lookahead * meas_x) - (0.42 * meas_xv) - (0.20 * ix) - wind_ff | |
| u_rft_v = (-kp_rft * lookahead * (meas_vv + 5.0)) | |
| actions += 1 | |
| u_rft_x = clamp(u_rft_x, -20.0, 20.0) | |
| u_rft_v = clamp(u_rft_v, -18.0, 18.0) | |
| vv = vv + (g + VERT_CTRL * u_rft_v + 0.09 * thrust_dev) * dt | |
| alt = max(0.0, alt + vv * dt) | |
| xv = xv + (WIND_PUSH * wind - LAT_CTRL * u_rft_x) * dt | |
| x = x + xv * dt | |
| ops_proxy += 16 | |
| rows.append({ | |
| "t": t, | |
| "alt_m": alt, | |
| "vv_m_s": vv, | |
| "x_m": x, | |
| "xv_m_s": xv, | |
| "wind_m_s": wind, | |
| "thrust_dev": thrust_dev, | |
| "uncertainty": uncertainty, | |
| "tau_eff": tau, | |
| "confidence": conf, | |
| "anomaly": int(is_anomaly), | |
| "anomaly_types": "|".join(anomaly_types) if anomaly_types else "", | |
| "action_taken": int(do_action), | |
| "u_baseline_x": u_base_x, | |
| "u_baseline_v": u_base_v, | |
| "u_rft_x": u_rft_x, | |
| "u_rft_v": u_rft_v, | |
| "ix": ix, | |
| "wind_ff": wind_ff, | |
| }) | |
| if alt <= 0.0: | |
| break | |
| df = pd.DataFrame(rows) | |
| landing_offset = float(abs(df["x_m"].iloc[-1])) if len(df) else 9999.0 | |
| fig1 = plt.figure(figsize=(10, 4)) | |
| ax = fig1.add_subplot(111) | |
| ax.plot(df["t"], df["alt_m"]) | |
| ax.set_title("Landing: altitude vs time") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("altitude (m)") | |
| p_alt = save_plot(fig1, f"landing_alt_seed{seed}.png") | |
| fig2 = plt.figure(figsize=(10, 4)) | |
| ax = fig2.add_subplot(111) | |
| ax.plot(df["t"], df["x_m"]) | |
| ax.axhline(goal_m, linestyle="--") | |
| ax.axhline(-goal_m, linestyle="--") | |
| ax.set_title("Landing: lateral offset vs time (goal band)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("offset (m)") | |
| p_x = save_plot(fig2, f"landing_offset_seed{seed}.png") | |
| fig3 = plt.figure(figsize=(10, 4)) | |
| ax = fig3.add_subplot(111) | |
| ax.plot(df["t"], df["wind_m_s"]) | |
| ax.set_title("Landing: wind profile (signed gusts)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("wind (m/s)") | |
| p_w = save_plot(fig3, f"landing_wind_seed{seed}.png") | |
| fig4 = plt.figure(figsize=(10, 3)) | |
| ax = fig4.add_subplot(111) | |
| ax.step(df["t"], df["anomaly"], where="post") | |
| ax.step(df["t"], df["action_taken"], where="post") | |
| ax.set_title("Landing: anomaly vs action timeline") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("0/1") | |
| p_a = save_plot(fig4, f"landing_anomaly_action_seed{seed}.png") | |
| csv_path = df_to_csv_file(df, f"landing_log_seed{seed}.csv") | |
| summary = { | |
| "seed": int(seed), | |
| "steps_ran": int(len(df)), | |
| "final_landing_offset_m": float(landing_offset), | |
| "goal_m": float(goal_m), | |
| "hit_goal": bool(landing_offset <= goal_m), | |
| "total_anomalies_detected": int(anomalies), | |
| "total_control_actions": int(actions), | |
| "ops_proxy": int(ops_proxy), | |
| } | |
| return summary, [p_alt, p_x, p_w, p_a], csv_path | |
| # =============================================================== | |
| # Predator Avoidance (Reflex vs RFT-style Adaptive Agents) | |
| # =============================================================== | |
| def numpy_convolve2d_toroidal(array: np.ndarray, kernel: np.ndarray) -> np.ndarray: | |
| out = np.zeros_like(array, dtype=float) | |
| kcx = kernel.shape[0] // 2 | |
| kcy = kernel.shape[1] // 2 | |
| rows, cols = array.shape | |
| for i in range(rows): | |
| for j in range(cols): | |
| val = 0.0 | |
| for m in range(kernel.shape[0]): | |
| for n in range(kernel.shape[1]): | |
| x = (i + m - kcx) % rows | |
| y = (j + n - kcy) % cols | |
| val += array[x, y] * kernel[m, n] | |
| out[i, j] = val | |
| return out | |
| class Predator: | |
| def __init__(self, grid_size: int): | |
| self.grid_size = grid_size | |
| self.x = random.randint(0, grid_size - 1) | |
| self.y = random.randint(0, grid_size - 1) | |
| def move(self): | |
| dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) | |
| self.x = (self.x + dx) % self.grid_size | |
| self.y = (self.y + dy) % self.grid_size | |
| class ReflexAgent: | |
| def __init__(self, grid_size: int): | |
| self.grid_size = grid_size | |
| self.x = random.randint(0, grid_size - 1) | |
| self.y = random.randint(0, grid_size - 1) | |
| self.collisions = 0 | |
| def move(self): | |
| dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) | |
| self.x = (self.x + dx) % self.grid_size | |
| self.y = (self.y + dy) % self.grid_size | |
| class RFTAdaptiveAgent: | |
| def __init__( | |
| self, | |
| grid_size: int, | |
| move_kernel: np.ndarray, | |
| energy_max: float, | |
| energy_regen: float, | |
| base_collapse_cost: float, | |
| boost_prob: float, | |
| boost_amount: float, | |
| sense_noise_prob: float, | |
| alpha: float, | |
| beta: float, | |
| dt_internal: float, | |
| collapse_threshold: float | |
| ): | |
| self.grid_size = grid_size | |
| self.move_kernel = move_kernel.astype(float) | |
| self.pos_prob = np.zeros((grid_size, grid_size), dtype=float) | |
| x, y = np.random.randint(grid_size), np.random.randint(grid_size) | |
| self.pos_prob[x, y] = 1.0 | |
| self.x, self.y = int(x), int(y) | |
| self.energy_max = float(energy_max) | |
| self.energy = float(energy_max) | |
| self.energy_regen = float(energy_regen) | |
| self.base_collapse_cost = float(base_collapse_cost) | |
| self.boost_prob = float(boost_prob) | |
| self.boost_amount = float(boost_amount) | |
| self.sense_noise_prob = float(sense_noise_prob) | |
| self.alpha = float(alpha) | |
| self.beta = float(beta) | |
| self.dt_internal = float(dt_internal) | |
| self.collapse_threshold = float(collapse_threshold) | |
| self.psi_action = (0.08 + 0j) # |psi|^2 = 0.0064 baseline | |
| self.collapse_actions = 0 | |
| self.collisions = 0 | |
| def move(self): | |
| dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) | |
| self.x = (self.x + dx) % self.grid_size | |
| self.y = (self.y + dy) % self.grid_size | |
| self.pos_prob.fill(0.0) | |
| self.pos_prob[self.x, self.y] = 1.0 | |
| def sense_predators(self, predators): | |
| perceived = [] | |
| for p in predators: | |
| if random.random() < self.sense_noise_prob: | |
| continue | |
| perceived.append((p.x, p.y)) | |
| return perceived | |
| def compute_threat(self, perceived): | |
| threat = 0.0 | |
| radius = 2 | |
| for (px, py) in perceived: | |
| xs = [(px + dx) % self.grid_size for dx in range(-radius, radius + 1)] | |
| ys = [(py + dy) % self.grid_size for dy in range(-radius, radius + 1)] | |
| sub = self.pos_prob[np.ix_(xs, ys)] | |
| threat += float(sub.sum()) | |
| return threat | |
| def update_action_state(self, perceived): | |
| T = self.compute_threat(perceived) | |
| E = self.energy / max(self.energy_max, 1e-9) | |
| drive = (self.alpha * T) - (self.beta * E) | |
| exp_term = clamp(drive, -6.0, 6.0) * 0.22 * self.dt_internal | |
| amp = math.exp(exp_term) | |
| amp = clamp(amp, 0.75, 1.35) | |
| H = drive + 0.01 * (abs(self.psi_action) ** 2) | |
| self.psi_action *= amp * np.exp(-1j * H * self.dt_internal) | |
| mag = abs(self.psi_action) | |
| if mag > 1.0: | |
| self.psi_action /= mag | |
| def get_action_probability(self): | |
| return float(min(abs(self.psi_action) ** 2, 1.0)) | |
| def apply_collapse_action(self, perceived): | |
| field = numpy_convolve2d_toroidal(self.pos_prob, self.move_kernel) | |
| field = np.maximum(field, 0.0) | |
| for (px, py) in perceived: | |
| for dx in range(-2, 3): | |
| for dy in range(-2, 3): | |
| nx = (px + dx) % self.grid_size | |
| ny = (py + dy) % self.grid_size | |
| dist = abs(dx) + abs(dy) | |
| field[nx, ny] *= (1.0 - 0.30 / (dist + 1.0)) | |
| s = float(field.sum()) | |
| if s <= 0: | |
| field[:] = 1.0 / (self.grid_size * self.grid_size) | |
| else: | |
| field /= s | |
| self.pos_prob = field | |
| flat = self.pos_prob.flatten().copy() | |
| for (px, py) in perceived: | |
| flat[px * self.grid_size + py] = 0.0 | |
| tot = float(flat.sum()) | |
| if tot <= 0: | |
| self.move() | |
| return | |
| flat /= tot | |
| idx = np.random.choice(self.grid_size * self.grid_size, p=flat) | |
| self.x, self.y = divmod(int(idx), self.grid_size) | |
| def energy_boost(self): | |
| if random.random() < self.boost_prob: | |
| return float(self.boost_amount) | |
| return 0.0 | |
| def regen_energy(self): | |
| boost = self.energy_boost() | |
| self.energy = clamp(self.energy + self.energy_regen + boost, 0.0, self.energy_max) | |
| if self.energy < self.energy_max and random.random() < 0.05: | |
| self.energy = self.energy_max | |
| def step_rft(self, predators, group_coherence: float): | |
| if self.energy <= 0: | |
| self.move() | |
| return 0, 0.0, 0.0 | |
| perceived = self.sense_predators(predators) | |
| self.update_action_state(perceived) | |
| P_act = self.get_action_probability() | |
| threat = self.compute_threat(perceived) | |
| acted = 0 | |
| if (P_act >= self.collapse_threshold) and (self.energy > 0): | |
| effective_cost = self.base_collapse_cost * (1.0 - float(group_coherence)) | |
| if self.energy >= effective_cost: | |
| self.collapse_actions += 1 | |
| self.energy -= effective_cost | |
| self.apply_collapse_action(perceived) | |
| self.psi_action = (0.08 + 0j) | |
| acted = 1 | |
| else: | |
| self.move() | |
| else: | |
| self.move() | |
| return acted, P_act, threat | |
| def simulate_predator( | |
| seed: int, | |
| grid_size: int, | |
| steps: int, | |
| num_reflex: int, | |
| num_rft: int, | |
| num_predators: int, | |
| group_coherence: float, | |
| sense_noise_prob: float, | |
| collapse_threshold: float, | |
| alpha: float, | |
| beta: float, | |
| dt_internal: float, | |
| energy_max: float, | |
| base_collapse_cost: float, | |
| energy_regen: float, | |
| boost_prob: float, | |
| boost_amount: float, | |
| show_heatmap: bool | |
| ): | |
| set_seed(seed) | |
| move_kernel = np.array([[0, 0.2, 0], | |
| [0.2, 0.2, 0.2], | |
| [0, 0.2, 0]], dtype=float) | |
| reflex_agents = [ReflexAgent(grid_size) for _ in range(int(num_reflex))] | |
| rft_agents = [ | |
| RFTAdaptiveAgent( | |
| grid_size=grid_size, | |
| move_kernel=move_kernel, | |
| energy_max=energy_max, | |
| energy_regen=energy_regen, | |
| base_collapse_cost=base_collapse_cost, | |
| boost_prob=boost_prob, | |
| boost_amount=boost_amount, | |
| sense_noise_prob=sense_noise_prob, | |
| alpha=alpha, | |
| beta=beta, | |
| dt_internal=dt_internal, | |
| collapse_threshold=collapse_threshold | |
| ) | |
| for _ in range(int(num_rft)) | |
| ] | |
| predators = [Predator(grid_size) for _ in range(int(num_predators))] | |
| rows = [] | |
| ops_proxy = 0 | |
| for t in range(int(steps)): | |
| for p in predators: | |
| p.move() | |
| for a in reflex_agents: | |
| a.move() | |
| for p in predators: | |
| if a.x == p.x and a.y == p.y: | |
| a.collisions += 1 | |
| actions = [] | |
| probs = [] | |
| threats = [] | |
| for a in rft_agents: | |
| acted, P_act, threat = a.step_rft(predators, group_coherence) | |
| a.regen_energy() | |
| actions.append(acted) | |
| probs.append(P_act) | |
| threats.append(threat) | |
| for p in predators: | |
| if a.x == p.x and a.y == p.y: | |
| a.collisions += 1 | |
| ops_proxy += 18 | |
| reflex_collisions = int(sum(a.collisions for a in reflex_agents)) | |
| rft_collisions = int(sum(a.collisions for a in rft_agents)) | |
| avg_actions = float(np.mean([a.collapse_actions for a in rft_agents])) if rft_agents else 0.0 | |
| avg_energy = float(np.mean([a.energy for a in rft_agents])) if rft_agents else 0.0 | |
| avg_threat = float(np.mean(threats)) if threats else 0.0 | |
| avg_prob = float(np.mean(probs)) if probs else 0.0 | |
| avg_act_rate = float(np.mean(actions)) if actions else 0.0 | |
| rows.append({ | |
| "t": t, | |
| "reflex_collisions_cum": reflex_collisions, | |
| "rft_collisions_cum": rft_collisions, | |
| "avg_rft_actions": avg_actions, | |
| "avg_rft_energy": avg_energy, | |
| "avg_rft_threat": avg_threat, | |
| "avg_rft_P_action": avg_prob, | |
| "avg_rft_action_rate": avg_act_rate, | |
| "predators_positions": "|".join([f"{p.x},{p.y}" for p in predators]), | |
| }) | |
| df = pd.DataFrame(rows) | |
| csv_path = df_to_csv_file(df, f"predator_log_seed{seed}.csv") | |
| fig1 = plt.figure(figsize=(10, 4)) | |
| ax = fig1.add_subplot(111) | |
| ax.plot(df["t"], df["reflex_collisions_cum"], label="Reflex collisions (cum)") | |
| ax.plot(df["t"], df["rft_collisions_cum"], label="RFT collisions (cum)") | |
| ax.set_title("Predator Avoidance: Collisions (Reflex vs RFT)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("collisions (cum)") | |
| ax.legend() | |
| p_col = save_plot(fig1, f"predator_collisions_seed{seed}.png") | |
| fig2 = plt.figure(figsize=(10, 4)) | |
| ax = fig2.add_subplot(111) | |
| ax.plot(df["t"], df["avg_rft_actions"], label="Avg actions (RFT)") | |
| ax.plot(df["t"], df["avg_rft_energy"], label="Avg energy (RFT)") | |
| ax.set_title("Predator Avoidance: Actions + Energy (RFT)") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("value") | |
| ax.legend() | |
| p_act = save_plot(fig2, f"predator_actions_energy_seed{seed}.png") | |
| fig3 = plt.figure(figsize=(10, 4)) | |
| ax = fig3.add_subplot(111) | |
| ax.plot(df["t"], df["avg_rft_threat"], label="Avg threat") | |
| ax.plot(df["t"], df["avg_rft_P_action"], label="Avg P_action") | |
| ax.plot(df["t"], df["avg_rft_action_rate"], label="Avg action rate") | |
| ax.set_title("Predator Avoidance: Threat vs P_action vs Action Rate") | |
| ax.set_xlabel("t (step)") | |
| ax.set_ylabel("value") | |
| ax.legend() | |
| p_thr = save_plot(fig3, f"predator_threat_seed{seed}.png") | |
| heatmap_path = None | |
| if show_heatmap and len(rft_agents) > 0: | |
| field = rft_agents[0].pos_prob | |
| fig4 = plt.figure(figsize=(6, 5)) | |
| ax = fig4.add_subplot(111) | |
| im = ax.imshow(field, aspect="auto") | |
| ax.set_title("RFT Agent[0]: Final probability field (pos_prob)") | |
| ax.set_xlabel("y") | |
| ax.set_ylabel("x") | |
| fig4.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
| heatmap_path = save_plot(fig4, f"predator_probfield_seed{seed}.png") | |
| summary = { | |
| "seed": int(seed), | |
| "grid_size": int(grid_size), | |
| "steps": int(steps), | |
| "num_reflex": int(num_reflex), | |
| "num_rft": int(num_rft), | |
| "num_predators": int(num_predators), | |
| "final_reflex_collisions": int(df["reflex_collisions_cum"].iloc[-1]) if len(df) else 0, | |
| "final_rft_collisions": int(df["rft_collisions_cum"].iloc[-1]) if len(df) else 0, | |
| "final_avg_rft_actions": float(df["avg_rft_actions"].iloc[-1]) if len(df) else 0.0, | |
| "final_avg_rft_energy": float(df["avg_rft_energy"].iloc[-1]) if len(df) else 0.0, | |
| "ops_proxy": int(ops_proxy), | |
| } | |
| imgs = [p_col, p_act, p_thr] | |
| if heatmap_path is not None: | |
| imgs.append(heatmap_path) | |
| return summary, imgs, csv_path | |
| # ----------------------------- | |
| # Benchmarks | |
| # ----------------------------- | |
| def run_benchmarks( | |
| seed: int, | |
| neo_steps: int, neo_dt: float, neo_alert_km: float, neo_noise_km: float, | |
| jit_steps: int, jit_dt: float, jit_noise: float, | |
| land_steps: int, land_dt: float, land_wind: float, land_thrust_noise: float, | |
| tau_gain: float | |
| ): | |
| seed = int(seed) | |
| s_rft, _, neo_imgs, neo_csv = simulate_neo( | |
| seed=seed, | |
| steps=neo_steps, | |
| dt=neo_dt, | |
| alert_km=neo_alert_km, | |
| noise_km=neo_noise_km, | |
| rft_conf_threshold=0.55, | |
| tau_gain=tau_gain, | |
| show_debug=False | |
| ) | |
| neo_df = pd.read_csv(neo_csv) | |
| neo_base = int(neo_df["baseline_alert"].sum()) | |
| neo_rft = int(neo_df["rft_alert"].sum()) | |
| neo_candidates = int(neo_df["rft_candidate"].sum()) | |
| j_sum, jit_imgs, jit_csv = simulate_jitter( | |
| seed=seed, | |
| steps=jit_steps, | |
| dt=jit_dt, | |
| noise=jit_noise, | |
| baseline_kp=0.35, | |
| rft_kp=0.55, | |
| gate_threshold=0.55, | |
| tau_gain=tau_gain | |
| ) | |
| l_sum, land_imgs, land_csv = simulate_landing( | |
| seed=seed, | |
| steps=land_steps, | |
| dt=land_dt, | |
| wind_max=land_wind, | |
| thrust_noise=land_thrust_noise, | |
| kp_baseline=0.06, | |
| kp_rft=0.10, | |
| gate_threshold=0.55, | |
| tau_gain=tau_gain, | |
| goal_m=10.0 | |
| ) | |
| score = pd.DataFrame([ | |
| { | |
| "Module": "NEO", | |
| "Baseline alerts": neo_base, | |
| "RFT candidates": neo_candidates, | |
| "RFT filtered alerts": neo_rft, | |
| "False-positive proxy reduction %": 100.0 * (1.0 - (neo_rft / max(neo_candidates, 1))), | |
| "Energy/compute proxy": int(s_rft["ops_proxy"]) | |
| }, | |
| { | |
| "Module": "Satellite Jitter", | |
| "Baseline alerts": "", | |
| "RFT candidates": "", | |
| "RFT filtered alerts": "", | |
| "False-positive proxy reduction %": "", | |
| "Energy/compute proxy": int(j_sum["ops_proxy"]) | |
| }, | |
| { | |
| "Module": "Landing", | |
| "Baseline alerts": "", | |
| "RFT candidates": "", | |
| "RFT filtered alerts": "", | |
| "False-positive proxy reduction %": "", | |
| "Energy/compute proxy": int(l_sum["ops_proxy"]) | |
| }, | |
| ]) | |
| score_path = df_to_csv_file(score, f"bench_score_seed{seed}.csv") | |
| txt = ( | |
| f"Benchmarks (seed={seed})\n" | |
| f"- NEO: baseline alerts={neo_base}, RFT candidates={neo_candidates}, RFT filtered={neo_rft}\n" | |
| f"- Jitter: jitter RMS={j_sum['jitter_rms']:.4f}, duty reduction={j_sum['duty_reduction_%']:.1f}%\n" | |
| f"- Landing: final offset={l_sum['final_landing_offset_m']:.2f} m (goal 10 m), anomalies={l_sum['total_anomalies_detected']}, actions={l_sum['total_control_actions']}\n" | |
| ) | |
| all_imgs = neo_imgs + jit_imgs + land_imgs | |
| return txt, score, score_path, all_imgs, [neo_csv, jit_csv, land_csv] | |
| # ----------------------------- | |
| # UI text blocks | |
| # ----------------------------- | |
| HOME_MD = """ | |
| # Rendered Frame Theory (RFT) — Agent Console | |
| This Space is meant to be transparent, reproducible, and benchmarkable. | |
| Run it. Change parameters. Break it. Compare baseline vs RFT. | |
| Core idea: | |
| **Decision timing matters.** | |
| RFT treats timing (τ_eff), uncertainty, and action “collapse” as first-class controls. | |
| This Space contains: | |
| - **NEO alerting** | |
| - **Satellite jitter reduction** | |
| - **Starship-style landing harness** | |
| - **Predator avoidance** (Reflex vs RFT-style adaptive agents) | |
| No SciPy. No hidden dependencies. No model weights. | |
| """ | |
| LIVE_MD = """ | |
| # Live Console | |
| Run everything quickly and export logs. | |
| - deterministic runs (seeded) | |
| - plots saved | |
| - CSV logs exported | |
| - baseline vs RFT comparisons available | |
| """ | |
| THEORY_PRACTICE_MD = """ | |
| # Theory → Practice (how I implement RFT here) | |
| ## 1) Uncertainty | |
| Explicit uncertainty proxy from noise + disturbance scale. | |
| ## 2) Confidence | |
| confidence = 1 − uncertainty (clipped 0..1). | |
| ## 3) Adaptive τ_eff | |
| Higher uncertainty → higher τ_eff. | |
| ## 4) Collapse gate | |
| Act only when the gate passes: | |
| - confidence exceeds a threshold | |
| - τ_eff increases strictness under uncertainty | |
| ## 5) Why it matters | |
| Baseline controllers often act constantly. | |
| RFT tries to act less often, but more decisively. | |
| """ | |
| MATH_MD = r""" | |
| # Mathematics (minimal and implementation-linked) | |
| u ∈ [0,1] : uncertainty proxy | |
| C ∈ [0,1] : confidence proxy | |
| τ_eff ≥ 1 : effective decision timing factor | |
| Confidence: | |
| \[ | |
| C = \text{clip}(1 - u, 0, 1) | |
| \] | |
| Adaptive τ_eff: | |
| \[ | |
| \tau_{\text{eff}} = \text{clip}(1 + 1.0 + g\cdot u,\; 1,\; \tau_{\max}) | |
| \] | |
| Collapse gate (concept): | |
| \[ | |
| \text{Gate} = \left[C \ge \theta + k(\tau_{\text{eff}}-1)\right] | |
| \] | |
| """ | |
| INVESTOR_MD = """ | |
| # Investor / Agency Walkthrough | |
| What this Space demonstrates: | |
| - alert filtering (NEO) | |
| - stabilisation (jitter reduction) | |
| - anomaly-aware control (landing harness) | |
| - threat-aware avoidance (predator demo) | |
| What it is not: | |
| - not flight-certified | |
| - not a production pipeline | |
| - not a claim that anyone is using it | |
| What makes it production-grade: | |
| - real sensor ingestion + timing constraints | |
| - hardware-in-loop testing | |
| - dataset validation | |
| """ | |
| REPRO_MD = """ | |
| # Reproducibility & Logs | |
| Everything is reproducible: | |
| - set seed | |
| - run | |
| - export CSV | |
| - verify plots + metrics | |
| CSV schema is explicit in the exports. | |
| """ | |
| # ----------------------------- | |
| # Gradio UI helpers | |
| # ----------------------------- | |
| def ui_run_neo(seed, steps, dt, alert_km, noise_km, rft_conf_th, tau_gain, show_debug): | |
| summary, debug_lines, imgs, csv_path = simulate_neo( | |
| seed=int(seed), | |
| steps=int(steps), | |
| dt=float(dt), | |
| alert_km=float(alert_km), | |
| noise_km=float(noise_km), | |
| rft_conf_threshold=float(rft_conf_th), | |
| tau_gain=float(tau_gain), | |
| show_debug=bool(show_debug), | |
| ) | |
| summary_txt = json.dumps(summary, indent=2) | |
| return summary_txt, debug_lines, imgs[0], imgs[1], imgs[2], csv_path | |
| def ui_run_jitter(seed, steps, dt, noise, baseline_kp, rft_kp, gate_th, tau_gain): | |
| summary, imgs, csv_path = simulate_jitter( | |
| seed=int(seed), | |
| steps=int(steps), | |
| dt=float(dt), | |
| noise=float(noise), | |
| baseline_kp=float(baseline_kp), | |
| rft_kp=float(rft_kp), | |
| gate_threshold=float(gate_th), | |
| tau_gain=float(tau_gain), | |
| ) | |
| summary_txt = json.dumps(summary, indent=2) | |
| return summary_txt, imgs[0], imgs[1], imgs[2], csv_path | |
| def ui_run_landing(seed, steps, dt, wind_max, thrust_noise, kp_base, kp_rft, gate_th, tau_gain, goal_m): | |
| summary, imgs, csv_path = simulate_landing( | |
| seed=int(seed), | |
| steps=int(steps), | |
| dt=float(dt), | |
| wind_max=float(wind_max), | |
| thrust_noise=float(thrust_noise), | |
| kp_baseline=float(kp_base), | |
| kp_rft=float(kp_rft), | |
| gate_threshold=float(gate_th), | |
| tau_gain=float(tau_gain), | |
| goal_m=float(goal_m), | |
| ) | |
| summary_txt = json.dumps(summary, indent=2) | |
| return summary_txt, imgs[0], imgs[1], imgs[2], imgs[3], csv_path | |
| def ui_run_predator(seed, grid_size, steps, num_reflex, num_rft, num_predators, | |
| group_coherence, sense_noise_prob, collapse_threshold, | |
| alpha, beta, dt_internal, | |
| energy_max, base_collapse_cost, energy_regen, | |
| boost_prob, boost_amount, | |
| show_heatmap): | |
| summary, imgs, csv_path = simulate_predator( | |
| seed=int(seed), | |
| grid_size=int(grid_size), | |
| steps=int(steps), | |
| num_reflex=int(num_reflex), | |
| num_rft=int(num_rft), | |
| num_predators=int(num_predators), | |
| group_coherence=float(group_coherence), | |
| sense_noise_prob=float(sense_noise_prob), | |
| collapse_threshold=float(collapse_threshold), | |
| alpha=float(alpha), | |
| beta=float(beta), | |
| dt_internal=float(dt_internal), | |
| energy_max=float(energy_max), | |
| base_collapse_cost=float(base_collapse_cost), | |
| energy_regen=float(energy_regen), | |
| boost_prob=float(boost_prob), | |
| boost_amount=float(boost_amount), | |
| show_heatmap=bool(show_heatmap) | |
| ) | |
| summary_txt = json.dumps(summary, indent=2) | |
| img1 = imgs[0] if len(imgs) > 0 else None | |
| img2 = imgs[1] if len(imgs) > 1 else None | |
| img3 = imgs[2] if len(imgs) > 2 else None | |
| img4 = imgs[3] if len(imgs) > 3 else None | |
| return summary_txt, img1, img2, img3, img4, csv_path | |
| def ui_run_bench(seed, neo_steps, neo_dt, neo_alert_km, neo_noise_km, jit_steps, jit_dt, jit_noise, land_steps, land_dt, land_wind, land_thrust_noise, tau_gain): | |
| txt, score_df, score_csv, imgs, logs = run_benchmarks( | |
| seed=int(seed), | |
| neo_steps=int(neo_steps), neo_dt=float(neo_dt), neo_alert_km=float(neo_alert_km), neo_noise_km=float(neo_noise_km), | |
| jit_steps=int(jit_steps), jit_dt=float(jit_dt), jit_noise=float(jit_noise), | |
| land_steps=int(land_steps), land_dt=float(land_dt), land_wind=float(land_wind), land_thrust_noise=float(land_thrust_noise), | |
| tau_gain=float(tau_gain) | |
| ) | |
| return ( | |
| txt, score_df, score_csv, | |
| imgs[0], imgs[1], imgs[2], imgs[3], imgs[4], imgs[5], imgs[6], imgs[7], imgs[8], imgs[9], | |
| logs[0], logs[1], logs[2] | |
| ) | |
| # ----------------------------- | |
| # Gradio UI | |
| # ----------------------------- | |
| with gr.Blocks(title="RFT — Agent Console (NEO / Jitter / Landing / Predator)") as demo: | |
| gr.Markdown(HOME_MD) | |
| with gr.Tabs(): | |
| with gr.Tab("Live Console"): | |
| gr.Markdown(LIVE_MD) | |
| with gr.Row(): | |
| seed_live = gr.Number(value=42, precision=0, label="Seed (reproducible)") | |
| tau_gain_live = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain (global)") | |
| with gr.Accordion("Benchmark settings", open=True): | |
| with gr.Row(): | |
| neo_steps = gr.Slider(50, 400, value=120, step=1, label="NEO steps") | |
| neo_dt = gr.Slider(0.5, 3.0, value=1.0, step=0.1, label="NEO dt") | |
| neo_alert = gr.Slider(1000, 20000, value=5000, step=50, label="NEO alert radius (km)") | |
| neo_noise = gr.Slider(0.0, 200.0, value=35.0, step=1.0, label="NEO measurement noise (km)") | |
| with gr.Row(): | |
| jit_steps = gr.Slider(100, 1200, value=500, step=1, label="Jitter steps") | |
| jit_dt = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="Jitter dt") | |
| jit_noise = gr.Slider(0.0, 0.5, value=0.08, step=0.01, label="Jitter noise") | |
| with gr.Row(): | |
| land_steps = gr.Slider(40, 400, value=120, step=1, label="Landing steps") | |
| land_dt = gr.Slider(0.2, 2.0, value=1.0, step=0.1, label="Landing dt") | |
| land_wind = gr.Slider(0.0, 25.0, value=15.0, step=0.5, label="Landing wind max (m/s)") | |
| land_thrust_noise = gr.Slider(0.0, 10.0, value=3.0, step=0.1, label="Landing thrust noise") | |
| run_b = gr.Button("Run Full Benchmarks (Baseline vs RFT)") | |
| bench_txt = gr.Textbox(label="Benchmark summary", lines=6) | |
| bench_table = gr.Dataframe(label="Scorecard (CSV also exported)") | |
| bench_score_csv = gr.File(label="Download: benchmark scorecard CSV") | |
| with gr.Row(): | |
| img1 = gr.Image(label="NEO: Distance") | |
| img2 = gr.Image(label="NEO: Confidence & τ_eff") | |
| img3 = gr.Image(label="NEO: Alerts") | |
| with gr.Row(): | |
| img4 = gr.Image(label="Jitter: Residual") | |
| img5 = gr.Image(label="Jitter: Duty") | |
| img6 = gr.Image(label="Jitter: τ_eff & confidence") | |
| with gr.Row(): | |
| img7 = gr.Image(label="Landing: Altitude") | |
| img8 = gr.Image(label="Landing: Offset") | |
| img9 = gr.Image(label="Landing: Wind") | |
| img10 = gr.Image(label="Landing: anomaly vs action timeline") | |
| neo_log = gr.File(label="Download: NEO log CSV") | |
| jit_log = gr.File(label="Download: Jitter log CSV") | |
| land_log = gr.File(label="Download: Landing log CSV") | |
| run_b.click( | |
| ui_run_bench, | |
| inputs=[seed_live, neo_steps, neo_dt, neo_alert, neo_noise, jit_steps, jit_dt, jit_noise, land_steps, land_dt, land_wind, land_thrust_noise, tau_gain_live], | |
| outputs=[ | |
| bench_txt, bench_table, bench_score_csv, | |
| img1, img2, img3, img4, img5, img6, img7, img8, img9, img10, | |
| neo_log, jit_log, land_log | |
| ] | |
| ) | |
| with gr.Tab("NEO Agent"): | |
| gr.Markdown( | |
| "# Near-Earth Object (NEO) Alerting Agent\n" | |
| "Baseline: distance threshold only.\n" | |
| "RFT: distance threshold + confidence + τ_eff collapse gate.\n" | |
| ) | |
| with gr.Row(): | |
| seed_neo = gr.Number(value=42, precision=0, label="Seed") | |
| steps_neo = gr.Slider(50, 400, value=120, step=1, label="Steps") | |
| dt_neo = gr.Slider(0.5, 3.0, value=1.0, step=0.1, label="dt") | |
| with gr.Row(): | |
| alert_km = gr.Slider(1000, 20000, value=5000, step=50, label="Alert threshold (km)") | |
| noise_km = gr.Slider(0.0, 200.0, value=35.0, step=1.0, label="Measurement noise (km)") | |
| rft_conf_th = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="RFT confidence threshold") | |
| tau_gain = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") | |
| show_debug = gr.Checkbox(value=False, label="Show debug table (first rows)") | |
| run_neo = gr.Button("Run NEO Simulation") | |
| out_neo_summary = gr.Textbox(label="Summary JSON", lines=12) | |
| out_neo_debug = gr.Textbox(label="Debug", lines=10) | |
| with gr.Row(): | |
| out_neo_img1 = gr.Image(label="Distance vs time") | |
| out_neo_img2 = gr.Image(label="Confidence and τ_eff") | |
| out_neo_img3 = gr.Image(label="Alerts timeline") | |
| out_neo_csv = gr.File(label="Download NEO CSV log") | |
| run_neo.click( | |
| ui_run_neo, | |
| inputs=[seed_neo, steps_neo, dt_neo, alert_km, noise_km, rft_conf_th, tau_gain, show_debug], | |
| outputs=[out_neo_summary, out_neo_debug, out_neo_img1, out_neo_img2, out_neo_img3, out_neo_csv] | |
| ) | |
| with gr.Tab("Satellite Jitter Agent"): | |
| gr.Markdown( | |
| "# Satellite Jitter Reduction\n" | |
| "Baseline: continuous correction.\n" | |
| "RFT: gated correction using confidence + τ_eff.\n" | |
| ) | |
| with gr.Row(): | |
| seed_j = gr.Number(value=42, precision=0, label="Seed") | |
| steps_j = gr.Slider(100, 1200, value=500, step=1, label="Steps") | |
| dt_j = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="dt") | |
| noise_j = gr.Slider(0.0, 0.5, value=0.08, step=0.01, label="Noise") | |
| with gr.Row(): | |
| base_kp = gr.Slider(0.0, 1.0, value=0.35, step=0.01, label="Baseline kp") | |
| rft_kp = gr.Slider(0.0, 1.5, value=0.55, step=0.01, label="RFT kp") | |
| gate_th = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="Gate threshold") | |
| tau_gain_j = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") | |
| run_j = gr.Button("Run Jitter Simulation") | |
| out_j_summary = gr.Textbox(label="Summary JSON", lines=10) | |
| with gr.Row(): | |
| out_j_img1 = gr.Image(label="Residual jitter") | |
| out_j_img2 = gr.Image(label="Duty timeline") | |
| out_j_img3 = gr.Image(label="τ_eff & confidence") | |
| out_j_csv = gr.File(label="Download Jitter CSV log") | |
| run_j.click( | |
| ui_run_jitter, | |
| inputs=[seed_j, steps_j, dt_j, noise_j, base_kp, rft_kp, gate_th, tau_gain_j], | |
| outputs=[out_j_summary, out_j_img1, out_j_img2, out_j_img3, out_j_csv] | |
| ) | |
| with gr.Tab("Starship Landing Harness"): | |
| gr.Markdown( | |
| "# Starship-style Landing Harness (Simplified)\n" | |
| "This is not a flight model. It’s a timing-control harness.\n" | |
| ) | |
| with gr.Row(): | |
| seed_l = gr.Number(value=42, precision=0, label="Seed") | |
| steps_l = gr.Slider(40, 400, value=120, step=1, label="Steps") | |
| dt_l = gr.Slider(0.2, 2.0, value=1.0, step=0.1, label="dt") | |
| with gr.Row(): | |
| wind_max = gr.Slider(0.0, 25.0, value=15.0, step=0.5, label="Wind max (m/s)") | |
| thrust_noise = gr.Slider(0.0, 10.0, value=3.0, step=0.1, label="Thrust noise") | |
| kp_base_l = gr.Slider(0.0, 0.2, value=0.06, step=0.005, label="Baseline kp") | |
| kp_rft_l = gr.Slider(0.0, 0.25, value=0.10, step=0.005, label="RFT kp") | |
| with gr.Row(): | |
| gate_th_l = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="Gate threshold") | |
| tau_gain_l = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") | |
| goal_m = gr.Slider(1.0, 50.0, value=10.0, step=0.5, label="Landing goal (m)") | |
| run_l = gr.Button("Run Landing Simulation") | |
| out_l_summary = gr.Textbox(label="Summary JSON", lines=10) | |
| with gr.Row(): | |
| out_l_img1 = gr.Image(label="Altitude") | |
| out_l_img2 = gr.Image(label="Offset") | |
| out_l_img3 = gr.Image(label="Wind") | |
| out_l_img4 = gr.Image(label="Anomaly vs Action timeline") | |
| out_l_csv = gr.File(label="Download Landing CSV log") | |
| run_l.click( | |
| ui_run_landing, | |
| inputs=[seed_l, steps_l, dt_l, wind_max, thrust_noise, kp_base_l, kp_rft_l, gate_th_l, tau_gain_l, goal_m], | |
| outputs=[out_l_summary, out_l_img1, out_l_img2, out_l_img3, out_l_img4, out_l_csv] | |
| ) | |
| with gr.Tab("Predator Avoidance"): | |
| gr.Markdown( | |
| "# Predator Avoidance (Reflex vs RFT)\n" | |
| "Grid world with roaming predators.\n" | |
| "Reflex agents: random walk.\n" | |
| "RFT agents: probability field + threat-weighted collapse action.\n" | |
| ) | |
| with gr.Row(): | |
| seed_p = gr.Number(value=42, precision=0, label="Seed") | |
| grid_size = gr.Slider(10, 60, value=20, step=1, label="Grid size") | |
| steps_p = gr.Slider(50, 1500, value=200, step=1, label="Steps") | |
| with gr.Row(): | |
| num_reflex = gr.Slider(0, 50, value=10, step=1, label="Reflex agents") | |
| num_rft = gr.Slider(0, 20, value=3, step=1, label="RFT agents") | |
| num_predators = gr.Slider(1, 20, value=3, step=1, label="Predators") | |
| with gr.Accordion("RFT / Agent parameters", open=True): | |
| with gr.Row(): | |
| group_coherence = gr.Slider(0.0, 0.95, value=0.30, step=0.01, label="Group coherence") | |
| sense_noise_prob = gr.Slider(0.0, 0.9, value=0.10, step=0.01, label="Sense noise probability") | |
| collapse_threshold = gr.Slider(0.0, 1.0, value=0.02, step=0.005, label="Collapse threshold (P_action)") | |
| with gr.Row(): | |
| alpha = gr.Slider(0.0, 50.0, value=15.0, step=0.5, label="alpha (threat gain)") | |
| beta = gr.Slider(0.0, 10.0, value=0.5, step=0.05, label="beta (energy term)") | |
| dt_internal = gr.Slider(0.01, 1.0, value=0.2, step=0.01, label="internal dt") | |
| with gr.Row(): | |
| energy_max = gr.Slider(1.0, 300.0, value=100.0, step=1.0, label="Energy max") | |
| base_collapse_cost = gr.Slider(0.0, 10.0, value=1.0, step=0.1, label="Base action cost") | |
| energy_regen = gr.Slider(0.0, 1.0, value=0.05, step=0.01, label="Energy regen") | |
| with gr.Row(): | |
| boost_prob = gr.Slider(0.0, 1.0, value=0.10, step=0.01, label="Boost probability") | |
| boost_amount = gr.Slider(0.0, 50.0, value=5.0, step=0.5, label="Boost amount") | |
| show_heatmap = gr.Checkbox(value=True, label="Show probability field heatmap (agent[0])") | |
| run_p = gr.Button("Run Predator Simulation") | |
| out_p_summary = gr.Textbox(label="Summary JSON", lines=12) | |
| with gr.Row(): | |
| out_p_img1 = gr.Image(label="Collisions (cumulative)") | |
| out_p_img2 = gr.Image(label="Actions + Energy") | |
| with gr.Row(): | |
| out_p_img3 = gr.Image(label="Threat / P_action / Action rate") | |
| out_p_img4 = gr.Image(label="Final probability field (optional)") | |
| out_p_csv = gr.File(label="Download Predator CSV log") | |
| run_p.click( | |
| ui_run_predator, | |
| inputs=[seed_p, grid_size, steps_p, num_reflex, num_rft, num_predators, | |
| group_coherence, sense_noise_prob, collapse_threshold, | |
| alpha, beta, dt_internal, | |
| energy_max, base_collapse_cost, energy_regen, | |
| boost_prob, boost_amount, | |
| show_heatmap], | |
| outputs=[out_p_summary, out_p_img1, out_p_img2, out_p_img3, out_p_img4, out_p_csv] | |
| ) | |
| with gr.Tab("Theory → Practice"): | |
| gr.Markdown(THEORY_PRACTICE_MD) | |
| with gr.Tab("Mathematics"): | |
| gr.Markdown(MATH_MD) | |
| with gr.Tab("Investor / Agency Walkthrough"): | |
| gr.Markdown(INVESTOR_MD) | |
| with gr.Tab("Reproducibility & Logs"): | |
| gr.Markdown(REPRO_MD) | |
| demo.queue().launch() |