From 1224841a82167e54457899427067731b174000c9 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sat, 24 Jan 2026 23:51:57 +0100 Subject: [PATCH] preliminary improved runs --- lab/case/thesis/simplified.py | 373 +++++------------------------- lab/case/thesis/simplified_env.py | 293 +++++++---------------- lab/case/thesis/train.py | 277 +++++++++++----------- 3 files changed, 279 insertions(+), 664 deletions(-) diff --git a/lab/case/thesis/simplified.py b/lab/case/thesis/simplified.py index 59aef75..3c58fdd 100644 --- a/lab/case/thesis/simplified.py +++ b/lab/case/thesis/simplified.py @@ -1,11 +1,11 @@ """Minimal implementation of thesis pricing system. Implements the core loop: prices -> sessions -> demand -> prices -with behavioral separability and robust pricing objective (Eq 23). +with behavioral separability and robust pricing objective. Objects: -- Session trajectories τ_s from mixture of H/A behavioral profiles -- Demand proxy q̂ via weighted action aggregation (Eq 2) +- Session trajectories tau_s from mixture of H/A behavioral profiles +- Demand proxy q_hat via weighted action aggregation - COI leakage penalty for agent reconnaissance - Limbo: alternating price/demand history for trajectory analysis """ @@ -14,11 +14,10 @@ from dataclasses import dataclass, field from typing import Dict, List, Tuple import numpy as np +from .coi import COIWindow, compute_coi_window, coi_erosion +from .separability import TRANS_H, TRANS_A, kl_div, build_kernel, compute_divergence, estimate_alpha + ACTION_WEIGHTS = {"add_to_cart": 0.8, "checkout": 0.9, "purchase": 1.0, "view": 0.15, "detail": 0.25, "hover": 0.3, "start": 0.05, "end": 0.0} -TRANS_H = {"start": {"view": 0.85, "end": 0.15}, "view": {"detail": 0.4, "cart": 0.3, "view": 0.2, "end": 0.1}, - "detail": {"cart": 0.5, "view": 0.3, "end": 0.2}, "cart": {"purchase": 0.6, "view": 0.25, "end": 0.15}, "purchase": {"end": 1.0}} -TRANS_A = {"start": {"view": 0.95, "end": 0.05}, "view": {"detail": 0.6, "view": 0.25, "cart": 0.1, "end": 0.05}, - "detail": {"view": 0.5, "cart": 0.15, "detail": 0.3, "end": 0.05}, "cart": {"view": 0.4, "purchase": 0.2, "end": 0.4}, "purchase": {"end": 1.0}} @dataclass @@ -38,235 +37,52 @@ class Session: def compute_demand(session: Session) -> float: - """Compute demand proxy q̂ = Σ_k ω(a_k) for session (Eq 2).""" + """Compute demand proxy q_hat = sum_k omega(a_k) for session.""" return sum(ACTION_WEIGHTS.get(e.action, 0.1) for e in session.events) -def kl_div(p: Dict[str, float], q: Dict[str, float]) -> float: - """KL divergence D_KL(p || q) for transition kernels.""" - eps = 1e-10 - keys = set(p.keys()) | set(q.keys()) - return sum(p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps)) for k in keys) - - -def build_kernel(events: List[Event]) -> Dict[str, Dict[str, float]]: - """Build empirical transition kernel from trajectory.""" - trans: Dict[str, Dict[str, int]] = {} - prev = "start" - for e in events: - curr = e.action - trans.setdefault(prev, {}) - trans[prev][curr] = trans[prev].get(curr, 0) + 1 - prev = curr - kernel = {} - for s, dsts in trans.items(): - total = sum(dsts.values()) - kernel[s] = {d: c / total for d, c in dsts.items()} if total > 0 else {} - return kernel - - -def compute_divergence(session: Session) -> Tuple[float, float]: - """Compute Δ_H, Δ_A divergence signals (Eq 20-21).""" - kernel = build_kernel(session.events) - delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / max(len(kernel), 1) - delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / max(len(kernel), 1) - return delta_h, delta_a - - -def estimate_alpha(session: Session, beta: float = 2.0) -> float: - """Per-session contamination estimate α̂(τ') = σ(β(Δ_H - Δ_A)).""" - dh, da = compute_divergence(session) - return 1.0 / (1.0 + np.exp(-beta * (dh - da))) if (dh + da) > 0 else 0.5 - - -@dataclass(frozen=True) -class COIWindow: - """Windowed COI metrics computed from realized price exposures. - - COI_policy is the definition-level KPI: E[p_shown] - p_min. - COI_agent is the theorem-level object: E[p^(1)] - p_min, where p^(1) is the minimum price realized under agent querying. - In this simplified simulator, p^(1) is approximated as the minimum price exposed to any agent in the window (per product). - Leak is the observable gap between them. - """ - - policy: float - agent: float - leak: float - survival_ratio: float - policy_by_product: np.ndarray - agent_by_product: np.ndarray - demand_weights: np.ndarray - - -def _prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]: - prices: Dict[int, List[float]] = {} - for s in sessions: - for e in s.events: - prices.setdefault(e.product_idx, []).append(float(e.price_seen)) - return prices - - -def _min_session_prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]: - mins: Dict[int, List[float]] = {} - for s in sessions: - by_p: Dict[int, float] = {} - for e in s.events: - pidx = int(e.product_idx) - price = float(e.price_seen) - by_p[pidx] = price if pidx not in by_p else min(by_p[pidx], price) - for pidx, pmin in by_p.items(): - mins.setdefault(pidx, []).append(pmin) - return mins - - -def _min_price_across_sessions_by_product(sessions: List[Session]) -> Dict[int, float]: - mins: Dict[int, float] = {} - for s in sessions: - for e in s.events: - pidx = int(e.product_idx) - price = float(e.price_seen) - mins[pidx] = price if pidx not in mins else min(mins[pidx], price) - return mins - - -def _demand_weights_by_product( - sessions: List[Session], - demand_mapping: Dict[str, float], - n_products: int, -) -> np.ndarray: - w = np.zeros(n_products, dtype=float) - sessions_by_id = {s.sid: s for s in sessions} - for sid, q in demand_mapping.items(): - sess = sessions_by_id.get(sid) - if not sess or not sess.events: - continue - pidx = int(sess.events[0].product_idx) - w[pidx] += float(q) - s = float(np.sum(w)) - return (w / s) if s > 0 else w - - -def compute_coi_window( - sessions: List[Session], - costs: np.ndarray, - demand_mapping: Dict[str, float] | None = None, -) -> COIWindow: - n_products = int(len(costs)) - prices = _prices_by_product(sessions) - agent_min_across = _min_price_across_sessions_by_product([s for s in sessions if s.actor == "A"]) - - policy_by_product = np.zeros(n_products, dtype=float) - agent_by_product = np.zeros(n_products, dtype=float) - seen = np.array([(i in prices) for i in range(n_products)], dtype=bool) - agent_seen = np.array([(i in agent_min_across) for i in range(n_products)], dtype=bool) - - for pidx, ps in prices.items(): - if 0 <= pidx < n_products and ps: - policy_by_product[pidx] = float(np.mean(ps) - float(costs[pidx])) - - for pidx, pmin in agent_min_across.items(): - if 0 <= pidx < n_products: - agent_by_product[pidx] = float(pmin - float(costs[pidx])) - - # If no agent exposure exists for a product in the window, there is no realized erosion for that product. - agent_by_product[seen & ~agent_seen] = policy_by_product[seen & ~agent_seen] - - demand_weights = ( - _demand_weights_by_product(sessions, demand_mapping, n_products) - if demand_mapping is not None - else np.zeros(n_products, dtype=float) - ) - - has_weights = float(np.sum(demand_weights)) > 0 - if has_weights: - policy = float(np.dot(demand_weights, policy_by_product)) - agent = float(np.dot(demand_weights, agent_by_product)) - else: - if not bool(np.any(seen)): - policy = 0.0 - agent = 0.0 - else: - policy = float(np.mean(policy_by_product[seen])) - agent = float(np.mean(agent_by_product[seen])) - - leak = float(max(policy - agent, 0.0)) - survival_ratio = float(np.clip(agent / policy, 0.0, 1.0)) if policy > 0 else 0.0 - - return COIWindow( - policy=policy, - agent=agent, - leak=leak, - survival_ratio=survival_ratio, - policy_by_product=policy_by_product, - agent_by_product=agent_by_product, - demand_weights=demand_weights, - ) - - -def sample_trajectory( - rng: np.random.Generator, - trans: Dict, - prices: np.ndarray, - costs: np.ndarray, - theta: Dict[str, float], - is_agent: bool, - session_price_noise: float = 0.02, - surge: float = 0.08, - max_markup_mult: float = 1.8, -) -> Tuple[List[Event], int]: +def sample_trajectory(rng: np.random.Generator, trans: Dict, prices: np.ndarray, costs: np.ndarray, theta: Dict[str, float], + is_agent: bool, session_noise: float = 0.02, surge: float = 0.08, max_mult: float = 1.8) -> Tuple[List[Event], int]: """Sample session trajectory from behavioral kernel.""" - state, t, pidx = "start", 0.0, int(rng.integers(0, len(prices))) - cost = float(costs[pidx]) - base_price = float(prices[pidx]) * float(1.0 + rng.normal(0.0, session_price_noise)) - base_price = float(np.clip(base_price, cost * 1.01, float(prices[pidx]) * 2.0)) - current_price = base_price - signal = 0.0 + pidx = int(rng.integers(0, len(prices))) + cost, base = float(costs[pidx]), float(prices[pidx]) * (1.0 + rng.normal(0.0, session_noise)) + base = float(np.clip(base, cost * 1.01, float(prices[pidx]) * 2.0)) + price, signal, state, t = base, 0.0, "start", 0.0 events = [] - # TODO: instead of this very controlled setup implement same session samplin as in models.py + while state != "end" and len(events) < 30: probs = trans.get(state, {"end": 1.0}) nxt = rng.choice(list(probs.keys()), p=list(probs.values())) - - if nxt == "purchase": - price_sens = float(theta.get("price_sens", 2.0)) - base_conv = float(theta.get("base_conv", 0.2)) - rel = max((current_price - cost) / (cost + 1e-6), 0.0) - p_buy = float(np.clip(base_conv * np.exp(-price_sens * rel), 0.0, 1.0)) + if nxt == "purchase": # purchase conversion check + rel = max((price - cost) / (cost + 1e-6), 0.0) + p_buy = float(np.clip(theta.get("base_conv", 0.2) * np.exp(-theta.get("price_sens", 2.0) * rel), 0.0, 1.0)) if rng.random() > p_buy: nxt = "end" - state = nxt if state not in {"start", "end"}: - events.append(Event(action=state, product_idx=pidx, price_seen=float(current_price), ts=t)) + events.append(Event(action=state, product_idx=pidx, price_seen=float(price), ts=t)) signal += float(ACTION_WEIGHTS.get(state, 0.1)) - current_price = float(np.clip(base_price * (1.0 + surge * signal), cost * 1.01, base_price * max_markup_mult)) - + price = float(np.clip(base * (1.0 + surge * signal), cost * 1.01, base * max_mult)) t += max(0.2, rng.gamma(1.5, 0.8) if is_agent else rng.gamma(2.0, 1.2)) return events, pidx def put_prices_to_market(prices: np.ndarray, costs: np.ndarray, alpha: float = 0.2, n_sessions: int = 50, seed: int | None = None) -> Tuple[List[Session], Dict[str, float]]: - """Generate sessions from mixture model - - Returns: - sessions: list of Session objects with events and product attribution - demand_mapping: session_id -> demand proxy q̂ - """ + """Generate sessions from mixture model. Returns sessions and demand mapping sid -> q_hat.""" rng = np.random.default_rng(seed) - sessions, demand_mapping = [], {} - + sessions, demand = [], {} for i in range(n_sessions): sid = f"s{i:04d}" is_agent = rng.random() < alpha trans = TRANS_A if is_agent else TRANS_H - theta = {"price_sens": rng.uniform(0.05, 0.2), "base_conv": 0.01} if is_agent else {"price_sens": rng.uniform(1.5, 4.0), "base_conv": rng.uniform(0.2, 0.5)} + theta = {"price_sens": rng.uniform(0.05, 0.2), "base_conv": 0.01} if is_agent else \ + {"price_sens": rng.uniform(1.5, 4.0), "base_conv": rng.uniform(0.2, 0.5)} events, _ = sample_trajectory(rng, trans, prices, costs=costs, theta=theta, is_agent=is_agent) session = Session(sid=sid, events=events, actor="A" if is_agent else "H", theta=theta) sessions.append(session) - demand_mapping[sid] = compute_demand(session) - - return sessions, demand_mapping + demand[sid] = compute_demand(session) + return sessions, demand @dataclass @@ -286,13 +102,7 @@ class Limbo: def add_update(self, utype: str, data: np.ndarray | Dict[str, float]) -> Dict: self.history.append(LimboUpdate(utype=utype, data=data, t=self._t)) self._t += 1 - return self.on_update(utype) - - def on_update(self, utype: str) -> Dict: - """React to update: after prices -> return observed demand; after demand -> signal price update needed.""" - if utype == "prices": - return {"action": "observe_demand", "msg": "awaiting market response"} - return {"action": "set_prices", "msg": "demand observed, update prices"} + return {"action": "observe_demand" if utype == "prices" else "set_prices"} def get_prices_history(self) -> List[np.ndarray]: return [u.data for u in self.history if u.utype == "prices"] @@ -304,21 +114,18 @@ class Limbo: class System: """Main pricing system implementing robust Stackelberg objective. - Manages the alternating loop: - 1. Set prices p_t - 2. Observe demand response Q̂(p_t) - 3. Estimate contamination α from behavioral signals - 4. Compute next prices via robust objective (Eq 23) + Manages the alternating loop: set prices p_t -> observe demand Q_hat(p_t) -> + estimate contamination alpha from behavioral signals -> compute next prices. """ def __init__(self, n_products: int = 10, costs: np.ndarray | None = None, lambda_coi: float = 0.5, seed: int | None = 42): self.n = n_products self.rng = np.random.default_rng(seed) self.costs = costs if costs is not None else self.rng.uniform(10, 50, n_products) - self.refs = self.costs * (1 + self.rng.uniform(0.2, 0.5, n_products)) # base prices with margin + self.refs = self.costs * (1 + self.rng.uniform(0.2, 0.5, n_products)) self.lambda_coi = lambda_coi self.limbo = Limbo() - self._alpha_est = 0.2 # current contamination estimate + self._alpha_est = 0.2 self._sessions: List[Session] = [] self._last_sessions: List[Session] = [] self._last_coi: COIWindow | None = None @@ -328,127 +135,73 @@ class System: return self._alpha_est def _estimate_alpha_from_sessions(self) -> float: - """Aggregate per-session α̂ estimates.""" if not self._sessions: return self._alpha_est - alphas = [estimate_alpha(s) for s in self._sessions[-50:]] # use recent sessions - return float(np.mean(alphas)) + return float(np.mean([estimate_alpha(s) for s in self._sessions[-50:]])) def _revenue_under_demand(self, prices: np.ndarray, demand: Dict[str, float]) -> float: - """Compute expected revenue R(p, d) from demand proxy.""" - agg_demand = np.zeros(self.n) + agg = np.zeros(self.n) for sid, q in demand.items(): - if self._sessions: - sess = next((s for s in self._sessions if s.sid == sid), None) - if sess and sess.events: - pidx = sess.events[0].product_idx - agg_demand[pidx] += q - return float(np.dot(prices, agg_demand)) + sess = next((s for s in self._sessions if s.sid == sid), None) + if sess and sess.events: + agg[sess.events[0].product_idx] += q + return float(np.dot(prices, agg)) def _compute_coi_window(self, demand: Dict[str, float]) -> COIWindow: if not self._last_sessions: zeros = np.zeros(self.n, dtype=float) - return COIWindow( - policy=0.0, - agent=0.0, - leak=0.0, - survival_ratio=0.0, - policy_by_product=zeros, - agent_by_product=zeros, - demand_weights=zeros, - ) + return COIWindow(policy=0.0, agent=0.0, leak=0.0, survival_ratio=0.0, + policy_by_product=zeros, agent_by_product=zeros, demand_weights=zeros) return compute_coi_window(self._last_sessions, self.costs, demand_mapping=demand) def _objective(self, prices: np.ndarray, demand: Dict[str, float]) -> float: - """Robust objective: R(p,d) - λ·COI_leak (Eq 23 simplified).""" - revenue = self._revenue_under_demand(prices, demand) - cost = float(np.sum(self.costs)) # fixed cost approximation - profit = revenue - cost + """Robust objective: R(p,d) - lambda * COI_leak.""" + profit = self._revenue_under_demand(prices, demand) - float(np.sum(self.costs)) self._last_coi = self._compute_coi_window(demand) return profit - self.lambda_coi * self._last_coi.leak def compute_prices(self, demand: Dict[str, float] | None = None) -> np.ndarray: - """Compute next prices via simple gradient-like update on robust objective. - - In a full implementation this would be replaced by DR-RL policy output. - Here we use a heuristic: adjust margins based on α estimate. - """ + """Compute next prices via heuristic margin adjustment based on alpha estimate.""" self._alpha_est = self._estimate_alpha_from_sessions() - - # base margin adjustment: higher α -> lower margins (defensive pricing) - margin_scale = 1.0 - 0.5 * self._alpha_est # reduce margins under high contamination + margin_scale = 1.0 - 0.5 * self._alpha_est # defensive pricing under high contamination margins = (self.refs - self.costs) * margin_scale - - # add small noise for exploration noise = self.rng.normal(0, 0.02, self.n) * self.costs prices = np.clip(self.costs + margins + noise, self.costs * 1.02, self.refs * 1.3) - self.limbo.add_update("prices", prices) return prices def observe_demand(self, prices: np.ndarray, alpha_true: float = 0.2, n_sessions: int = 50) -> Dict[str, float]: - """Observe market response to prices.""" - sessions, demand_map = put_prices_to_market(prices, costs=self.costs, alpha=alpha_true, n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000))) + sessions, demand_map = put_prices_to_market(prices, costs=self.costs, alpha=alpha_true, + n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000))) self._last_sessions = sessions - self._sessions.extend(sessions) # store actual sessions for correct product attribution + self._sessions.extend(sessions) self.limbo.add_update("demand", demand_map) return demand_map def step(self, alpha_true: float = 0.2, n_sessions: int = 50) -> Tuple[np.ndarray, Dict[str, float], float, COIWindow]: - """Single simulation step: prices -> demand -> reward.""" demand_hist = self.limbo.get_demand_history() prices = self.compute_prices(demand_hist[-1] if demand_hist else None) demand = self.observe_demand(prices, alpha_true, n_sessions) reward = self._objective(prices, demand) - coi = self._last_coi or self._compute_coi_window(demand) - return prices, demand, reward, coi + return prices, demand, reward, self._last_coi or self._compute_coi_window(demand) def run(self, n_steps: int = 100, alpha_true: float = 0.2) -> Dict: - """Run simulation for n_steps, return trajectory.""" - trajectory = { - "prices": [], - "demand": [], - "rewards": [], - "alpha_est": [], - "alpha_true": alpha_true, - "coi_policy": [], - "coi_agent": [], - "coi_leak": [], - "coi_survival": [], - } + traj = {"prices": [], "demand": [], "rewards": [], "alpha_est": [], "alpha_true": alpha_true, + "coi_policy": [], "coi_agent": [], "coi_leak": [], "coi_survival": []} for _ in range(n_steps): p, d, r, coi = self.step(alpha_true) - trajectory["prices"].append(p) - trajectory["demand"].append(d) - trajectory["rewards"].append(r) - trajectory["alpha_est"].append(self._alpha_est) - trajectory["coi_policy"].append(coi.policy) - trajectory["coi_agent"].append(coi.agent) - trajectory["coi_leak"].append(coi.leak) - trajectory["coi_survival"].append(coi.survival_ratio) - return trajectory - - -def coi_erosion(n_agents: int, price_std: float) -> float: - """COI erosion from Theorem 1: as N->inf, min(p_1..p_N)->p_min.""" - if n_agents <= 1: - return 0.0 - log_n = np.log(n_agents) - shift = price_std * (np.sqrt(2 * log_n) - (np.log(log_n) + np.log(4 * np.pi)) / (2 * np.sqrt(2 * log_n) + 1e-6)) - return float(min(shift / (price_std * 2 + 1e-6), 1.0)) + traj["prices"].append(p); traj["demand"].append(d); traj["rewards"].append(r) + traj["alpha_est"].append(self._alpha_est) + traj["coi_policy"].append(coi.policy); traj["coi_agent"].append(coi.agent) + traj["coi_leak"].append(coi.leak); traj["coi_survival"].append(coi.survival_ratio) + return traj if __name__ == "__main__": - # quick demo sys = System(n_products=5, seed=42) traj = sys.run(n_steps=20, alpha_true=0.25) - print( - f"avg reward: {np.mean(traj['rewards']):.2f}, " - f"final α̂: {traj['alpha_est'][-1]:.3f}, " - f"COI_policy: {np.mean(traj['coi_policy']):.3f}, " - f"COI_agent: {np.mean(traj['coi_agent']):.3f}, " - f"leak: {np.mean(traj['coi_leak']):.3f}" - ) + print(f"avg reward: {np.mean(traj['rewards']):.2f}, final alpha_hat: {traj['alpha_est'][-1]:.3f}, " + f"COI_policy: {np.mean(traj['coi_policy']):.3f}, COI_agent: {np.mean(traj['coi_agent']):.3f}, leak: {np.mean(traj['coi_leak']):.3f}") prices = np.array([20.0, 35.0, 50.0, 25.0, 40.0]) costs = np.array([15.0, 28.0, 40.0, 18.0, 30.0]) @@ -456,16 +209,10 @@ if __name__ == "__main__": print(f'sessions: {len(sessions)}, agents: {sum(1 for s in sessions if s.actor=="A")}') for n in [1, 5, 10, 50, 100]: - ero = coi_erosion(n, price_std=5.0) - print(f'N={n:3d} agents -> COI erosion: {ero:.3f}') + print(f'N={n:3d} agents -> COI erosion: {coi_erosion(n, price_std=5.0):.3f}') - # test separability - events = [Event('view', 0, 20.0, 0.1), Event('detail', 0, 20.0, 0.5), Event('cart', 0, 20.0, 1.0), - Event('purchase', 0, 20.0, 2.0)] - sess_h = Session(sid='test', events=events, actor='H') - print(f'human-like session α̂: {estimate_alpha(sess_h):.3f}') + events = [Event('view', 0, 20.0, 0.1), Event('detail', 0, 20.0, 0.5), Event('cart', 0, 20.0, 1.0), Event('purchase', 0, 20.0, 2.0)] + print(f'human-like session alpha_hat: {estimate_alpha(Session(sid="test", events=events, actor="H")):.3f}') - events_a = [Event('view', 0, 20.0, 0.1), Event('detail', 0, 20.0, 0.2), Event('view', 0, 20.0, 0.3), - Event('detail', 0, 20.0, 0.4)] - sess_a = Session(sid='test2', events=events_a, actor='A') - print(f'agent-like session α̂: {estimate_alpha(sess_a):.3f}') + events_a = [Event('view', 0, 20.0, 0.1), Event('detail', 0, 20.0, 0.2), Event('view', 0, 20.0, 0.3), Event('detail', 0, 20.0, 0.4)] + print(f'agent-like session alpha_hat: {estimate_alpha(Session(sid="test2", events=events_a, actor="A")):.3f}') diff --git a/lab/case/thesis/simplified_env.py b/lab/case/thesis/simplified_env.py index e59ae41..e4cd84c 100644 --- a/lab/case/thesis/simplified_env.py +++ b/lab/case/thesis/simplified_env.py @@ -19,58 +19,45 @@ try: except ImportError: HAS_GYM = False -from .simplified import ( - System, - Session, - Event, - Limbo, - put_prices_to_market, - compute_coi_window, - compute_demand, - estimate_alpha, - coi_erosion, - TRANS_H, - TRANS_A, -) +from .simplified import System, Session, Event, Limbo, put_prices_to_market, compute_demand, estimate_alpha +from .coi import COIWindow, compute_coi_window, coi_erosion @dataclass class EnvConfig: - """Configuration for pricing environment.""" n_products: int = 5 max_steps: int = 200 sessions_per_step: int = 30 - alpha_true: float = 0.2 # true contamination level - alpha_drift: float = 0.0 # per-step drift in α + alpha_true: float = 0.2 + alpha_drift: float = 0.0 alpha_bounds: Tuple[float, float] = (0.0, 0.6) - lambda_coi: float = 0.5 # COI penalty weight - lambda_vol: float = 0.1 # volatility penalty weight - reward_mode: str = "robust" # revenue | profit | robust | coi_aware + lambda_coi: float = 0.5 + lambda_vol: float = 0.1 + reward_mode: str = "robust" # revenue | profit | robust | coi_aware normalize_reward: bool = True seed: int | None = 42 +def aggregate_purchases(sessions: list[Session], n_products: int, costs: np.ndarray) -> Tuple[np.ndarray, float, float]: + """Aggregate purchases from sessions, returns (counts, revenue, cost).""" + purchases = np.zeros(n_products, dtype=float) + revenue, cost = 0.0, 0.0 + for sess in sessions: + for e in sess.events: + if e.action == "purchase" and 0 <= e.product_idx < n_products: + purchases[e.product_idx] += 1.0 + revenue += float(e.price_seen) + cost += float(costs[e.product_idx]) + return purchases, revenue, cost + + class PricingEnv(gym.Env if HAS_GYM else object): """RL environment for dynamic pricing under agent contamination. - Implements the thesis formulation where: - - Platform sets prices p_t - - Market responds with mixture demand Q(p) = (1-α)D_H + αD_A - - Agent estimates contamination α̂ from behavioral signals - - Reward balances profit vs COI leakage - - Observation space (normalized): - [0:n] - current prices / ref_prices - [n:2n] - aggregated demand per product - [2n] - estimated contamination α̂ - [2n+1] - true contamination α (if observable, else 0) - [2n+2:3n+2] - current margins (prices - costs) / costs - [3n+2] - step / max_steps - - Action space: - price multipliers in [0.5, 1.5] applied to reference prices + Platform sets prices p_t, market responds with mixture demand Q(p) = (1-alpha)*D_H + alpha*D_A. + Agent estimates contamination alpha_hat from behavioral signals. + Reward balances profit vs COI leakage. """ - metadata = {"render_modes": ["human", "ansi"]} def __init__(self, cfg: EnvConfig | None = None): @@ -86,34 +73,23 @@ class PricingEnv(gym.Env if HAS_GYM else object): self._episode_rewards: list[float] = [] self._demand_agg = np.zeros(self.n) - # gymnasium spaces self.action_space = spaces.Box(low=0.5, high=1.5, shape=(self.n,), dtype=np.float32) - obs_dim = self.n + self.n + 1 + 1 + self.n + 1 # prices + demand + α̂ + α + margins + t + obs_dim = self.n + self.n + 1 + 1 + self.n + 1 # prices + demand + alpha_hat + alpha + margins + t self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32) def _build_obs(self) -> np.ndarray: - """Construct observation vector.""" if self._sys is None: return np.zeros(self.observation_space.shape[0], dtype=np.float32) - prices = self._last_prices if self._last_prices is not None else self._sys.refs - price_ratio = prices / (self._sys.refs + 1e-6) - demand_norm = self._demand_agg / (np.sum(self._demand_agg) + 1e-6) - margins = (prices - self._sys.costs) / (self._sys.costs + 1e-6) - t_norm = self._t / self.cfg.max_steps - - obs = np.concatenate([ - price_ratio, # [0:n] - demand_norm, # [n:2n] - [self._sys.alpha], # [2n] estimated α̂ - [self._alpha], # [2n+1] true α - margins, # [2n+2:3n+2] - [t_norm], # [3n+2] - ]) - return obs.astype(np.float32) + return np.concatenate([ + prices / (self._sys.refs + 1e-6), + self._demand_agg / (np.sum(self._demand_agg) + 1e-6), + [self._sys.alpha, self._alpha], + (prices - self._sys.costs) / (self._sys.costs + 1e-6), + [self._t / self.cfg.max_steps], + ]).astype(np.float32) def _compute_reward(self, prices: np.ndarray, demand: Dict[str, float]) -> float: - """Compute reward based on configured mode.""" cfg, sys = self.cfg, self._sys if sys is None: return 0.0 @@ -123,159 +99,77 @@ class PricingEnv(gym.Env if HAS_GYM else object): for sid, q in demand.items(): sess = next((s for s in sys._sessions if s.sid == sid), None) if sess and sess.events: - pidx = sess.events[0].product_idx - agg[pidx] += q + agg[sess.events[0].product_idx] += q self._demand_agg = agg - revenue = 0.0 - cost = 0.0 - purchases = np.zeros(self.n, dtype=float) - for sess in sys._last_sessions: - for e in sess.events: - if e.action != "purchase": - continue - pidx = int(e.product_idx) - if 0 <= pidx < self.n: - purchases[pidx] += 1.0 - revenue += float(e.price_seen) - cost += float(sys.costs[pidx]) - profit = float(revenue - cost) + _, revenue, cost = aggregate_purchases(sys._last_sessions, self.n, sys.costs) + profit = revenue - cost - # volatility penalty (price changes) vol_penalty = 0.0 if self._last_prices is not None: - price_change = np.abs(prices - self._last_prices) / (sys.refs + 1e-6) - vol_penalty = cfg.lambda_vol * float(np.mean(price_change)) + vol_penalty = cfg.lambda_vol * float(np.mean(np.abs(prices - self._last_prices) / (sys.refs + 1e-6))) coi = compute_coi_window(sys._last_sessions, sys.costs, demand_mapping=demand) - coi_leak = float(coi.leak) + leak = float(coi.leak) - if cfg.reward_mode == "revenue": - r = revenue - elif cfg.reward_mode == "profit": - r = profit - elif cfg.reward_mode == "robust": - # robust objective: profit - λ_coi * COI_leak - λ_vol * volatility - r = profit - cfg.lambda_coi * coi_leak - vol_penalty - elif cfg.reward_mode == "coi_aware": - # adaptive: heavier penalty at high contamination - adaptive_lambda = cfg.lambda_coi * (1 + 2 * sys.alpha) - r = profit - adaptive_lambda * coi_leak - vol_penalty - else: - r = profit - - if cfg.normalize_reward: - r = r / (float(np.sum(sys.refs)) + 1e-6) # normalize by potential revenue - - return float(r) + reward_fns = { + "revenue": lambda: revenue, + "profit": lambda: profit, + "robust": lambda: profit - cfg.lambda_coi * leak - vol_penalty, + "coi_aware": lambda: profit - cfg.lambda_coi * (1 + 2 * sys.alpha) * leak - vol_penalty, + } + r = reward_fns.get(cfg.reward_mode, lambda: profit)() + return float(r / (float(np.sum(sys.refs)) + 1e-6)) if cfg.normalize_reward else float(r) def reset(self, seed: int | None = None, options: dict | None = None) -> Tuple[np.ndarray, dict]: - """Reset environment to initial state.""" seed = seed if seed is not None else self.cfg.seed self._sys = System(n_products=self.n, lambda_coi=self.cfg.lambda_coi, seed=seed) - self._t = 0 - self._alpha = self.cfg.alpha_true - self._last_prices = None - self._last_demand = None - self._episode_rewards = [] - self._demand_agg = np.zeros(self.n) - - info = {"alpha_true": self._alpha, "alpha_est": self._sys.alpha, - "costs": self._sys.costs.copy(), "refs": self._sys.refs.copy()} - return self._build_obs(), info + self._t, self._alpha = 0, self.cfg.alpha_true + self._last_prices, self._last_demand = None, None + self._episode_rewards, self._demand_agg = [], np.zeros(self.n) + return self._build_obs(), {"alpha_true": self._alpha, "alpha_est": self._sys.alpha, + "costs": self._sys.costs.copy(), "refs": self._sys.refs.copy()} def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]: - """Execute one environment step. - - Args: - action: price multipliers in [0.5, 1.5] - - Returns: - obs, reward, terminated, truncated, info - """ if self._sys is None: raise RuntimeError("call reset() first") - # convert action to prices action = np.clip(action, 0.5, 1.5) - prices = self._sys.refs * action.astype(np.float64) - prices = np.clip(prices, self._sys.costs * 1.01, self._sys.refs * 2.0) - - # # drift contamination - # if self.cfg.alpha_drift != 0: - # self._alpha = np.clip( - # self._alpha + self.cfg.alpha_drift * self._sys.rng.normal(), - # *self.cfg.alpha_bounds) - - # observe demand + prices = np.clip(self._sys.refs * action.astype(np.float64), self._sys.costs * 1.01, self._sys.refs * 2.0) demand = self._sys.observe_demand(prices, alpha_true=self._alpha, n_sessions=self.cfg.sessions_per_step) self._sys.limbo.add_update("prices", prices) - - # update α estimate self._sys._alpha_est = self._sys._estimate_alpha_from_sessions() reward = self._compute_reward(prices, demand) self._episode_rewards.append(reward) - - self._last_prices = prices.copy() - self._last_demand = demand + self._last_prices, self._last_demand = prices.copy(), demand self._t += 1 - terminated = self._t >= self.cfg.max_steps - truncated = False - - # compute metrics for tracking - revenue = 0.0 - cost = 0.0 - n_purchases = 0 - for sess in self._sys._last_sessions: - for e in sess.events: - if e.action != "purchase": - continue - n_purchases += 1 - revenue += float(e.price_seen) - cost += float(self._sys.costs[int(e.product_idx)]) - profit = float(revenue - cost) + # compute info metrics using shared helper + purchases, revenue, cost = aggregate_purchases(self._sys._last_sessions, self.n, self._sys.costs) n_agents = int(self._alpha * self.cfg.sessions_per_step) - price_std = float(np.std(prices)) coi = compute_coi_window(self._sys._last_sessions, self._sys.costs, demand_mapping=demand) info = { - "alpha_true": self._alpha, - "alpha_est": self._sys.alpha, + "alpha_true": self._alpha, "alpha_est": self._sys.alpha, "alpha_error": abs(self._alpha - self._sys.alpha), - "revenue": float(revenue), - "profit": float(profit), - "cost": float(cost), - "n_purchases": int(n_purchases), + "revenue": float(revenue), "profit": float(revenue - cost), "cost": float(cost), + "n_purchases": int(np.sum(purchases)), "avg_margin": float(np.mean((prices - self._sys.costs) / self._sys.costs)), - "n_sessions": len(demand), - "n_agents": n_agents, - "price_std": price_std, - "coi_erosion": coi_erosion(max(1, n_agents), price_std), - "coi_policy": float(coi.policy), - "coi_agent": float(coi.agent), - "coi_leakage": float(coi.leak), - "coi_survival": float(coi.survival_ratio), - "cumulative_reward": sum(self._episode_rewards), - "step": self._t, + "n_sessions": len(demand), "n_agents": n_agents, "price_std": float(np.std(prices)), + "coi_erosion": coi_erosion(max(1, n_agents), float(np.std(prices))), + "coi_policy": float(coi.policy), "coi_agent": float(coi.agent), + "coi_leakage": float(coi.leak), "coi_survival": float(coi.survival_ratio), + "cumulative_reward": sum(self._episode_rewards), "step": self._t, } - - return self._build_obs(), reward, terminated, truncated, info + return self._build_obs(), reward, self._t >= self.cfg.max_steps, False, info def render(self, mode: str = "human") -> str | None: - """Render environment state.""" if self._sys is None or self._last_prices is None: return None - - lines = [ - f"t={self._t}/{self.cfg.max_steps}", - f"α_true={self._alpha:.3f} α̂={self._sys.alpha:.3f}", - f"prices: {self._last_prices.round(1)}", - f"demand: {self._demand_agg.round(2)}", - f"reward: {self._episode_rewards[-1] if self._episode_rewards else 0:.3f}", - ] - out = " | ".join(lines) + out = f"t={self._t}/{self.cfg.max_steps} | alpha_true={self._alpha:.3f} alpha_hat={self._sys.alpha:.3f} | " \ + f"prices: {self._last_prices.round(1)} | demand: {self._demand_agg.round(2)} | " \ + f"reward: {self._episode_rewards[-1] if self._episode_rewards else 0:.3f}" if mode == "human": print(out) return out @@ -285,10 +179,7 @@ class PricingEnv(gym.Env if HAS_GYM else object): class ContaminationSweepEnv(PricingEnv): - """Environment that sweeps through contamination levels during training. - - Useful for curriculum learning: start with low α, gradually increase. - """ + """Environment that sweeps through contamination levels during training.""" def __init__(self, cfg: EnvConfig | None = None, alpha_schedule: list[float] | None = None): super().__init__(cfg) @@ -296,7 +187,6 @@ class ContaminationSweepEnv(PricingEnv): self._schedule_idx = 0 def reset(self, seed: int | None = None, options: dict | None = None) -> Tuple[np.ndarray, dict]: - # advance schedule on reset if options and options.get("advance_schedule", False): self._schedule_idx = (self._schedule_idx + 1) % len(self._schedule) self.cfg.alpha_true = self._schedule[self._schedule_idx] @@ -306,8 +196,7 @@ class ContaminationSweepEnv(PricingEnv): class AdversarialEnv(PricingEnv): """Environment with adversarial contamination dynamics. - The contamination level responds to pricing policy: if prices are too predictable, - agents learn to exploit and α increases. + Contamination increases when prices are predictable (agents exploit). """ def __init__(self, cfg: EnvConfig | None = None, exploitation_rate: float = 0.02): @@ -317,20 +206,13 @@ class AdversarialEnv(PricingEnv): def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]: obs, reward, term, trunc, info = super().step(action) - - # track price history for predictability if self._last_prices is not None: self._price_history.append(self._last_prices.copy()) - - # increase α if prices are predictable (low variance over recent history) + predictability = 0.0 if len(self._price_history) > 10: - recent = np.array(self._price_history[-10:]) - predictability = 1.0 / (float(np.std(recent)) + 0.1) - self._alpha = np.clip( - self._alpha + self._exploit_rate * predictability * self._sys.rng.random(), - *self.cfg.alpha_bounds) - - info["predictability"] = predictability if len(self._price_history) > 10 else 0.0 + predictability = 1.0 / (float(np.std(self._price_history[-10:])) + 0.1) + self._alpha = np.clip(self._alpha + self._exploit_rate * predictability * self._sys.rng.random(), *self.cfg.alpha_bounds) + info["predictability"] = predictability return obs, reward, term, trunc, info def reset(self, seed: int | None = None, options: dict | None = None) -> Tuple[np.ndarray, dict]: @@ -339,39 +221,20 @@ class AdversarialEnv(PricingEnv): def make_env(cfg: EnvConfig | None = None, env_type: str = "standard") -> PricingEnv: - """Factory for creating pricing environments.""" - if env_type == "sweep": - return ContaminationSweepEnv(cfg) - elif env_type == "adversarial": - return AdversarialEnv(cfg) - return PricingEnv(cfg) + return {"sweep": ContaminationSweepEnv, "adversarial": AdversarialEnv}.get(env_type, PricingEnv)(cfg) -# simple baseline policies for benchmarking -def fixed_price_policy(refs: np.ndarray, margin: float = 0.0) -> np.ndarray: - """Fixed markup policy: always return ref * (1 + margin).""" - return np.ones(len(refs), dtype=np.float32) * (1.0 + margin) - - -def random_policy(n: int, rng: np.random.Generator | None = None) -> np.ndarray: - """Random policy for exploration baseline.""" - rng = rng or np.random.default_rng() - return rng.uniform(0.7, 1.3, n).astype(np.float32) - - -def adaptive_policy(obs: np.ndarray, n: int, base_margin: float = 0.1) -> np.ndarray: - """Simple adaptive policy: reduce margins when α̂ is high.""" - alpha_est = obs[2 * n] # α̂ is at position 2n in observation - margin_scale = 1.0 - 0.4 * alpha_est # defensive when α̂ high - return np.ones(n, dtype=np.float32) * (1.0 + base_margin * margin_scale) +# baseline policies +fixed_price_policy = lambda refs, margin=0.0: np.ones(len(refs), dtype=np.float32) * (1.0 + margin) +random_policy = lambda n, rng=None: (rng or np.random.default_rng()).uniform(0.7, 1.3, n).astype(np.float32) +adaptive_policy = lambda obs, n, base=0.1: np.ones(n, dtype=np.float32) * (1.0 + base * (1.0 - 0.4 * obs[2 * n])) if __name__ == "__main__": - # demo run cfg = EnvConfig(n_products=100, max_steps=100, alpha_true=0.25, reward_mode="robust") env = make_env(cfg) obs, info = env.reset() - print(f"initial: α={info['alpha_true']:.2f}") + print(f"initial: alpha={info['alpha_true']:.2f}") total_reward = 0.0 for t in range(cfg.max_steps): @@ -383,4 +246,4 @@ if __name__ == "__main__": if done: break - print(f"\ntotal reward: {total_reward:.2f}, final α̂: {info['alpha_est']:.3f}") + print(f"\ntotal reward: {total_reward:.2f}, final alpha_hat: {info['alpha_est']:.3f}") diff --git a/lab/case/thesis/train.py b/lab/case/thesis/train.py index cc152b5..753a5f1 100644 --- a/lab/case/thesis/train.py +++ b/lab/case/thesis/train.py @@ -6,7 +6,8 @@ Tracks COI erosion, alpha estimation error, and economic KPIs per thesis formula from __future__ import annotations import argparse import json -from dataclasses import dataclass, asdict +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Dict, List, Callable, Any import numpy as np @@ -27,10 +28,9 @@ except ImportError: HAS_TB = False from .simplified_env import PricingEnv, EnvConfig, make_env, adaptive_policy, fixed_price_policy, random_policy -from .simplified import coi_erosion +from .coi import coi_erosion -# thesis-aligned KPIs tracked per episode @dataclass class EpisodeMetrics: reward: float = 0.0 @@ -43,10 +43,24 @@ class EpisodeMetrics: n_agents: int = 0 steps: int = 0 + def accumulate(self, info: Dict[str, Any]) -> None: + self.steps += 1 + self.reward += info.get('reward', 0) + self.revenue += info.get('revenue', 0) + self.profit += info.get('profit', 0) + self.coi_erosion += info.get('coi_erosion', 0) + self.coi_leakage += info.get('coi_leakage', 0) + self.alpha_error += abs(info.get('alpha_true', 0) - info.get('alpha_est', 0)) + self.avg_margin += info.get('avg_margin', 0) + self.n_agents += info.get('n_agents', 0) + + def normalized(self) -> Dict[str, float]: + s = max(self.steps, 1) + return {k: getattr(self, k) / s for k in ['revenue', 'profit', 'coi_erosion', 'coi_leakage', 'alpha_error', 'avg_margin', 'n_agents']} + @dataclass class ExperimentConfig: - """Full experiment specification for reproducibility.""" algo: str = "ppo" total_timesteps: int = 100_000 n_envs: int = 4 @@ -65,17 +79,14 @@ class ExperimentConfig: self.experiment_name = f"{self.algo}_a{self.alpha_true:.2f}_{self.reward_mode}" -# unified policy interface wrapping all baselines class Policy: """Unified policy interface for baselines and trained models.""" def __init__(self, policy_fn: Callable[[np.ndarray, int], np.ndarray], name: str): - self._fn = policy_fn - self.name = name + self._fn, self.name = policy_fn, name def predict(self, obs: np.ndarray, deterministic: bool = True) -> tuple[np.ndarray, None]: - n = (len(obs) - 3) // 3 - return self._fn(obs, n), None + return self._fn(obs, (len(obs) - 3) // 3), None @staticmethod def fixed(margin: float = 0.15) -> "Policy": @@ -91,99 +102,97 @@ class Policy: @staticmethod def myopic(greed: float = 0.3) -> "Policy": - """Myopic: maximize immediate margin, ignore alpha.""" def _fn(obs: np.ndarray, n: int) -> np.ndarray: demand_norm = obs[n:2*n] if len(obs) > 2*n else np.ones(n) * 0.5 - mult = 1.0 + greed * (1 + np.mean(demand_norm)) - return np.ones(n, dtype=np.float32) * np.clip(mult, 0.5, 1.5) + return np.ones(n, dtype=np.float32) * np.clip(1.0 + greed * (1 + np.mean(demand_norm)), 0.5, 1.5) return Policy(_fn, f"myopic_{greed:.1f}") -class MetricsCallback(BaseCallback): - """Tracks thesis-aligned metrics during RL training.""" +def log_metrics(writer: SummaryWriter | None, metrics: Dict[str, float], prefix: str, step: int) -> None: + if writer is None: + return + for k, v in metrics.items(): + writer.add_scalar(f'{prefix}/{k}', v, step) + +class MetricsCallback(BaseCallback): def __init__(self, writer: SummaryWriter | None, verbose: int = 0): super().__init__(verbose) self._writer = writer - self._ep = EpisodeMetrics() - self._buffer: List[EpisodeMetrics] = [] def _on_step(self) -> bool: + if self._writer is None: + return True for info in self.locals.get('infos', []): - self._ep.steps += 1 - self._ep.reward += info.get('reward', 0) - self._ep.revenue += info.get('revenue', 0) - self._ep.profit += info.get('profit', 0) - self._ep.coi_erosion += info.get('coi_erosion', 0) - self._ep.coi_leakage += info.get('coi_leakage', 0) - self._ep.alpha_error += abs(info.get('alpha_true', 0) - info.get('alpha_est', 0)) - self._ep.avg_margin += info.get('avg_margin', 0) - self._ep.n_agents += info.get('n_agents', 0) + t = self.num_timesteps + self._writer.add_scalar('economics/revenue', info.get('revenue', 0), t) + self._writer.add_scalar('economics/profit', info.get('profit', 0), t) + self._writer.add_scalar('economics/margin', info.get('avg_margin', 0), t) + self._writer.add_scalar('coi/erosion', info.get('coi_erosion', 0), t) + self._writer.add_scalar('coi/leakage', info.get('coi_leakage', 0), t) + self._writer.add_scalar('alpha/estimation_error', abs(info.get('alpha_true', 0) - info.get('alpha_est', 0)), t) + self._writer.add_scalar('agents/count', info.get('n_agents', 0), t) return True - def _on_rollout_end(self) -> None: - if self._ep.steps == 0 or self._writer is None: - return - s, step = self._ep.steps, self.num_timesteps - self._writer.add_scalar('economics/revenue', self._ep.revenue / s, step) - self._writer.add_scalar('economics/profit', self._ep.profit / s, step) - self._writer.add_scalar('economics/margin', self._ep.avg_margin / s, step) - self._writer.add_scalar('coi/erosion', self._ep.coi_erosion / s, step) - self._writer.add_scalar('coi/leakage', self._ep.coi_leakage / s, step) - self._writer.add_scalar('alpha/estimation_error', self._ep.alpha_error / s, step) - self._writer.add_scalar('agents/count', self._ep.n_agents / s, step) - self._buffer.append(self._ep) - self._ep = EpisodeMetrics() - def make_vec_env(cfg: ExperimentConfig, n_envs: int = 1) -> DummyVecEnv: def _make(): - env_cfg = EnvConfig(n_products=cfg.n_products, max_steps=cfg.max_steps, - alpha_true=cfg.alpha_true, reward_mode=cfg.reward_mode, seed=cfg.seed) - return Monitor(make_env(env_cfg)) + return Monitor(make_env(EnvConfig(n_products=cfg.n_products, max_steps=cfg.max_steps, + alpha_true=cfg.alpha_true, reward_mode=cfg.reward_mode, seed=cfg.seed))) return DummyVecEnv([_make for _ in range(n_envs)]) -def evaluate_policy(policy: Policy | Any, cfg: ExperimentConfig, n_episodes: int = 20) -> Dict[str, float]: - """Evaluate policy and return thesis-aligned metrics.""" - env_cfg = EnvConfig(n_products=cfg.n_products, max_steps=cfg.max_steps, - alpha_true=cfg.alpha_true, reward_mode=cfg.reward_mode, seed=cfg.seed + 999) - env = make_env(env_cfg) +def run_episodes(policy: Policy | Any, env: PricingEnv, n_episodes: int) -> List[EpisodeMetrics]: + """Run policy for n episodes and collect metrics.""" metrics = [] - for _ in range(n_episodes): obs, _ = env.reset() - ep = EpisodeMetrics() - done = False + ep, done = EpisodeMetrics(), False while not done: action, _ = policy.predict(obs, deterministic=True) obs, reward, term, trunc, info = env.step(action) done = term or trunc + ep.accumulate(info) ep.reward += reward - ep.revenue += info.get('revenue', 0) - ep.profit += info.get('profit', 0) - ep.coi_erosion += info.get('coi_erosion', 0) - ep.coi_leakage += info.get('coi_leakage', 0) - ep.alpha_error += abs(info['alpha_true'] - info['alpha_est']) - ep.avg_margin += info.get('avg_margin', 0) - ep.steps += 1 metrics.append(ep) + return metrics - n = len(metrics) + +def evaluate_policy(policy: Policy | Any, cfg: ExperimentConfig, n_episodes: int = 20) -> Dict[str, float]: + env = make_env(EnvConfig(n_products=cfg.n_products, max_steps=cfg.max_steps, + alpha_true=cfg.alpha_true, reward_mode=cfg.reward_mode, seed=cfg.seed + 999)) + metrics = run_episodes(policy, env, n_episodes) return { - 'reward_mean': np.mean([m.reward for m in metrics]), - 'reward_std': np.std([m.reward for m in metrics]), - 'revenue_mean': np.mean([m.revenue / m.steps for m in metrics]), - 'profit_mean': np.mean([m.profit / m.steps for m in metrics]), - 'coi_erosion_mean': np.mean([m.coi_erosion / m.steps for m in metrics]), - 'coi_leakage_mean': np.mean([m.coi_leakage / m.steps for m in metrics]), - 'alpha_error_mean': np.mean([m.alpha_error / m.steps for m in metrics]), - 'margin_mean': np.mean([m.avg_margin / m.steps for m in metrics]), + 'reward_mean': np.mean([m.reward for m in metrics]), 'reward_std': np.std([m.reward for m in metrics]), + **{f'{k}_mean': np.mean([m.normalized()[k] for m in metrics]) + for k in ['revenue', 'profit', 'coi_erosion', 'coi_leakage', 'alpha_error', 'avg_margin']}, } +def run_baseline(policy: Policy, vec_env: DummyVecEnv, total_steps: int, writer: SummaryWriter | None): + obs, n_envs = vec_env.reset(), vec_env.num_envs + ep_rewards = np.zeros(n_envs) + + for step in range(0, total_steps, n_envs): + actions = np.array([policy.predict(obs[i])[0] for i in range(n_envs)]) + obs, rewards, dones, infos = vec_env.step(actions) + ep_rewards += rewards + for i, info in enumerate(infos): + if writer: + writer.add_scalar('economics/revenue', info.get('revenue', 0), step) + writer.add_scalar('economics/profit', info.get('profit', 0), step) + writer.add_scalar('economics/margin', info.get('avg_margin', 0), step) + writer.add_scalar('coi/erosion', info.get('coi_erosion', 0), step) + writer.add_scalar('coi/leakage', info.get('coi_leakage', 0), step) + writer.add_scalar('alpha/estimation_error', abs(info.get('alpha_true', 0) - info.get('alpha_est', 0)), step) + writer.add_scalar('agents/count', info.get('n_agents', 0), step) + if dones[i]: + if writer: + writer.add_scalar('rollout/ep_reward', ep_rewards[i], step) + ep_rewards[i] = 0 + + def train(cfg: ExperimentConfig) -> Dict[str, Any]: - """Train RL agent or evaluate baseline policy.""" is_baseline = cfg.algo.lower() in ["fixed", "adaptive", "random", "myopic"] if not HAS_SB3 and not is_baseline: raise ImportError("stable-baselines3 required: pip install stable-baselines3[extra]") @@ -194,85 +203,65 @@ def train(cfg: ExperimentConfig) -> Dict[str, Any]: json.dump(asdict(cfg), f, indent=2) writer = SummaryWriter(log_path) if HAS_TB else None - train_env = make_vec_env(cfg, cfg.n_envs) - eval_env = make_vec_env(cfg, 1) + train_env, eval_env = make_vec_env(cfg, cfg.n_envs), make_vec_env(cfg, 1) if is_baseline: - policy_map = {"fixed": Policy.fixed(), "adaptive": Policy.adaptive(), - "random": Policy.random(), "myopic": Policy.myopic()} - policy = policy_map[cfg.algo.lower()] + policy = {"fixed": Policy.fixed, "adaptive": Policy.adaptive, "random": Policy.random, "myopic": Policy.myopic}[cfg.algo.lower()]() run_baseline(policy, train_env, cfg.total_timesteps, writer) final_metrics = evaluate_policy(policy, cfg) else: - algo_cls = {"ppo": PPO, "sac": SAC, "a2c": A2C}.get(cfg.algo.lower()) - if algo_cls is None: - raise ValueError(f"unknown algo: {cfg.algo}") + algo_cls = {"ppo": PPO, "sac": SAC, "a2c": A2C}[cfg.algo.lower()] common = dict(verbose=1, seed=cfg.seed, tensorboard_log=str(log_path), device="auto") - # TODO: setup hyper parameter passing to train different variations (no free lunch) - if cfg.algo.lower() == "ppo": - model = PPO("MlpPolicy", train_env, learning_rate=3e-4, n_steps=2048, - batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95, - clip_range=0.2, ent_coef=0.01, **common) - elif cfg.algo.lower() == "sac": - model = SAC("MlpPolicy", train_env, learning_rate=3e-4, buffer_size=100_000, - batch_size=256, tau=0.005, gamma=0.99, **common) - else: - model = A2C("MlpPolicy", train_env, learning_rate=7e-4, n_steps=5, gamma=0.99, **common) + model = { + "ppo": lambda: PPO("MlpPolicy", train_env, learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95, clip_range=0.2, ent_coef=0.01, **common), + "sac": lambda: SAC("MlpPolicy", train_env, learning_rate=3e-4, buffer_size=100_000, batch_size=256, tau=0.005, gamma=0.99, **common), + "a2c": lambda: A2C("MlpPolicy", train_env, learning_rate=7e-4, n_steps=5, gamma=0.99, **common), + }[cfg.algo.lower()]() cb = MetricsCallback(writer) - eval_cb = EvalCallback(eval_env, best_model_save_path=str(log_path / "best"), - log_path=str(log_path), eval_freq=cfg.eval_freq, - n_eval_episodes=cfg.n_eval_episodes, deterministic=True) + eval_cb = EvalCallback(eval_env, best_model_save_path=str(log_path / "best"), log_path=str(log_path), + eval_freq=cfg.eval_freq, n_eval_episodes=cfg.n_eval_episodes, deterministic=True) model.learn(cfg.total_timesteps, callback=[cb, eval_cb], progress_bar=True) model.save(log_path / "final_model") policy = model final_metrics = evaluate_policy(model, cfg) if writer: - for k, v in final_metrics.items(): - writer.add_scalar(f'final/{k}', v, cfg.total_timesteps) + log_metrics(writer, final_metrics, 'final', cfg.total_timesteps) writer.close() - train_env.close() - eval_env.close() + train_env.close(); eval_env.close() with open(log_path / "results.json", "w") as f: json.dump(final_metrics, f, indent=2) return {"path": str(log_path), "metrics": final_metrics} -def run_baseline(policy: Policy, vec_env: DummyVecEnv, total_steps: int, writer: SummaryWriter | None): - """Run baseline policy through environment with logging.""" - obs = vec_env.reset() - n_envs = vec_env.num_envs - ep_rewards = np.zeros(n_envs) - all_rewards, coi_buf, alpha_buf = [], [], [] - - for step in range(0, total_steps, n_envs): - actions = np.array([policy.predict(obs[i])[0] for i in range(n_envs)]) - obs, rewards, dones, infos = vec_env.step(actions) - ep_rewards += rewards - for i, info in enumerate(infos): - coi_buf.append(info.get('coi_erosion', 0)) - alpha_buf.append(abs(info.get('alpha_true', 0) - info.get('alpha_est', 0))) - if dones[i]: - all_rewards.append(ep_rewards[i]) - ep_rewards[i] = 0 - if writer and step % 1000 < n_envs and all_rewards: - writer.add_scalar('rollout/ep_rew_mean', np.mean(all_rewards[-20:]), step) - writer.add_scalar('coi/erosion', np.mean(coi_buf[-100:]), step) - writer.add_scalar('alpha/estimation_error', np.mean(alpha_buf[-100:]), step) +def _train_alpha(args: tuple) -> tuple[str, Dict]: + """Worker for parallel sweep - must be top-level for pickling.""" + cfg_dict, alpha = args + cfg_dict["alpha_true"] = alpha + cfg_dict["experiment_name"] = f"{cfg_dict['algo']}_a{alpha:.2f}_{cfg_dict['reward_mode']}" + sweep_cfg = ExperimentConfig(**cfg_dict) + print(f"[alpha={alpha:.2f}] starting") + metrics = train(sweep_cfg)["metrics"] + print(f"[alpha={alpha:.2f}] done") + return f"alpha_{alpha:.2f}", metrics -def run_sweep(cfg: ExperimentConfig, alphas: List[float] | None = None) -> Dict[str, Dict]: - """Run experiment across contamination levels for scientific comparison.""" - alphas = alphas or [0.0, 0.1, 0.2, 0.3, 0.4, 0.5] - results = {} - for alpha in alphas: - sweep_cfg = ExperimentConfig(**{**asdict(cfg), "alpha_true": alpha, - "experiment_name": f"{cfg.algo}_a{alpha:.2f}_{cfg.reward_mode}"}) - print(f"\n=== α={alpha:.2f} ===") - out = train(sweep_cfg) - results[f"alpha_{alpha:.2f}"] = out["metrics"] +def run_sweep(cfg: ExperimentConfig, alphas: List[float] | None = None, max_workers: int | None = None) -> Dict[str, Dict]: + alphas = alphas or [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] + cfg_dict = asdict(cfg) + + if max_workers == 1: # sequential fallback + results = dict(_train_alpha((cfg_dict.copy(), a)) for a in alphas) + else: + with ProcessPoolExecutor(max_workers=max_workers) as pool: + futures = {pool.submit(_train_alpha, (cfg_dict.copy(), a)): a for a in alphas} + results = {} + for fut in as_completed(futures): + key, metrics = fut.result() + results[key] = metrics + summary_path = Path(cfg.log_dir) / f"sweep_{cfg.algo}_{cfg.reward_mode}.json" with open(summary_path, "w") as f: json.dump(results, f, indent=2) @@ -280,23 +269,38 @@ def run_sweep(cfg: ExperimentConfig, alphas: List[float] | None = None) -> Dict[ return results -def compare_policies(cfg: ExperimentConfig, policies: List[str] | None = None) -> Dict[str, Dict]: - """Compare multiple policies at same contamination level.""" +def _train_policy(args: tuple) -> tuple[str, Dict]: + """Worker for parallel policy comparison.""" + cfg_dict, algo = args + cfg_dict["algo"] = algo + cfg_dict["experiment_name"] = f"cmp_{algo}_a{cfg_dict['alpha_true']:.2f}" + cmp_cfg = ExperimentConfig(**cfg_dict) + print(f"[{algo}] starting") + metrics = train(cmp_cfg)["metrics"] + print(f"[{algo}] done") + return algo, metrics + + +def compare_policies(cfg: ExperimentConfig, policies: List[str] | None = None, max_workers: int | None = None) -> Dict[str, Dict]: policies = policies or ["fixed", "adaptive", "myopic", "random"] - results = {} - for algo in policies: - cmp_cfg = ExperimentConfig(**{**asdict(cfg), "algo": algo, - "experiment_name": f"cmp_{algo}_a{cfg.alpha_true:.2f}"}) - print(f"\n=== {algo} ===") - out = train(cmp_cfg) - results[algo] = out["metrics"] + cfg_dict = asdict(cfg) + + if max_workers == 1: + results = dict(_train_policy((cfg_dict.copy(), p)) for p in policies) + else: + with ProcessPoolExecutor(max_workers=max_workers) as pool: + futures = {pool.submit(_train_policy, (cfg_dict.copy(), p)): p for p in policies} + results = {} + for fut in as_completed(futures): + algo, metrics = fut.result() + results[algo] = metrics + cmp_path = Path(cfg.log_dir) / f"compare_a{cfg.alpha_true:.2f}.json" with open(cmp_path, "w") as f: json.dump(results, f, indent=2) print(f"\nComparison saved to {cmp_path}") for algo, m in results.items(): - print(f" {algo:12s}: reward={m['reward_mean']:.2f} coi_erosion={m['coi_erosion_mean']:.4f} " - f"alpha_err={m['alpha_error_mean']:.4f}") + print(f" {algo:12s}: reward={m['reward_mean']:.2f} coi_erosion={m['coi_erosion_mean']:.4f} alpha_err={m['alpha_error_mean']:.4f}") return results @@ -312,6 +316,7 @@ def main(): parser.add_argument("--log-dir", default="lab/case/thesis/runs") parser.add_argument("--sweep", action="store_true", help="run contamination sweep") parser.add_argument("--compare", action="store_true", help="compare all baselines") + parser.add_argument("--workers", type=int, default=None, help="max parallel workers for sweep (None=auto, 1=sequential)") args = parser.parse_args() cfg = ExperimentConfig(algo=args.algo, total_timesteps=args.steps, alpha_true=args.alpha, @@ -319,9 +324,9 @@ def main(): n_envs=args.n_envs, seed=args.seed, log_dir=args.log_dir) if args.sweep: - run_sweep(cfg) + run_sweep(cfg, max_workers=args.workers) elif args.compare: - compare_policies(cfg) + compare_policies(cfg, max_workers=args.workers) else: result = train(cfg) print(f"\nTraining complete: {result['path']}")