import gymnasium as gym from gymnasium import spaces import numpy as np from .engine import Limbo, MarketEngine, PricingEngine from .lib.render import DashboardRenderer from .lib.coi import ( compute_uplift_coi, extract_purchases, compute_agent_probability, ) from .lib.behavior import get_transition_models, trajectory_to_events from .lib.wrappers import EconomicMetricsWrapper class _ActionPricingEngine(PricingEngine): def __init__(self, n_products: int, price_bounds: tuple): self._prices = np.full(n_products, price_bounds[0], dtype=float) def set_prices(self, prices: np.ndarray): self._prices = np.asarray(prices, dtype=float) def act(self, _): return self._prices class PHANTOM(gym.Env): """Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL COI_leak uses behavioral divergence to estimate agent probability f(τ') robust inner step: min over alpha in Wasserstein interval around nominal alpha actions are discrete global price-scale moves """ metadata = {"render_modes": ["human", "ansi"]} def __init__( self, n_products: int = 10, alpha: float = 0.3, N: int = 100, human_params: tuple = (50.0, 10.0), agent_params: tuple = (45.0, 15.0), noise_std: float = 1.0, price_bounds: tuple = (10.0, 150.0), lambda_coi: float = 0.1, coi_window: int = 10, robust_radius: float = 0.0, robust_points: int = 5, info_value: float = 1.0, action_levels: int = 9, action_scale_low: float = 0.9, action_scale_high: float = 1.1, max_steps: int = 100, margin_floor: float = 0.05, margin_floor_patience: int = 5, render_mode: str = None, ): super().__init__() self.n_products = n_products self.price_bounds = price_bounds self.lambda_coi = lambda_coi self.coi_window = coi_window self.max_steps = max(1, int(max_steps)) self.margin_floor = float( margin_floor ) # terminate if avg margin stays below this for patience steps self.margin_floor_patience = max(1, int(margin_floor_patience)) self.render_mode = render_mode self.alpha = float(alpha) self.nominal_alpha = float(alpha) self.N = N self.human_params = human_params self.agent_params = agent_params self.robust_radius = max(0.0, float(robust_radius)) self.robust_points = max(1, int(robust_points)) self.info_value = float(info_value) self.action_levels = max(2, int(action_levels)) self._action_scales = np.linspace( float(action_scale_low), float(action_scale_high), self.action_levels ) self.market = MarketEngine( alpha=alpha, N=N, human_params=human_params, agent_params=agent_params, noise_std=noise_std, ) self._platform_stub = _ActionPricingEngine(n_products, price_bounds) self._limbo = Limbo(self._platform_stub, self.market) self._set_market_mix(self.nominal_alpha) self.action_space = spaces.Discrete(self.action_levels) self.observation_space = spaces.Dict( { "demand": spaces.Box( low=0.0, high=100.0, shape=(n_products,), dtype=np.float32 ), "prices": spaces.Box( low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32, ), } ) self._prices = None self._demand = None self._step_count = 0 self._demand_history = [] self._price_history = [] self._revenue_history = [] self._renderer = None self._initial_episode_prices = None self._trajectories = [] # session trajectories for agent prob calculation self.baseline_prices = np.full(self.n_products, self.price_bounds[0]) self._low_margin_streak = 0 # consecutive steps below margin_floor # load behavioral models for agent probability estimation try: self._human_trans, self._agent_trans = get_transition_models() except Exception: # fallback if behavioral data unavailable self._human_trans, self._agent_trans = None, None def _get_obs(self) -> dict: demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32 ) return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} def _set_market_mix(self, alpha: float): alpha = float(np.clip(alpha, 0.0, 1.0)) n_agents = int(self.N * alpha) self.alpha = alpha self.market.alpha = alpha self.market.Nagents = n_agents self.market.Nhumans = self.N - n_agents def _decode_action(self, action) -> np.ndarray: base = ( self._prices if self._prices is not None else np.full(self.n_products, self.price_bounds[0], dtype=float) ) if np.isscalar(action): idx = int(np.clip(int(action), 0, self.action_levels - 1)) return np.clip(base * self._action_scales[idx], *self.price_bounds) a = np.asarray(action) if a.size == 1: idx = int(np.clip(int(a.reshape(-1)[0]), 0, self.action_levels - 1)) return np.clip(base * self._action_scales[idx], *self.price_bounds) return np.clip(a.astype(float), *self.price_bounds) def _compute_agent_prob(self, trajectories=None) -> float: trajectories = ( self.market.last_trajectories if trajectories is None else trajectories ) if not trajectories or self._human_trans is None or self._agent_trans is None: return float(self.market.alpha) probs = [] for traj in trajectories: events = trajectory_to_events(traj) if len(events) < 2: continue probs.append( compute_agent_probability(events, self._human_trans, self._agent_trans) ) return float(np.mean(probs)) if probs else float(self.market.alpha) def _compute_reward( self, prices: np.ndarray, demand: dict, agent_prob: float, trajectories: list ) -> tuple[float, dict]: demand_arr = np.array( [demand.get(i, 0.0) for i in range(self.n_products)], dtype=float ) revenue = float(np.dot(prices, demand_arr)) purchases = extract_purchases(trajectories) coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices) # multiplicative penalty so COI term scales with revenue magnitude coi_leakage = float(agent_prob * self.info_value) discount = float(np.clip(1.0 - self.lambda_coi * coi_leakage, 0.0, 1.0)) coi_penalty = revenue * (1.0 - discount) # absolute penalty in revenue units reward = revenue * discount return reward, { "revenue": revenue, "coi_mix": float(coi_mix), "coi_base": 0.0, "coi_leakage": coi_leakage, "coi_penalty": coi_penalty, "coi_discount": discount, } def _alpha_candidates(self) -> np.ndarray: if self.robust_radius <= 0.0 or self.robust_points == 1: return np.array([self.nominal_alpha], dtype=float) lo = max(0.0, self.nominal_alpha - self.robust_radius) hi = min(1.0, self.nominal_alpha + self.robust_radius) return np.linspace(lo, hi, self.robust_points) def _select_adversarial_alpha( self, prices: np.ndarray ) -> tuple[float, dict, list, float]: """inner robust step: pick worst-case alpha and return its outcome directly to avoid double-sampling""" candidates = self._alpha_candidates() best_alpha, worst_reward = float(candidates[0]), np.inf best_demand, best_trajectories, best_agent_prob = None, [], 0.0 for alpha in candidates: self._set_market_mix(float(alpha)) demand = self.market.act(prices) trajectories = list(self.market.last_trajectories) agent_prob = self._compute_agent_prob(trajectories) reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories) if reward < worst_reward: worst_reward = reward best_alpha, best_demand, best_trajectories, best_agent_prob = ( float(alpha), demand, trajectories, agent_prob, ) return best_alpha, best_demand, best_trajectories, best_agent_prob def _record_history(self): demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)] ) self._demand_history.append(demand_arr) self._price_history.append(self._prices.copy()) self._revenue_history.append(np.sum(self._prices * demand_arr)) def reset(self, seed=None, options=None): super().reset(seed=seed) self._set_market_mix(self.nominal_alpha) self._limbo.reset() self._prices = np.random.uniform(*self.price_bounds, size=self.n_products) self._platform_stub.set_prices(self._prices) self._limbo.step() self._demand = self._limbo.step() self._initial_episode_prices = self._prices.copy() self._step_count = 0 self._low_margin_streak = 0 self._demand_history, self._price_history, self._revenue_history = [], [], [] self._trajectories = list(getattr(self.market, "last_trajectories", [])) self._record_history() return self._get_obs(), {} def step(self, action): self._prices = self._decode_action(action) # inner robust step returns worst-case outcome directly, no re-sampling alpha_adv, self._demand, trajectories, agent_prob = ( self._select_adversarial_alpha(self._prices) ) self._set_market_mix(alpha_adv) self._platform_stub.set_prices(self._prices) self._step_count += 1 self._trajectories.extend(trajectories) reward, metrics = self._compute_reward( self._prices, self._demand, agent_prob, trajectories ) self._record_history() # soft early termination when margin collapses for too long avg_margin = float(np.mean(self._prices) - self.price_bounds[0]) / max( float(np.mean(self._prices)), 1e-6 ) if avg_margin < self.margin_floor: self._low_margin_streak += 1 else: self._low_margin_streak = 0 margin_collapsed = self._low_margin_streak >= self.margin_floor_patience terminated = self._step_count >= self.max_steps or margin_collapsed info = { "step": self._step_count, "agent_prob": agent_prob, "alpha_adv": float(alpha_adv), "wasserstein_radius": float(self.robust_radius), **metrics, "raw_revenue": np.sum( self._prices * np.array([self._demand.get(i, 0.0) for i in range(self.n_products)]) ), } return self._get_obs(), reward, terminated, False, info def _compute_elasticity(self) -> np.ndarray: """point elasticity: e = (dQ/dP) * (P/Q) via finite differences, clipped to [-5, 5]""" if len(self._price_history) < 2: return np.zeros(self.n_products) p, q = np.array(self._price_history), np.array(self._demand_history) dp, dq = np.diff(p, axis=0), np.diff(q, axis=0) valid = np.abs(dp) > 0.5 with np.errstate(divide="ignore", invalid="ignore"): elasticity = np.where( valid, (dq / dp) * (p[:-1] / np.maximum(q[:-1], 1.0)), 0.0 ) elasticity = np.nan_to_num(np.clip(elasticity, -5.0, 5.0), nan=0.0) return ( np.mean(elasticity, axis=0) if len(elasticity) > 0 else np.zeros(self.n_products) ) def render(self): if self.render_mode == "human": if self._renderer is None: self._renderer = DashboardRenderer() self._renderer.render(self) elif self.render_mode == "ansi": return ( f"step={self._step_count}, prices={self._prices}, demand={self._demand}" ) return None def close(self): if self._renderer: self._renderer.close() self._renderer = None if __name__ == "__main__": import wandb from .lib import MetricsCallback class RandomPolicy: """Minimal SB3-compatible random policy for baseline testing.""" def __init__(self, env): self.env = env self.num_timesteps = 0 def learn(self, total_timesteps, callback=None): callback.model = self callback.num_timesteps = 0 callback.locals = {} callback.on_training_start({}, {}) obs, _ = self.env.reset() for step in range(total_timesteps): action = self.env.action_space.sample() obs, reward, term, trunc, info = self.env.step(action) self.num_timesteps = step + 1 callback.num_timesteps = self.num_timesteps callback.locals = {"infos": [info]} callback.on_step() if term or trunc: callback.on_rollout_end() obs, _ = self.env.reset() return self def predict(self, obs, **kwargs): return self.env.action_space.sample(), None wandb.init(project="phantom-pricing", config={"policy": "random", "alpha": 0.3}) env = EconomicMetricsWrapper(PHANTOM(n_products=15, alpha=0.3, render_mode=None)) model = RandomPolicy(env) model.learn(total_timesteps=1000, callback=MetricsCallback()) print(f"Episode revenue: {env.episode_revenue:.1f}") wandb.finish() env.close()