From bc6c481d034081b6a60c7b944305f86c757e4eac Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sat, 14 Feb 2026 14:53:30 +0100 Subject: [PATCH] minor refactors to codebase to implement DRO --- .gitignore | 1 + engine/engine.py | 12 +++- engine/lib/__init__.py | 2 +- engine/lib/demand.py | 76 ++++++++++++++++---- engine/train.py | 26 +++++-- engine/wrapper.py | 153 +++++++++++++++++++++++++++-------------- 6 files changed, 195 insertions(+), 75 deletions(-) diff --git a/.gitignore b/.gitignore index 19bb041..f18e3d4 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ sim/rl/behavior_loader/*.pdf tests/e2e/node_modules/** lab/case/thesis/runs*/ sim/case/thesis_simplified/runs*/ +PHANTOM_web/* diff --git a/engine/engine.py b/engine/engine.py index 000f03f..8ca6339 100644 --- a/engine/engine.py +++ b/engine/engine.py @@ -19,15 +19,18 @@ class MarketEngine: agent_params: tuple, demand_distribution=np.random.normal, noise_std: float = 1.0, + action_weights: dict | None = None, ): # no defaults for D_H, D_A - force explicit experiment design self.alpha = alpha + self.N = int(N) self.Nagents = int(N * alpha) self.Nhumans = int(N * (1 - alpha)) self.human_params = human_params self.agent_params = agent_params self.noise_std = noise_std self.demand_dist = demand_distribution + self.action_weights = action_weights def act(self, prices): # generate separate demands d() per actor type @@ -48,7 +51,7 @@ class MarketEngine: agent_t = [sample_behavior(demand_a, human=False) for _ in range(self.Nagents)] # store trajectories for agent probability calculation self.last_trajectories = human_t + agent_t - return estimate_demand(self.last_trajectories) + return estimate_demand(self.last_trajectories, self.action_weights) def measure(self): pass @@ -72,13 +75,16 @@ class Limbo: self.output = None def step(self): - # we could code golf this a little bit if self.platform_turn: self.output = self.platform.act(self.output) else: self.output = self.market.act(self.output) - print(self.output) self.platform_turn = not self.platform_turn + return self.output + + def reset(self): + self.platform_turn = True + self.output = None if __name__ == "__main__": diff --git a/engine/lib/__init__.py b/engine/lib/__init__.py index 3d22207..04a06c2 100644 --- a/engine/lib/__init__.py +++ b/engine/lib/__init__.py @@ -1,4 +1,4 @@ -from .demand import estimate_demand, generate_demand_for_actor +from .demand import estimate_demand, estimate_weighted_demand, generate_demand_for_actor from .behavior import sample_behavior, get_transition_models, trajectory_to_events from .render import DashboardRenderer, style_axis from .wrappers import EconomicMetricsWrapper diff --git a/engine/lib/demand.py b/engine/lib/demand.py index d9f7edb..cb37c3d 100644 --- a/engine/lib/demand.py +++ b/engine/lib/demand.py @@ -1,9 +1,23 @@ -import logging import numpy as np -from logging import getLogger -logger = getLogger(__name__) -def generate_demand_for_actor(prices: np.ndarray, params: tuple, noise_std: float = 1.0, distribution_method=np.random.normal) -> np.ndarray: +CATEGORY_WEIGHTS = {"cart": 4.0, "dwell": 2.0, "nav": 1.0, "filter": 0.5} +ACTION_CATEGORIES = { + "cart": {"add_item", "add_to_cart", "remove", "checkout", "purchase"}, + "dwell": {"hover_title", "hover_paragraph", "hover_link"}, + "nav": {"page_view", "view_item", "view", "learn_more"}, + "filter": {"search", "filter_date", "filter_price", "sort"}, +} +DEFAULT_ACTION_WEIGHTS = { + a: CATEGORY_WEIGHTS[c] for c, actions in ACTION_CATEGORIES.items() for a in actions +} + + +def generate_demand_for_actor( + prices: np.ndarray, + params: tuple, + noise_std: float = 1.0, + distribution_method=np.random.normal, +) -> np.ndarray: """d(p;0) = max(0, valuation - price) + epsi for single actor type params: (mean, std) for valuation distribution D_H or D_A""" val = distribution_method(*params, size=len(prices)) @@ -13,17 +27,50 @@ def generate_demand_for_actor(prices: np.ndarray, params: tuple, noise_std: floa return demand / total * 100 if total > 0 else demand -def estimate_demand(trajectories): - demand_estimate = {} +def estimate_demand(trajectories, action_weights=None): + return estimate_weighted_demand(trajectories, action_weights) + + +def _parse_event_state(state: str): + if "_product" not in state: + return state, None + action, raw_pid = state.rsplit("_product", 1) + return action, int(raw_pid) if raw_pid.isdigit() else None + + +def _weight_for_action(action: str, action_weights: dict) -> float: + if action in action_weights: + return action_weights[action] + if action.startswith("hover"): + return CATEGORY_WEIGHTS["dwell"] + if action.startswith("filter") or action in {"search", "sort"}: + return CATEGORY_WEIGHTS["filter"] + if action.startswith("add") or action in {"checkout", "purchase", "remove"}: + return CATEGORY_WEIGHTS["cart"] + return CATEGORY_WEIGHTS["nav"] + + +def estimate_weighted_demand(trajectories, action_weights=None): + action_weights = ( + DEFAULT_ACTION_WEIGHTS if action_weights is None else action_weights + ) + scores = {} for traj in trajectories: - for event in traj: - if 'view_product' in event: - product_id = int(event.split('_')[-1].replace('product', '')) - demand_estimate[product_id] = demand_estimate.get(product_id, 0) + 1 - total_views = sum(demand_estimate.values()) - for product_id in demand_estimate: - demand_estimate[product_id] = (demand_estimate[product_id] / total_views) * 100 # normalize to percentage - return demand_estimate + for state in traj: + action, product_id = _parse_event_state(state) + if product_id is None: + continue + w = _weight_for_action(action, action_weights) + if w <= 0: + continue + scores[product_id] = scores.get(product_id, 0.0) + w + total = sum(scores.values()) + return ( + {pid: (score / total) * 100 for pid, score in scores.items()} + if total > 0 + else {} + ) + # Example usage if __name__ == "__main__": @@ -36,6 +83,7 @@ if __name__ == "__main__": print("Human Demand:", demand_h) print("Agent Demand:", demand_a) from .behavior import sample_behavior + N, alpha = 200, 0.3 n_h, n_a = int(N * (1 - alpha)), int(N * alpha) human_t = [sample_behavior(demand_h, human=True) for _ in range(n_h)] diff --git a/engine/train.py b/engine/train.py index f733895..00aa274 100644 --- a/engine/train.py +++ b/engine/train.py @@ -6,11 +6,26 @@ from .lib import EconomicMetricsWrapper, MetricsCallback wandb.init( project="phantom-pricing", - config={"alpha": 0.3, "n_products": 10, "total_timesteps": 50000} + config={ + "alpha": 0.3, + "n_products": 10, + "total_timesteps": 50000, + "robust_radius": 0.15, + "robust_points": 5, + "lambda_coi": 0.2, + }, ) -env = EconomicMetricsWrapper(PHANTOM(n_products=10, alpha=0.3, render_mode=None)) -eval_env = EconomicMetricsWrapper(PHANTOM(n_products=10, alpha=0.3, render_mode=None)) +env_kwargs = { + "n_products": 10, + "alpha": 0.3, + "lambda_coi": 0.2, + "robust_radius": 0.15, + "robust_points": 5, + "render_mode": None, +} +env = EconomicMetricsWrapper(PHANTOM(**env_kwargs)) +eval_env = EconomicMetricsWrapper(PHANTOM(**env_kwargs)) model = SAC( "MultiInputPolicy", @@ -31,11 +46,12 @@ model.save("phantom_sac") wandb.finish() # test trained policy -env = PHANTOM(n_products=10, alpha=0.3, render_mode=None) +env = PHANTOM(**env_kwargs) obs, _ = env.reset() for _ in range(100): action, _ = model.predict(obs, deterministic=True) obs, reward, term, trunc, _ = env.step(action) env.render() - if term or trunc: break + if term or trunc: + break env.close() diff --git a/engine/wrapper.py b/engine/wrapper.py index fe1e6bb..e1ea79b 100644 --- a/engine/wrapper.py +++ b/engine/wrapper.py @@ -12,11 +12,23 @@ from .lib.behavior import get_transition_models, trajectory_to_events from .lib.wrappers import EconomicMetricsWrapper +class _ActionPricingEngine(PricingEngine): + def __init__(self, n_products: int, price_bounds: tuple): + self._prices = np.full(n_products, price_bounds[0], dtype=float) + + def set_prices(self, prices: np.ndarray): + self._prices = np.asarray(prices, dtype=float) + + def act(self, _): + return self._prices + + class PHANTOM(gym.Env): """Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL COI_leak uses behavioral divergence to estimate agent probability f(τ') + robust inner step: min over alpha in Wasserstein interval around nominal alpha """ metadata = {"render_modes": ["human", "ansi"]} @@ -32,6 +44,9 @@ class PHANTOM(gym.Env): price_bounds: tuple = (10.0, 150.0), lambda_coi: float = 0.1, coi_window: int = 10, + robust_radius: float = 0.0, + robust_points: int = 5, + info_value: float = 1.0, render_mode: str = None, ): super().__init__() @@ -40,10 +55,14 @@ class PHANTOM(gym.Env): self.lambda_coi = lambda_coi self.coi_window = coi_window self.render_mode = render_mode - self.alpha = alpha + self.alpha = float(alpha) + self.nominal_alpha = float(alpha) self.N = N self.human_params = human_params self.agent_params = agent_params + self.robust_radius = max(0.0, float(robust_radius)) + self.robust_points = max(1, int(robust_points)) + self.info_value = float(info_value) self.market = MarketEngine( alpha=alpha, @@ -52,8 +71,9 @@ class PHANTOM(gym.Env): agent_params=agent_params, noise_std=noise_std, ) - self._platform_stub = PricingEngine() + self._platform_stub = _ActionPricingEngine(n_products, price_bounds) self._limbo = Limbo(self._platform_stub, self.market) + self._set_market_mix(self.nominal_alpha) self.action_space = spaces.Box( low=price_bounds[0], @@ -99,53 +119,72 @@ class PHANTOM(gym.Env): ) return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} - def _compute_agent_prob(self) -> float: - """estimate agent probability from accumulated trajectories using KL divergence""" - if ( - not self._trajectories - or self._human_trans is None - or self._agent_trans is None - ): - return self.alpha # fallback to contamination level + def _set_market_mix(self, alpha: float): + alpha = float(np.clip(alpha, 0.0, 1.0)) + n_agents = int(self.N * alpha) + self.alpha = alpha + self.market.alpha = alpha + self.market.Nagents = n_agents + self.market.Nhumans = self.N - n_agents - # aggregate all trajectories from this episode - all_events = [] - for traj in self._trajectories: - all_events.extend(trajectory_to_events(traj)) - - if len(all_events) < 2: - return self.alpha - - return compute_agent_probability( - all_events, self._human_trans, self._agent_trans + def _compute_agent_prob(self, trajectories=None) -> float: + trajectories = ( + self.market.last_trajectories if trajectories is None else trajectories ) + if not trajectories or self._human_trans is None or self._agent_trans is None: + return float(self.market.alpha) + probs = [] + for traj in trajectories: + events = trajectory_to_events(traj) + if len(events) < 2: + continue + probs.append( + compute_agent_probability(events, self._human_trans, self._agent_trans) + ) + return float(np.mean(probs)) if probs else float(self.market.alpha) - def _compute_reward(self, prices: np.ndarray, demand: dict) -> tuple[float, dict]: - revenue = sum(prices[i] * demand.get(i, 0.0) for i in range(self.n_products)) - - trajs_mix = self.market.last_trajectories - purchases_mix = extract_purchases(trajs_mix) - coi_mix = compute_uplift_coi(prices, purchases_mix, self.baseline_prices) - - old_state = (self.market.alpha, self.market.Nagents, self.market.Nhumans) - self.market.alpha, self.market.Nagents, self.market.Nhumans = 0.0, 0, self.N - self.market.act(prices) - purchases_base = extract_purchases(self.market.last_trajectories) - coi_base = compute_uplift_coi(prices, purchases_base, self.baseline_prices) - self.market.alpha, self.market.Nagents, self.market.Nhumans = old_state - - coi_leakage = max(0.0, coi_base - coi_mix) - coi_penalty = max(self.lambda_coi * coi_leakage, 1000) / 1000 - coi_penalty *= revenue - + def _compute_reward( + self, prices: np.ndarray, demand: dict, agent_prob: float, trajectories: list + ) -> tuple[float, dict]: + demand_arr = np.array( + [demand.get(i, 0.0) for i in range(self.n_products)], dtype=float + ) + revenue = float(np.dot(prices, demand_arr)) + purchases = extract_purchases(trajectories) + coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices) + coi_leakage = float(agent_prob * self.info_value) + coi_penalty = float(self.lambda_coi * coi_leakage) return float(revenue - coi_penalty), { - "revenue": float(revenue), + "revenue": revenue, "coi_mix": float(coi_mix), - "coi_base": float(coi_base), - "coi_leakage": float(coi_leakage), - "coi_penalty": float(coi_penalty), + "coi_base": 0.0, + "coi_leakage": coi_leakage, + "coi_penalty": coi_penalty, } + def _alpha_candidates(self) -> np.ndarray: + if self.robust_radius <= 0.0 or self.robust_points == 1: + return np.array([self.nominal_alpha], dtype=float) + lo = max(0.0, self.nominal_alpha - self.robust_radius) + hi = min(1.0, self.nominal_alpha + self.robust_radius) + return np.linspace(lo, hi, self.robust_points) + + def _select_adversarial_alpha(self, prices: np.ndarray) -> float: + candidates = self._alpha_candidates() + if len(candidates) == 1: + return float(candidates[0]) + best_alpha, worst_reward = float(candidates[0]), np.inf + for alpha in candidates: + self._set_market_mix(float(alpha)) + demand = self.market.act(prices) + trajectories = self.market.last_trajectories + agent_prob = self._compute_agent_prob(trajectories) + reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories) + if reward < worst_reward: + worst_reward = reward + best_alpha = float(alpha) + return best_alpha + def _record_history(self): demand_arr = np.array( [self._demand.get(i, 0.0) for i in range(self.n_products)] @@ -156,32 +195,42 @@ class PHANTOM(gym.Env): def reset(self, seed=None, options=None): super().reset(seed=seed) + self._set_market_mix(self.nominal_alpha) + self._limbo.reset() self._prices = np.random.uniform(*self.price_bounds, size=self.n_products) + self._platform_stub.set_prices(self._prices) + self._limbo.step() + self._demand = self._limbo.step() self._initial_episode_prices = self._prices.copy() - self._demand = self.market.act(self._prices) self._step_count = 0 self._demand_history, self._price_history, self._revenue_history = [], [], [] - self._trajectories = [] + self._trajectories = list(getattr(self.market, "last_trajectories", [])) self._record_history() return self._get_obs(), {} def step(self, action: np.ndarray): self._prices = np.clip(action, *self.price_bounds) - self._demand = self.market.act(self._prices) + alpha_adv = self._select_adversarial_alpha(self._prices) + self._set_market_mix(alpha_adv) + self._platform_stub.set_prices(self._prices) + self._limbo.step() + self._demand = self._limbo.step() + trajectories = getattr(self.market, "last_trajectories", []) self._step_count += 1 + self._trajectories.extend(trajectories) + + agent_prob = self._compute_agent_prob(trajectories) + reward, metrics = self._compute_reward( + self._prices, self._demand, agent_prob, trajectories + ) self._record_history() - - # capture trajectories generated by market for agent prob estimation - if hasattr(self.market, "last_trajectories"): - self._trajectories.extend(self.market.last_trajectories) - - agent_prob = self._compute_agent_prob() - reward, metrics = self._compute_reward(self._prices, self._demand) terminated = self._step_count >= 100 info = { "step": self._step_count, "agent_prob": agent_prob, + "alpha_adv": float(alpha_adv), + "wasserstein_radius": float(self.robust_radius), **metrics, "raw_revenue": np.sum( self._prices