From 4c7d9110436b253d253baa3db39b571bfda1a98c Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Tue, 10 Mar 2026 14:23:17 +0100 Subject: [PATCH] feature: telemetry logging --- engine/backends/common.py | 73 +++++++++++- engine/backends/qtable.py | 35 +++--- engine/backends/sb3.py | 5 +- engine/benchmark.py | 222 ++++++++++++++++++++++++++++-------- engine/lib/callbacks.py | 26 ++++- engine/lib/wrappers.py | 16 ++- engine/logging_utils.py | 33 ++++++ engine/project.json | 10 ++ engine/spec.py | 13 +++ engine/telemetry/metrics.py | 11 +- engine/train.py | 6 + engine/wrapper.py | 96 ++++++++++------ package.json | 1 + scripts/nx_research.sh | 11 ++ 14 files changed, 454 insertions(+), 104 deletions(-) create mode 100644 engine/logging_utils.py diff --git a/engine/backends/common.py b/engine/backends/common.py index 9b916ab..9e50d48 100644 --- a/engine/backends/common.py +++ b/engine/backends/common.py @@ -19,7 +19,10 @@ def make_env(cfg: Mapping[str, Any]): lambda_coi=float(cfg["lambda_coi"]), robust_radius=float(cfg["robust_radius"]), robust_points=int(cfg["robust_points"]), + robust_rollouts=int(cfg.get("robust_rollouts", 1)), info_value=float(cfg["info_value"]), + eta_ux=float(cfg.get("eta_ux", 0.5)), + reward_profit_weight=float(cfg.get("reward_profit_weight", 1.0)), action_levels=int(cfg["action_levels"]), action_scale_low=float(cfg["action_scale_low"]), action_scale_high=float(cfg["action_scale_high"]), @@ -40,11 +43,14 @@ def _action(agent: Any, obs: Any, deterministic: bool = True): return action -def evaluate(agent: Any, env: Any, episodes: int) -> dict[str, float]: +def _evaluate_env(agent: Any, env: Any, episodes: int) -> dict[str, float]: rewards: list[float] = [] revenues: list[float] = [] margins: list[float] = [] coi_levels: list[float] = [] + coi_leakages: list[float] = [] + volatilities: list[float] = [] + agent_probs: list[float] = [] for _ in range(int(episodes)): obs, _ = env.reset() @@ -53,6 +59,9 @@ def evaluate(agent: Any, env: Any, episodes: int) -> dict[str, float]: ep_revenue = 0.0 ep_margin = 0.0 ep_coi = 0.0 + ep_coi_leakage = 0.0 + ep_volatility = 0.0 + ep_agent_prob = 0.0 steps = 0 while not done: @@ -63,6 +72,9 @@ def evaluate(agent: Any, env: Any, episodes: int) -> dict[str, float]: ep_revenue += float(econ.get("revenue", info.get("revenue", 0.0))) ep_margin += float(econ.get("margin", 0.0)) ep_coi += float(econ.get("coi_level", 0.0)) + ep_coi_leakage += float(econ.get("coi_leakage", 0.0)) + ep_volatility += float(econ.get("volatility", 0.0)) + ep_agent_prob += float(econ.get("agent_prob", info.get("agent_prob", 0.0))) steps += 1 rewards.append(ep_reward) @@ -70,6 +82,9 @@ def evaluate(agent: Any, env: Any, episodes: int) -> dict[str, float]: denom = max(steps, 1) margins.append(ep_margin / denom) coi_levels.append(ep_coi / denom) + coi_leakages.append(ep_coi_leakage / denom) + volatilities.append(ep_volatility / denom) + agent_probs.append(ep_agent_prob / denom) return { "eval/reward_mean": float(np.mean(rewards)) if rewards else 0.0, @@ -78,4 +93,60 @@ def evaluate(agent: Any, env: Any, episodes: int) -> dict[str, float]: "eval/revenue_std": float(np.std(revenues)) if revenues else 0.0, "eval/margin_mean": float(np.mean(margins)) if margins else 0.0, "eval/coi_level_mean": float(np.mean(coi_levels)) if coi_levels else 0.0, + "eval/coi_leakage_mean": float(np.mean(coi_leakages)) if coi_leakages else 0.0, + "eval/volatility_mean": float(np.mean(volatilities)) if volatilities else 0.0, + "eval/agent_prob_mean": float(np.mean(agent_probs)) if agent_probs else 0.0, } + + +def evaluate( + agent: Any, + env: Any, + episodes: int, + cfg: Mapping[str, Any] | None = None, +) -> dict[str, float]: + metrics = _evaluate_env(agent, env, episodes) + if cfg is None or not bool(cfg.get("robust_eval_enabled", True)): + return metrics + + nominal_alpha = float(cfg.get("alpha", 0.0)) + eval_radius = max(float(cfg.get("robust_radius", 0.0)), 0.15) + low_alpha = float(np.clip(nominal_alpha - eval_radius, 0.0, 1.0)) + high_alpha = float(np.clip(nominal_alpha + eval_radius, 0.0, 1.0)) + shifted_episodes = max(1, int(np.ceil(int(episodes) / 2))) + + shifted_rows = [] + for tag, alpha in ( + ("low", low_alpha), + ("nominal", nominal_alpha), + ("high", high_alpha), + ): + eval_cfg = dict(cfg) + eval_cfg["alpha"] = float(alpha) + shifted_env = make_env(eval_cfg) + shifted_metrics = _evaluate_env(agent, shifted_env, shifted_episodes) + shifted_env.close() + shifted_rows.append((tag, alpha, shifted_metrics)) + + metrics["eval/robust_alpha_low"] = low_alpha + metrics["eval/robust_alpha_high"] = high_alpha + metrics["eval/robust_reward_worst"] = float( + min(row[2]["eval/reward_mean"] for row in shifted_rows) + ) + metrics["eval/robust_revenue_worst"] = float( + min(row[2]["eval/revenue_mean"] for row in shifted_rows) + ) + metrics["eval/robust_coi_leakage_worst"] = float( + max(row[2]["eval/coi_leakage_mean"] for row in shifted_rows) + ) + for tag, alpha, shifted_metrics in shifted_rows: + metrics[f"eval/{tag}_alpha"] = float(alpha) + metrics[f"eval/{tag}_reward_mean"] = float(shifted_metrics["eval/reward_mean"]) + metrics[f"eval/{tag}_revenue_mean"] = float( + shifted_metrics["eval/revenue_mean"] + ) + metrics[f"eval/{tag}_coi_leakage_mean"] = float( + shifted_metrics["eval/coi_leakage_mean"] + ) + + return metrics diff --git a/engine/backends/qtable.py b/engine/backends/qtable.py index 6468d43..b314fdb 100644 --- a/engine/backends/qtable.py +++ b/engine/backends/qtable.py @@ -7,6 +7,7 @@ from typing import Any, Mapping import numpy as np from .common import evaluate, make_env +from ..telemetry.wandb import get_wandb_module logger = logging.getLogger(__name__) @@ -36,6 +37,9 @@ def train_qtable( console_progress = bool(cfg.get("console_progress", False)) obs, _ = env.reset(seed=int(cfg["seed"])) started_at = time.perf_counter() + wandb = get_wandb_module() + wandb_live = bool(wandb is not None and wandb.run is not None) + step_offset = max(0, int(cfg.get("wandb_step_offset", 0))) interval_sums = { "reward": 0.0, @@ -75,7 +79,10 @@ def train_qtable( "train/epsilon": float(epsilon), "train/global_step": int(steps), } - train_events.append(event) + if wandb_live: + wandb.log(dict(event), step=step_offset + int(steps)) + else: + train_events.append(event) if console_progress: elapsed = max(time.perf_counter() - started_at, 1e-6) speed = steps / elapsed @@ -96,17 +103,19 @@ def train_qtable( if interval_count > 0: denom = float(interval_count) - train_events.append( - { - "train/reward_mean": interval_sums["reward"] / denom, - "train/revenue_mean": interval_sums["revenue"] / denom, - "train/agent_prob": interval_sums["agent_prob"] / denom, - "train/alpha_adv": interval_sums["alpha_adv"] / denom, - "train/coi_leakage": interval_sums["coi_leakage"] / denom, - "train/epsilon": float(epsilon), - "train/global_step": int(steps), - } - ) + tail_event = { + "train/reward_mean": interval_sums["reward"] / denom, + "train/revenue_mean": interval_sums["revenue"] / denom, + "train/agent_prob": interval_sums["agent_prob"] / denom, + "train/alpha_adv": interval_sums["alpha_adv"] / denom, + "train/coi_leakage": interval_sums["coi_leakage"] / denom, + "train/epsilon": float(epsilon), + "train/global_step": int(steps), + } + if wandb_live: + wandb.log(dict(tail_event), step=step_offset + int(steps)) + else: + train_events.append(tail_event) metrics: dict[str, Any] = { "train/reward_mean": total_reward / max(steps, 1), @@ -114,7 +123,7 @@ def train_qtable( "train/epsilon": float(epsilon), "train/global_step": int(cfg["total_timesteps"]), } - metrics.update(evaluate(agent, eval_env, int(cfg["eval_episodes"]))) + metrics.update(evaluate(agent, eval_env, int(cfg["eval_episodes"]), cfg=cfg)) metrics["_train_events"] = train_events env.close() diff --git a/engine/backends/sb3.py b/engine/backends/sb3.py index 52dbd87..37f23c5 100644 --- a/engine/backends/sb3.py +++ b/engine/backends/sb3.py @@ -144,7 +144,9 @@ def train_sb3(cfg: Mapping[str, Any]) -> tuple[object, dict[str, Any]]: pass metrics_callback = MetricsCallback( - log_histograms=False, log_freq=int(cfg["log_freq"]) + log_histograms=False, + log_freq=int(cfg["log_freq"]), + step_offset=int(cfg.get("wandb_step_offset", 0)), ) callbacks = [metrics_callback] callbacks.append( @@ -175,6 +177,7 @@ def train_sb3(cfg: Mapping[str, Any]) -> tuple[object, dict[str, Any]]: model, eval_env, int(cfg["eval_episodes"]), + cfg=cfg, ) metrics["train/global_step"] = int(model.num_timesteps) metrics["model/path"] = str(model_path.with_suffix(".zip")) diff --git a/engine/benchmark.py b/engine/benchmark.py index 41d4ca5..0e2da26 100644 --- a/engine/benchmark.py +++ b/engine/benchmark.py @@ -83,20 +83,24 @@ def _run_eval_episode(env, policy) -> dict: } -def _build_tier(name: str, cfg: dict, alpha: float): +def _build_tier(name: str, cfg: dict, alpha: float, *, step_offset: int = 0): from .backends.common import make_env tier = name.lower().strip() run_cfg = dict(cfg) run_cfg["alpha"] = float(alpha) + run_cfg["wandb_step_offset"] = int(step_offset) if tier == "static": - return StaticPolicy(int(run_cfg["action_levels"])) + return StaticPolicy(int(run_cfg["action_levels"])), [] if tier == "surge": - return SurgePolicy( - n_actions=int(run_cfg["action_levels"]), - n_products=int(run_cfg["n_products"]), + return ( + SurgePolicy( + n_actions=int(run_cfg["action_levels"]), + n_products=int(run_cfg["n_products"]), + ), + [], ) if tier == "linear": @@ -113,27 +117,72 @@ def _build_tier(name: str, cfg: dict, alpha: float): seed=int(run_cfg["seed"]), ) warmup_env.close() - return policy + return policy, [] if tier == "qtable": from .backends.qtable import train_qtable run_cfg["console_progress"] = True - agent, _ = train_qtable(run_cfg) - return agent + agent, metrics = train_qtable(run_cfg) + events = metrics.get("_train_events", []) + return agent, events if isinstance(events, list) else [] if tier in {"ppo", "a2c", "dqn"}: from .backends.sb3 import train_sb3 run_cfg["algo"] = tier - agent, _ = train_sb3(run_cfg) - return agent + agent, metrics = train_sb3(run_cfg) + events = metrics.get("_train_events", []) + return agent, events if isinstance(events, list) else [] raise ValueError(f"unsupported tier '{name}'") +def _log_train_events( + events: list[dict], + *, + tier_name: str, + mode_label: str, + alpha: float, + step_offset: int, +) -> int: + if not (HAS_WANDB and wandb.run is not None): + return int(step_offset) + if not events: + return int(step_offset) + + ordered = sorted( + [evt for evt in events if isinstance(evt, dict)], + key=lambda evt: int(evt.get("train/global_step", 0)), + ) + if not ordered: + return int(step_offset) + + cursor = int(step_offset) + for evt in ordered: + rel_step = max(1, int(evt.get("train/global_step", 0))) + payload = dict(evt) + payload.update( + { + "run.kind": "benchmark", + "runtime/backend": tier_name, + "study/mode": mode_label, + "study/no_robust": float(mode_label == "no_robust"), + "study/alpha": float(alpha), + } + ) + wandb.log(payload, step=cursor + rel_step) + max_rel = max(max(1, int(evt.get("train/global_step", 0))) for evt in ordered) + return cursor + max_rel + 1 + + def run_benchmark( - cfg: dict, tiers: list[str], alpha_values: list[float], n_episodes: int + cfg: dict, + tiers: list[str], + alpha_values: list[float], + n_episodes: int, + mode_label: str, + step_cursor_start: int = 0, ): from .backends.common import make_env @@ -141,6 +190,7 @@ def run_benchmark( traces: list[dict] = [] total_runs = max(1, len(alpha_values) * len(tiers)) run_index = 0 + wandb_step_cursor = int(step_cursor_start) for alpha in alpha_values: for tier_name in tiers: @@ -148,13 +198,34 @@ def run_benchmark( _log( f"[{run_index}/{total_runs}] alpha={float(alpha):.2f} tier={tier_name}: training" ) - policy = _build_tier(tier_name, cfg, alpha) + policy, train_events = _build_tier( + tier_name, + cfg, + alpha, + step_offset=wandb_step_cursor, + ) + prev_cursor = int(wandb_step_cursor) + wandb_step_cursor = _log_train_events( + train_events, + tier_name=tier_name, + mode_label=mode_label, + alpha=float(alpha), + step_offset=wandb_step_cursor, + ) + if wandb_step_cursor == prev_cursor and tier_name in { + "qtable", + "ppo", + "a2c", + "dqn", + }: + wandb_step_cursor += max(1, int(cfg.get("total_timesteps", 1))) + 1 env = make_env({**cfg, "alpha": float(alpha)}) eps = [_run_eval_episode(env, policy) for _ in range(int(n_episodes))] env.close() row = { "tier": tier_name, + "mode": mode_label, "alpha": float(alpha), "episodes": int(n_episodes), "mean_reward": float(np.mean([e["reward"] for e in eps])), @@ -163,10 +234,7 @@ def run_benchmark( "mean_coi": float(np.mean([e["mean_coi"] for e in eps])), "std_revenue": float(np.std([e["revenue"] for e in eps])), } - row["objective_score"] = ( - row["mean_reward"] - + float(cfg.get("revenue_weight", 0.01)) * row["mean_revenue"] - ) + row["objective_score"] = row["mean_reward"] rows.append(row) _log( f"[{run_index}/{total_runs}] alpha={float(alpha):.2f} tier={tier_name}: " @@ -192,16 +260,23 @@ def run_benchmark( if HAS_WANDB and wandb.run is not None: wandb.log( { + "run.kind": "benchmark", + "runtime/backend": tier_name, + "study/mode": mode_label, + "study/no_robust": float(mode_label == "no_robust"), "study/alpha": float(alpha), "eval/reward_mean": row["mean_reward"], "eval/revenue_mean": row["mean_revenue"], "eval/margin_mean": row["mean_margin"], + "eval/coi_level_mean": row["mean_coi"], "objective/score": row["objective_score"], "objective/coi_preserved": row["mean_coi"], - } + }, + step=wandb_step_cursor, ) + wandb_step_cursor += 1 - return pd.DataFrame(rows), traces + return pd.DataFrame(rows), traces, int(wandb_step_cursor) def _plot_outputs(df: pd.DataFrame, traces: list[dict], out_dir: Path, stamp: str): @@ -277,8 +352,12 @@ def _plot_outputs(df: pd.DataFrame, traces: list[dict], out_dir: Path, stamp: st return rev_path, coi_path, price_path -def _run_with_args(args): - compare_robust = _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST")) +def _run_with_args(args, compare_robust_override: bool | None = None): + compare_robust = ( + bool(compare_robust_override) + if compare_robust_override is not None + else _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST")) + ) robust_modes = [False, True] if compare_robust else [bool(args.no_robust)] base_overrides = { @@ -289,6 +368,9 @@ def _run_with_args(args): "lambda_coi": args.lambda_coi, "robust_radius": args.robust_radius, "robust_points": args.robust_points, + "robust_rollouts": args.robust_rollouts, + "eta_ux": args.eta_ux, + "reward_profit_weight": args.reward_profit_weight, "price_low": args.price_low, "price_high": args.price_high, "action_levels": args.action_levels, @@ -318,6 +400,7 @@ def _run_with_args(args): all_frames: list[pd.DataFrame] = [] all_traces: list[dict] = [] + wandb_step_cursor = 0 for no_robust in robust_modes: overrides = dict(base_overrides) overrides["no_robust"] = bool(no_robust) @@ -327,9 +410,15 @@ def _run_with_args(args): cfg["linear_warmup_steps"] = int(args.linear_warmup_steps) mode_label = "no_robust" if no_robust else "robust" _log(f"mode={mode_label}: begin") - df_mode, traces_mode = run_benchmark(cfg, tiers, alpha_values, args.episodes) + df_mode, traces_mode, wandb_step_cursor = run_benchmark( + cfg, + tiers, + alpha_values, + args.episodes, + mode_label=mode_label, + step_cursor_start=wandb_step_cursor, + ) _log(f"mode={mode_label}: complete ({len(df_mode)} rows)") - df_mode["mode"] = mode_label for trace in traces_mode: trace["mode"] = mode_label all_frames.append(df_mode) @@ -349,7 +438,7 @@ def _run_with_args(args): _log(f"artifacts written in {out_dir}") if not df.empty: - best_idx = int(df["mean_revenue"].idxmax()) + best_idx = int(df["objective_score"].idxmax()) best = df.iloc[best_idx] _log( "BEST_TIER=" @@ -358,6 +447,7 @@ def _run_with_args(args): "tier": best["tier"], "mode": best.get("mode", "robust"), "alpha": float(best["alpha"]), + "objective_score": float(best["objective_score"]), "mean_revenue": float(best["mean_revenue"]), "mean_coi": float(best["mean_coi"]), } @@ -385,6 +475,9 @@ def run_cli(raw_args: list[str] | None = None): parser.add_argument("--lambda-coi", type=float, default=0.2) parser.add_argument("--robust-radius", type=float, default=0.15) parser.add_argument("--robust-points", type=int, default=5) + parser.add_argument("--robust-rollouts", type=int, default=1) + parser.add_argument("--eta-ux", type=float, default=0.5) + parser.add_argument("--reward-profit-weight", type=float, default=1.0) parser.add_argument("--price-low", type=float, default=10.0) parser.add_argument("--price-high", type=float, default=150.0) parser.add_argument("--action-levels", type=int, default=9) @@ -421,6 +514,9 @@ def run_cli(raw_args: list[str] | None = None): "lambda_coi": "lambda_coi", "robust_radius": "robust_radius", "robust_points": "robust_points", + "robust_rollouts": "robust_rollouts", + "eta_ux": "eta_ux", + "reward_profit_weight": "reward_profit_weight", "learning_rate": "learning_rate", "batch_size": "batch_size", "n_steps": "n_steps", @@ -435,6 +531,9 @@ def run_cli(raw_args: list[str] | None = None): "lambda_coi", "robust_radius", "robust_points", + "robust_rollouts", + "eta_ux", + "reward_profit_weight", "learning_rate", "batch_size", "n_steps", @@ -459,34 +558,57 @@ def run_cli(raw_args: list[str] | None = None): _run_with_args(args) return - run = wandb.init( - project=args.project, - name=f"benchmark-{datetime.now(UTC).strftime('%m%d-%H%M%S')}", - tags=[ - "benchmark", - "robust-compare" - if _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST")) - else "single-mode", - ], - config={ - "run.kind": "benchmark", - "tiers": args.tiers, - "alpha_values": args.alpha_values, - "episodes": args.episodes, - "total_timesteps": args.total_timesteps, - "lambda_coi": args.lambda_coi, - "robust_radius": args.robust_radius, - "robust_points": args.robust_points, - "learning_rate": args.learning_rate, - "device": args.device, - }, - mode="offline" if args.offline else "online", + tiers = _parse_list(args.tiers) + run_stamp = datetime.now(UTC).strftime("%m%d-%H%M%S") + compare_enabled = _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST")) + compare_tag = "robust-compare" if compare_enabled else "single-mode" + modes = ( + [("no_robust", True), ("robust", False)] + if compare_enabled + else [("no_robust" if bool(args.no_robust) else "robust", bool(args.no_robust))] ) - try: - _run_with_args(args) - finally: - if run is not None: - wandb.finish() + + run_idx = 0 + for tier in tiers: + for mode_label, no_robust in modes: + run_idx += 1 + tier_args = argparse.Namespace(**vars(args)) + tier_args.tiers = tier + tier_args.no_robust = bool(no_robust) + run = wandb.init( + project=args.project, + name=f"benchmark-{tier}-{mode_label}-{run_stamp}-{run_idx}", + tags=[ + "benchmark", + compare_tag, + f"backend:{tier}", + f"mode:{mode_label}", + ], + config={ + "run.kind": "benchmark", + "runtime/backend": tier, + "study/mode": mode_label, + "study/no_robust": float(no_robust), + "tiers": tier, + "alpha_values": args.alpha_values, + "episodes": args.episodes, + "total_timesteps": args.total_timesteps, + "lambda_coi": args.lambda_coi, + "robust_radius": args.robust_radius, + "robust_points": args.robust_points, + "robust_rollouts": args.robust_rollouts, + "eta_ux": args.eta_ux, + "reward_profit_weight": args.reward_profit_weight, + "learning_rate": args.learning_rate, + "device": args.device, + }, + mode="offline" if args.offline else "online", + ) + try: + _run_with_args(tier_args, compare_robust_override=False) + finally: + if run is not None: + wandb.finish() if __name__ == "__main__": diff --git a/engine/lib/callbacks.py b/engine/lib/callbacks.py index 8377d80..2193894 100644 --- a/engine/lib/callbacks.py +++ b/engine/lib/callbacks.py @@ -5,6 +5,8 @@ from typing import Any from stable_baselines3.common.callbacks import BaseCallback, EvalCallback import numpy as np +from ..telemetry.wandb import get_wandb_module + class MetricsCallback(BaseCallback): """Collects interval train metrics from env info dictionaries.""" @@ -13,16 +15,25 @@ class MetricsCallback(BaseCallback): self, log_histograms: bool = False, log_freq: int = 100, + step_offset: int = 0, verbose: int = 0, ): super().__init__(verbose) self.log_histograms = log_histograms self.log_freq = max(1, int(log_freq)) + self.step_offset = max(0, int(step_offset)) + self._wandb = get_wandb_module() + self._wandb_live = bool(self._wandb is not None and self._wandb.run is not None) self._window_sums = { "train/revenue_mean": 0.0, "train/margin_mean": 0.0, "train/coi_level_mean": 0.0, "train/regret_mean": 0.0, + "train/profit_mean": 0.0, + "train/agent_prob": 0.0, + "train/alpha_adv": 0.0, + "train/ux_penalty": 0.0, + "train/volatility": 0.0, "train/coi_mix": 0.0, "train/coi_base": 0.0, "train/coi_leakage": 0.0, @@ -39,6 +50,16 @@ class MetricsCallback(BaseCallback): self._window_sums["train/margin_mean"] += float(econ.get("margin", 0.0)) self._window_sums["train/coi_level_mean"] += float(econ.get("coi_level", 0.0)) self._window_sums["train/regret_mean"] += float(econ.get("regret", 0.0)) + if "profit" in econ: + self._window_sums["train/profit_mean"] += float(econ.get("profit", 0.0)) + if "agent_prob" in econ: + self._window_sums["train/agent_prob"] += float(econ.get("agent_prob", 0.0)) + if "alpha_adv" in econ: + self._window_sums["train/alpha_adv"] += float(econ.get("alpha_adv", 0.0)) + if "ux_penalty" in econ: + self._window_sums["train/ux_penalty"] += float(econ.get("ux_penalty", 0.0)) + if "volatility" in econ: + self._window_sums["train/volatility"] += float(econ.get("volatility", 0.0)) if "coi_mix" in econ: self._window_sums["train/coi_mix"] += float(econ.get("coi_mix", 0.0)) if "coi_base" in econ: @@ -70,7 +91,10 @@ class MetricsCallback(BaseCallback): } } payload["train/global_step"] = int(step) - self.events.append(payload) + if self._wandb_live: + self._wandb.log(dict(payload), step=self.step_offset + int(step)) + else: + self.events.append(payload) for key in self._window_sums: self._window_sums[key] = 0.0 self._window_count = 0 diff --git a/engine/lib/wrappers.py b/engine/lib/wrappers.py index a1b464b..f68a27c 100644 --- a/engine/lib/wrappers.py +++ b/engine/lib/wrappers.py @@ -57,7 +57,21 @@ class EconomicMetricsWrapper(gym.Wrapper): "coi_level": coi_level, "regret": regret, } - for key in ("coi_mix", "coi_base", "coi_leakage", "coi_penalty"): + for key in ( + "coi_mix", + "coi_base", + "coi_leakage", + "coi_penalty", + "ux_penalty", + "volatility", + "profit", + "cost_floor", + "reward_revenue", + "reward_total", + "agent_prob", + "alpha_adv", + "alpha_nominal", + ): if key in info: info["economics"][key] = info[key] info["prices"] = prices.copy() diff --git a/engine/logging_utils.py b/engine/logging_utils.py new file mode 100644 index 0000000..ac92bfb --- /dev/null +++ b/engine/logging_utils.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import logging +import os +import sys + +_CONFIGURED = False + + +def _resolve_level(raw: str | None) -> int: + name = str(raw or os.environ.get("PHANTOM_LOG_LEVEL", "INFO")).upper().strip() + return int(getattr(logging, name, logging.INFO)) + + +def configure_logging(level: str | None = None) -> None: + global _CONFIGURED + if _CONFIGURED: + return + + logger = logging.getLogger("engine") + logger.setLevel(_resolve_level(level)) + logger.propagate = False + + if logger.handlers: + _CONFIGURED = True + return + + handler = logging.StreamHandler(stream=sys.stdout) + handler.setFormatter( + logging.Formatter("%(asctime)s %(levelname)s [%(name)s] %(message)s") + ) + logger.addHandler(handler) + _CONFIGURED = True diff --git a/engine/project.json b/engine/project.json index 2a78f1d..4d5d041 100644 --- a/engine/project.json +++ b/engine/project.json @@ -41,6 +41,16 @@ "cwd": "." } }, + "benchmark-simple": { + "executor": "nx:run-commands", + "dependsOn": [ + "install" + ], + "options": { + "command": "bash scripts/nx_research.sh benchmark-simple", + "cwd": "." + } + }, "benchmark-agent": { "executor": "nx:run-commands", "dependsOn": [ diff --git a/engine/spec.py b/engine/spec.py index 7c9f059..818d59f 100644 --- a/engine/spec.py +++ b/engine/spec.py @@ -31,7 +31,10 @@ def _normalize_keys(raw: Mapping[str, Any]) -> dict[str, Any]: "study.lambda_coi": "lambda_coi", "study.robust_radius": "robust_radius", "study.robust_points": "robust_points", + "study.robust_rollouts": "robust_rollouts", "study.info_value": "info_value", + "study.eta_ux": "eta_ux", + "study.reward_profit_weight": "reward_profit_weight", "study.revenue_weight": "revenue_weight", "optimizer.learning_rate": "learning_rate", "optimizer.gamma": "gamma", @@ -77,7 +80,10 @@ class StudySpec: lambda_coi: float = 0.2 robust_radius: float = 0.15 robust_points: int = 5 + robust_rollouts: int = 1 info_value: float = 1.0 + eta_ux: float = 0.5 + reward_profit_weight: float = 1.0 revenue_weight: float = 0.01 no_robust: bool = False @@ -165,7 +171,10 @@ class TrainSpec: "lambda_coi": self.study.lambda_coi, "robust_radius": self.study.robust_radius, "robust_points": self.study.robust_points, + "robust_rollouts": self.study.robust_rollouts, "info_value": self.study.info_value, + "eta_ux": self.study.eta_ux, + "reward_profit_weight": self.study.reward_profit_weight, "revenue_weight": self.study.revenue_weight, "no_robust": self.study.no_robust, "learning_rate": self.optimizer.learning_rate, @@ -222,6 +231,7 @@ class TrainSpec: base["lambda_coi"] = 0.0 base["robust_radius"] = 0.0 base["robust_points"] = 1 + base["robust_rollouts"] = 1 return cls( algorithm=AlgorithmSpec(name=str(base["algo"]).lower().strip()), @@ -242,7 +252,10 @@ class TrainSpec: lambda_coi=float(base["lambda_coi"]), robust_radius=float(base["robust_radius"]), robust_points=int(base["robust_points"]), + robust_rollouts=int(base["robust_rollouts"]), info_value=float(base["info_value"]), + eta_ux=float(base["eta_ux"]), + reward_profit_weight=float(base["reward_profit_weight"]), revenue_weight=float(base["revenue_weight"]), no_robust=no_robust, ), diff --git a/engine/telemetry/metrics.py b/engine/telemetry/metrics.py index 43ee0a4..aa080d8 100644 --- a/engine/telemetry/metrics.py +++ b/engine/telemetry/metrics.py @@ -34,9 +34,14 @@ def canonicalize_metrics(raw: Mapping[str, Any], spec: TrainSpec) -> dict[str, A metrics.setdefault("train/global_step", spec.runtime.total_timesteps) - eval_reward = _as_float(metrics.get("eval/reward_mean"), 0.0) or 0.0 - eval_revenue = _as_float(metrics.get("eval/revenue_mean"), 0.0) or 0.0 - metrics["objective/score"] = eval_reward + spec.study.revenue_weight * eval_revenue + eval_reward = ( + _as_float( + metrics.get("eval/robust_reward_worst", metrics.get("eval/reward_mean")), + 0.0, + ) + or 0.0 + ) + metrics["objective/score"] = eval_reward margin_mean = _as_float(metrics.get("eval/margin_mean"), None) if margin_mean is not None: diff --git a/engine/train.py b/engine/train.py index 0133db7..2828db3 100644 --- a/engine/train.py +++ b/engine/train.py @@ -64,7 +64,10 @@ def _build_parser() -> argparse.ArgumentParser: parser.add_argument("--info-value", type=float) parser.add_argument("--robust-radius", type=float) parser.add_argument("--robust-points", type=int) + parser.add_argument("--robust-rollouts", type=int) parser.add_argument("--no-robust", action="store_true") + parser.add_argument("--eta-ux", type=float) + parser.add_argument("--reward-profit-weight", type=float) parser.add_argument("--revenue-weight", type=float) parser.add_argument("--price-low", type=float) @@ -132,7 +135,10 @@ def _overrides_from_args(args: argparse.Namespace) -> dict[str, Any]: "info_value": args.info_value, "robust_radius": args.robust_radius, "robust_points": args.robust_points, + "robust_rollouts": args.robust_rollouts, "no_robust": args.no_robust, + "eta_ux": args.eta_ux, + "reward_profit_weight": args.reward_profit_weight, "revenue_weight": args.revenue_weight, "price_low": args.price_low, "price_high": args.price_high, diff --git a/engine/wrapper.py b/engine/wrapper.py index dc97cb1..d2ac2cd 100644 --- a/engine/wrapper.py +++ b/engine/wrapper.py @@ -47,8 +47,10 @@ class PHANTOM(gym.Env): coi_window: int = 10, robust_radius: float = 0.0, robust_points: int = 5, + robust_rollouts: int = 1, info_value: float = 1.0, eta_ux: float = 0.5, + reward_profit_weight: float = 1.0, action_levels: int = 9, action_scale_low: float = 0.9, action_scale_high: float = 1.1, @@ -75,8 +77,10 @@ class PHANTOM(gym.Env): self.agent_params = agent_params self.robust_radius = max(0.0, float(robust_radius)) self.robust_points = max(1, int(robust_points)) + self.robust_rollouts = max(1, int(robust_rollouts)) self.info_value = float(info_value) self.eta_ux = float(eta_ux) + self.reward_profit_weight = float(reward_profit_weight) self.action_levels = max(2, int(action_levels)) self._action_scales = np.linspace( float(action_scale_low), float(action_scale_high), self.action_levels @@ -105,6 +109,12 @@ class PHANTOM(gym.Env): shape=(n_products,), dtype=np.float32, ), + "signals": spaces.Box( + low=np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32), + high=np.array([1.0, 1.0, 1.0, 1.0], dtype=np.float32), + shape=(4,), + dtype=np.float32, + ), } ) @@ -119,6 +129,8 @@ class PHANTOM(gym.Env): self._trajectories = [] # session trajectories for agent prob calculation self.baseline_prices = np.full(self.n_products, self.price_bounds[0]) self._low_margin_streak = 0 # consecutive steps below margin_floor + self._last_agent_prob = float(self.alpha) + self._last_alpha_adv = float(self.alpha) # load behavioral models for agent probability estimation try: @@ -131,7 +143,20 @@ class PHANTOM(gym.Env): demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32 ) - return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} + signals = np.array( + [ + float(np.clip(self._last_agent_prob, 0.0, 1.0)), + float(np.clip(self._last_alpha_adv, 0.0, 1.0)), + float(np.clip(self.nominal_alpha, 0.0, 1.0)), + float(np.clip(self.robust_radius, 0.0, 1.0)), + ], + dtype=np.float32, + ) + return { + "demand": demand_arr, + "prices": self._prices.astype(np.float32), + "signals": signals, + } def _set_market_mix(self, alpha: float): alpha = float(np.clip(alpha, 0.0, 1.0)) @@ -179,15 +204,15 @@ class PHANTOM(gym.Env): [demand.get(i, 0.0) for i in range(self.n_products)], dtype=float ) revenue = float(np.dot(prices, demand_arr)) + floor_cost = float(np.dot(self.baseline_prices, demand_arr)) + profit = revenue - floor_cost purchases = extract_purchases(trajectories) coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices) - # multiplicative penalty so COI term scales with revenue magnitude coi_leakage = float(agent_prob * self.info_value) - discount = float(np.clip(1.0 - self.lambda_coi * coi_leakage, 0.0, 1.0)) - coi_penalty = revenue * (1.0 - discount) # absolute penalty in revenue units + info_budget = max(floor_cost, 1.0) + coi_penalty = self.lambda_coi * coi_leakage * info_budget - # calculate UX penalty based on price volatility if len(self._price_history) > 0: volatility = float( np.mean( @@ -197,19 +222,24 @@ class PHANTOM(gym.Env): ) else: volatility = 0.0 - ux_penalty = self.eta_ux * revenue * volatility + ux_penalty = self.eta_ux * info_budget * volatility - reward = revenue * discount - ux_penalty + reward_revenue = self.reward_profit_weight * profit + reward = reward_revenue - coi_penalty - ux_penalty return reward, { "revenue": revenue, + "cost_floor": floor_cost, + "profit": profit, "coi_mix": float(coi_mix), "coi_base": 0.0, "coi_leakage": coi_leakage, "coi_penalty": coi_penalty, - "coi_discount": discount, + "coi_info_budget": info_budget, "ux_penalty": ux_penalty, "volatility": volatility, + "reward_revenue": reward_revenue, + "reward_total": reward, } def _alpha_candidates(self) -> np.ndarray: @@ -219,35 +249,26 @@ class PHANTOM(gym.Env): hi = min(1.0, self.nominal_alpha + self.robust_radius) return np.linspace(lo, hi, self.robust_points) - def _evaluate_candidate( - self, alpha: float, prices: np.ndarray - ) -> tuple[float, dict, list, float]: + def _evaluate_candidate(self, alpha: float, prices: np.ndarray) -> float: self._set_market_mix(alpha) - demand = self.market.act(prices) - trajectories = list(self.market.last_trajectories) - agent_prob = self._compute_agent_prob(trajectories) - reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories) - return reward, demand, trajectories, agent_prob + rewards = [] + for _ in range(self.robust_rollouts): + demand = self.market.act(prices) + trajectories = list(self.market.last_trajectories) + agent_prob = self._compute_agent_prob(trajectories) + reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories) + rewards.append(float(reward)) + return float(np.mean(rewards)) if rewards else 0.0 - def _select_adversarial_alpha( - self, prices: np.ndarray - ) -> tuple[float, dict, list, float]: + def _select_adversarial_alpha(self, prices: np.ndarray) -> float: """inner robust step: evaluate candidates and pick worst-case alpha""" candidates = self._alpha_candidates() evaluations = [ - (alpha, *self._evaluate_candidate(float(alpha), prices)) + (float(alpha), self._evaluate_candidate(float(alpha), prices)) for alpha in candidates ] - - # min over alpha in Wasserstein interval - best_eval = min(evaluations, key=lambda x: x[1]) # index 1 is reward - - best_alpha = best_eval[0] - best_demand = best_eval[2] - best_trajectories = best_eval[3] - best_agent_prob = best_eval[4] - - return best_alpha, best_demand, best_trajectories, best_agent_prob + best_alpha, _ = min(evaluations, key=lambda x: x[1]) + return best_alpha def _record_history(self): demand_arr = np.array( @@ -270,19 +291,24 @@ class PHANTOM(gym.Env): self._low_margin_streak = 0 self._demand_history, self._price_history, self._revenue_history = [], [], [] self._trajectories = list(getattr(self.market, "last_trajectories", [])) + self._last_agent_prob = float(self.nominal_alpha) + self._last_alpha_adv = float(self.nominal_alpha) self._record_history() return self._get_obs(), {} def step(self, action): self._prices = self._decode_action(action) - # inner robust step returns worst-case outcome directly, no re-sampling - alpha_adv, self._demand, trajectories, agent_prob = ( - self._select_adversarial_alpha(self._prices) - ) + alpha_adv = self._select_adversarial_alpha(self._prices) self._set_market_mix(alpha_adv) self._platform_stub.set_prices(self._prices) self._step_count += 1 + + self._demand = self.market.act(self._prices) + trajectories = list(self.market.last_trajectories) + agent_prob = self._compute_agent_prob(trajectories) self._trajectories.extend(trajectories) + self._last_agent_prob = float(agent_prob) + self._last_alpha_adv = float(alpha_adv) reward, metrics = self._compute_reward( self._prices, self._demand, agent_prob, trajectories @@ -304,7 +330,9 @@ class PHANTOM(gym.Env): "step": self._step_count, "agent_prob": agent_prob, "alpha_adv": float(alpha_adv), + "alpha_nominal": float(self.nominal_alpha), "wasserstein_radius": float(self.robust_radius), + "robust_rollouts": int(self.robust_rollouts), **metrics, "raw_revenue": np.sum( self._prices diff --git a/package.json b/package.json index 1ee68c6..8590f3c 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "platform:logs": "nx run platform:logs", "research:test": "nx run research:test", "research:benchmark": "nx run research:benchmark", + "research:benchmark:simple": "nx run research:benchmark-simple", "e2e:test": "nx run e2e:test" }, "devDependencies": { diff --git a/scripts/nx_research.sh b/scripts/nx_research.sh index 78117e3..434a312 100644 --- a/scripts/nx_research.sh +++ b/scripts/nx_research.sh @@ -44,6 +44,17 @@ case "$cmd" in WANDB_API_KEY="${WANDB_API_KEY:-}" \ .venv/bin/python -m engine.train --run-kind benchmark ${LOCAL_BENCHMARK_ARGS:---tiers static,surge,linear,qtable,ppo --alpha-values 0.0,0.3 --episodes 3 --total-timesteps 3000 --max-steps 40 --device cpu} ;; + benchmark-simple) + load_sweep_env + if [[ " ${SIMPLE_BENCHMARK_ARGS:-} " != *" --no-wandb "* ]]; then + require_var WANDB_API_KEY "WANDB_API_KEY required - set it in $env_file" + fi + WANDB_ENTITY="${WANDB_ENTITY:-}" \ + WANDB_PROJECT="${WANDB_PROJECT:-capstone}" \ + WANDB_API_KEY="${WANDB_API_KEY:-}" \ + PHANTOM_BENCHMARK_COMPARE_ROBUST="${PHANTOM_BENCHMARK_COMPARE_ROBUST:-1}" \ + .venv/bin/python -m engine.train --run-kind benchmark ${SIMPLE_BENCHMARK_ARGS:---tiers qtable,ppo,dqn,a2c --alpha-values 0.0,0.15,0.3,0.45,0.6 --episodes 8 --total-timesteps 8000 --max-steps 40 --device cpu} + ;; train-agent) load_sweep_env require_var WANDB_API_KEY "WANDB_API_KEY required - set it in $env_file"