import gymnasium as gym from gymnasium import spaces import numpy as np from .engine import Limbo, MarketEngine, PricingEngine from .lib.render import DashboardRenderer from .lib.coi import ( compute_uplift_coi, extract_purchases, compute_agent_probability, ) from .lib.behavior import get_transition_models, trajectory_to_events from .lib.wrappers import EconomicMetricsWrapper class PHANTOM(gym.Env): """Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL COI_leak uses behavioral divergence to estimate agent probability f(τ') """ metadata = {"render_modes": ["human", "ansi"]} def __init__( self, n_products: int = 10, alpha: float = 0.3, N: int = 100, human_params: tuple = (50.0, 10.0), agent_params: tuple = (45.0, 15.0), noise_std: float = 1.0, price_bounds: tuple = (10.0, 150.0), lambda_coi: float = 0.1, coi_window: int = 10, render_mode: str = None, ): super().__init__() self.n_products = n_products self.price_bounds = price_bounds self.lambda_coi = lambda_coi self.coi_window = coi_window self.render_mode = render_mode self.alpha = alpha self.N = N self.human_params = human_params self.agent_params = agent_params self.market = MarketEngine( alpha=alpha, N=N, human_params=human_params, agent_params=agent_params, noise_std=noise_std, ) self._platform_stub = PricingEngine() self._limbo = Limbo(self._platform_stub, self.market) self.action_space = spaces.Box( low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32, ) self.observation_space = spaces.Dict( { "demand": spaces.Box( low=0.0, high=100.0, shape=(n_products,), dtype=np.float32 ), "prices": spaces.Box( low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32, ), } ) self._prices = None self._demand = None self._step_count = 0 self._demand_history = [] self._price_history = [] self._revenue_history = [] self._renderer = None self._initial_episode_prices = None self._trajectories = [] # session trajectories for agent prob calculation self.baseline_prices = np.full(self.n_products, self.price_bounds[0]) # load behavioral models for agent probability estimation try: self._human_trans, self._agent_trans = get_transition_models() except Exception: # fallback if behavioral data unavailable self._human_trans, self._agent_trans = None, None def _get_obs(self) -> dict: demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32 ) return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} def _compute_agent_prob(self) -> float: """estimate agent probability from accumulated trajectories using KL divergence""" if ( not self._trajectories or self._human_trans is None or self._agent_trans is None ): return self.alpha # fallback to contamination level # aggregate all trajectories from this episode all_events = [] for traj in self._trajectories: all_events.extend(trajectory_to_events(traj)) if len(all_events) < 2: return self.alpha return compute_agent_probability( all_events, self._human_trans, self._agent_trans ) def _compute_reward(self, prices: np.ndarray, demand: dict) -> tuple[float, dict]: revenue = sum(prices[i] * demand.get(i, 0.0) for i in range(self.n_products)) trajs_mix = self.market.last_trajectories purchases_mix = extract_purchases(trajs_mix) coi_mix = compute_uplift_coi(prices, purchases_mix, self.baseline_prices) old_state = (self.market.alpha, self.market.Nagents, self.market.Nhumans) self.market.alpha, self.market.Nagents, self.market.Nhumans = 0.0, 0, self.N self.market.act(prices) purchases_base = extract_purchases(self.market.last_trajectories) coi_base = compute_uplift_coi(prices, purchases_base, self.baseline_prices) self.market.alpha, self.market.Nagents, self.market.Nhumans = old_state coi_leakage = max(0.0, coi_base - coi_mix) coi_penalty = max(self.lambda_coi * coi_leakage, 1000) / 1000 coi_penalty *= revenue return float(revenue - coi_penalty), { "revenue": float(revenue), "coi_mix": float(coi_mix), "coi_base": float(coi_base), "coi_leakage": float(coi_leakage), "coi_penalty": float(coi_penalty), } def _record_history(self): demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)] ) self._demand_history.append(demand_arr) self._price_history.append(self._prices.copy()) self._revenue_history.append(np.sum(self._prices * demand_arr)) def reset(self, seed=None, options=None): super().reset(seed=seed) self._prices = np.random.uniform(*self.price_bounds, size=self.n_products) self._initial_episode_prices = self._prices.copy() self._demand = self.market.act(self._prices) self._step_count = 0 self._demand_history, self._price_history, self._revenue_history = [], [], [] self._trajectories = [] self._record_history() return self._get_obs(), {} def step(self, action: np.ndarray): self._prices = np.clip(action, *self.price_bounds) self._demand = self.market.act(self._prices) self._step_count += 1 self._record_history() # capture trajectories generated by market for agent prob estimation if hasattr(self.market, "last_trajectories"): self._trajectories.extend(self.market.last_trajectories) agent_prob = self._compute_agent_prob() reward, metrics = self._compute_reward(self._prices, self._demand) terminated = self._step_count >= 100 info = { "step": self._step_count, "agent_prob": agent_prob, **metrics, "raw_revenue": np.sum( self._prices * np.array([self._demand.get(i, 0.0) for i in range(self.n_products)]) ), } return self._get_obs(), reward, terminated, False, info def _compute_elasticity(self) -> np.ndarray: """point elasticity: e = (dQ/dP) * (P/Q) via finite differences, clipped to [-5, 5]""" if len(self._price_history) < 2: return np.zeros(self.n_products) p, q = np.array(self._price_history), np.array(self._demand_history) dp, dq = np.diff(p, axis=0), np.diff(q, axis=0) valid = np.abs(dp) > 0.5 with np.errstate(divide="ignore", invalid="ignore"): elasticity = np.where( valid, (dq / dp) * (p[:-1] / np.maximum(q[:-1], 1.0)), 0.0 ) elasticity = np.nan_to_num(np.clip(elasticity, -5.0, 5.0), nan=0.0) return ( np.mean(elasticity, axis=0) if len(elasticity) > 0 else np.zeros(self.n_products) ) def render(self): if self.render_mode == "human": if self._renderer is None: self._renderer = DashboardRenderer() self._renderer.render(self) elif self.render_mode == "ansi": return ( f"step={self._step_count}, prices={self._prices}, demand={self._demand}" ) return None def close(self): if self._renderer: self._renderer.close() self._renderer = None if __name__ == "__main__": import wandb from .lib import MetricsCallback class RandomPolicy: """Minimal SB3-compatible random policy for baseline testing.""" def __init__(self, env): self.env = env self.num_timesteps = 0 def learn(self, total_timesteps, callback=None): callback.model = self callback.num_timesteps = 0 callback.locals = {} callback.on_training_start({}, {}) obs, _ = self.env.reset() for step in range(total_timesteps): action = self.env.action_space.sample() obs, reward, term, trunc, info = self.env.step(action) self.num_timesteps = step + 1 callback.num_timesteps = self.num_timesteps callback.locals = {"infos": [info]} callback.on_step() if term or trunc: callback.on_rollout_end() obs, _ = self.env.reset() return self def predict(self, obs, **kwargs): return self.env.action_space.sample(), None wandb.init(project="phantom-pricing", config={"policy": "random", "alpha": 0.3}) env = EconomicMetricsWrapper(PHANTOM(n_products=15, alpha=0.3, render_mode=None)) model = RandomPolicy(env) model.learn(total_timesteps=1000, callback=MetricsCallback()) print(f"Episode revenue: {env.episode_revenue:.1f}") wandb.finish() env.close()