From 2b47c3499aec11d4a513e107708dd57237d384ed Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sun, 15 Feb 2026 15:45:46 +0100 Subject: [PATCH] chore: fixing discretization of actions --- engine/lib/__init__.py | 1 + engine/lib/behavior.py | 17 +- engine/train.py | 437 +++++++++++++++++++++++--- engine/wrapper.py | 34 +- paper/src/chapters/03-methodology.tex | 2 + 5 files changed, 436 insertions(+), 55 deletions(-) diff --git a/engine/lib/__init__.py b/engine/lib/__init__.py index 04a06c2..2a56747 100644 --- a/engine/lib/__init__.py +++ b/engine/lib/__init__.py @@ -5,3 +5,4 @@ from .wrappers import EconomicMetricsWrapper from .callbacks import MetricsCallback, EvalMetricsCallback from .providers import ProviderBenchmark, ProviderResult, BenchmarkConfig from .coi import compute_uplift_coi, extract_purchases, compute_agent_probability +from .discrete import EventQTable diff --git a/engine/lib/behavior.py b/engine/lib/behavior.py index 34faad2..e8fe2be 100644 --- a/engine/lib/behavior.py +++ b/engine/lib/behavior.py @@ -70,7 +70,14 @@ def trajectory_to_events(trajectory: list) -> list: def adjust_behavior_to_condition(condition, transition_matrix): # expand NxN transition matrix to (N*P)x(N*P) weighted by demand condition - cond_norm = condition / np.sum(condition) + condition = np.asarray(condition, dtype=float) + condition = np.nan_to_num(condition, nan=0.0, posinf=0.0, neginf=0.0) + condition = np.clip(condition, 0.0, None) + s = float(np.sum(condition)) + if not np.isfinite(s) or s <= 0: + cond_norm = np.full(len(condition), 1.0 / max(len(condition), 1), dtype=float) + else: + cond_norm = condition / s n_products = len(condition) base_vals = transition_matrix.values base_cols, base_rows = ( @@ -91,10 +98,12 @@ def sample_behavior(condition, human=True, max_len=40): trajectory = [np.random.choice(adjusted_transitions.index)] while len(trajectory) < max_len and "checkout" not in trajectory[-1]: - probs = adjusted_transitions.loc[trajectory[-1]].values + probs = np.asarray(adjusted_transitions.loc[trajectory[-1]].values, dtype=float) + probs = np.nan_to_num(probs, nan=0.0, posinf=0.0, neginf=0.0) + probs = np.clip(probs, 0.0, None) + s = float(np.sum(probs)) sample = np.random.choice( - adjusted_transitions.columns, - p=probs / np.sum(probs) if np.sum(probs) > 0 else None, + adjusted_transitions.columns, p=(probs / s) if s > 0 else None ) trajectory.append(sample) return trajectory diff --git a/engine/train.py b/engine/train.py index 00aa274..e059593 100644 --- a/engine/train.py +++ b/engine/train.py @@ -1,57 +1,408 @@ -import wandb -from stable_baselines3 import SAC -from stable_baselines3.common.callbacks import EvalCallback +import argparse +import json +from pathlib import Path +import numpy as np +from gymnasium.wrappers import FlattenObservation + +try: + import wandb + + HAS_WANDB = True +except ImportError: + HAS_WANDB = False + +try: + from stable_baselines3 import PPO, A2C, DQN + from stable_baselines3.common.callbacks import EvalCallback + from stable_baselines3.common.monitor import Monitor + + HAS_SB3 = True +except ImportError: + HAS_SB3 = False + from .wrapper import PHANTOM from .lib import EconomicMetricsWrapper, MetricsCallback +from .lib.discrete import EventQTable -wandb.init( - project="phantom-pricing", - config={ - "alpha": 0.3, - "n_products": 10, - "total_timesteps": 50000, - "robust_radius": 0.15, - "robust_points": 5, - "lambda_coi": 0.2, - }, -) -env_kwargs = { +DEFAULT_CFG = { + "project": "phantom-pricing", + "algo": "ppo", + "seed": 42, + "total_timesteps": 50_000, + "eval_episodes": 5, + "eval_freq": 1_000, + "log_freq": 100, + "revenue_weight": 0.01, "n_products": 10, + "N": 100, "alpha": 0.3, "lambda_coi": 0.2, "robust_radius": 0.15, "robust_points": 5, - "render_mode": None, + "info_value": 1.0, + "price_low": 10.0, + "price_high": 150.0, + "action_levels": 9, + "action_scale_low": 0.8, + "action_scale_high": 1.2, + "learning_rate": 3e-4, + "gamma": 0.99, + "buffer_size": 50_000, + "batch_size": 256, + "tau": 0.005, + "train_freq": 1, + "learning_starts": 1_000, + "target_update_interval": 1_000, + "exploration_fraction": 0.2, + "exploration_final_eps": 0.05, + "n_steps": 2_048, + "n_epochs": 10, + "gae_lambda": 0.95, + "clip_range": 0.2, + "ent_coef": 0.0, + "q_lr": 0.1, + "eps_start": 1.0, + "eps_end": 0.05, + "eps_decay": 0.9995, + "model_dir": "engine/models", + "arch": "small", + "activation": "relu", + "q_bins": 6, } -env = EconomicMetricsWrapper(PHANTOM(**env_kwargs)) -eval_env = EconomicMetricsWrapper(PHANTOM(**env_kwargs)) -model = SAC( - "MultiInputPolicy", - env, - verbose=1, - learning_rate=3e-4, - buffer_size=50000, - batch_size=256, - tau=0.005, - gamma=0.99, -) -metrics_cb = MetricsCallback(log_histograms=True, log_freq=100) -eval_cb = EvalCallback(eval_env, eval_freq=1000, n_eval_episodes=5, verbose=1) +def _cfg(raw: dict | None = None) -> dict: + cfg = dict(DEFAULT_CFG) + if raw: + cfg.update({k: v for k, v in raw.items() if v is not None}) + cfg["algo"] = str(cfg["algo"]).lower() + return cfg -model.learn(total_timesteps=50000, callback=[metrics_cb, eval_cb]) -model.save("phantom_sac") -wandb.finish() -# test trained policy -env = PHANTOM(**env_kwargs) -obs, _ = env.reset() -for _ in range(100): - action, _ = model.predict(obs, deterministic=True) - obs, reward, term, trunc, _ = env.step(action) - env.render() - if term or trunc: - break -env.close() +def _wandb_cfg_dict() -> dict: + return ( + {k: wandb.config[k] for k in wandb.config.keys()} + if HAS_WANDB and wandb.run + else {} + ) + + +def make_env(cfg: dict): + env = PHANTOM( + n_products=int(cfg["n_products"]), + alpha=float(cfg["alpha"]), + N=int(cfg["N"]), + price_bounds=(float(cfg["price_low"]), float(cfg["price_high"])), + lambda_coi=float(cfg["lambda_coi"]), + robust_radius=float(cfg["robust_radius"]), + robust_points=int(cfg["robust_points"]), + info_value=float(cfg["info_value"]), + action_levels=int(cfg["action_levels"]), + action_scale_low=float(cfg["action_scale_low"]), + action_scale_high=float(cfg["action_scale_high"]), + render_mode=None, + ) + env = EconomicMetricsWrapper(env) + env = FlattenObservation(env) + return env + + +def _net_arch(name) -> list[int]: + presets = { + "tiny": [32, 32], + "small": [64, 64], + "medium": [128, 128], + "large": [256, 256], + } + if isinstance(name, (list, tuple)): + return [int(v) for v in name] + s = str(name).lower().strip() + if s in presets: + return presets[s] + if "x" in s: + try: + vals = [int(v) for v in s.split("x") if v] + return vals if vals else presets["small"] + except ValueError: + return presets["small"] + return presets["small"] + + +def _activation(name): + try: + import torch.nn as nn + except ImportError: + return None + return { + "relu": nn.ReLU, + "tanh": nn.Tanh, + "elu": nn.ELU, + "leaky_relu": nn.LeakyReLU, + }.get(str(name).lower().strip(), nn.ReLU) + + +def _policy_kwargs(cfg: dict) -> dict: + kw = {"net_arch": _net_arch(cfg.get("arch", "small"))} + act = _activation(cfg.get("activation", "relu")) + if act is not None: + kw["activation_fn"] = act + return kw + + +def _action(agent, obs, deterministic: bool = True): + out = agent.predict(obs, deterministic=deterministic) + a = out[0] if isinstance(out, tuple) else out + if isinstance(a, np.ndarray) and a.size == 1: + return int(a.reshape(-1)[0]) + return a + + +def evaluate(agent, env, episodes: int) -> dict: + rewards, revenues = [], [] + for _ in range(int(episodes)): + obs, _ = env.reset() + done, ep_r, ep_rev = False, 0.0, 0.0 + while not done: + obs, reward, term, trunc, info = env.step(_action(agent, obs, True)) + done = term or trunc + ep_r += float(reward) + ep_rev += float( + info.get("economics", {}).get("revenue", info.get("revenue", 0.0)) + ) + rewards.append(ep_r) + revenues.append(ep_rev) + return { + "eval/reward": float(np.mean(rewards)), + "eval/revenue": float(np.mean(revenues)), + "eval/reward_std": float(np.std(rewards)), + "eval/revenue_std": float(np.std(revenues)), + } + + +def build_model(cfg: dict, env): + algo = cfg["algo"] + policy_kwargs = _policy_kwargs(cfg) + if algo == "sac": + raise ValueError("sac is not supported with the discrete core env") + if algo == "ppo": + return PPO( + "MlpPolicy", + env, + verbose=1, + policy_kwargs=policy_kwargs, + seed=int(cfg["seed"]), + learning_rate=float(cfg["learning_rate"]), + n_steps=int(cfg["n_steps"]), + batch_size=int(cfg["batch_size"]), + n_epochs=int(cfg["n_epochs"]), + gamma=float(cfg["gamma"]), + gae_lambda=float(cfg["gae_lambda"]), + clip_range=float(cfg["clip_range"]), + ent_coef=float(cfg["ent_coef"]), + ) + if algo == "a2c": + return A2C( + "MlpPolicy", + env, + verbose=1, + policy_kwargs=policy_kwargs, + seed=int(cfg["seed"]), + learning_rate=float(cfg["learning_rate"]), + n_steps=max(5, int(cfg["n_steps"]) // 32), + gamma=float(cfg["gamma"]), + gae_lambda=float(cfg["gae_lambda"]), + ent_coef=float(cfg["ent_coef"]), + ) + if algo == "dqn": + return DQN( + "MlpPolicy", + env, + verbose=1, + policy_kwargs=policy_kwargs, + seed=int(cfg["seed"]), + learning_rate=float(cfg["learning_rate"]), + buffer_size=int(cfg["buffer_size"]), + batch_size=int(cfg["batch_size"]), + gamma=float(cfg["gamma"]), + train_freq=int(cfg["train_freq"]), + learning_starts=int(cfg["learning_starts"]), + target_update_interval=int(cfg["target_update_interval"]), + exploration_fraction=float(cfg["exploration_fraction"]), + exploration_final_eps=float(cfg["exploration_final_eps"]), + ) + raise ValueError(f"unsupported algo '{algo}'") + + +def train_qtable(cfg: dict) -> tuple[EventQTable, dict]: + np.random.seed(int(cfg["seed"])) + env = make_env(cfg) + eval_env = make_env(cfg) + agent = EventQTable( + env.action_space.n, + int(cfg["n_products"]), + (float(cfg["price_low"]), float(cfg["price_high"])), + lr=float(cfg["q_lr"]), + gamma=float(cfg["gamma"]), + n_bins=int(cfg["q_bins"]), + ) + eps = float(cfg["eps_start"]) + obs, _ = env.reset(seed=int(cfg["seed"])) + for t in range(int(cfg["total_timesteps"])): + a, s = agent.act(obs, eps) + nxt, reward, term, trunc, info = env.step(a) + done = term or trunc + agent.update(s, a, float(reward), agent.encode(nxt), done) + eps = max(float(cfg["eps_end"]), eps * float(cfg["eps_decay"])) + if HAS_WANDB and wandb.run and (t + 1) % int(cfg["log_freq"]) == 0: + econ = info.get("economics", {}) + wandb.log( + { + "train/reward": float(reward), + "train/revenue": float(econ.get("revenue", 0.0)), + "train/epsilon": float(eps), + }, + step=t + 1, + ) + obs = env.reset()[0] if done else nxt + metrics = evaluate(agent, eval_env, int(cfg["eval_episodes"])) + metrics["train/global_step"] = int(cfg["total_timesteps"]) + env.close() + eval_env.close() + return agent, metrics + + +def train_sb3(cfg: dict) -> tuple[object, dict]: + if not HAS_SB3: + raise ImportError("stable-baselines3 is required for SB3 models") + env = make_env(cfg) + eval_env = make_env(cfg) + env = Monitor(env) + eval_env = Monitor(eval_env) + model = build_model(cfg, env) + cbs = [MetricsCallback(log_histograms=True, log_freq=int(cfg["log_freq"]))] + cbs.append( + EvalCallback( + eval_env, + eval_freq=int(cfg["eval_freq"]), + n_eval_episodes=int(cfg["eval_episodes"]), + deterministic=True, + verbose=0, + ) + ) + model.learn(total_timesteps=int(cfg["total_timesteps"]), callback=cbs) + model_path = Path(cfg["model_dir"]) + model_path.mkdir(parents=True, exist_ok=True) + model.save(str(model_path / f"phantom_{cfg['algo']}")) + metrics = evaluate(model, eval_env, int(cfg["eval_episodes"])) + metrics["train/global_step"] = int(model.num_timesteps) + env.close() + eval_env.close() + return model, metrics + + +def train_once(cfg: dict) -> dict: + algo = cfg["algo"] + if algo == "qtable": + _, metrics = train_qtable(cfg) + else: + _, metrics = train_sb3(cfg) + metrics["sweep/score"] = float( + metrics["eval/reward"] + float(cfg["revenue_weight"]) * metrics["eval/revenue"] + ) + return metrics + + +def run_wandb( + project: str, overrides: dict, mode: str = "online", sweep_mode: bool = False +) -> dict: + if not HAS_WANDB: + raise ImportError("wandb is required for sweep runs") + init_kwargs = {"mode": mode} + if sweep_mode: + run = wandb.init(**init_kwargs) + cfg = _cfg(_wandb_cfg_dict()) + for k, v in overrides.items(): + if k not in wandb.config: + cfg[k] = v + else: + run = wandb.init(project=project, config=overrides, **init_kwargs) + cfg = _cfg(_wandb_cfg_dict()) + metrics = train_once(cfg) + step = int(metrics.get("train/global_step", cfg["total_timesteps"])) + wandb.log(metrics, step=step) + for k, v in metrics.items(): + run.summary[k] = v + wandb.finish() + return metrics + + +def run_local(overrides: dict) -> dict: + cfg = _cfg(overrides) + metrics = train_once(cfg) + print(json.dumps(metrics, indent=2)) + return metrics + + +def main(): + p = argparse.ArgumentParser(description="PHANTOM training and W&B sweeps") + p.add_argument("--project", default=DEFAULT_CFG["project"]) + p.add_argument("--algo", choices=["ppo", "a2c", "dqn", "qtable"]) + p.add_argument("--total-timesteps", type=int) + p.add_argument("--alpha", type=float) + p.add_argument("--n-products", type=int) + p.add_argument("--lambda-coi", type=float) + p.add_argument("--robust-radius", type=float) + p.add_argument("--robust-points", type=int) + p.add_argument("--learning-rate", type=float) + p.add_argument("--gamma", type=float) + p.add_argument("--revenue-weight", type=float) + p.add_argument("--arch", type=str) + p.add_argument("--activation", type=str) + p.add_argument("--sweep-agent", action="store_true") + p.add_argument("--sweep-id", type=str) + p.add_argument("--count", type=int, default=0) + p.add_argument("--offline", action="store_true") + p.add_argument("--no-wandb", action="store_true") + args = p.parse_args() + + overrides = { + "algo": args.algo, + "total_timesteps": args.total_timesteps, + "alpha": args.alpha, + "n_products": args.n_products, + "lambda_coi": args.lambda_coi, + "robust_radius": args.robust_radius, + "robust_points": args.robust_points, + "learning_rate": args.learning_rate, + "gamma": args.gamma, + "revenue_weight": args.revenue_weight, + "arch": args.arch, + "activation": args.activation, + } + overrides = {k: v for k, v in overrides.items() if v is not None} + + if args.sweep_agent: + if args.no_wandb: + raise ValueError("sweep agent requires wandb") + if not args.sweep_id: + raise ValueError("--sweep-id is required with --sweep-agent") + mode = "offline" if args.offline else "online" + wandb.agent( + args.sweep_id, + function=lambda: run_wandb( + args.project, overrides, mode=mode, sweep_mode=True + ), + count=args.count if args.count > 0 else None, + ) + return + + if args.no_wandb or not HAS_WANDB: + run_local(overrides) + return + + run_wandb(args.project, overrides, mode="offline" if args.offline else "online") + + +if __name__ == "__main__": + main() diff --git a/engine/wrapper.py b/engine/wrapper.py index e1ea79b..22e958b 100644 --- a/engine/wrapper.py +++ b/engine/wrapper.py @@ -29,6 +29,7 @@ class PHANTOM(gym.Env): reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL COI_leak uses behavioral divergence to estimate agent probability f(τ') robust inner step: min over alpha in Wasserstein interval around nominal alpha + actions are discrete global price-scale moves """ metadata = {"render_modes": ["human", "ansi"]} @@ -47,6 +48,9 @@ class PHANTOM(gym.Env): robust_radius: float = 0.0, robust_points: int = 5, info_value: float = 1.0, + action_levels: int = 9, + action_scale_low: float = 0.9, + action_scale_high: float = 1.1, render_mode: str = None, ): super().__init__() @@ -63,6 +67,10 @@ class PHANTOM(gym.Env): self.robust_radius = max(0.0, float(robust_radius)) self.robust_points = max(1, int(robust_points)) self.info_value = float(info_value) + self.action_levels = max(2, int(action_levels)) + self._action_scales = np.linspace( + float(action_scale_low), float(action_scale_high), self.action_levels + ) self.market = MarketEngine( alpha=alpha, @@ -75,12 +83,7 @@ class PHANTOM(gym.Env): self._limbo = Limbo(self._platform_stub, self.market) self._set_market_mix(self.nominal_alpha) - self.action_space = spaces.Box( - low=price_bounds[0], - high=price_bounds[1], - shape=(n_products,), - dtype=np.float32, - ) + self.action_space = spaces.Discrete(self.action_levels) self.observation_space = spaces.Dict( { "demand": spaces.Box( @@ -127,6 +130,21 @@ class PHANTOM(gym.Env): self.market.Nagents = n_agents self.market.Nhumans = self.N - n_agents + def _decode_action(self, action) -> np.ndarray: + base = ( + self._prices + if self._prices is not None + else np.full(self.n_products, self.price_bounds[0], dtype=float) + ) + if np.isscalar(action): + idx = int(np.clip(int(action), 0, self.action_levels - 1)) + return np.clip(base * self._action_scales[idx], *self.price_bounds) + a = np.asarray(action) + if a.size == 1: + idx = int(np.clip(int(a.reshape(-1)[0]), 0, self.action_levels - 1)) + return np.clip(base * self._action_scales[idx], *self.price_bounds) + return np.clip(a.astype(float), *self.price_bounds) + def _compute_agent_prob(self, trajectories=None) -> float: trajectories = ( self.market.last_trajectories if trajectories is None else trajectories @@ -208,8 +226,8 @@ class PHANTOM(gym.Env): self._record_history() return self._get_obs(), {} - def step(self, action: np.ndarray): - self._prices = np.clip(action, *self.price_bounds) + def step(self, action): + self._prices = self._decode_action(action) alpha_adv = self._select_adversarial_alpha(self._prices) self._set_market_mix(alpha_adv) self._platform_stub.set_prices(self._prices) diff --git a/paper/src/chapters/03-methodology.tex b/paper/src/chapters/03-methodology.tex index 9258a80..79e5ca7 100644 --- a/paper/src/chapters/03-methodology.tex +++ b/paper/src/chapters/03-methodology.tex @@ -315,6 +315,8 @@ This yields two centroid-like heuristics that guide contamination estimation at In implementation, we maintain an alternating game-history stack (our \textit{Limbo} stack) and execute it explicitly every epoch with exactly two transitions: first the platform publishes a price vector (leader move), then the market responds with trajectory-derived demand (follower move). +% Mention discretized action space and the clipping and over shotting in continuous action spaces + \subsubsection{Ambiguity Set Construction} We define an ambiguity set $\mathcal{U}_\epsilon(\hat{P}_N)$ centered around our empirical reference distribution $\hat{P}_N$ (derived from the generator $\mathcal{G}$). We utilize the Wasserstein distance metric to define the set of plausible demand distributions the agent might face: \begin{equation}