From 4033e73ba1694f5758986727bb2729f7325a9d48 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sat, 24 Jan 2026 15:16:41 +0100 Subject: [PATCH] feat: consistent failure case --- lab/case/thesis/simplified.py | 233 ++++++++++++++++++++++++++---- lab/case/thesis/simplified_env.py | 73 +++++++--- lab/case/thesis/train.py | 12 +- 3 files changed, 264 insertions(+), 54 deletions(-) diff --git a/lab/case/thesis/simplified.py b/lab/case/thesis/simplified.py index 00ed43a..59aef75 100644 --- a/lab/case/thesis/simplified.py +++ b/lab/case/thesis/simplified.py @@ -79,22 +79,175 @@ def estimate_alpha(session: Session, beta: float = 2.0) -> float: return 1.0 / (1.0 + np.exp(-beta * (dh - da))) if (dh + da) > 0 else 0.5 -def sample_trajectory(rng: np.random.Generator, trans: Dict, prices: np.ndarray, is_agent: bool) -> Tuple[List[Event], int]: +@dataclass(frozen=True) +class COIWindow: + """Windowed COI metrics computed from realized price exposures. + + COI_policy is the definition-level KPI: E[p_shown] - p_min. + COI_agent is the theorem-level object: E[p^(1)] - p_min, where p^(1) is the minimum price realized under agent querying. + In this simplified simulator, p^(1) is approximated as the minimum price exposed to any agent in the window (per product). + Leak is the observable gap between them. + """ + + policy: float + agent: float + leak: float + survival_ratio: float + policy_by_product: np.ndarray + agent_by_product: np.ndarray + demand_weights: np.ndarray + + +def _prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]: + prices: Dict[int, List[float]] = {} + for s in sessions: + for e in s.events: + prices.setdefault(e.product_idx, []).append(float(e.price_seen)) + return prices + + +def _min_session_prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]: + mins: Dict[int, List[float]] = {} + for s in sessions: + by_p: Dict[int, float] = {} + for e in s.events: + pidx = int(e.product_idx) + price = float(e.price_seen) + by_p[pidx] = price if pidx not in by_p else min(by_p[pidx], price) + for pidx, pmin in by_p.items(): + mins.setdefault(pidx, []).append(pmin) + return mins + + +def _min_price_across_sessions_by_product(sessions: List[Session]) -> Dict[int, float]: + mins: Dict[int, float] = {} + for s in sessions: + for e in s.events: + pidx = int(e.product_idx) + price = float(e.price_seen) + mins[pidx] = price if pidx not in mins else min(mins[pidx], price) + return mins + + +def _demand_weights_by_product( + sessions: List[Session], + demand_mapping: Dict[str, float], + n_products: int, +) -> np.ndarray: + w = np.zeros(n_products, dtype=float) + sessions_by_id = {s.sid: s for s in sessions} + for sid, q in demand_mapping.items(): + sess = sessions_by_id.get(sid) + if not sess or not sess.events: + continue + pidx = int(sess.events[0].product_idx) + w[pidx] += float(q) + s = float(np.sum(w)) + return (w / s) if s > 0 else w + + +def compute_coi_window( + sessions: List[Session], + costs: np.ndarray, + demand_mapping: Dict[str, float] | None = None, +) -> COIWindow: + n_products = int(len(costs)) + prices = _prices_by_product(sessions) + agent_min_across = _min_price_across_sessions_by_product([s for s in sessions if s.actor == "A"]) + + policy_by_product = np.zeros(n_products, dtype=float) + agent_by_product = np.zeros(n_products, dtype=float) + seen = np.array([(i in prices) for i in range(n_products)], dtype=bool) + agent_seen = np.array([(i in agent_min_across) for i in range(n_products)], dtype=bool) + + for pidx, ps in prices.items(): + if 0 <= pidx < n_products and ps: + policy_by_product[pidx] = float(np.mean(ps) - float(costs[pidx])) + + for pidx, pmin in agent_min_across.items(): + if 0 <= pidx < n_products: + agent_by_product[pidx] = float(pmin - float(costs[pidx])) + + # If no agent exposure exists for a product in the window, there is no realized erosion for that product. + agent_by_product[seen & ~agent_seen] = policy_by_product[seen & ~agent_seen] + + demand_weights = ( + _demand_weights_by_product(sessions, demand_mapping, n_products) + if demand_mapping is not None + else np.zeros(n_products, dtype=float) + ) + + has_weights = float(np.sum(demand_weights)) > 0 + if has_weights: + policy = float(np.dot(demand_weights, policy_by_product)) + agent = float(np.dot(demand_weights, agent_by_product)) + else: + if not bool(np.any(seen)): + policy = 0.0 + agent = 0.0 + else: + policy = float(np.mean(policy_by_product[seen])) + agent = float(np.mean(agent_by_product[seen])) + + leak = float(max(policy - agent, 0.0)) + survival_ratio = float(np.clip(agent / policy, 0.0, 1.0)) if policy > 0 else 0.0 + + return COIWindow( + policy=policy, + agent=agent, + leak=leak, + survival_ratio=survival_ratio, + policy_by_product=policy_by_product, + agent_by_product=agent_by_product, + demand_weights=demand_weights, + ) + + +def sample_trajectory( + rng: np.random.Generator, + trans: Dict, + prices: np.ndarray, + costs: np.ndarray, + theta: Dict[str, float], + is_agent: bool, + session_price_noise: float = 0.02, + surge: float = 0.08, + max_markup_mult: float = 1.8, +) -> Tuple[List[Event], int]: """Sample session trajectory from behavioral kernel.""" state, t, pidx = "start", 0.0, int(rng.integers(0, len(prices))) + cost = float(costs[pidx]) + base_price = float(prices[pidx]) * float(1.0 + rng.normal(0.0, session_price_noise)) + base_price = float(np.clip(base_price, cost * 1.01, float(prices[pidx]) * 2.0)) + current_price = base_price + signal = 0.0 events = [] + # TODO: instead of this very controlled setup implement same session samplin as in models.py while state != "end" and len(events) < 30: - if state != "start": - events.append(Event(action=state, product_idx=pidx, price_seen=float(prices[pidx]), ts=t)) probs = trans.get(state, {"end": 1.0}) - state = rng.choice(list(probs.keys()), p=list(probs.values())) + nxt = rng.choice(list(probs.keys()), p=list(probs.values())) + + if nxt == "purchase": + price_sens = float(theta.get("price_sens", 2.0)) + base_conv = float(theta.get("base_conv", 0.2)) + rel = max((current_price - cost) / (cost + 1e-6), 0.0) + p_buy = float(np.clip(base_conv * np.exp(-price_sens * rel), 0.0, 1.0)) + if rng.random() > p_buy: + nxt = "end" + + state = nxt + if state not in {"start", "end"}: + events.append(Event(action=state, product_idx=pidx, price_seen=float(current_price), ts=t)) + signal += float(ACTION_WEIGHTS.get(state, 0.1)) + current_price = float(np.clip(base_price * (1.0 + surge * signal), cost * 1.01, base_price * max_markup_mult)) + t += max(0.2, rng.gamma(1.5, 0.8) if is_agent else rng.gamma(2.0, 1.2)) return events, pidx -def put_prices_to_market(prices: np.ndarray, alpha: float = 0.2, n_sessions: int = 50, +def put_prices_to_market(prices: np.ndarray, costs: np.ndarray, alpha: float = 0.2, n_sessions: int = 50, seed: int | None = None) -> Tuple[List[Session], Dict[str, float]]: - """Generate sessions from mixture model Q(p) = (1-α)E[d_H] + αE[d_A] (Eq 3). + """Generate sessions from mixture model Returns: sessions: list of Session objects with events and product attribution @@ -108,7 +261,7 @@ def put_prices_to_market(prices: np.ndarray, alpha: float = 0.2, n_sessions: int is_agent = rng.random() < alpha trans = TRANS_A if is_agent else TRANS_H theta = {"price_sens": rng.uniform(0.05, 0.2), "base_conv": 0.01} if is_agent else {"price_sens": rng.uniform(1.5, 4.0), "base_conv": rng.uniform(0.2, 0.5)} - events, _ = sample_trajectory(rng, trans, prices, is_agent) + events, _ = sample_trajectory(rng, trans, prices, costs=costs, theta=theta, is_agent=is_agent) session = Session(sid=sid, events=events, actor="A" if is_agent else "H", theta=theta) sessions.append(session) demand_mapping[sid] = compute_demand(session) @@ -167,6 +320,8 @@ class System: self.limbo = Limbo() self._alpha_est = 0.2 # current contamination estimate self._sessions: List[Session] = [] + self._last_sessions: List[Session] = [] + self._last_coi: COIWindow | None = None @property def alpha(self) -> float: @@ -190,24 +345,27 @@ class System: agg_demand[pidx] += q return float(np.dot(prices, agg_demand)) - def _coi_leakage(self, prices: np.ndarray, n_agents: int = 1) -> float: - """COI leakage tied to Theorem 1: erosion from order statistic collapse. - - As N agents query, min(p_1..p_N) → p_min and COI → 0. - Leakage = erosion_rate × margin_at_risk - """ - price_std = float(np.std(prices)) - erosion = coi_erosion(max(1, n_agents), price_std) - margin_at_risk = float(np.mean(prices - self.costs)) - return erosion * margin_at_risk + def _compute_coi_window(self, demand: Dict[str, float]) -> COIWindow: + if not self._last_sessions: + zeros = np.zeros(self.n, dtype=float) + return COIWindow( + policy=0.0, + agent=0.0, + leak=0.0, + survival_ratio=0.0, + policy_by_product=zeros, + agent_by_product=zeros, + demand_weights=zeros, + ) + return compute_coi_window(self._last_sessions, self.costs, demand_mapping=demand) def _objective(self, prices: np.ndarray, demand: Dict[str, float]) -> float: """Robust objective: R(p,d) - λ·COI_leak (Eq 23 simplified).""" revenue = self._revenue_under_demand(prices, demand) cost = float(np.sum(self.costs)) # fixed cost approximation profit = revenue - cost - coi_penalty = self.lambda_coi * self._coi_leakage(prices) * float(np.mean(prices - self.costs)) - return profit - coi_penalty + self._last_coi = self._compute_coi_window(demand) + return profit - self.lambda_coi * self._last_coi.leak def compute_prices(self, demand: Dict[str, float] | None = None) -> np.ndarray: """Compute next prices via simple gradient-like update on robust objective. @@ -230,28 +388,44 @@ class System: def observe_demand(self, prices: np.ndarray, alpha_true: float = 0.2, n_sessions: int = 50) -> Dict[str, float]: """Observe market response to prices.""" - sessions, demand_map = put_prices_to_market(prices, alpha=alpha_true, n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000))) + sessions, demand_map = put_prices_to_market(prices, costs=self.costs, alpha=alpha_true, n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000))) + self._last_sessions = sessions self._sessions.extend(sessions) # store actual sessions for correct product attribution self.limbo.add_update("demand", demand_map) return demand_map - def step(self, alpha_true: float = 0.2, n_sessions: int = 50) -> Tuple[np.ndarray, Dict[str, float], float]: + def step(self, alpha_true: float = 0.2, n_sessions: int = 50) -> Tuple[np.ndarray, Dict[str, float], float, COIWindow]: """Single simulation step: prices -> demand -> reward.""" demand_hist = self.limbo.get_demand_history() prices = self.compute_prices(demand_hist[-1] if demand_hist else None) demand = self.observe_demand(prices, alpha_true, n_sessions) reward = self._objective(prices, demand) - return prices, demand, reward + coi = self._last_coi or self._compute_coi_window(demand) + return prices, demand, reward, coi def run(self, n_steps: int = 100, alpha_true: float = 0.2) -> Dict: """Run simulation for n_steps, return trajectory.""" - trajectory = {"prices": [], "demand": [], "rewards": [], "alpha_est": [], "alpha_true": alpha_true} + trajectory = { + "prices": [], + "demand": [], + "rewards": [], + "alpha_est": [], + "alpha_true": alpha_true, + "coi_policy": [], + "coi_agent": [], + "coi_leak": [], + "coi_survival": [], + } for _ in range(n_steps): - p, d, r = self.step(alpha_true) + p, d, r, coi = self.step(alpha_true) trajectory["prices"].append(p) trajectory["demand"].append(d) trajectory["rewards"].append(r) trajectory["alpha_est"].append(self._alpha_est) + trajectory["coi_policy"].append(coi.policy) + trajectory["coi_agent"].append(coi.agent) + trajectory["coi_leak"].append(coi.leak) + trajectory["coi_survival"].append(coi.survival_ratio) return trajectory @@ -268,10 +442,17 @@ if __name__ == "__main__": # quick demo sys = System(n_products=5, seed=42) traj = sys.run(n_steps=20, alpha_true=0.25) - print(f"avg reward: {np.mean(traj['rewards']):.2f}, final α̂: {traj['alpha_est'][-1]:.3f}") + print( + f"avg reward: {np.mean(traj['rewards']):.2f}, " + f"final α̂: {traj['alpha_est'][-1]:.3f}, " + f"COI_policy: {np.mean(traj['coi_policy']):.3f}, " + f"COI_agent: {np.mean(traj['coi_agent']):.3f}, " + f"leak: {np.mean(traj['coi_leak']):.3f}" + ) prices = np.array([20.0, 35.0, 50.0, 25.0, 40.0]) - sessions, demand = put_prices_to_market(prices, alpha=0.3, n_sessions=20, seed=123) + costs = np.array([15.0, 28.0, 40.0, 18.0, 30.0]) + sessions, demand = put_prices_to_market(prices, costs=costs, alpha=0.3, n_sessions=20, seed=123) print(f'sessions: {len(sessions)}, agents: {sum(1 for s in sessions if s.actor=="A")}') for n in [1, 5, 10, 50, 100]: diff --git a/lab/case/thesis/simplified_env.py b/lab/case/thesis/simplified_env.py index af4af87..e59ae41 100644 --- a/lab/case/thesis/simplified_env.py +++ b/lab/case/thesis/simplified_env.py @@ -19,8 +19,19 @@ try: except ImportError: HAS_GYM = False -from .simplified import (System, Session, Event, Limbo, put_prices_to_market, - compute_demand, estimate_alpha, coi_erosion, TRANS_H, TRANS_A) +from .simplified import ( + System, + Session, + Event, + Limbo, + put_prices_to_market, + compute_coi_window, + compute_demand, + estimate_alpha, + coi_erosion, + TRANS_H, + TRANS_A, +) @dataclass @@ -116,9 +127,19 @@ class PricingEnv(gym.Env if HAS_GYM else object): agg[pidx] += q self._demand_agg = agg - revenue = float(np.dot(prices, agg)) - cost = float(np.dot(sys.costs, np.clip(agg, 0, 1))) # simplified cost model - profit = revenue - cost + revenue = 0.0 + cost = 0.0 + purchases = np.zeros(self.n, dtype=float) + for sess in sys._last_sessions: + for e in sess.events: + if e.action != "purchase": + continue + pidx = int(e.product_idx) + if 0 <= pidx < self.n: + purchases[pidx] += 1.0 + revenue += float(e.price_seen) + cost += float(sys.costs[pidx]) + profit = float(revenue - cost) # volatility penalty (price changes) vol_penalty = 0.0 @@ -126,9 +147,8 @@ class PricingEnv(gym.Env if HAS_GYM else object): price_change = np.abs(prices - self._last_prices) / (sys.refs + 1e-6) vol_penalty = cfg.lambda_vol * float(np.mean(price_change)) - # COI leakage penalty - avg_margin = float(np.mean(prices - sys.costs)) - coi_leak = sys.alpha * avg_margin + coi = compute_coi_window(sys._last_sessions, sys.costs, demand_mapping=demand) + coi_leak = float(coi.leak) if cfg.reward_mode == "revenue": r = revenue @@ -181,11 +201,11 @@ class PricingEnv(gym.Env if HAS_GYM else object): prices = self._sys.refs * action.astype(np.float64) prices = np.clip(prices, self._sys.costs * 1.01, self._sys.refs * 2.0) - # drift contamination - if self.cfg.alpha_drift != 0: - self._alpha = np.clip( - self._alpha + self.cfg.alpha_drift * self._sys.rng.normal(), - *self.cfg.alpha_bounds) + # # drift contamination + # if self.cfg.alpha_drift != 0: + # self._alpha = np.clip( + # self._alpha + self.cfg.alpha_drift * self._sys.rng.normal(), + # *self.cfg.alpha_bounds) # observe demand demand = self._sys.observe_demand(prices, alpha_true=self._alpha, n_sessions=self.cfg.sessions_per_step) @@ -205,25 +225,38 @@ class PricingEnv(gym.Env if HAS_GYM else object): truncated = False # compute metrics for tracking - revenue = float(np.dot(prices, self._demand_agg)) - cost = float(np.dot(self._sys.costs, np.clip(self._demand_agg, 0, 1))) - profit = revenue - cost + revenue = 0.0 + cost = 0.0 + n_purchases = 0 + for sess in self._sys._last_sessions: + for e in sess.events: + if e.action != "purchase": + continue + n_purchases += 1 + revenue += float(e.price_seen) + cost += float(self._sys.costs[int(e.product_idx)]) + profit = float(revenue - cost) n_agents = int(self._alpha * self.cfg.sessions_per_step) price_std = float(np.std(prices)) + coi = compute_coi_window(self._sys._last_sessions, self._sys.costs, demand_mapping=demand) info = { "alpha_true": self._alpha, "alpha_est": self._sys.alpha, "alpha_error": abs(self._alpha - self._sys.alpha), - "revenue": revenue, - "profit": profit, - "cost": cost, + "revenue": float(revenue), + "profit": float(profit), + "cost": float(cost), + "n_purchases": int(n_purchases), "avg_margin": float(np.mean((prices - self._sys.costs) / self._sys.costs)), "n_sessions": len(demand), "n_agents": n_agents, "price_std": price_std, "coi_erosion": coi_erosion(max(1, n_agents), price_std), - "coi_leakage": self._sys.alpha * float(np.mean(prices - self._sys.costs)), + "coi_policy": float(coi.policy), + "coi_agent": float(coi.agent), + "coi_leakage": float(coi.leak), + "coi_survival": float(coi.survival_ratio), "cumulative_reward": sum(self._episode_rewards), "step": self._t, } diff --git a/lab/case/thesis/train.py b/lab/case/thesis/train.py index cd134fd..cc152b5 100644 --- a/lab/case/thesis/train.py +++ b/lab/case/thesis/train.py @@ -2,11 +2,6 @@ Trains pricing policies using stable-baselines3 with TensorBoard logging. Tracks COI erosion, alpha estimation error, and economic KPIs per thesis formulation. - -Usage: - python -m lab.case.thesis.train --algo ppo --alpha 0.3 --steps 100000 - python -m lab.case.thesis.train --algo adaptive --sweep # run alpha sweep - tensorboard --logdir lab/case/thesis/runs """ from __future__ import annotations import argparse @@ -41,9 +36,9 @@ class EpisodeMetrics: reward: float = 0.0 revenue: float = 0.0 profit: float = 0.0 - coi_erosion: float = 0.0 # theorem 1: order statistic erosion - coi_leakage: float = 0.0 # per-step leakage penalty - alpha_error: float = 0.0 # |α - α̂| + coi_erosion: float = 0.0 + coi_leakage: float = 0.0 + alpha_error: float = 0.0 avg_margin: float = 0.0 n_agents: int = 0 steps: int = 0 @@ -213,6 +208,7 @@ def train(cfg: ExperimentConfig) -> Dict[str, Any]: if algo_cls is None: raise ValueError(f"unknown algo: {cfg.algo}") common = dict(verbose=1, seed=cfg.seed, tensorboard_log=str(log_path), device="auto") + # TODO: setup hyper parameter passing to train different variations (no free lunch) if cfg.algo.lower() == "ppo": model = PPO("MlpPolicy", train_env, learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95,