import os import math import json import random import numpy as np import pandas as pd import matplotlib.pyplot as plt import gradio as gr # =============================================================== # Rendered Frame Theory (RFT) — Agent Console (All-in-One Space) # Author: Liam Grinstead # Purpose: Transparent, reproducible, benchmarkable agent demos # Dependencies: numpy, pandas, matplotlib, gradio (NO scipy) # =============================================================== OUTDIR = "outputs" os.makedirs(OUTDIR, exist_ok=True) # ----------------------------- # Shared utilities # ----------------------------- def set_seed(seed: int): seed = int(seed) % (2**32 - 1) np.random.seed(seed) random.seed(seed) def clamp(x, lo, hi): return max(lo, min(hi, x)) def save_plot(fig, name: str): path = os.path.join(OUTDIR, name) fig.savefig(path, dpi=150, bbox_inches="tight") plt.close(fig) return path def df_to_csv_file(df: pd.DataFrame, name: str): path = os.path.join(OUTDIR, name) df.to_csv(path, index=False) return path # ----------------------------- # RFT Core: τ_eff + gating # ----------------------------- def tau_eff_adaptive( uncertainty: float, base: float = 1.0, slow_by: float = 1.0, gain: float = 1.2, cap: float = 4.0 ): u = clamp(float(uncertainty), 0.0, 1.0) tau = base + slow_by + gain * u return clamp(tau, base, cap) def rft_confidence(uncertainty: float): return clamp(1.0 - float(uncertainty), 0.0, 1.0) def rft_gate(conf: float, tau_eff: float, threshold: float): conf = float(conf) tau_eff = float(tau_eff) effective = threshold + 0.08 * (tau_eff - 1.0) return conf >= clamp(effective, 0.0, 0.999) # ----------------------------- # NEO Simulation # ----------------------------- def simulate_neo( seed: int, steps: int, dt: float, alert_km: float, noise_km: float, rft_conf_threshold: float, tau_gain: float, show_debug: bool ): set_seed(seed) pos = np.array([9000.0, 2500.0, 1000.0], dtype=float) # km vel = np.array([-55.0, -8.0, -3.0], dtype=float) # km/step (scaled) rows = [] alerts_baseline = 0 alerts_rft_raw = 0 alerts_rft_filtered = 0 ops_proxy = 0 for t in range(int(steps)): drift = 0.05 * np.array([math.sin(0.03*t), math.cos(0.02*t), math.sin(0.015*t)]) pos_true = pos + vel * dt + drift meas = pos_true + np.random.normal(0.0, noise_km, size=3) dist = float(np.linalg.norm(meas)) speed = float(np.linalg.norm(vel)) uncertainty = clamp((noise_km / max(alert_km, 1.0)) * 2.0 + (speed / 200.0) * 0.2, 0.0, 1.0) baseline_alert = dist <= alert_km if baseline_alert: alerts_baseline += 1 tau = tau_eff_adaptive(uncertainty=uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) conf = rft_confidence(uncertainty) rft_candidate = dist <= alert_km rft_alert = bool(rft_candidate and rft_gate(conf, tau, rft_conf_threshold)) if rft_candidate: alerts_rft_raw += 1 if rft_alert: alerts_rft_filtered += 1 ops_proxy += 12 rows.append({ "t": t, "dt": dt, "x_km": meas[0], "y_km": meas[1], "z_km": meas[2], "dist_km": dist, "noise_km": noise_km, "uncertainty": uncertainty, "tau_eff": tau, "confidence": conf, "baseline_alert": int(baseline_alert), "rft_candidate": int(rft_candidate), "rft_alert": int(rft_alert), }) pos = pos_true df = pd.DataFrame(rows) fig1 = plt.figure(figsize=(10, 4)) ax = fig1.add_subplot(111) ax.plot(df["t"], df["dist_km"]) ax.axhline(alert_km, linestyle="--") ax.set_title("NEO: Distance to target vs time") ax.set_xlabel("t (step)") ax.set_ylabel("distance (km)") p_dist = save_plot(fig1, f"neo_distance_seed{seed}.png") fig2 = plt.figure(figsize=(10, 4)) ax = fig2.add_subplot(111) ax.plot(df["t"], df["confidence"]) ax.plot(df["t"], df["tau_eff"]) ax.set_title("NEO: Confidence and τ_eff (Adaptive)") ax.set_xlabel("t (step)") ax.set_ylabel("value") p_conf = save_plot(fig2, f"neo_conf_tau_seed{seed}.png") fig3 = plt.figure(figsize=(10, 3)) ax = fig3.add_subplot(111) ax.step(df["t"], df["baseline_alert"], where="post") ax.step(df["t"], df["rft_alert"], where="post") ax.set_title("NEO: Alerts (Baseline vs RFT)") ax.set_xlabel("t (step)") ax.set_ylabel("alert (0/1)") p_alerts = save_plot(fig3, f"neo_alerts_seed{seed}.png") csv_path = df_to_csv_file(df, f"neo_log_seed{seed}.csv") summary = { "seed": int(seed), "steps": int(steps), "alert_km": float(alert_km), "baseline_alerts": int(alerts_baseline), "rft_candidates": int(alerts_rft_raw), "rft_alerts_filtered": int(alerts_rft_filtered), "false_positive_proxy_reduction_%": float( 100.0 * (1.0 - (alerts_rft_filtered / max(alerts_rft_raw, 1))) ), "ops_proxy": int(ops_proxy), } debug_lines = "" if show_debug: debug_lines = "Debug view (first 12 rows):\n" + df.head(12).to_string(index=False) return summary, debug_lines, [p_dist, p_conf, p_alerts], csv_path # ----------------------------- # Satellite Jitter Simulation # ----------------------------- def simulate_jitter( seed: int, steps: int, dt: float, noise: float, baseline_kp: float, rft_kp: float, gate_threshold: float, tau_gain: float ): set_seed(seed) jitter = 0.0 jitter_rate = 0.0 rows = [] duty_baseline = 0 duty_rft = 0 ops_proxy = 0 for t in range(int(steps)): micro = 0.25 * math.sin(0.05 * t) + 0.12 * math.sin(0.13 * t) jitter_rate += np.random.normal(0.0, noise) * 0.08 jitter += jitter_rate * dt + micro + np.random.normal(0.0, noise) u_base = -baseline_kp * jitter jitter_base_next = jitter + u_base * 0.35 duty_baseline += int(abs(u_base) > 0.01) uncertainty = clamp(noise * 3.0, 0.0, 1.0) tau = tau_eff_adaptive(uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) conf = rft_confidence(uncertainty) should_act = rft_gate(conf, tau, gate_threshold) and (abs(jitter) > 0.35) u_rft = (-rft_kp * jitter) if should_act else 0.0 jitter_rft_next = jitter + u_rft * 0.35 duty_rft += int(abs(u_rft) > 0.01) ops_proxy += 10 rows.append({ "t": t, "jitter": jitter, "u_baseline": u_base, "u_rft": u_rft, "baseline_active": int(abs(u_base) > 0.01), "rft_active": int(abs(u_rft) > 0.01), "tau_eff": tau, "confidence": conf, "noise": noise, "jitter_baseline_next": jitter_base_next, "jitter_rft_next": jitter_rft_next, }) jitter = jitter_rft_next jitter_rate *= 0.92 df = pd.DataFrame(rows) def rms(x): return float(np.sqrt(np.mean(np.square(np.asarray(x))))) jitter_rms = rms(df["jitter"].values) duty_b = duty_baseline / max(steps, 1) duty_r = duty_rft / max(steps, 1) fig1 = plt.figure(figsize=(10, 4)) ax = fig1.add_subplot(111) ax.plot(df["t"], df["jitter"]) ax.set_title("Jitter: residual vs time (running RFT plant)") ax.set_xlabel("t (step)") ax.set_ylabel("jitter (arb)") p_jit = save_plot(fig1, f"jitter_residual_seed{seed}.png") fig2 = plt.figure(figsize=(10, 3)) ax = fig2.add_subplot(111) ax.step(df["t"], df["baseline_active"], where="post") ax.step(df["t"], df["rft_active"], where="post") ax.set_title("Jitter: Actuation duty (Baseline vs RFT gating)") ax.set_xlabel("t (step)") ax.set_ylabel("active (0/1)") p_duty = save_plot(fig2, f"jitter_duty_seed{seed}.png") fig3 = plt.figure(figsize=(10, 4)) ax = fig3.add_subplot(111) ax.plot(df["t"], df["tau_eff"]) ax.plot(df["t"], df["confidence"]) ax.set_title("Jitter: τ_eff and confidence") ax.set_xlabel("t (step)") ax.set_ylabel("value") p_tau = save_plot(fig3, f"jitter_tau_seed{seed}.png") csv_path = df_to_csv_file(df, f"jitter_log_seed{seed}.csv") summary = { "seed": int(seed), "steps": int(steps), "jitter_rms": jitter_rms, "baseline_duty_ratio": float(duty_b), "rft_duty_ratio": float(duty_r), "duty_reduction_%": float(100.0 * (1.0 - (duty_r / max(duty_b, 1e-9)))), "ops_proxy": int(ops_proxy), } return summary, [p_jit, p_duty, p_tau], csv_path # ----------------------------- # Starship-style Landing Harness (2D) # ----------------------------- def simulate_landing( seed: int, steps: int, dt: float, wind_max: float, thrust_noise: float, kp_baseline: float, kp_rft: float, gate_threshold: float, tau_gain: float, goal_m: float ): set_seed(seed) alt = 1000.0 vv = -45.0 x = 60.0 xv = 0.0 ix = 0.0 anomalies = 0 actions = 0 ops_proxy = 0 rows = [] g = -9.81 LAT_CTRL = 0.95 WIND_PUSH = 0.28 VERT_CTRL = 0.22 for t in range(int(steps)): gust = math.sin(0.08 * t) + 0.55 * math.sin(0.21 * t + 0.7) wind = (wind_max * 0.75) * gust + np.random.normal(0.0, 0.65) wind = clamp(wind, -wind_max, wind_max) thrust_dev = np.random.normal(0.0, thrust_noise) meas_alt = alt + np.random.normal(0, 0.6) meas_vv = vv + np.random.normal(0, 0.35) meas_x = x + np.random.normal(0, 0.8) meas_xv = xv + np.random.normal(0, 0.25) uncertainty = clamp((abs(thrust_dev) / 5.0) * 0.18 + (abs(wind) / max(wind_max, 1e-9)) * 0.30, 0.0, 1.0) tau = tau_eff_adaptive(uncertainty, base=1.0, slow_by=1.0, gain=tau_gain, cap=4.0) conf = rft_confidence(uncertainty) anomaly_types = [] if abs(wind) > (0.85 * wind_max): anomaly_types.append("High wind") if meas_alt < 200 and abs(meas_x) > 20: anomaly_types.append("High lateral error near ground") if meas_alt < 150 and abs(meas_vv) > 15: anomaly_types.append("High descent rate near ground") is_anomaly = len(anomaly_types) > 0 if is_anomaly: anomalies += 1 u_base_x = -kp_baseline * meas_x - 0.30 * meas_xv u_base_v = -kp_baseline * (meas_vv + 5.0) phase = 1.0 - clamp(meas_alt / 1000.0, 0.0, 1.0) lookahead = 1.0 + 1.6 * phase wind_ff = (WIND_PUSH * wind) / max(LAT_CTRL, 1e-9) if meas_alt < 600: ix = clamp(ix + (meas_x * dt) * 0.0025, -40.0, 40.0) do_action = bool(rft_gate(conf, tau, gate_threshold)) u_rft_x = 0.0 u_rft_v = 0.0 if do_action: u_rft_x = (-kp_rft * lookahead * meas_x) - (0.42 * meas_xv) - (0.20 * ix) - wind_ff u_rft_v = (-kp_rft * lookahead * (meas_vv + 5.0)) actions += 1 u_rft_x = clamp(u_rft_x, -20.0, 20.0) u_rft_v = clamp(u_rft_v, -18.0, 18.0) vv = vv + (g + VERT_CTRL * u_rft_v + 0.09 * thrust_dev) * dt alt = max(0.0, alt + vv * dt) xv = xv + (WIND_PUSH * wind - LAT_CTRL * u_rft_x) * dt x = x + xv * dt ops_proxy += 16 rows.append({ "t": t, "alt_m": alt, "vv_m_s": vv, "x_m": x, "xv_m_s": xv, "wind_m_s": wind, "thrust_dev": thrust_dev, "uncertainty": uncertainty, "tau_eff": tau, "confidence": conf, "anomaly": int(is_anomaly), "anomaly_types": "|".join(anomaly_types) if anomaly_types else "", "action_taken": int(do_action), "u_baseline_x": u_base_x, "u_baseline_v": u_base_v, "u_rft_x": u_rft_x, "u_rft_v": u_rft_v, "ix": ix, "wind_ff": wind_ff, }) if alt <= 0.0: break df = pd.DataFrame(rows) landing_offset = float(abs(df["x_m"].iloc[-1])) if len(df) else 9999.0 fig1 = plt.figure(figsize=(10, 4)) ax = fig1.add_subplot(111) ax.plot(df["t"], df["alt_m"]) ax.set_title("Landing: altitude vs time") ax.set_xlabel("t (step)") ax.set_ylabel("altitude (m)") p_alt = save_plot(fig1, f"landing_alt_seed{seed}.png") fig2 = plt.figure(figsize=(10, 4)) ax = fig2.add_subplot(111) ax.plot(df["t"], df["x_m"]) ax.axhline(goal_m, linestyle="--") ax.axhline(-goal_m, linestyle="--") ax.set_title("Landing: lateral offset vs time (goal band)") ax.set_xlabel("t (step)") ax.set_ylabel("offset (m)") p_x = save_plot(fig2, f"landing_offset_seed{seed}.png") fig3 = plt.figure(figsize=(10, 4)) ax = fig3.add_subplot(111) ax.plot(df["t"], df["wind_m_s"]) ax.set_title("Landing: wind profile (signed gusts)") ax.set_xlabel("t (step)") ax.set_ylabel("wind (m/s)") p_w = save_plot(fig3, f"landing_wind_seed{seed}.png") fig4 = plt.figure(figsize=(10, 3)) ax = fig4.add_subplot(111) ax.step(df["t"], df["anomaly"], where="post") ax.step(df["t"], df["action_taken"], where="post") ax.set_title("Landing: anomaly vs action timeline") ax.set_xlabel("t (step)") ax.set_ylabel("0/1") p_a = save_plot(fig4, f"landing_anomaly_action_seed{seed}.png") csv_path = df_to_csv_file(df, f"landing_log_seed{seed}.csv") summary = { "seed": int(seed), "steps_ran": int(len(df)), "final_landing_offset_m": float(landing_offset), "goal_m": float(goal_m), "hit_goal": bool(landing_offset <= goal_m), "total_anomalies_detected": int(anomalies), "total_control_actions": int(actions), "ops_proxy": int(ops_proxy), } return summary, [p_alt, p_x, p_w, p_a], csv_path # =============================================================== # Predator Avoidance (Reflex vs RFT-style Adaptive Agents) # =============================================================== def numpy_convolve2d_toroidal(array: np.ndarray, kernel: np.ndarray) -> np.ndarray: out = np.zeros_like(array, dtype=float) kcx = kernel.shape[0] // 2 kcy = kernel.shape[1] // 2 rows, cols = array.shape for i in range(rows): for j in range(cols): val = 0.0 for m in range(kernel.shape[0]): for n in range(kernel.shape[1]): x = (i + m - kcx) % rows y = (j + n - kcy) % cols val += array[x, y] * kernel[m, n] out[i, j] = val return out class Predator: def __init__(self, grid_size: int): self.grid_size = grid_size self.x = random.randint(0, grid_size - 1) self.y = random.randint(0, grid_size - 1) def move(self): dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) self.x = (self.x + dx) % self.grid_size self.y = (self.y + dy) % self.grid_size class ReflexAgent: def __init__(self, grid_size: int): self.grid_size = grid_size self.x = random.randint(0, grid_size - 1) self.y = random.randint(0, grid_size - 1) self.collisions = 0 def move(self): dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) self.x = (self.x + dx) % self.grid_size self.y = (self.y + dy) % self.grid_size class RFTAdaptiveAgent: def __init__( self, grid_size: int, move_kernel: np.ndarray, energy_max: float, energy_regen: float, base_collapse_cost: float, boost_prob: float, boost_amount: float, sense_noise_prob: float, alpha: float, beta: float, dt_internal: float, collapse_threshold: float ): self.grid_size = grid_size self.move_kernel = move_kernel.astype(float) self.pos_prob = np.zeros((grid_size, grid_size), dtype=float) x, y = np.random.randint(grid_size), np.random.randint(grid_size) self.pos_prob[x, y] = 1.0 self.x, self.y = int(x), int(y) self.energy_max = float(energy_max) self.energy = float(energy_max) self.energy_regen = float(energy_regen) self.base_collapse_cost = float(base_collapse_cost) self.boost_prob = float(boost_prob) self.boost_amount = float(boost_amount) self.sense_noise_prob = float(sense_noise_prob) self.alpha = float(alpha) self.beta = float(beta) self.dt_internal = float(dt_internal) self.collapse_threshold = float(collapse_threshold) self.psi_action = (0.08 + 0j) # |psi|^2 = 0.0064 baseline self.collapse_actions = 0 self.collisions = 0 def move(self): dx, dy = random.choice([(0,1), (0,-1), (1,0), (-1,0)]) self.x = (self.x + dx) % self.grid_size self.y = (self.y + dy) % self.grid_size self.pos_prob.fill(0.0) self.pos_prob[self.x, self.y] = 1.0 def sense_predators(self, predators): perceived = [] for p in predators: if random.random() < self.sense_noise_prob: continue perceived.append((p.x, p.y)) return perceived def compute_threat(self, perceived): threat = 0.0 radius = 2 for (px, py) in perceived: xs = [(px + dx) % self.grid_size for dx in range(-radius, radius + 1)] ys = [(py + dy) % self.grid_size for dy in range(-radius, radius + 1)] sub = self.pos_prob[np.ix_(xs, ys)] threat += float(sub.sum()) return threat def update_action_state(self, perceived): T = self.compute_threat(perceived) E = self.energy / max(self.energy_max, 1e-9) drive = (self.alpha * T) - (self.beta * E) exp_term = clamp(drive, -6.0, 6.0) * 0.22 * self.dt_internal amp = math.exp(exp_term) amp = clamp(amp, 0.75, 1.35) H = drive + 0.01 * (abs(self.psi_action) ** 2) self.psi_action *= amp * np.exp(-1j * H * self.dt_internal) mag = abs(self.psi_action) if mag > 1.0: self.psi_action /= mag def get_action_probability(self): return float(min(abs(self.psi_action) ** 2, 1.0)) def apply_collapse_action(self, perceived): field = numpy_convolve2d_toroidal(self.pos_prob, self.move_kernel) field = np.maximum(field, 0.0) for (px, py) in perceived: for dx in range(-2, 3): for dy in range(-2, 3): nx = (px + dx) % self.grid_size ny = (py + dy) % self.grid_size dist = abs(dx) + abs(dy) field[nx, ny] *= (1.0 - 0.30 / (dist + 1.0)) s = float(field.sum()) if s <= 0: field[:] = 1.0 / (self.grid_size * self.grid_size) else: field /= s self.pos_prob = field flat = self.pos_prob.flatten().copy() for (px, py) in perceived: flat[px * self.grid_size + py] = 0.0 tot = float(flat.sum()) if tot <= 0: self.move() return flat /= tot idx = np.random.choice(self.grid_size * self.grid_size, p=flat) self.x, self.y = divmod(int(idx), self.grid_size) def energy_boost(self): if random.random() < self.boost_prob: return float(self.boost_amount) return 0.0 def regen_energy(self): boost = self.energy_boost() self.energy = clamp(self.energy + self.energy_regen + boost, 0.0, self.energy_max) if self.energy < self.energy_max and random.random() < 0.05: self.energy = self.energy_max def step_rft(self, predators, group_coherence: float): if self.energy <= 0: self.move() return 0, 0.0, 0.0 perceived = self.sense_predators(predators) self.update_action_state(perceived) P_act = self.get_action_probability() threat = self.compute_threat(perceived) acted = 0 if (P_act >= self.collapse_threshold) and (self.energy > 0): effective_cost = self.base_collapse_cost * (1.0 - float(group_coherence)) if self.energy >= effective_cost: self.collapse_actions += 1 self.energy -= effective_cost self.apply_collapse_action(perceived) self.psi_action = (0.08 + 0j) acted = 1 else: self.move() else: self.move() return acted, P_act, threat def simulate_predator( seed: int, grid_size: int, steps: int, num_reflex: int, num_rft: int, num_predators: int, group_coherence: float, sense_noise_prob: float, collapse_threshold: float, alpha: float, beta: float, dt_internal: float, energy_max: float, base_collapse_cost: float, energy_regen: float, boost_prob: float, boost_amount: float, show_heatmap: bool ): set_seed(seed) move_kernel = np.array([[0, 0.2, 0], [0.2, 0.2, 0.2], [0, 0.2, 0]], dtype=float) reflex_agents = [ReflexAgent(grid_size) for _ in range(int(num_reflex))] rft_agents = [ RFTAdaptiveAgent( grid_size=grid_size, move_kernel=move_kernel, energy_max=energy_max, energy_regen=energy_regen, base_collapse_cost=base_collapse_cost, boost_prob=boost_prob, boost_amount=boost_amount, sense_noise_prob=sense_noise_prob, alpha=alpha, beta=beta, dt_internal=dt_internal, collapse_threshold=collapse_threshold ) for _ in range(int(num_rft)) ] predators = [Predator(grid_size) for _ in range(int(num_predators))] rows = [] ops_proxy = 0 for t in range(int(steps)): for p in predators: p.move() for a in reflex_agents: a.move() for p in predators: if a.x == p.x and a.y == p.y: a.collisions += 1 actions = [] probs = [] threats = [] for a in rft_agents: acted, P_act, threat = a.step_rft(predators, group_coherence) a.regen_energy() actions.append(acted) probs.append(P_act) threats.append(threat) for p in predators: if a.x == p.x and a.y == p.y: a.collisions += 1 ops_proxy += 18 reflex_collisions = int(sum(a.collisions for a in reflex_agents)) rft_collisions = int(sum(a.collisions for a in rft_agents)) avg_actions = float(np.mean([a.collapse_actions for a in rft_agents])) if rft_agents else 0.0 avg_energy = float(np.mean([a.energy for a in rft_agents])) if rft_agents else 0.0 avg_threat = float(np.mean(threats)) if threats else 0.0 avg_prob = float(np.mean(probs)) if probs else 0.0 avg_act_rate = float(np.mean(actions)) if actions else 0.0 rows.append({ "t": t, "reflex_collisions_cum": reflex_collisions, "rft_collisions_cum": rft_collisions, "avg_rft_actions": avg_actions, "avg_rft_energy": avg_energy, "avg_rft_threat": avg_threat, "avg_rft_P_action": avg_prob, "avg_rft_action_rate": avg_act_rate, "predators_positions": "|".join([f"{p.x},{p.y}" for p in predators]), }) df = pd.DataFrame(rows) csv_path = df_to_csv_file(df, f"predator_log_seed{seed}.csv") fig1 = plt.figure(figsize=(10, 4)) ax = fig1.add_subplot(111) ax.plot(df["t"], df["reflex_collisions_cum"], label="Reflex collisions (cum)") ax.plot(df["t"], df["rft_collisions_cum"], label="RFT collisions (cum)") ax.set_title("Predator Avoidance: Collisions (Reflex vs RFT)") ax.set_xlabel("t (step)") ax.set_ylabel("collisions (cum)") ax.legend() p_col = save_plot(fig1, f"predator_collisions_seed{seed}.png") fig2 = plt.figure(figsize=(10, 4)) ax = fig2.add_subplot(111) ax.plot(df["t"], df["avg_rft_actions"], label="Avg actions (RFT)") ax.plot(df["t"], df["avg_rft_energy"], label="Avg energy (RFT)") ax.set_title("Predator Avoidance: Actions + Energy (RFT)") ax.set_xlabel("t (step)") ax.set_ylabel("value") ax.legend() p_act = save_plot(fig2, f"predator_actions_energy_seed{seed}.png") fig3 = plt.figure(figsize=(10, 4)) ax = fig3.add_subplot(111) ax.plot(df["t"], df["avg_rft_threat"], label="Avg threat") ax.plot(df["t"], df["avg_rft_P_action"], label="Avg P_action") ax.plot(df["t"], df["avg_rft_action_rate"], label="Avg action rate") ax.set_title("Predator Avoidance: Threat vs P_action vs Action Rate") ax.set_xlabel("t (step)") ax.set_ylabel("value") ax.legend() p_thr = save_plot(fig3, f"predator_threat_seed{seed}.png") heatmap_path = None if show_heatmap and len(rft_agents) > 0: field = rft_agents[0].pos_prob fig4 = plt.figure(figsize=(6, 5)) ax = fig4.add_subplot(111) im = ax.imshow(field, aspect="auto") ax.set_title("RFT Agent[0]: Final probability field (pos_prob)") ax.set_xlabel("y") ax.set_ylabel("x") fig4.colorbar(im, ax=ax, fraction=0.046, pad=0.04) heatmap_path = save_plot(fig4, f"predator_probfield_seed{seed}.png") summary = { "seed": int(seed), "grid_size": int(grid_size), "steps": int(steps), "num_reflex": int(num_reflex), "num_rft": int(num_rft), "num_predators": int(num_predators), "final_reflex_collisions": int(df["reflex_collisions_cum"].iloc[-1]) if len(df) else 0, "final_rft_collisions": int(df["rft_collisions_cum"].iloc[-1]) if len(df) else 0, "final_avg_rft_actions": float(df["avg_rft_actions"].iloc[-1]) if len(df) else 0.0, "final_avg_rft_energy": float(df["avg_rft_energy"].iloc[-1]) if len(df) else 0.0, "ops_proxy": int(ops_proxy), } imgs = [p_col, p_act, p_thr] if heatmap_path is not None: imgs.append(heatmap_path) return summary, imgs, csv_path # ----------------------------- # Benchmarks # ----------------------------- def run_benchmarks( seed: int, neo_steps: int, neo_dt: float, neo_alert_km: float, neo_noise_km: float, jit_steps: int, jit_dt: float, jit_noise: float, land_steps: int, land_dt: float, land_wind: float, land_thrust_noise: float, tau_gain: float ): seed = int(seed) s_rft, _, neo_imgs, neo_csv = simulate_neo( seed=seed, steps=neo_steps, dt=neo_dt, alert_km=neo_alert_km, noise_km=neo_noise_km, rft_conf_threshold=0.55, tau_gain=tau_gain, show_debug=False ) neo_df = pd.read_csv(neo_csv) neo_base = int(neo_df["baseline_alert"].sum()) neo_rft = int(neo_df["rft_alert"].sum()) neo_candidates = int(neo_df["rft_candidate"].sum()) j_sum, jit_imgs, jit_csv = simulate_jitter( seed=seed, steps=jit_steps, dt=jit_dt, noise=jit_noise, baseline_kp=0.35, rft_kp=0.55, gate_threshold=0.55, tau_gain=tau_gain ) l_sum, land_imgs, land_csv = simulate_landing( seed=seed, steps=land_steps, dt=land_dt, wind_max=land_wind, thrust_noise=land_thrust_noise, kp_baseline=0.06, kp_rft=0.10, gate_threshold=0.55, tau_gain=tau_gain, goal_m=10.0 ) score = pd.DataFrame([ { "Module": "NEO", "Baseline alerts": neo_base, "RFT candidates": neo_candidates, "RFT filtered alerts": neo_rft, "False-positive proxy reduction %": 100.0 * (1.0 - (neo_rft / max(neo_candidates, 1))), "Energy/compute proxy": int(s_rft["ops_proxy"]) }, { "Module": "Satellite Jitter", "Baseline alerts": "", "RFT candidates": "", "RFT filtered alerts": "", "False-positive proxy reduction %": "", "Energy/compute proxy": int(j_sum["ops_proxy"]) }, { "Module": "Landing", "Baseline alerts": "", "RFT candidates": "", "RFT filtered alerts": "", "False-positive proxy reduction %": "", "Energy/compute proxy": int(l_sum["ops_proxy"]) }, ]) score_path = df_to_csv_file(score, f"bench_score_seed{seed}.csv") txt = ( f"Benchmarks (seed={seed})\n" f"- NEO: baseline alerts={neo_base}, RFT candidates={neo_candidates}, RFT filtered={neo_rft}\n" f"- Jitter: jitter RMS={j_sum['jitter_rms']:.4f}, duty reduction={j_sum['duty_reduction_%']:.1f}%\n" f"- Landing: final offset={l_sum['final_landing_offset_m']:.2f} m (goal 10 m), anomalies={l_sum['total_anomalies_detected']}, actions={l_sum['total_control_actions']}\n" ) all_imgs = neo_imgs + jit_imgs + land_imgs return txt, score, score_path, all_imgs, [neo_csv, jit_csv, land_csv] # ----------------------------- # UI text blocks # ----------------------------- HOME_MD = """ # Rendered Frame Theory (RFT) — Agent Console This Space is meant to be transparent, reproducible, and benchmarkable. Run it. Change parameters. Break it. Compare baseline vs RFT. Core idea: **Decision timing matters.** RFT treats timing (τ_eff), uncertainty, and action “collapse” as first-class controls. This Space contains: - **NEO alerting** - **Satellite jitter reduction** - **Starship-style landing harness** - **Predator avoidance** (Reflex vs RFT-style adaptive agents) No SciPy. No hidden dependencies. No model weights. """ LIVE_MD = """ # Live Console Run everything quickly and export logs. - deterministic runs (seeded) - plots saved - CSV logs exported - baseline vs RFT comparisons available """ THEORY_PRACTICE_MD = """ # Theory → Practice (how I implement RFT here) ## 1) Uncertainty Explicit uncertainty proxy from noise + disturbance scale. ## 2) Confidence confidence = 1 − uncertainty (clipped 0..1). ## 3) Adaptive τ_eff Higher uncertainty → higher τ_eff. ## 4) Collapse gate Act only when the gate passes: - confidence exceeds a threshold - τ_eff increases strictness under uncertainty ## 5) Why it matters Baseline controllers often act constantly. RFT tries to act less often, but more decisively. """ MATH_MD = r""" # Mathematics (minimal and implementation-linked) u ∈ [0,1] : uncertainty proxy C ∈ [0,1] : confidence proxy τ_eff ≥ 1 : effective decision timing factor Confidence: \[ C = \text{clip}(1 - u, 0, 1) \] Adaptive τ_eff: \[ \tau_{\text{eff}} = \text{clip}(1 + 1.0 + g\cdot u,\; 1,\; \tau_{\max}) \] Collapse gate (concept): \[ \text{Gate} = \left[C \ge \theta + k(\tau_{\text{eff}}-1)\right] \] """ INVESTOR_MD = """ # Investor / Agency Walkthrough What this Space demonstrates: - alert filtering (NEO) - stabilisation (jitter reduction) - anomaly-aware control (landing harness) - threat-aware avoidance (predator demo) What it is not: - not flight-certified - not a production pipeline - not a claim that anyone is using it What makes it production-grade: - real sensor ingestion + timing constraints - hardware-in-loop testing - dataset validation """ REPRO_MD = """ # Reproducibility & Logs Everything is reproducible: - set seed - run - export CSV - verify plots + metrics CSV schema is explicit in the exports. """ # ----------------------------- # Gradio UI helpers # ----------------------------- def ui_run_neo(seed, steps, dt, alert_km, noise_km, rft_conf_th, tau_gain, show_debug): summary, debug_lines, imgs, csv_path = simulate_neo( seed=int(seed), steps=int(steps), dt=float(dt), alert_km=float(alert_km), noise_km=float(noise_km), rft_conf_threshold=float(rft_conf_th), tau_gain=float(tau_gain), show_debug=bool(show_debug), ) summary_txt = json.dumps(summary, indent=2) return summary_txt, debug_lines, imgs[0], imgs[1], imgs[2], csv_path def ui_run_jitter(seed, steps, dt, noise, baseline_kp, rft_kp, gate_th, tau_gain): summary, imgs, csv_path = simulate_jitter( seed=int(seed), steps=int(steps), dt=float(dt), noise=float(noise), baseline_kp=float(baseline_kp), rft_kp=float(rft_kp), gate_threshold=float(gate_th), tau_gain=float(tau_gain), ) summary_txt = json.dumps(summary, indent=2) return summary_txt, imgs[0], imgs[1], imgs[2], csv_path def ui_run_landing(seed, steps, dt, wind_max, thrust_noise, kp_base, kp_rft, gate_th, tau_gain, goal_m): summary, imgs, csv_path = simulate_landing( seed=int(seed), steps=int(steps), dt=float(dt), wind_max=float(wind_max), thrust_noise=float(thrust_noise), kp_baseline=float(kp_base), kp_rft=float(kp_rft), gate_threshold=float(gate_th), tau_gain=float(tau_gain), goal_m=float(goal_m), ) summary_txt = json.dumps(summary, indent=2) return summary_txt, imgs[0], imgs[1], imgs[2], imgs[3], csv_path def ui_run_predator(seed, grid_size, steps, num_reflex, num_rft, num_predators, group_coherence, sense_noise_prob, collapse_threshold, alpha, beta, dt_internal, energy_max, base_collapse_cost, energy_regen, boost_prob, boost_amount, show_heatmap): summary, imgs, csv_path = simulate_predator( seed=int(seed), grid_size=int(grid_size), steps=int(steps), num_reflex=int(num_reflex), num_rft=int(num_rft), num_predators=int(num_predators), group_coherence=float(group_coherence), sense_noise_prob=float(sense_noise_prob), collapse_threshold=float(collapse_threshold), alpha=float(alpha), beta=float(beta), dt_internal=float(dt_internal), energy_max=float(energy_max), base_collapse_cost=float(base_collapse_cost), energy_regen=float(energy_regen), boost_prob=float(boost_prob), boost_amount=float(boost_amount), show_heatmap=bool(show_heatmap) ) summary_txt = json.dumps(summary, indent=2) img1 = imgs[0] if len(imgs) > 0 else None img2 = imgs[1] if len(imgs) > 1 else None img3 = imgs[2] if len(imgs) > 2 else None img4 = imgs[3] if len(imgs) > 3 else None return summary_txt, img1, img2, img3, img4, csv_path def ui_run_bench(seed, neo_steps, neo_dt, neo_alert_km, neo_noise_km, jit_steps, jit_dt, jit_noise, land_steps, land_dt, land_wind, land_thrust_noise, tau_gain): txt, score_df, score_csv, imgs, logs = run_benchmarks( seed=int(seed), neo_steps=int(neo_steps), neo_dt=float(neo_dt), neo_alert_km=float(neo_alert_km), neo_noise_km=float(neo_noise_km), jit_steps=int(jit_steps), jit_dt=float(jit_dt), jit_noise=float(jit_noise), land_steps=int(land_steps), land_dt=float(land_dt), land_wind=float(land_wind), land_thrust_noise=float(land_thrust_noise), tau_gain=float(tau_gain) ) return ( txt, score_df, score_csv, imgs[0], imgs[1], imgs[2], imgs[3], imgs[4], imgs[5], imgs[6], imgs[7], imgs[8], imgs[9], logs[0], logs[1], logs[2] ) # ----------------------------- # Gradio UI # ----------------------------- with gr.Blocks(title="RFT — Agent Console (NEO / Jitter / Landing / Predator)") as demo: gr.Markdown(HOME_MD) with gr.Tabs(): with gr.Tab("Live Console"): gr.Markdown(LIVE_MD) with gr.Row(): seed_live = gr.Number(value=42, precision=0, label="Seed (reproducible)") tau_gain_live = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain (global)") with gr.Accordion("Benchmark settings", open=True): with gr.Row(): neo_steps = gr.Slider(50, 400, value=120, step=1, label="NEO steps") neo_dt = gr.Slider(0.5, 3.0, value=1.0, step=0.1, label="NEO dt") neo_alert = gr.Slider(1000, 20000, value=5000, step=50, label="NEO alert radius (km)") neo_noise = gr.Slider(0.0, 200.0, value=35.0, step=1.0, label="NEO measurement noise (km)") with gr.Row(): jit_steps = gr.Slider(100, 1200, value=500, step=1, label="Jitter steps") jit_dt = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="Jitter dt") jit_noise = gr.Slider(0.0, 0.5, value=0.08, step=0.01, label="Jitter noise") with gr.Row(): land_steps = gr.Slider(40, 400, value=120, step=1, label="Landing steps") land_dt = gr.Slider(0.2, 2.0, value=1.0, step=0.1, label="Landing dt") land_wind = gr.Slider(0.0, 25.0, value=15.0, step=0.5, label="Landing wind max (m/s)") land_thrust_noise = gr.Slider(0.0, 10.0, value=3.0, step=0.1, label="Landing thrust noise") run_b = gr.Button("Run Full Benchmarks (Baseline vs RFT)") bench_txt = gr.Textbox(label="Benchmark summary", lines=6) bench_table = gr.Dataframe(label="Scorecard (CSV also exported)") bench_score_csv = gr.File(label="Download: benchmark scorecard CSV") with gr.Row(): img1 = gr.Image(label="NEO: Distance") img2 = gr.Image(label="NEO: Confidence & τ_eff") img3 = gr.Image(label="NEO: Alerts") with gr.Row(): img4 = gr.Image(label="Jitter: Residual") img5 = gr.Image(label="Jitter: Duty") img6 = gr.Image(label="Jitter: τ_eff & confidence") with gr.Row(): img7 = gr.Image(label="Landing: Altitude") img8 = gr.Image(label="Landing: Offset") img9 = gr.Image(label="Landing: Wind") img10 = gr.Image(label="Landing: anomaly vs action timeline") neo_log = gr.File(label="Download: NEO log CSV") jit_log = gr.File(label="Download: Jitter log CSV") land_log = gr.File(label="Download: Landing log CSV") run_b.click( ui_run_bench, inputs=[seed_live, neo_steps, neo_dt, neo_alert, neo_noise, jit_steps, jit_dt, jit_noise, land_steps, land_dt, land_wind, land_thrust_noise, tau_gain_live], outputs=[ bench_txt, bench_table, bench_score_csv, img1, img2, img3, img4, img5, img6, img7, img8, img9, img10, neo_log, jit_log, land_log ] ) with gr.Tab("NEO Agent"): gr.Markdown( "# Near-Earth Object (NEO) Alerting Agent\n" "Baseline: distance threshold only.\n" "RFT: distance threshold + confidence + τ_eff collapse gate.\n" ) with gr.Row(): seed_neo = gr.Number(value=42, precision=0, label="Seed") steps_neo = gr.Slider(50, 400, value=120, step=1, label="Steps") dt_neo = gr.Slider(0.5, 3.0, value=1.0, step=0.1, label="dt") with gr.Row(): alert_km = gr.Slider(1000, 20000, value=5000, step=50, label="Alert threshold (km)") noise_km = gr.Slider(0.0, 200.0, value=35.0, step=1.0, label="Measurement noise (km)") rft_conf_th = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="RFT confidence threshold") tau_gain = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") show_debug = gr.Checkbox(value=False, label="Show debug table (first rows)") run_neo = gr.Button("Run NEO Simulation") out_neo_summary = gr.Textbox(label="Summary JSON", lines=12) out_neo_debug = gr.Textbox(label="Debug", lines=10) with gr.Row(): out_neo_img1 = gr.Image(label="Distance vs time") out_neo_img2 = gr.Image(label="Confidence and τ_eff") out_neo_img3 = gr.Image(label="Alerts timeline") out_neo_csv = gr.File(label="Download NEO CSV log") run_neo.click( ui_run_neo, inputs=[seed_neo, steps_neo, dt_neo, alert_km, noise_km, rft_conf_th, tau_gain, show_debug], outputs=[out_neo_summary, out_neo_debug, out_neo_img1, out_neo_img2, out_neo_img3, out_neo_csv] ) with gr.Tab("Satellite Jitter Agent"): gr.Markdown( "# Satellite Jitter Reduction\n" "Baseline: continuous correction.\n" "RFT: gated correction using confidence + τ_eff.\n" ) with gr.Row(): seed_j = gr.Number(value=42, precision=0, label="Seed") steps_j = gr.Slider(100, 1200, value=500, step=1, label="Steps") dt_j = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="dt") noise_j = gr.Slider(0.0, 0.5, value=0.08, step=0.01, label="Noise") with gr.Row(): base_kp = gr.Slider(0.0, 1.0, value=0.35, step=0.01, label="Baseline kp") rft_kp = gr.Slider(0.0, 1.5, value=0.55, step=0.01, label="RFT kp") gate_th = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="Gate threshold") tau_gain_j = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") run_j = gr.Button("Run Jitter Simulation") out_j_summary = gr.Textbox(label="Summary JSON", lines=10) with gr.Row(): out_j_img1 = gr.Image(label="Residual jitter") out_j_img2 = gr.Image(label="Duty timeline") out_j_img3 = gr.Image(label="τ_eff & confidence") out_j_csv = gr.File(label="Download Jitter CSV log") run_j.click( ui_run_jitter, inputs=[seed_j, steps_j, dt_j, noise_j, base_kp, rft_kp, gate_th, tau_gain_j], outputs=[out_j_summary, out_j_img1, out_j_img2, out_j_img3, out_j_csv] ) with gr.Tab("Starship Landing Harness"): gr.Markdown( "# Starship-style Landing Harness (Simplified)\n" "This is not a flight model. It’s a timing-control harness.\n" ) with gr.Row(): seed_l = gr.Number(value=42, precision=0, label="Seed") steps_l = gr.Slider(40, 400, value=120, step=1, label="Steps") dt_l = gr.Slider(0.2, 2.0, value=1.0, step=0.1, label="dt") with gr.Row(): wind_max = gr.Slider(0.0, 25.0, value=15.0, step=0.5, label="Wind max (m/s)") thrust_noise = gr.Slider(0.0, 10.0, value=3.0, step=0.1, label="Thrust noise") kp_base_l = gr.Slider(0.0, 0.2, value=0.06, step=0.005, label="Baseline kp") kp_rft_l = gr.Slider(0.0, 0.25, value=0.10, step=0.005, label="RFT kp") with gr.Row(): gate_th_l = gr.Slider(0.1, 0.95, value=0.55, step=0.01, label="Gate threshold") tau_gain_l = gr.Slider(0.0, 3.0, value=1.2, step=0.05, label="τ_eff gain") goal_m = gr.Slider(1.0, 50.0, value=10.0, step=0.5, label="Landing goal (m)") run_l = gr.Button("Run Landing Simulation") out_l_summary = gr.Textbox(label="Summary JSON", lines=10) with gr.Row(): out_l_img1 = gr.Image(label="Altitude") out_l_img2 = gr.Image(label="Offset") out_l_img3 = gr.Image(label="Wind") out_l_img4 = gr.Image(label="Anomaly vs Action timeline") out_l_csv = gr.File(label="Download Landing CSV log") run_l.click( ui_run_landing, inputs=[seed_l, steps_l, dt_l, wind_max, thrust_noise, kp_base_l, kp_rft_l, gate_th_l, tau_gain_l, goal_m], outputs=[out_l_summary, out_l_img1, out_l_img2, out_l_img3, out_l_img4, out_l_csv] ) with gr.Tab("Predator Avoidance"): gr.Markdown( "# Predator Avoidance (Reflex vs RFT)\n" "Grid world with roaming predators.\n" "Reflex agents: random walk.\n" "RFT agents: probability field + threat-weighted collapse action.\n" ) with gr.Row(): seed_p = gr.Number(value=42, precision=0, label="Seed") grid_size = gr.Slider(10, 60, value=20, step=1, label="Grid size") steps_p = gr.Slider(50, 1500, value=200, step=1, label="Steps") with gr.Row(): num_reflex = gr.Slider(0, 50, value=10, step=1, label="Reflex agents") num_rft = gr.Slider(0, 20, value=3, step=1, label="RFT agents") num_predators = gr.Slider(1, 20, value=3, step=1, label="Predators") with gr.Accordion("RFT / Agent parameters", open=True): with gr.Row(): group_coherence = gr.Slider(0.0, 0.95, value=0.30, step=0.01, label="Group coherence") sense_noise_prob = gr.Slider(0.0, 0.9, value=0.10, step=0.01, label="Sense noise probability") collapse_threshold = gr.Slider(0.0, 1.0, value=0.02, step=0.005, label="Collapse threshold (P_action)") with gr.Row(): alpha = gr.Slider(0.0, 50.0, value=15.0, step=0.5, label="alpha (threat gain)") beta = gr.Slider(0.0, 10.0, value=0.5, step=0.05, label="beta (energy term)") dt_internal = gr.Slider(0.01, 1.0, value=0.2, step=0.01, label="internal dt") with gr.Row(): energy_max = gr.Slider(1.0, 300.0, value=100.0, step=1.0, label="Energy max") base_collapse_cost = gr.Slider(0.0, 10.0, value=1.0, step=0.1, label="Base action cost") energy_regen = gr.Slider(0.0, 1.0, value=0.05, step=0.01, label="Energy regen") with gr.Row(): boost_prob = gr.Slider(0.0, 1.0, value=0.10, step=0.01, label="Boost probability") boost_amount = gr.Slider(0.0, 50.0, value=5.0, step=0.5, label="Boost amount") show_heatmap = gr.Checkbox(value=True, label="Show probability field heatmap (agent[0])") run_p = gr.Button("Run Predator Simulation") out_p_summary = gr.Textbox(label="Summary JSON", lines=12) with gr.Row(): out_p_img1 = gr.Image(label="Collisions (cumulative)") out_p_img2 = gr.Image(label="Actions + Energy") with gr.Row(): out_p_img3 = gr.Image(label="Threat / P_action / Action rate") out_p_img4 = gr.Image(label="Final probability field (optional)") out_p_csv = gr.File(label="Download Predator CSV log") run_p.click( ui_run_predator, inputs=[seed_p, grid_size, steps_p, num_reflex, num_rft, num_predators, group_coherence, sense_noise_prob, collapse_threshold, alpha, beta, dt_internal, energy_max, base_collapse_cost, energy_regen, boost_prob, boost_amount, show_heatmap], outputs=[out_p_summary, out_p_img1, out_p_img2, out_p_img3, out_p_img4, out_p_csv] ) with gr.Tab("Theory → Practice"): gr.Markdown(THEORY_PRACTICE_MD) with gr.Tab("Mathematics"): gr.Markdown(MATH_MD) with gr.Tab("Investor / Agency Walkthrough"): gr.Markdown(INVESTOR_MD) with gr.Tab("Reproducibility & Logs"): gr.Markdown(REPRO_MD) demo.queue().launch()