import gymnasium as gym from gymnasium import spaces import numpy as np from .engine import Limbo, MarketEngine, PricingEngine from .lib.render import DashboardRenderer from .lib.coi import ( compute_uplift_coi, extract_purchases, compute_agent_probability, ) from .lib.behavior import get_transition_models, trajectory_to_events from .lib.wrappers import EconomicMetricsWrapper class _ActionPricingEngine(PricingEngine): def __init__(self, n_products: int, price_bounds: tuple): self._prices = np.full(n_products, price_bounds[0], dtype=float) def set_prices(self, prices: np.ndarray): self._prices = np.asarray(prices, dtype=float) def act(self, _): return self._prices class PHANTOM(gym.Env): """Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL COI_leak uses behavioral divergence to estimate agent probability f(τ') robust inner step: min over alpha in Wasserstein interval around nominal alpha """ metadata = {"render_modes": ["human", "ansi"]} def __init__( self, n_products: int = 10, alpha: float = 0.3, N: int = 100, human_params: tuple = (50.0, 10.0), agent_params: tuple = (45.0, 15.0), noise_std: float = 1.0, price_bounds: tuple = (10.0, 150.0), lambda_coi: float = 0.1, coi_window: int = 10, robust_radius: float = 0.0, robust_points: int = 5, info_value: float = 1.0, render_mode: str = None, ): super().__init__() self.n_products = n_products self.price_bounds = price_bounds self.lambda_coi = lambda_coi self.coi_window = coi_window self.render_mode = render_mode self.alpha = float(alpha) self.nominal_alpha = float(alpha) self.N = N self.human_params = human_params self.agent_params = agent_params self.robust_radius = max(0.0, float(robust_radius)) self.robust_points = max(1, int(robust_points)) self.info_value = float(info_value) self.market = MarketEngine( alpha=alpha, N=N, human_params=human_params, agent_params=agent_params, noise_std=noise_std, ) self._platform_stub = _ActionPricingEngine(n_products, price_bounds) self._limbo = Limbo(self._platform_stub, self.market) self._set_market_mix(self.nominal_alpha) self.action_space = spaces.Box( low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32, ) self.observation_space = spaces.Dict( { "demand": spaces.Box( low=0.0, high=100.0, shape=(n_products,), dtype=np.float32 ), "prices": spaces.Box( low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32, ), } ) self._prices = None self._demand = None self._step_count = 0 self._demand_history = [] self._price_history = [] self._revenue_history = [] self._renderer = None self._initial_episode_prices = None self._trajectories = [] # session trajectories for agent prob calculation self.baseline_prices = np.full(self.n_products, self.price_bounds[0]) # load behavioral models for agent probability estimation try: self._human_trans, self._agent_trans = get_transition_models() except Exception: # fallback if behavioral data unavailable self._human_trans, self._agent_trans = None, None def _get_obs(self) -> dict: demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32 ) return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} def _set_market_mix(self, alpha: float): alpha = float(np.clip(alpha, 0.0, 1.0)) n_agents = int(self.N * alpha) self.alpha = alpha self.market.alpha = alpha self.market.Nagents = n_agents self.market.Nhumans = self.N - n_agents def _compute_agent_prob(self, trajectories=None) -> float: trajectories = ( self.market.last_trajectories if trajectories is None else trajectories ) if not trajectories or self._human_trans is None or self._agent_trans is None: return float(self.market.alpha) probs = [] for traj in trajectories: events = trajectory_to_events(traj) if len(events) < 2: continue probs.append( compute_agent_probability(events, self._human_trans, self._agent_trans) ) return float(np.mean(probs)) if probs else float(self.market.alpha) def _compute_reward( self, prices: np.ndarray, demand: dict, agent_prob: float, trajectories: list ) -> tuple[float, dict]: demand_arr = np.array( [demand.get(i, 0.0) for i in range(self.n_products)], dtype=float ) revenue = float(np.dot(prices, demand_arr)) purchases = extract_purchases(trajectories) coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices) coi_leakage = float(agent_prob * self.info_value) coi_penalty = float(self.lambda_coi * coi_leakage) return float(revenue - coi_penalty), { "revenue": revenue, "coi_mix": float(coi_mix), "coi_base": 0.0, "coi_leakage": coi_leakage, "coi_penalty": coi_penalty, } def _alpha_candidates(self) -> np.ndarray: if self.robust_radius <= 0.0 or self.robust_points == 1: return np.array([self.nominal_alpha], dtype=float) lo = max(0.0, self.nominal_alpha - self.robust_radius) hi = min(1.0, self.nominal_alpha + self.robust_radius) return np.linspace(lo, hi, self.robust_points) def _select_adversarial_alpha(self, prices: np.ndarray) -> float: candidates = self._alpha_candidates() if len(candidates) == 1: return float(candidates[0]) best_alpha, worst_reward = float(candidates[0]), np.inf for alpha in candidates: self._set_market_mix(float(alpha)) demand = self.market.act(prices) trajectories = self.market.last_trajectories agent_prob = self._compute_agent_prob(trajectories) reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories) if reward < worst_reward: worst_reward = reward best_alpha = float(alpha) return best_alpha def _record_history(self): demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)] ) self._demand_history.append(demand_arr) self._price_history.append(self._prices.copy()) self._revenue_history.append(np.sum(self._prices * demand_arr)) def reset(self, seed=None, options=None): super().reset(seed=seed) self._set_market_mix(self.nominal_alpha) self._limbo.reset() self._prices = np.random.uniform(*self.price_bounds, size=self.n_products) self._platform_stub.set_prices(self._prices) self._limbo.step() self._demand = self._limbo.step() self._initial_episode_prices = self._prices.copy() self._step_count = 0 self._demand_history, self._price_history, self._revenue_history = [], [], [] self._trajectories = list(getattr(self.market, "last_trajectories", [])) self._record_history() return self._get_obs(), {} def step(self, action: np.ndarray): self._prices = np.clip(action, *self.price_bounds) alpha_adv = self._select_adversarial_alpha(self._prices) self._set_market_mix(alpha_adv) self._platform_stub.set_prices(self._prices) self._limbo.step() self._demand = self._limbo.step() trajectories = getattr(self.market, "last_trajectories", []) self._step_count += 1 self._trajectories.extend(trajectories) agent_prob = self._compute_agent_prob(trajectories) reward, metrics = self._compute_reward( self._prices, self._demand, agent_prob, trajectories ) self._record_history() terminated = self._step_count >= 100 info = { "step": self._step_count, "agent_prob": agent_prob, "alpha_adv": float(alpha_adv), "wasserstein_radius": float(self.robust_radius), **metrics, "raw_revenue": np.sum( self._prices * np.array([self._demand.get(i, 0.0) for i in range(self.n_products)]) ), } return self._get_obs(), reward, terminated, False, info def _compute_elasticity(self) -> np.ndarray: """point elasticity: e = (dQ/dP) * (P/Q) via finite differences, clipped to [-5, 5]""" if len(self._price_history) < 2: return np.zeros(self.n_products) p, q = np.array(self._price_history), np.array(self._demand_history) dp, dq = np.diff(p, axis=0), np.diff(q, axis=0) valid = np.abs(dp) > 0.5 with np.errstate(divide="ignore", invalid="ignore"): elasticity = np.where( valid, (dq / dp) * (p[:-1] / np.maximum(q[:-1], 1.0)), 0.0 ) elasticity = np.nan_to_num(np.clip(elasticity, -5.0, 5.0), nan=0.0) return ( np.mean(elasticity, axis=0) if len(elasticity) > 0 else np.zeros(self.n_products) ) def render(self): if self.render_mode == "human": if self._renderer is None: self._renderer = DashboardRenderer() self._renderer.render(self) elif self.render_mode == "ansi": return ( f"step={self._step_count}, prices={self._prices}, demand={self._demand}" ) return None def close(self): if self._renderer: self._renderer.close() self._renderer = None if __name__ == "__main__": import wandb from .lib import MetricsCallback class RandomPolicy: """Minimal SB3-compatible random policy for baseline testing.""" def __init__(self, env): self.env = env self.num_timesteps = 0 def learn(self, total_timesteps, callback=None): callback.model = self callback.num_timesteps = 0 callback.locals = {} callback.on_training_start({}, {}) obs, _ = self.env.reset() for step in range(total_timesteps): action = self.env.action_space.sample() obs, reward, term, trunc, info = self.env.step(action) self.num_timesteps = step + 1 callback.num_timesteps = self.num_timesteps callback.locals = {"infos": [info]} callback.on_step() if term or trunc: callback.on_rollout_end() obs, _ = self.env.reset() return self def predict(self, obs, **kwargs): return self.env.action_space.sample(), None wandb.init(project="phantom-pricing", config={"policy": "random", "alpha": 0.3}) env = EconomicMetricsWrapper(PHANTOM(n_products=15, alpha=0.3, render_mode=None)) model = RandomPolicy(env) model.learn(total_timesteps=1000, callback=MetricsCallback()) print(f"Episode revenue: {env.episode_revenue:.1f}") wandb.finish() env.close()