From c8c44d0453c235d03318c5a0f6019bf24456a02c Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Wed, 17 Dec 2025 17:41:16 +0100 Subject: [PATCH] refactor to align moer with research in the env sims --- sim/rl/engine.py | 220 ++++++++++++++++++++++++++++++++++++ sim/rl/environment.py | 255 ++++++++++-------------------------------- sim/rl/train.py | 149 ++++++++++++++++++++++++ 3 files changed, 431 insertions(+), 193 deletions(-) create mode 100644 sim/rl/engine.py create mode 100644 sim/rl/train.py diff --git a/sim/rl/engine.py b/sim/rl/engine.py new file mode 100644 index 0000000..6d913f3 --- /dev/null +++ b/sim/rl/engine.py @@ -0,0 +1,220 @@ +import numpy as np +import pandas as pd +from abc import ABC, abstractmethod +from typing import Dict, Any +from environment import BusinessLogicConstraints + + +class BasePricingEngine(ABC): + """base interface for all pricing engines""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + self.c = constraints + self.rng = np.random.default_rng(seed) + self.step_count = 0 + + @abstractmethod + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + """compute new prices given current state and observation from environment + + args: + current_prices: current price vector [N] + observation: dict containing 'price', 'demand', and possibly interaction data + + returns: + new_prices: updated price vector [N] + """ + pass + + @abstractmethod + def update(obs, reward, done, info): + pass + + + + def reset(self): + """reset engine state for new episode""" + self.step_count = 0 + + +class WildPricingEngine(BasePricingEngine): + """production-like pricing using online elasticity estimation via EWMA regression""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + super().__init__(constraints, seed) + # per-product unit costs (unknown to customers; known to platform) + self.unit_cost = self.rng.uniform(8.0, 40.0, size=self.c.product_catelogue_size).astype(np.float32) + # online elasticity estimate (start moderately elastic) + self.e_hat = np.full((self.c.product_catelogue_size,), -1.3, dtype=np.float32) + # EWMA state for log-log regression + self.mu_logp = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.mu_logq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.cov_pq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.var_p = np.ones(self.c.product_catelogue_size, dtype=np.float32) + # knobs typical in production + self.lr = 0.08 + self.ewma = 0.05 + self.eps_explore = 0.03 + self.explore_scale = 0.03 + + def _safe_elasticity(self, e: np.ndarray) -> np.ndarray: + return np.clip(e, -5.0, -1.05) + + def reset(self): + super().reset() + self.e_hat = np.full((self.c.product_catelogue_size,), -1.3, dtype=np.float32) + self.mu_logp = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.mu_logq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.cov_pq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + self.var_p = np.ones(self.c.product_catelogue_size, dtype=np.float32) + + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + self.step_count += 1 + # extract demand signal (from env observation) as proxy for sales + demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) + return self._update_from_demand(current_prices, demand) + + def _update_from_demand(self, prices: np.ndarray, sold: np.ndarray) -> np.ndarray: + # log transforms (add 1 to handle zeros) + logp = np.log(np.clip(prices, 1e-3, None)).astype(np.float32) + logq = np.log(sold + 1.0).astype(np.float32) + # EWMA moments for per-product regression: logq ≈ a + e*logp + a = self.ewma + dp = logp - self.mu_logp + dq = logq - self.mu_logq + self.mu_logp = (1 - a) * self.mu_logp + a * logp + self.mu_logq = (1 - a) * self.mu_logq + a * logq + self.cov_pq = (1 - a) * self.cov_pq + a * (dp * dq) + self.var_p = (1 - a) * self.var_p + a * (dp * dp + 1e-6) + e_new = self.cov_pq / (self.var_p + 1e-6) + self.e_hat = self._safe_elasticity(0.9 * self.e_hat + 0.1 * e_new) + # profit-optimal price for isoelastic demand (if e < -1) + e = self.e_hat + p_star = self.unit_cost * (e / (e + 1.0)) + # smooth toward p_star + new_prices = (1 - self.lr) * prices + self.lr * p_star + # exploration (small random perturbations) + if self.rng.random() < self.eps_explore: + noise = self.rng.normal(0.0, self.explore_scale, size=new_prices.shape).astype(np.float32) + new_prices = new_prices * (1.0 + noise) + # apply business guardrails (max change + bounds) + max_adj = self.c.max_price_adjustment + ratio = np.clip(new_prices / (prices + 1e-6), 1 - max_adj, 1 + max_adj) + new_prices = prices * ratio + new_prices = np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) + return new_prices + + +class StaticPricingEngine(BasePricingEngine): + """baseline: fixed prices throughout episode""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + super().__init__(constraints, seed) + self.fixed_prices = None + + def reset(self): + super().reset() + self.fixed_prices = None + + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + self.step_count += 1 + if self.fixed_prices is None: + self.fixed_prices = current_prices.copy() + return self.fixed_prices.copy() + + +class SimpleDemandEngine(BasePricingEngine): + """demand-driven pricing: increase price when demand rises, decrease when it falls""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + super().__init__(constraints, seed) + self.prev_demand = None + self.lr = 0.05 + + def reset(self): + super().reset() + self.prev_demand = None + + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + self.step_count += 1 + demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) + if self.prev_demand is None: + self.prev_demand = demand.copy() + return current_prices.copy() + # simple rule: if demand increases, raise price; if decreases, lower price + delta_d = demand - self.prev_demand + price_adj = self.lr * np.sign(delta_d) * np.abs(delta_d) / (np.abs(self.prev_demand) + 1.0) + new_prices = current_prices * (1.0 + price_adj) + self.prev_demand = demand.copy() + # apply constraints + max_adj = self.c.max_price_adjustment + ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj) + new_prices = current_prices * ratio + return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) + + +class RandomWalkEngine(BasePricingEngine): + """random walk pricing with mean reversion""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + super().__init__(constraints, seed) + self.target_price = None + self.volatility = 0.02 + + def reset(self): + super().reset() + self.target_price = None + + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + self.step_count += 1 + if self.target_price is None: + self.target_price = current_prices.copy() + # random walk with mean reversion toward target + noise = self.rng.normal(0.0, self.volatility, size=current_prices.shape).astype(np.float32) + reversion = 0.01 * (self.target_price - current_prices) + new_prices = current_prices * (1.0 + noise) + reversion + # apply constraints + max_adj = self.c.max_price_adjustment + ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj) + new_prices = current_prices * ratio + return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) + + +class ThompsonSamplingEngine(BasePricingEngine): + """bayesian bandit approach per product treating price as discrete action""" + def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): + super().__init__(constraints, seed) + self.n_price_levels = 5 + self.alpha = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) + self.beta = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) + self.price_grid = None + self.last_actions = None + + def reset(self): + super().reset() + self.alpha = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) + self.beta = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) + self.price_grid = None + self.last_actions = None + + def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: + self.step_count += 1 + if self.price_grid is None: + # define price grid per product + lo = current_prices * 0.7 + hi = current_prices * 1.3 + self.price_grid = np.linspace(lo, hi, self.n_price_levels).T + demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) + # update beliefs based on last action + if self.last_actions is not None: + for i in range(self.c.product_catelogue_size): + a = self.last_actions[i] + reward = demand[i] + if reward > 0.5: + self.alpha[i, a] += reward + else: + self.beta[i, a] += 1.0 + # thompson sampling: sample from posterior, pick best + new_prices = np.zeros(self.c.product_catelogue_size, dtype=np.float32) + actions = np.zeros(self.c.product_catelogue_size, dtype=int) + for i in range(self.c.product_catelogue_size): + theta = self.rng.beta(self.alpha[i], self.beta[i]).astype(np.float32) + actions[i] = int(np.argmax(theta)) + new_prices[i] = self.price_grid[i, actions[i]] + self.last_actions = actions + return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) diff --git a/sim/rl/environment.py b/sim/rl/environment.py index 19f9ad4..fd725f8 100644 --- a/sim/rl/environment.py +++ b/sim/rl/environment.py @@ -1,5 +1,7 @@ +from sys import intern import gymnasium as gym from gymnasium import spaces +from matplotlib import interactive import numpy as np from dataclasses import dataclass import pandas as pd @@ -24,7 +26,7 @@ class BusinessLogicConstraints(): coi_sigmoid_temp: float = 1.25 base_human_demand: float = 0.08 base_agent_demand: float = 0.05 - human_price_elasticity: float = -1.2 + human_price_elasticity: float = -1.2 # assumptions here agent_price_elasticity: float = -0.6 w_agent_loss: float = 1.0 w_volatility: float = 5.0 @@ -35,31 +37,25 @@ class BusinessLogicConstraints(): def _sigmoid(x: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-x)) - -def simple_agent_detector(session_df: pd.DataFrame) -> pd.Series: - # baseline heuristic: high velocity + low conversion - v = session_df.get("interaction_velocity", pd.Series(0.0, index=session_df.index)) - cr = session_df.get("conversion_rate", pd.Series(0.0, index=session_df.index)) - total = session_df.get("total_interactions", pd.Series(0, index=session_df.index)) - return (total >= 12) & (v >= 0.20) & (cr <= 0.01) - - class CommercePlatform: - def __init__(self, product_catelogue_size: int, max_price: float, min_price: float, - constraints: BusinessLogicConstraints, agent_detector: Optional[Callable[[pd.DataFrame], pd.Series]] = None, - use_defense: bool = False): + """ + This is just an extension of the state management for the environment, it does not implement anything dynamic just helps us simulate demand. + """ + def __init__(self, + product_catelogue_size: int, + max_price: float, + min_price: float, + constraints: BusinessLogicConstraints): self.product_catelogue_size = product_catelogue_size + self.product_supply = np.random.uniform(low=10, high=50, size=(self.product_catelogue_size,)) self.max_price = max_price self.min_price = min_price self.constraints = constraints - self.use_defense = use_defense - self.agent_detector = agent_detector self.simulation_history: List[Dict[str, Any]] = [] self._rng = np.random.default_rng(constraints.seed) - self._popularity = self._rng.lognormal(mean=0.0, sigma=0.6, size=self.product_catelogue_size) - self._popularity = self._popularity / (self._popularity.mean() + 1e-12) self._last_interaction_df: pd.DataFrame = pd.DataFrame() + def setup_true_demand(self, prices: np.ndarray) -> Dict[str, np.ndarray]: # ground truth purchase propensities p = np.clip(prices, self.min_price, self.max_price) @@ -67,14 +63,19 @@ class CommercePlatform: human_prob = self.constraints.base_human_demand * (pn ** self.constraints.human_price_elasticity) agent_prob = self.constraints.base_agent_demand * (pn ** self.constraints.agent_price_elasticity) return { - "human_purchase_prob": np.clip(human_prob * self._popularity, 0.0, 0.95), - "agent_purchase_prob": np.clip(agent_prob * self._popularity, 0.0, 0.95) + "human_purchase_prob": np.clip(human_prob, 0.0, 0.95), + "agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95) } - def _session_markup_multiplier(self, signal_score: float) -> float: - # session-based COI markup based on demand signal expression - x = (signal_score - self.constraints.coi_threshold) / max(self.constraints.coi_sigmoid_temp, 1e-6) - return 1.0 + self.constraints.coi_strength * float(_sigmoid(np.array([x]))[0]) + def _load_behavioral_profile(actor : str, demand_forcing): + """ + This returns a markov chain with average weights which we get from interaction data of our experiments. + This defines transition probabilities between different events: + search -> view_item_price_binN: 0.7 + view_item_price_binN -> add_to_cart: 0.2 + we also must reweight with the demand_forcing vector or purchase probabilities per-product + """ + def _simulate_sessions(self, base_prices: np.ndarray) -> pd.DataFrame: demand = self.setup_true_demand(base_prices) @@ -84,94 +85,32 @@ class CommercePlatform: T = self.constraints.sessions_per_step n_agent_sessions = int(round(T * self.constraints.agent_share)) n_human_sessions = T - n_agent_sessions - - # human sessions: normal browse with possible purchase - for s in range(n_human_sessions): - session_id = f"h_{len(events)}_{s}" - k = int(self._rng.integers(1, 4)) - prod_ids = self._rng.choice(self.product_catelogue_size, size=k, replace=False) - t = 0.0 - inter_times = self._rng.gamma(shape=2.0, scale=3.0, size=3 * k) - signal_score = 0.0 - purchased_any = False - - for i, pid in enumerate(prod_ids): - t += float(inter_times[i]) - price_shown = float(base_prices[pid]) - events.append({ - "session_id": session_id, "actor": "human", "agent_id": None, "product_id": int(pid), - "action": "view", "t": t, "price_shown": price_shown, "is_purchase": 0, - "price_paid": 0.0, "oracle_price_paid": 0.0, "signal_score": 0.0, - }) - signal_score += 1.0 - - if self._rng.random() < 0.35: - t += float(inter_times[i + k]) - events.append({ - "session_id": session_id, "actor": "human", "agent_id": None, "product_id": int(pid), - "action": "cart", "t": t, "price_shown": price_shown, "is_purchase": 0, - "price_paid": 0.0, "oracle_price_paid": 0.0, "signal_score": 0.0, - }) - signal_score += 2.0 - - if (not purchased_any) and (self._rng.random() < float(human_pprob[pid])): - t += float(inter_times[i + 2 * k]) - mult = self._session_markup_multiplier(signal_score) - price_paid = float(np.clip(base_prices[pid] * mult, self.min_price, self.max_price)) - events.append({ - "session_id": session_id, "actor": "human", "agent_id": None, "product_id": int(pid), - "action": "purchase", "t": t, "price_shown": float(base_prices[pid]), "is_purchase": 1, - "price_paid": price_paid, "oracle_price_paid": price_paid, "signal_score": signal_score, - }) - purchased_any = True - - # agent sessions: split recon/purchase to circumvent COI n_agent_ids = max(1, n_agent_sessions // 2) - for a in range(n_agent_ids): - agent_id = f"a_{a}" - recon_session_id = f"{agent_id}_recon" - t = 0.0 - n_views = int(self._rng.poisson(lam=8) * self.constraints.agent_recon_multiplier) + 5 - inter_times = self._rng.gamma(shape=2.0, scale=0.6, size=max(n_views, 1)) - prod_ids = self._rng.integers(0, self.product_catelogue_size, size=n_views) - recon_signal = 0.0 + session_map = { + 'humans': n_human_sessions, + 'agents': n_agent_ids + } + pprob_map = { + 'humans': human_pprob, + 'agents': agent_pprob + } + joint_events = [] + for actor, n_sessions in session_map.items(): + bp = _load_behavioral_profile(actor, pprob_map[actor]) + counter = 0 + events = [] + while counter < n_sessions: + session_events = [] + while len(session_events) == 0 or session_events[-1]['action'] == 'checkout': + interaction_event = bp.sample(self._rng) + interaction_event['session_id'] = f'{actor}_{counter:06d}' + # TODO any other assignments + session_events.append(interaction_event) + events.extend(session_events) + counter += 1 + joint_events.extend(events) - for i, pid in enumerate(prod_ids): - t += float(inter_times[i]) - events.append({ - "session_id": recon_session_id, "actor": "agent", "agent_id": agent_id, "product_id": int(pid), - "action": "view", "t": t, "price_shown": float(base_prices[pid]), "is_purchase": 0, - "price_paid": 0.0, "oracle_price_paid": 0.0, "signal_score": 0.0, - }) - recon_signal += 1.0 - - # clean purchase session with minimal interactions - if self._rng.random() < self.constraints.agent_purchase_probability: - purchase_session_id = f"{agent_id}_clean" - pid = int(self._rng.integers(0, self.product_catelogue_size)) - t2 = 0.0 - clean_signal = 0.0 - t2 += float(self._rng.gamma(shape=2.0, scale=0.7)) - events.append({ - "session_id": purchase_session_id, "actor": "agent", "agent_id": agent_id, "product_id": pid, - "action": "view", "t": t2, "price_shown": float(base_prices[pid]), "is_purchase": 0, - "price_paid": 0.0, "oracle_price_paid": 0.0, "signal_score": 0.0, - }) - clean_signal += 1.0 - - if self._rng.random() < float(agent_pprob[pid]): - t2 += float(self._rng.gamma(shape=2.0, scale=0.7)) - obs_mult = self._session_markup_multiplier(clean_signal) - obs_paid = float(np.clip(base_prices[pid] * obs_mult, self.min_price, self.max_price)) - oracle_mult = self._session_markup_multiplier(recon_signal) # oracle links recon->purchase - oracle_paid = float(np.clip(base_prices[pid] * oracle_mult, self.min_price, self.max_price)) - events.append({ - "session_id": purchase_session_id, "actor": "agent", "agent_id": agent_id, "product_id": pid, - "action": "purchase", "t": t2, "price_shown": float(base_prices[pid]), "is_purchase": 1, - "price_paid": obs_paid, "oracle_price_paid": oracle_paid, "signal_score": clean_signal, - }) - - return pd.DataFrame(events) + return pd.DataFrame(joint_events) def compute_interaction_features(self, interaction_df: pd.DataFrame) -> Dict[str, float]: if interaction_df.empty: @@ -183,6 +122,7 @@ class CommercePlatform: return {"mean_sale_price": mean_sale_price, "look_to_book": float(views / (buys + 1e-6))} def _session_feature_table(self, df: pd.DataFrame) -> pd.DataFrame: + # TODO: adapt this if df.empty: return pd.DataFrame() g = df.groupby("session_id", sort=False) @@ -208,73 +148,6 @@ class CommercePlatform: "is_agent": is_agent.astype(bool), }).reset_index() - def demand_estimate(self, interaction_df: pd.DataFrame, exclude_sessions: Optional[pd.Series] = None) -> np.ndarray: - # proxy demand from weighted interaction events - if interaction_df.empty: - return np.zeros(self.product_catelogue_size, dtype=np.float32) - df = interaction_df - if exclude_sessions is not None: - bad_sessions = set(exclude_sessions.loc[exclude_sessions].index) - df = df[~df["session_id"].isin(bad_sessions)] - weights = {"view": 0.15, "cart": 0.75, "purchase": 2.5} - w = df["action"].map(weights).fillna(0.0).to_numpy(dtype=float) - prod = df["product_id"].to_numpy(dtype=int) - q_hat = np.zeros(self.product_catelogue_size, dtype=float) - np.add.at(q_hat, prod, w) - return q_hat.astype(np.float32) - - def run_pricing_simulation(self, prices: np.ndarray) -> Dict[str, Any]: - interaction_df = self._simulate_sessions(prices) - self._last_interaction_df = interaction_df - session_df = self._session_feature_table(interaction_df) - - predicted_agent_sessions = None - if (self.use_defense and self.agent_detector is not None and not session_df.empty): - predicted_agent_sessions = self.agent_detector(session_df.set_index("session_id")) - - q_hat_naive = self.demand_estimate(interaction_df, exclude_sessions=None) - q_hat_defended = self.demand_estimate(interaction_df, exclude_sessions=predicted_agent_sessions) \ - if predicted_agent_sessions is not None else q_hat_naive.copy() - - true_human = np.zeros(self.product_catelogue_size, dtype=float) - true_agent = np.zeros(self.product_catelogue_size, dtype=float) - if not interaction_df.empty: - purchases = interaction_df[interaction_df["action"] == "purchase"] - if not purchases.empty: - for _, r in purchases.iterrows(): - if r["actor"] == "human": - true_human[int(r["product_id"])] += 1.0 - else: - true_agent[int(r["product_id"])] += 1.0 - - revenue_observed = float(interaction_df["price_paid"].sum()) if not interaction_df.empty else 0.0 - revenue_oracle = float(interaction_df["oracle_price_paid"].sum()) if not interaction_df.empty else 0.0 - agent_loss = max(0.0, revenue_oracle - revenue_observed) - - eps = 1e-6 - internal_error_naive = np.abs(true_human - q_hat_naive) / (true_human + eps) - internal_error_def = np.abs(true_human - q_hat_defended) / (true_human + eps) - interaction_features = self.compute_interaction_features(interaction_df) - - summary = { - "prices": prices.copy(), - "interaction_df": interaction_df, - "session_df": session_df, - "q_hat_naive": q_hat_naive, - "q_hat_defended": q_hat_defended, - "true_human_demand": true_human.astype(np.float32), - "true_agent_purchases": true_agent.astype(np.float32), - "internal_error_naive": internal_error_naive.astype(np.float32), - "internal_error_defended": internal_error_def.astype(np.float32), - "interaction_features": interaction_features, - "revenue_observed": revenue_observed, - "revenue_oracle": revenue_oracle, - "agent_loss": agent_loss, - "predicted_agent_sessions": predicted_agent_sessions, - } - self.simulation_history.append(summary) - return summary - def get_interaction_data(self) -> np.ndarray: if self._last_interaction_df.empty: return np.array([], dtype=object) @@ -284,7 +157,7 @@ class CommercePlatform: class PHANTOMEnv(gym.Env): metadata = {"render_modes": []} - def __init__(self, use_defense: bool = False): + def __init__(self, constraints): super().__init__() self.constraints = BusinessLogicConstraints() self.action_space = spaces.Box(low=-self.constraints.max_price_adjustment, @@ -301,14 +174,13 @@ class PHANTOMEnv(gym.Env): high=np.full((self.constraints.product_catelogue_size,), 1e6, dtype=np.float32), dtype=np.float32), }) + # TODO: define more features that we compute from the interaction data }) self.commerce_platform = CommercePlatform( product_catelogue_size=self.constraints.product_catelogue_size, max_price=self.constraints.system_max_price, min_price=self.constraints.system_min_price, - constraints=self.constraints, - agent_detector=simple_agent_detector, - use_defense=use_defense) + constraints=self.constraints) self._rng = np.random.default_rng(self.constraints.seed) self.t = 0 self._prev_prices: Optional[np.ndarray] = None @@ -336,17 +208,13 @@ class PHANTOMEnv(gym.Env): new_prices = np.clip(base_prices * (1.0 + action.astype(np.float32)), self.constraints.system_min_price, self.constraints.system_max_price).astype(np.float32) - result = self.commerce_platform.run_pricing_simulation(new_prices) - - if self.commerce_platform.use_defense: - demand_est = result["q_hat_defended"] - internal_err = result["internal_error_defended"] - else: - demand_est = result["q_hat_naive"] - internal_err = result["internal_error_naive"] self.state["elasticity"]["price"] = new_prices - self.state["elasticity"]["demand"] = demand_est + # TODO: use the commerce platform to simulate sessions + interactions_df = self.commerce_platform._simulate_sessions(new_prices) + result = self.commerce_platform.compute_interaction_features(interactions_df) + # TODO: implement COI computation to use in reward + COI = 0.0 volatility = 0.0 if self._prev_prices is None else \ float(np.mean(np.abs((new_prices - self._prev_prices) / (self._prev_prices + 1e-6)))) @@ -354,12 +222,13 @@ class PHANTOMEnv(gym.Env): revenue_observed = float(result["revenue_observed"]) agent_loss = float(result["agent_loss"]) - err_mean = float(np.mean(internal_err)) reward = (revenue_observed - - self.constraints.w_agent_loss * agent_loss - - self.constraints.w_volatility * volatility - - self.constraints.w_estimation_error * err_mean) + - COI + - self.constraints.w_agent_loss * agent_loss + - self.constraints.w_volatility * volatility + - self.constraints.w_estimation_error + ) terminated = self.t >= self.constraints.episode_length info = { diff --git a/sim/rl/train.py b/sim/rl/train.py new file mode 100644 index 0000000..41a87ab --- /dev/null +++ b/sim/rl/train.py @@ -0,0 +1,149 @@ +import numpy as np +import logging +from pathlib import Path +from typing import Dict, Type, Optional +import pickle +from torch import neg_ +from torch.utils.tensorboard import SummaryWriter +from environment import PHANTOMEnv, FastTrainingConstraints, BusinessLogicConstraints +from engine import (BasePricingEngine, WildPricingEngine, StaticPricingEngine, + SimpleDemandEngine, RandomWalkEngine, ThompsonSamplingEngine) + +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + + + +""" +Target training loop: +have base prices p0 from env reset and run the env step, collect reward and metrics +pass this to the pricing engine which computes the price action to take based on previous reward by learning +the new action gets passed to the step +so we alternate, step -> reward -> engine (produces price delta) -> step with price delta -> reward +to make sure the reinforcement learning inside the engine can learn we need to have trajectory of prices +CURRENT SOLUTION BELOW does not implement correct learning or updates. +""" + +class EngineTrainer: + """wrapper to run pricing engines through episodes and collect metrics""" + def __init__(self, engine: BasePricingEngine, env: PHANTOMEnv, + tb_writer: Optional[SummaryWriter] = None): + self.engine = engine + self.env = env + self.episode_metrics = [] + self.tb_writer = tb_writer + self.global_step = 0 + + def train(self, n_episodes: int, seed: int = 42): + + obs, _ = self.env.reset(seed=seed) + prices = None + for ep in range(n_episodes): + prices = self.engine.compute_prices(prices, obs + obs, reward, done, _, info = self.env.step(prices) + self.engine.update(obs, reward, done, info) + return self + + + + + + + return self.episode_metrics + + def evaluate(self, n_episodes: int = 10, seed: int = 100) -> Dict: + """evaluate trained engine""" + results = {k: [] for k in ['total_reward', 'revenue_observed', 'revenue_oracle', + 'agent_loss', 'ux_volatility', 'look_to_book']} + for ep in range(n_episodes): + metrics = self.run_episode(seed=seed + ep) + for k in results: results[k].append(metrics[k]) + return {k: (np.mean(v), np.std(v)) for k, v in results.items()} + + +def make_env(fast: bool = True): + constraints = FastTrainingConstraints() if fast else BusinessLogicConstraints() + return PHANTOMEnv(constraints=constraints) + + +def train_engine(engine_cls: Type[BasePricingEngine], env: PHANTOMEnv, + n_episodes: int, seed: int = 42, + tb_writer: Optional[SummaryWriter] = None) -> EngineTrainer: + constraints = env.constraints + engine = engine_cls(constraints=constraints, seed=seed) + trainer = EngineTrainer(engine, env, tb_writer=tb_writer) + trainer.train(n_episodes, seed=seed) + return trainer + + +def save_trainer(trainer: EngineTrainer, path: Path): + """save engine state and metrics""" + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, 'wb') as f: + pickle.dump({ + 'engine': trainer.engine, + 'metrics': trainer.episode_metrics + }, f) + logger.info(f"Saved trainer to {path}") + + +def load_trainer(path: Path, env: PHANTOMEnv, + tb_writer: Optional[SummaryWriter] = None) -> EngineTrainer: + """load saved engine""" + with open(path, 'rb') as f: + data = pickle.load(f) + trainer = EngineTrainer(data['engine'], env, tb_writer=tb_writer) + trainer.episode_metrics = data['metrics'] + return trainer + + +if __name__ == "__main__": + base_dir = Path("./runs") + base_dir.mkdir(exist_ok=True) + + engines = { + "Wild": WildPricingEngine, + "Static": StaticPricingEngine, +# "SimpleDemand": SimpleDemandEngine, + "RandomWalk": RandomWalkEngine, + "ThompsonSampling": ThompsonSamplingEngine, + } + defenses = [False, True] + n_train_episodes = 50 + n_eval_episodes = 10 + seed = 42 + fast_mode = True + + logger.info(f"Training config: {n_train_episodes} episodes per engine, fast_mode={fast_mode}") + + trained_trainers = {} + + for engine_name, engine_cls in engines.items(): + for use_defense in defenses: + defense_label = "defense_on" if use_defense else "defense_off" + run_name = f"{engine_name}_{defense_label}" + log_dir = base_dir / run_name + log_dir.mkdir(parents=True, exist_ok=True) + + logger.info(f"Training {engine_name} with defense={use_defense}") + logger.info(f"Log directory: {log_dir}") + + env = make_env(fast=fast_mode) + tb_writer = SummaryWriter(log_dir=str(log_dir)) + trainer = train_engine(engine_cls, env, n_train_episodes, seed, tb_writer=tb_writer) + tb_writer.close() + + save_path = log_dir / "trainer.pkl" + save_trainer(trainer, save_path) + + trained_trainers[run_name] = (trainer, env) + + logger.info("Starting evaluation") + + for run_name, (trainer, env) in trained_trainers.items(): + logger.info(f"Evaluating {run_name}") + results = trainer.evaluate(n_episodes=n_eval_episodes, seed=seed + 1000) + for metric, (mean, std) in results.items(): + logger.info(f" {metric:20s}: {mean:10.2f} ± {std:6.2f}") + + logger.info(f"Results saved to: {base_dir}")