From cd6c3d600685bb51618226471fbb196d1355eec2 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Mon, 26 Jan 2026 13:19:55 +0100 Subject: [PATCH] chore: migrating thesis case definition --- .gitignore | 4 +- sim/case/__init__.py | 2 + sim/case/thesis_simplified/__init__.py | 2 + sim/case/thesis_simplified/coi.py | 125 +++++++ sim/case/thesis_simplified/experiments.py | 325 ++++++++++++++++++ sim/case/thesis_simplified/separability.py | 72 ++++ .../case/thesis_simplified}/simplified.py | 0 .../case/thesis_simplified}/simplified_env.py | 0 sim/case/thesis_simplified/summarize.py | 168 +++++++++ .../case/thesis_simplified}/train.py | 4 +- sim/rl/environment.py | 51 ++- 11 files changed, 741 insertions(+), 12 deletions(-) create mode 100644 sim/case/__init__.py create mode 100644 sim/case/thesis_simplified/__init__.py create mode 100644 sim/case/thesis_simplified/coi.py create mode 100644 sim/case/thesis_simplified/experiments.py create mode 100644 sim/case/thesis_simplified/separability.py rename {lab/case/thesis => sim/case/thesis_simplified}/simplified.py (100%) rename {lab/case/thesis => sim/case/thesis_simplified}/simplified_env.py (100%) create mode 100644 sim/case/thesis_simplified/summarize.py rename {lab/case/thesis => sim/case/thesis_simplified}/train.py (99%) diff --git a/.gitignore b/.gitignore index e06db65..90077a7 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ sim/rl/behavior_loader/*.dot sim/rl/behavior_loader/*.png sim/rl/behavior_loader/*.svg sim/rl/behavior_loader/*.pdf -tests/e2e/node_modules/** \ No newline at end of file +tests/e2e/node_modules/** +lab/case/thesis/runs*/ +sim/case/thesis_simplified/runs*/ diff --git a/sim/case/__init__.py b/sim/case/__init__.py new file mode 100644 index 0000000..cb6c13c --- /dev/null +++ b/sim/case/__init__.py @@ -0,0 +1,2 @@ +"""Case-specific simulations and experiments.""" + diff --git a/sim/case/thesis_simplified/__init__.py b/sim/case/thesis_simplified/__init__.py new file mode 100644 index 0000000..6259958 --- /dev/null +++ b/sim/case/thesis_simplified/__init__.py @@ -0,0 +1,2 @@ +"""Minimal thesis-aligned pricing simulation (self-contained).""" + diff --git a/sim/case/thesis_simplified/coi.py b/sim/case/thesis_simplified/coi.py new file mode 100644 index 0000000..1657f65 --- /dev/null +++ b/sim/case/thesis_simplified/coi.py @@ -0,0 +1,125 @@ +"""Cost of Information (COI) computation for thesis pricing system. + +Core KPI: COI = E[p_shown] - p_min measures pricing power from information asymmetry. +Theorem 1 shows COI erodes as agent queries increase: as N->inf, p^(1)->p_min. +""" +from __future__ import annotations +from dataclasses import dataclass +from typing import Dict, List, TYPE_CHECKING +import numpy as np + +if TYPE_CHECKING: + from .simplified import Session + + +@dataclass(frozen=True) +class COIWindow: + """Windowed COI metrics computed from realized price exposures. + + policy: E[p_shown] - cost, the definition-level KPI + agent: E[p^(1)] - cost where p^(1) is min price under agent querying + leak: max(policy - agent, 0), observable gap from reconnaissance + survival_ratio: agent/policy, fraction of pricing power retained + """ + policy: float + agent: float + leak: float + survival_ratio: float + policy_by_product: np.ndarray + agent_by_product: np.ndarray + demand_weights: np.ndarray + + +def aggregate_prices(sessions: List["Session"], mode: str = "all") -> Dict[int, List[float] | float]: + """Unified price aggregation across sessions. + + mode: "all" returns all prices per product, "min_per_session" returns min price per session per product, + "min_across" returns single min price per product + """ + if mode == "min_across": + mins: Dict[int, float] = {} + for s in sessions: + for e in s.events: + pidx, price = int(e.product_idx), float(e.price_seen) + mins[pidx] = min(mins.get(pidx, price), price) + return mins + elif mode == "min_per_session": + result: Dict[int, List[float]] = {} + for s in sessions: + by_p: Dict[int, float] = {} + for e in s.events: + pidx, price = int(e.product_idx), float(e.price_seen) + by_p[pidx] = min(by_p.get(pidx, price), price) + for pidx, pmin in by_p.items(): + result.setdefault(pidx, []).append(pmin) + return result + else: # "all" + prices: Dict[int, List[float]] = {} + for s in sessions: + for e in s.events: + prices.setdefault(e.product_idx, []).append(float(e.price_seen)) + return prices + + +def demand_weights_by_product(sessions: List["Session"], demand_mapping: Dict[str, float], n_products: int) -> np.ndarray: + """Compute demand-weighted importance per product.""" + w = np.zeros(n_products, dtype=float) + sessions_by_id = {s.sid: s for s in sessions} + for sid, q in demand_mapping.items(): + sess = sessions_by_id.get(sid) + if sess and sess.events: + w[int(sess.events[0].product_idx)] += float(q) + total = float(np.sum(w)) + return (w / total) if total > 0 else w + + +def compute_coi_window(sessions: List["Session"], costs: np.ndarray, demand_mapping: Dict[str, float] | None = None) -> COIWindow: + """Compute COI metrics over session window. + + Aggregates price exposures and computes policy-level vs agent-realized COI. + """ + n = int(len(costs)) + prices = aggregate_prices(sessions, mode="all") + agent_sessions = [s for s in sessions if s.actor == "A"] + agent_min = aggregate_prices(agent_sessions, mode="min_across") if agent_sessions else {} + + policy_by = np.zeros(n, dtype=float) + agent_by = np.zeros(n, dtype=float) + seen = np.array([(i in prices) for i in range(n)], dtype=bool) + agent_seen = np.array([(i in agent_min) for i in range(n)], dtype=bool) + + for pidx, ps in prices.items(): + if 0 <= pidx < n and ps: + policy_by[pidx] = float(np.mean(ps) - float(costs[pidx])) + for pidx, pmin in agent_min.items(): + if 0 <= pidx < n: + agent_by[pidx] = float(pmin - float(costs[pidx])) + + agent_by[seen & ~agent_seen] = policy_by[seen & ~agent_seen] # no erosion if no agent exposure + + demand_w = demand_weights_by_product(sessions, demand_mapping, n) if demand_mapping else np.zeros(n, dtype=float) + has_weights = float(np.sum(demand_w)) > 0 + + if has_weights: + policy, agent = float(np.dot(demand_w, policy_by)), float(np.dot(demand_w, agent_by)) + elif np.any(seen): + policy, agent = float(np.mean(policy_by[seen])), float(np.mean(agent_by[seen])) + else: + policy, agent = 0.0, 0.0 + + leak = float(max(policy - agent, 0.0)) + survival = float(np.clip(agent / policy, 0.0, 1.0)) if policy > 0 else 0.0 + + return COIWindow(policy=policy, agent=agent, leak=leak, survival_ratio=survival, + policy_by_product=policy_by, agent_by_product=agent_by, demand_weights=demand_w) + + +def coi_erosion(coi_policy: float, coi_agent: float, eps: float = 1e-9) -> float: + """Thesis-consistent COI erosion: fraction of pricing power destroyed by agent queries. + + erosion = 1 - (COI_agent / COI_policy) + When agents find low prices, COI_agent -> 0, erosion -> 1. + """ + if coi_policy <= eps: + return 0.0 + return float(np.clip(1.0 - (coi_agent / (coi_policy + eps)), 0.0, 1.0)) diff --git a/sim/case/thesis_simplified/experiments.py b/sim/case/thesis_simplified/experiments.py new file mode 100644 index 0000000..74458d7 --- /dev/null +++ b/sim/case/thesis_simplified/experiments.py @@ -0,0 +1,325 @@ +"""COI leakage experiments and policy comparisons. + +Demonstrates the core thesis contribution: COI erosion under agent contamination +and recovery via robust pricing policies. + +Generates TensorBoard logs for: +- COI erosion curves across contamination levels +- Policy comparison (fixed vs adaptive vs RL) +- Revenue/margin trade-offs +""" +from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Tuple +import json +import numpy as np + +try: + from torch.utils.tensorboard import SummaryWriter + HAS_TB = True +except ImportError: + HAS_TB = False + +from .simplified_env import PricingEnv, EnvConfig, make_env +from .simplified import System + + +@dataclass +class ExperimentResult: + """Container for experiment metrics.""" + name: str + alpha: float + reward_mean: float + reward_std: float + coi_erosion: float + alpha_error: float + revenue: float + margin: float + + def to_dict(self) -> dict: + return {k: getattr(self, k) for k in self.__dataclass_fields__} + + +def theoretical_coi_erosion_curve(alphas: np.ndarray, n_sessions: int = 1000) -> np.ndarray: + """Theoretical COI erosion from Theorem 1 using order statistic model. + + For N i.i.d. uniform queries on [p_min, p_max]: + E[p^(1)] = p_min + (p_max - p_min)/(N+1), so erosion = 1 - 2/(N+1) + """ + erosions = [] + for a in alphas: + n_agents = max(1, int(a * n_sessions)) + erosions.append(1.0 - 2.0 / (n_agents + 1)) + return np.array(erosions) + + +def run_policy_episode( + env: PricingEnv, + policy_fn, + n_episodes: int = 10 +) -> Tuple[List[float], List[float], List[float], List[float]]: + """Run policy and collect per-step metrics.""" + rewards, coi_erosions, alpha_errors, revenues = [], [], [], [] + + for _ in range(n_episodes): + obs, info = env.reset() + done = False + while not done: + action = policy_fn(obs, env.n) + obs, reward, terminated, truncated, info = env.step(action) + done = terminated or truncated + rewards.append(reward) + if 'coi_erosion' in info: + coi_erosions.append(info['coi_erosion']) + if 'alpha_true' in info and 'alpha_est' in info: + alpha_errors.append(abs(info['alpha_true'] - info['alpha_est'])) + if 'revenue' in info: + revenues.append(info['revenue']) + + return rewards, coi_erosions, alpha_errors, revenues + + +class PolicyRegistry: + """Registry of baseline policies.""" + + @staticmethod + def fixed(obs: np.ndarray, n: int, margin: float = 0.15) -> np.ndarray: + return np.ones(n, dtype=np.float32) * (1.0 + margin) + + @staticmethod + def random(obs: np.ndarray, n: int, rng: np.random.Generator = None) -> np.ndarray: + rng = rng or np.random.default_rng() + return rng.uniform(0.7, 1.3, n).astype(np.float32) + + @staticmethod + def adaptive(obs: np.ndarray, n: int, base_margin: float = 0.15) -> np.ndarray: + """Reduce margins when alpha estimate is high.""" + alpha_est = obs[2 * n] if len(obs) > 2 * n else 0.2 + margin_scale = 1.0 - 0.4 * alpha_est + return np.ones(n, dtype=np.float32) * (1.0 + base_margin * margin_scale) + + @staticmethod + def aggressive(obs: np.ndarray, n: int) -> np.ndarray: + """High margins, ignores contamination.""" + return np.ones(n, dtype=np.float32) * 1.4 + + @staticmethod + def defensive(obs: np.ndarray, n: int) -> np.ndarray: + """Low margins, always cautious.""" + return np.ones(n, dtype=np.float32) * 1.05 + + @staticmethod + def alpha_proportional(obs: np.ndarray, n: int, max_margin: float = 0.3) -> np.ndarray: + """Margin inversely proportional to estimated alpha.""" + alpha_est = obs[2 * n] if len(obs) > 2 * n else 0.2 + margin = max_margin * (1.0 - alpha_est) + return np.ones(n, dtype=np.float32) * (1.0 + margin) + + +def run_contamination_sweep( + alphas: List[float], + policies: Dict[str, callable], + n_products: int = 10, + max_steps: int = 200, + n_episodes: int = 10, + seed: int = 42, + log_dir: str = None +) -> Dict[str, List[ExperimentResult]]: + """Run policies across contamination levels.""" + + results = {name: [] for name in policies} + writer = SummaryWriter(Path(log_dir) / "sweep") if log_dir and HAS_TB else None + + for alpha in alphas: + print(f" alpha={alpha:.2f}", end=" ") + env_cfg = EnvConfig( + n_products=n_products, max_steps=max_steps, + alpha_true=alpha, reward_mode="robust", seed=seed) + env = make_env(env_cfg) + + for name, policy_fn in policies.items(): + rewards, coi_vals, alpha_errs, revenues = run_policy_episode(env, policy_fn, n_episodes) + + result = ExperimentResult( + name=name, alpha=alpha, + reward_mean=float(np.mean(rewards)), + reward_std=float(np.std(rewards)), + coi_erosion=float(np.mean(coi_vals)) if coi_vals else 0.0, + alpha_error=float(np.mean(alpha_errs)) if alpha_errs else 0.0, + revenue=float(np.mean(revenues)) if revenues else 0.0, + margin=float(np.mean([policy_fn(np.zeros(3 * n_products + 3), n_products)]) - 1.0)) + + results[name].append(result) + + if writer: + step = int(alpha * 100) + writer.add_scalar(f'{name}/reward', result.reward_mean, step) + writer.add_scalar(f'{name}/coi_erosion', result.coi_erosion, step) + writer.add_scalar(f'{name}/alpha_error', result.alpha_error, step) + writer.add_scalar(f'{name}/revenue', result.revenue, step) + + print(f"done") + + # add theoretical curve + if writer: + theo = theoretical_coi_erosion_curve(np.array(alphas)) + for i, (a, e) in enumerate(zip(alphas, theo)): + writer.add_scalar('theoretical/coi_erosion', e, int(a * 100)) + writer.close() + + return results + + +def run_coi_demonstration(log_dir: str = "sim/case/thesis_simplified/runs", seed: int = 42) -> Dict: + """Main COI demonstration experiment.""" + print("=== COI Leakage Demonstration ===\n") + + Path(log_dir).mkdir(parents=True, exist_ok=True) + writer = SummaryWriter(Path(log_dir) / "coi_demo") if HAS_TB else None + + # theoretical erosion curve + print("1. Theoretical COI erosion (Theorem 1)") + alphas = np.linspace(0.0, 0.6, 13) + theo_erosion = theoretical_coi_erosion_curve(alphas, n_sessions=1000) + + for a, e in zip(alphas, theo_erosion): + print(f" alpha={a:.2f} -> erosion={e:.3f}") + if writer: + writer.add_scalar('theory/coi_erosion', e, int(a * 100)) + + # policy comparison + print("\n2. Policy comparison across contamination levels") + policies = { + 'fixed': lambda obs, n: PolicyRegistry.fixed(obs, n), + 'aggressive': PolicyRegistry.aggressive, + 'defensive': PolicyRegistry.defensive, + 'adaptive': PolicyRegistry.adaptive, + 'alpha_proportional': PolicyRegistry.alpha_proportional, + } + + sweep_alphas = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5] + results = run_contamination_sweep( + sweep_alphas, policies, n_products=10, max_steps=100, + n_episodes=5, seed=seed, log_dir=log_dir) + + # summarize + print("\n3. Summary by policy") + for name, res_list in results.items(): + avg_reward = np.mean([r.reward_mean for r in res_list]) + avg_coi = np.mean([r.coi_erosion for r in res_list]) + print(f" {name:20s}: avg_reward={avg_reward:.2f}, avg_coi={avg_coi:.3f}") + + # save results + output = { + 'theoretical': {'alphas': alphas.tolist(), 'erosion': theo_erosion.tolist()}, + 'empirical': {name: [r.to_dict() for r in res_list] for name, res_list in results.items()}} + + with open(Path(log_dir) / "coi_demo_results.json", 'w') as f: + json.dump(output, f, indent=2) + + if writer: + writer.close() + + print(f"\nResults saved to {log_dir}/coi_demo_results.json") + print(f"TensorBoard: tensorboard --logdir {log_dir}") + + return output + + +def run_reward_mode_comparison(log_dir: str = "sim/case/thesis_simplified/runs", seed: int = 42) -> Dict: + """Compare different reward modes.""" + print("=== Reward Mode Comparison ===\n") + + Path(log_dir).mkdir(parents=True, exist_ok=True) + writer = SummaryWriter(Path(log_dir) / "reward_modes") if HAS_TB else None + + reward_modes = ["revenue", "profit", "robust", "coi_aware"] + alpha = 0.3 # moderate contamination + + results = {} + for mode in reward_modes: + print(f" mode={mode}", end=" ") + env_cfg = EnvConfig( + n_products=10, max_steps=200, alpha_true=alpha, + reward_mode=mode, seed=seed) + env = make_env(env_cfg) + + rewards, coi_vals, _, revenues = run_policy_episode( + env, PolicyRegistry.adaptive, n_episodes=10) + + results[mode] = { + 'reward_mean': float(np.mean(rewards)), + 'reward_std': float(np.std(rewards)), + 'coi_erosion': float(np.mean(coi_vals)) if coi_vals else 0.0, + 'revenue': float(np.mean(revenues)) if revenues else 0.0} + + if writer: + for k, v in results[mode].items(): + writer.add_scalar(f'{mode}/{k}', v, 0) + + print(f"reward={results[mode]['reward_mean']:.2f}, coi={results[mode]['coi_erosion']:.3f}") + + if writer: + writer.close() + + with open(Path(log_dir) / "reward_mode_results.json", 'w') as f: + json.dump(results, f, indent=2) + + return results + + +def run_alpha_drift_experiment(log_dir: str = "sim/case/thesis_simplified/runs", seed: int = 42) -> Dict: + """Test policy robustness under non-stationary contamination.""" + print("=== Alpha Drift Experiment ===\n") + + Path(log_dir).mkdir(parents=True, exist_ok=True) + writer = SummaryWriter(Path(log_dir) / "alpha_drift") if HAS_TB else None + + drift_rates = [0.0, 0.01, 0.02, 0.05] + results = {} + + for drift in drift_rates: + print(f" drift={drift:.2f}", end=" ") + env_cfg = EnvConfig( + n_products=10, max_steps=200, alpha_true=0.2, + alpha_drift=drift, reward_mode="robust", seed=seed) + env = make_env(env_cfg) + + rewards, coi_vals, alpha_errs, _ = run_policy_episode( + env, PolicyRegistry.adaptive, n_episodes=10) + + results[f'drift_{drift}'] = { + 'reward_mean': float(np.mean(rewards)), + 'coi_erosion': float(np.mean(coi_vals)) if coi_vals else 0.0, + 'alpha_tracking_error': float(np.mean(alpha_errs)) if alpha_errs else 0.0} + + if writer: + for k, v in results[f'drift_{drift}'].items(): + writer.add_scalar(f'drift_{drift}/{k}', v, 0) + + print(f"reward={results[f'drift_{drift}']['reward_mean']:.2f}, " + f"alpha_err={results[f'drift_{drift}']['alpha_tracking_error']:.3f}") + + if writer: + writer.close() + + return results + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Run COI experiments") + parser.add_argument("--exp", type=str, default="coi", choices=["coi", "reward", "drift", "all"]) + parser.add_argument("--log-dir", type=str, default="sim/case/thesis_simplified/runs") + parser.add_argument("--seed", type=int, default=42) + args = parser.parse_args() + + if args.exp == "coi" or args.exp == "all": + run_coi_demonstration(args.log_dir, args.seed) + + if args.exp == "reward" or args.exp == "all": + run_reward_mode_comparison(args.log_dir, args.seed) + + if args.exp == "drift" or args.exp == "all": + run_alpha_drift_experiment(args.log_dir, args.seed) diff --git a/sim/case/thesis_simplified/separability.py b/sim/case/thesis_simplified/separability.py new file mode 100644 index 0000000..eaabaa3 --- /dev/null +++ b/sim/case/thesis_simplified/separability.py @@ -0,0 +1,72 @@ +"""Behavioral separability for human/agent detection. + +Computes divergence signals delta_H, delta_A from session trajectories using +transition kernel estimation and KL divergence to prototype behavioral profiles. +""" +from __future__ import annotations +from typing import Dict, List, Tuple, TYPE_CHECKING +import numpy as np + +if TYPE_CHECKING: + from .simplified import Event, Session + + +# prototype behavioral kernels for human vs agent sessions +TRANS_H = { + "start": {"view": 0.85, "end": 0.15}, + "view": {"detail": 0.4, "cart": 0.3, "view": 0.2, "end": 0.1}, + "detail": {"cart": 0.5, "view": 0.3, "end": 0.2}, + "cart": {"purchase": 0.6, "view": 0.25, "end": 0.15}, + "purchase": {"end": 1.0}, +} + +TRANS_A = { + "start": {"view": 0.95, "end": 0.05}, + "view": {"detail": 0.6, "view": 0.25, "cart": 0.1, "end": 0.05}, + "detail": {"view": 0.5, "cart": 0.15, "detail": 0.3, "end": 0.05}, + "cart": {"view": 0.4, "purchase": 0.2, "end": 0.4}, + "purchase": {"end": 1.0}, +} + + +def kl_div(p: Dict[str, float], q: Dict[str, float], eps: float = 1e-10) -> float: + """KL divergence D_KL(p || q) for discrete distributions.""" + keys = set(p.keys()) | set(q.keys()) + return sum(p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps)) for k in keys) + + +def build_kernel(events: List["Event"]) -> Dict[str, Dict[str, float]]: + """Build empirical transition kernel T' from trajectory events.""" + trans: Dict[str, Dict[str, int]] = {} + prev = "start" + for e in events: + curr = e.action + trans.setdefault(prev, {}) + trans[prev][curr] = trans[prev].get(curr, 0) + 1 + prev = curr + return {s: {d: c / sum(dsts.values()) for d, c in dsts.items()} for s, dsts in trans.items() if sum(dsts.values()) > 0} + + +def compute_divergence(session: "Session") -> Tuple[float, float]: + """Compute divergence signals delta_H, delta_A for session. + + delta_H = mean KL(T' || T_H) across states, measures distance to human prototype + delta_A = mean KL(T' || T_A) across states, measures distance to agent prototype + """ + kernel = build_kernel(session.events) + if not kernel: + return 0.5, 0.5 + delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / len(kernel) + delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / len(kernel) + return delta_h, delta_a + + +def estimate_alpha(session: "Session", beta: float = 2.0) -> float: + """Per-session contamination estimate alpha_hat = sigma(beta*(delta_H - delta_A)). + + Returns probability session is agent-generated based on behavioral divergence. + """ + dh, da = compute_divergence(session) + if (dh + da) <= 0: + return 0.5 + return 1.0 / (1.0 + np.exp(-beta * (dh - da))) diff --git a/lab/case/thesis/simplified.py b/sim/case/thesis_simplified/simplified.py similarity index 100% rename from lab/case/thesis/simplified.py rename to sim/case/thesis_simplified/simplified.py diff --git a/lab/case/thesis/simplified_env.py b/sim/case/thesis_simplified/simplified_env.py similarity index 100% rename from lab/case/thesis/simplified_env.py rename to sim/case/thesis_simplified/simplified_env.py diff --git a/sim/case/thesis_simplified/summarize.py b/sim/case/thesis_simplified/summarize.py new file mode 100644 index 0000000..10406aa --- /dev/null +++ b/sim/case/thesis_simplified/summarize.py @@ -0,0 +1,168 @@ +"""Summarize TensorBoard logs into comparison tables.""" +from __future__ import annotations +import json +import re +from pathlib import Path +from collections import defaultdict +from dataclasses import dataclass +import pandas as pd + +try: + from tensorboard.backend.event_processing.event_accumulator import EventAccumulator + HAS_TB = True +except ImportError: + HAS_TB = False + + +@dataclass +class RunInfo: + algo: str + alpha: float + reward_mode: str + path: Path + + +def parse_run_name(name: str) -> RunInfo | None: + """Extract algo, alpha, reward_mode from run directory name.""" + # patterns: ppo_a0.20_robust, cmp_fixed_a0.20, sac_a0.90_robust + m = re.match(r'(cmp_)?(\w+)_a([\d.]+)_?(\w+)?', name) + if not m: + return None + prefix, algo, alpha, mode = m.groups() + return RunInfo(algo=algo, alpha=float(alpha), reward_mode=mode or 'robust', path=Path()) + + +def load_tb_scalars(log_dir: Path, tags: list[str], reduce: str = 'last') -> dict[str, float]: + """Load scalar values from TensorBoard event files.""" + if not HAS_TB: + return {} + ea = EventAccumulator(str(log_dir)) + ea.Reload() + results = {} + for tag in tags: + if tag in ea.Tags().get('scalars', []): + events = ea.Scalars(tag) + if not events: + continue + vals = [e.value for e in events] + if reduce == 'last': + results[tag] = vals[-1] + elif reduce == 'mean': + results[tag] = sum(vals) / len(vals) + elif reduce == 'max': + results[tag] = max(vals) + elif reduce == 'min': + results[tag] = min(vals) + return results + + +def load_json_results(log_dir: Path) -> dict[str, float]: + """Load metrics from results.json if available.""" + results_file = log_dir / 'results.json' + if results_file.exists(): + with open(results_file) as f: + return json.load(f) + return {} + + +def discover_runs(base_dir: Path) -> list[RunInfo]: + """Find all experiment runs in base directory.""" + runs = [] + for d in base_dir.iterdir(): + if not d.is_dir(): + continue + info = parse_run_name(d.name) + if info: + info.path = d + runs.append(info) + return runs + + +def build_tables(runs: list[RunInfo], metrics: list[str], reduce: str = 'last') -> dict[str, dict[str, pd.DataFrame]]: + """Build pivot tables: reward_mode -> metric -> DataFrame[alpha x algo].""" + # collect data: {reward_mode: {metric: {(alpha, algo): value}}} + data = defaultdict(lambda: defaultdict(dict)) + + tb_tags = [f'economics/{m}' if m in ['revenue', 'profit', 'margin'] else f'coi/{m}' if m in ['erosion', 'leakage'] else f'alpha/{m}' for m in metrics] + tag_map = dict(zip(tb_tags, metrics)) + + for run in runs: + # try json first (final eval metrics) + jm = load_json_results(run.path) + tb = load_tb_scalars(run.path, tb_tags, reduce) + + for tag, metric in tag_map.items(): + val = None + json_key = f'{metric}_mean' if metric != 'reward' else 'reward_mean' + if json_key in jm: + val = jm[json_key] + elif tag in tb: + val = tb[tag] + if val is not None: + data[run.reward_mode][metric][(run.alpha, run.algo)] = val + + # convert to DataFrames + tables = {} + for mode, metrics_data in data.items(): + tables[mode] = {} + for metric, vals in metrics_data.items(): + if not vals: + continue + alphas = sorted(set(a for a, _ in vals.keys())) + algos = sorted(set(al for _, al in vals.keys())) + df = pd.DataFrame(index=alphas, columns=algos, dtype=float) + for (a, al), v in vals.items(): + df.loc[a, al] = v + df.index.name = 'alpha' + tables[mode][metric] = df + return tables + + +def format_table(df: pd.DataFrame, fmt: str = '.3f') -> str: + """Format DataFrame as markdown table.""" + return df.to_markdown(floatfmt=fmt) + + +def summarize(base_dir: str = 'sim/case/thesis_simplified/runs', + metrics: list[str] | None = None, + reduce: str = 'last', + output: str | None = None) -> dict: + """Generate summary tables from experiment runs.""" + base = Path(base_dir) + metrics = metrics or ['revenue', 'profit', 'margin', 'erosion', 'leakage'] + + runs = discover_runs(base) + if not runs: + print(f"No runs found in {base}") + return {} + + print(f"Found {len(runs)} runs") + tables = build_tables(runs, metrics, reduce) + + lines = [] + for mode, metric_tables in sorted(tables.items()): + lines.append(f"\n# Reward Mode: {mode}\n") + for metric, df in sorted(metric_tables.items()): + lines.append(f"\n## {metric}\n") + lines.append(format_table(df)) + lines.append("") + + report = '\n'.join(lines) + print(report) + + if output: + Path(output).write_text(report) + print(f"\nSaved to {output}") + + return tables + + +if __name__ == '__main__': + import argparse + p = argparse.ArgumentParser() + p.add_argument('--dir', default='sim/case/thesis_simplified/runs') + p.add_argument('--metrics', nargs='+', default=['revenue', 'profit', 'margin', 'erosion', 'leakage']) + p.add_argument('--reduce', default='last', choices=['last', 'mean', 'max', 'min']) + p.add_argument('--output', '-o', help='save markdown to file') + args = p.parse_args() + summarize(args.dir, args.metrics, args.reduce, args.output) diff --git a/lab/case/thesis/train.py b/sim/case/thesis_simplified/train.py similarity index 99% rename from lab/case/thesis/train.py rename to sim/case/thesis_simplified/train.py index c1273eb..a405c44 100644 --- a/lab/case/thesis/train.py +++ b/sim/case/thesis_simplified/train.py @@ -65,7 +65,7 @@ class ExperimentConfig: n_envs: int = 4 eval_freq: int = 5000 n_eval_episodes: int = 10 - log_dir: str = "lab/case/thesis/runs" + log_dir: str = "sim/case/thesis_simplified/runs" seed: int = 42 n_products: int = 10 max_steps: int = 200 @@ -312,7 +312,7 @@ def main(): parser.add_argument("--n-products", type=int, default=10) parser.add_argument("--n-envs", type=int, default=4) parser.add_argument("--seed", type=int, default=42) - parser.add_argument("--log-dir", default="lab/case/thesis/runs") + parser.add_argument("--log-dir", default="sim/case/thesis_simplified/runs") parser.add_argument("--sweep", action="store_true", help="run contamination sweep") parser.add_argument("--compare", action="store_true", help="compare all baselines") parser.add_argument("--workers", type=int, default=None, help="max parallel workers for sweep (None=auto, 1=sequential)") diff --git a/sim/rl/environment.py b/sim/rl/environment.py index 597359f..a4cf7c9 100644 --- a/sim/rl/environment.py +++ b/sim/rl/environment.py @@ -2,6 +2,7 @@ import gymnasium as gym from gymnasium import spaces import numpy as np from dataclasses import dataclass +from pathlib import Path import pandas as pd from types import SimpleNamespace from typing import Optional, Dict, Any, List, Tuple @@ -19,8 +20,6 @@ except ImportError: # "learner" agent learning to optimize pricing # "agent" part of environment creating demand signals that learner processes -base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments" -human_dir, agent_dir = f"{base_dir}/collected_data/", f"{base_dir}/agents/collected_data/" @dataclass class BusinessLogicConstraints(): max_price_adjustment: float = 0.30 @@ -43,6 +42,17 @@ class BusinessLogicConstraints(): w_volatility: float = 5.0 w_estimation_error: float = 0.25 seed: int = 7 + human_data_dir: str | None = None + agent_data_dir: str | None = None + + +def _resolve_behavior_data_dirs(constraints: BusinessLogicConstraints) -> tuple[str, str]: + base = Path(__file__).resolve().parents[2] / "experiments" + human_default = str(base / "collected_data") + agent_default = str(base / "agents" / "collected_data") + human = constraints.human_data_dir or human_default + agent = constraints.agent_data_dir or agent_default + return human, agent def _sigmoid(x: np.ndarray) -> np.ndarray: @@ -94,7 +104,7 @@ class BehavioralProfile: """Synthetic Markov profile used to generate interaction sessions. Uses aggregate_event_transitions from models.py to build transition kernels from real data.""" - def __init__(self, actor: str, purchase_probs: np.ndarray): + def __init__(self, actor: str, purchase_probs: np.ndarray, *, human_data_dir: str, agent_data_dir: str): self.actor = actor self.purchase_probs = np.clip(purchase_probs, 0.0, 0.95) self.states = [ @@ -105,7 +115,7 @@ class BehavioralProfile: "purchase_complete", "session_end", ] - model = AgentBehaviorModel(agent_dir) if actor == "agents" else BehaviorModel(human_dir) + model = AgentBehaviorModel(agent_data_dir) if actor == "agents" else BehaviorModel(human_data_dir) mdp = model.build_MDP() raw_trans = aggregate_event_transitions(mdp) if mdp.get("transitions") else {} self.transitions = _canonicalize_transitions(raw_trans) if raw_trans else self._fallback_transitions() @@ -227,12 +237,18 @@ class BehavioralProfile: return events, feature_events -def _load_behavioral_profile(actor: str, demand_forcing: np.ndarray) -> BehavioralProfile: +def _load_behavioral_profile( + actor: str, + demand_forcing: np.ndarray, + *, + human_data_dir: str, + agent_data_dir: str, +) -> BehavioralProfile: """returns a behavioral profile for generating synthetic sessions actor: 'humans' or 'agents' demand_forcing: per-product purchase probabilities used to weight interactions """ - return BehavioralProfile(actor, demand_forcing) + return BehavioralProfile(actor, demand_forcing, human_data_dir=human_data_dir, agent_data_dir=agent_data_dir) class CommercePlatform: @@ -248,6 +264,7 @@ class CommercePlatform: self.unit_cost = np.random.uniform(low=15.0, high=60.0, size=(self.product_catalogue_size,)).astype(np.float32) self.base_price = np.random.uniform(low=60.0, high=140.0, size=(self.product_catalogue_size,)).astype(np.float32) self.alpha_hat = constraints.agent_share + self._human_data_dir, self._agent_data_dir = _resolve_behavior_data_dirs(constraints) try: self.separability_artifacts = load_artifacts() except FileNotFoundError: @@ -287,7 +304,12 @@ class CommercePlatform: demand_agent = np.zeros_like(prices, dtype=np.float32) for actor, n_sessions in session_map.items(): - profile = _load_behavioral_profile(actor, pprob_map[actor]) + profile = _load_behavioral_profile( + actor, + pprob_map[actor], + human_data_dir=self._human_data_dir, + agent_data_dir=self._agent_data_dir, + ) for idx in range(n_sessions): session_id = f"{actor}_{idx:06d}" session_rows, feature_events = profile.sample_session( @@ -474,8 +496,19 @@ class PHANTOMEnv(gym.Env): def _init_jax_transitions(self): try: - human_profile = _load_behavioral_profile("humans", np.ones(self.constraints.product_catalogue_size) * 0.1) - agent_profile = _load_behavioral_profile("agents", np.ones(self.constraints.product_catalogue_size) * 0.1) + human_dir, agent_dir = _resolve_behavior_data_dirs(self.constraints) + human_profile = _load_behavioral_profile( + "humans", + np.ones(self.constraints.product_catalogue_size) * 0.1, + human_data_dir=human_dir, + agent_data_dir=agent_dir, + ) + agent_profile = _load_behavioral_profile( + "agents", + np.ones(self.constraints.product_catalogue_size) * 0.1, + human_data_dir=human_dir, + agent_data_dir=agent_dir, + ) self._jax_trans = compile_transitions(human_profile, agent_profile).to_jax() except Exception: self._jax_trans = fallback_transitions().to_jax()