diff --git a/sim/rl/engine.py b/sim/rl/engine.py index ab751e3..ec4d871 100644 --- a/sim/rl/engine.py +++ b/sim/rl/engine.py @@ -76,8 +76,7 @@ class WildPricingEngine(BasePricingEngine): def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 - # extract demand signal (from env observation) as proxy for sales - demand = observation.get('demand', np.zeros(self.c.product_catalogue_size, dtype=np.float32)) + demand = _extract_demand(observation, self.c.product_catalogue_size) return self._update_from_demand(current_prices, demand) def _update_from_demand(self, prices: np.ndarray, sold: np.ndarray) -> np.ndarray: @@ -141,7 +140,7 @@ class SimpleDemandEngine(BasePricingEngine): def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 - demand = observation.get('demand', np.zeros(self.c.product_catalogue_size, dtype=np.float32)) + demand = _extract_demand(observation, self.c.product_catalogue_size) if self.prev_demand is None: self.prev_demand = demand.copy() return current_prices.copy() @@ -207,7 +206,7 @@ class ThompsonSamplingEngine(BasePricingEngine): lo = current_prices * 0.7 hi = current_prices * 1.3 self.price_grid = np.linspace(lo, hi, self.n_price_levels).T - demand = observation.get('demand', np.zeros(self.c.product_catalogue_size, dtype=np.float32)) + demand = _extract_demand(observation, self.c.product_catalogue_size) # update beliefs based on last action if self.last_actions is not None: for i in range(self.c.product_catalogue_size): @@ -226,3 +225,14 @@ class ThompsonSamplingEngine(BasePricingEngine): new_prices[i] = self.price_grid[i, actions[i]] self.last_actions = actions return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) + + +def _extract_demand(observation: Dict[str, Any], n: int) -> np.ndarray: + if "elasticity" in observation and isinstance(observation["elasticity"], dict): + d = observation["elasticity"].get("demand") + if d is not None: + return np.asarray(d, dtype=np.float32) + d = observation.get("demand") + if d is not None: + return np.asarray(d, dtype=np.float32) + return np.zeros(n, dtype=np.float32) diff --git a/sim/rl/environment.py b/sim/rl/environment.py index a4cf7c9..94bc8e1 100644 --- a/sim/rl/environment.py +++ b/sim/rl/environment.py @@ -1,715 +1,244 @@ -import gymnasium as gym -from gymnasium import spaces -import numpy as np -from dataclasses import dataclass -from pathlib import Path -import pandas as pd -from types import SimpleNamespace -from typing import Optional, Dict, Any, List, Tuple +from __future__ import annotations -from lib.separability import load_artifacts, score_session, estimate_alpha -from sim.rl.behavior_loader.models import AgentBehaviorModel, BehaviorModel, aggregate_event_transitions +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + +import numpy as np try: - import jax - from sim.rl.jax_core import JAX_AVAILABLE, compile_transitions, fallback_transitions, sample_sessions, compute_metrics - from sim.rl.jax_core import session_features, compute_session_transitions, compute_divergences, estimate_alpha_batch -except ImportError: - JAX_AVAILABLE = False + import gymnasium as gym + from gymnasium import spaces +except ImportError as e: + raise ImportError("sim.rl.environment requires gymnasium") from e -# "learner" agent learning to optimize pricing -# "agent" part of environment creating demand signals that learner processes +from sim.case.thesis_simplified.coi import COIWindow, coi_erosion, compute_coi_window +from sim.case.thesis_simplified.separability import estimate_alpha as estimate_session_alpha +from sim.case.thesis_simplified.simplified import Limbo, Session, put_prices_to_market +from sim.rl.thesis_core import aggregate_demand_by_product, aggregate_purchases, constrain_prices + + +@dataclass(frozen=True) +class BusinessLogicConstraints: + product_catalogue_size: int = 100 + max_steps: int = 2000 + sessions_per_step: int = 250 -@dataclass -class BusinessLogicConstraints(): - max_price_adjustment: float = 0.30 system_max_price: float = 500.0 system_min_price: float = 1.0 - product_catalogue_size: int = 100 - episode_length: int = 2000 - sessions_per_step: int = 250 + max_price_adjustment: float = 0.30 + min_margin_pct: float = 0.05 + agent_share: float = 0.2 - agent_recon_multiplier: float = 6.0 - agent_purchase_probability: float = 0.20 + alpha_drift: float = 0.0 + alpha_bounds: tuple[float, float] = (0.0, 0.8) + coi_strength: float = 0.25 - coi_threshold: float = 4.0 - coi_sigmoid_temp: float = 1.25 - base_human_demand: float = 0.08 - base_agent_demand: float = 0.05 - human_price_elasticity: float = -1.2 # assumptions here - agent_price_elasticity: float = -0.6 - w_agent_loss: float = 1.0 w_volatility: float = 5.0 w_estimation_error: float = 0.25 + seed: int = 7 - human_data_dir: str | None = None - agent_data_dir: str | None = None -def _resolve_behavior_data_dirs(constraints: BusinessLogicConstraints) -> tuple[str, str]: - base = Path(__file__).resolve().parents[2] / "experiments" - human_default = str(base / "collected_data") - agent_default = str(base / "agents" / "collected_data") - human = constraints.human_data_dir or human_default - agent = constraints.agent_data_dir or agent_default - return human, agent - - -def _sigmoid(x: np.ndarray) -> np.ndarray: - return 1.0 / (1.0 + np.exp(-x)) - -EVENT_PAGE_MAP = { - "session_start": "/", - "page_view": "/", - "view_item_page": "/products", - "learn_more_about_item": "/products/details", - "add_item_to_cart": "/cart", - "checkout_start": "/checkout", - "purchase_complete": "/checkout", - "session_end": "/checkout/success", -} - -# map real collected event names to canonical simulation states -EVENT_CANONICAL_MAP = { - "page_view": "session_start", - "hover_over_paragraph": "view_item_page", - "hover_over_title": "view_item_page", - "view_item_page": "view_item_page", - "learn_more_about_item": "learn_more_about_item", - "add_item_to_cart": "add_item_to_cart", - "checkout_start": "purchase_complete", - "remove_item": "view_item_page", -} - - -def _canonicalize_transitions(raw_trans: Dict[str, Dict[str, float]]) -> Dict[str, Dict[str, float]]: - """Map real event transition names to canonical simulation states.""" - canonical: Dict[str, Dict[str, float]] = {} - for src, dsts in raw_trans.items(): - src_canon = EVENT_CANONICAL_MAP.get(src, src) - if src_canon not in canonical: - canonical[src_canon] = {} - for dst, prob in dsts.items(): - dst_canon = EVENT_CANONICAL_MAP.get(dst, dst) - canonical[src_canon][dst_canon] = canonical[src_canon].get(dst_canon, 0.0) + prob - # re-normalize after aggregation - for src in canonical: - total = sum(canonical[src].values()) - if total > 0: - canonical[src] = {k: v / total for k, v in canonical[src].items()} - return canonical - - -class BehavioralProfile: - """Synthetic Markov profile used to generate interaction sessions. - Uses aggregate_event_transitions from models.py to build transition kernels from real data.""" - - def __init__(self, actor: str, purchase_probs: np.ndarray, *, human_data_dir: str, agent_data_dir: str): - self.actor = actor - self.purchase_probs = np.clip(purchase_probs, 0.0, 0.95) - self.states = [ - "session_start", - "view_item_page", - "learn_more_about_item", - "add_item_to_cart", - "purchase_complete", - "session_end", - ] - model = AgentBehaviorModel(agent_data_dir) if actor == "agents" else BehaviorModel(human_data_dir) - mdp = model.build_MDP() - raw_trans = aggregate_event_transitions(mdp) if mdp.get("transitions") else {} - self.transitions = _canonicalize_transitions(raw_trans) if raw_trans else self._fallback_transitions() - self._ensure_terminal_states() - self.dwell_params = self._extract_dwell_params(mdp) - - def _ensure_terminal_states(self): - # guarantee purchase_complete leads to session_end and session_start exists - if "purchase_complete" not in self.transitions: - self.transitions["purchase_complete"] = {"session_end": 1.0} - elif "session_end" not in self.transitions.get("purchase_complete", {}): - self.transitions["purchase_complete"]["session_end"] = 1.0 - total = sum(self.transitions["purchase_complete"].values()) - self.transitions["purchase_complete"] = {k: v/total for k, v in self.transitions["purchase_complete"].items()} - if "session_start" not in self.transitions: - self.transitions["session_start"] = {"view_item_page": 0.7, "learn_more_about_item": 0.2, "session_end": 0.1} - - def _fallback_transitions(self) -> Dict[str, Dict[str, float]]: - return { - "session_start": {"view_item_page": 0.85, "session_end": 0.15}, - "view_item_page": {"learn_more_about_item": 0.4, "add_item_to_cart": 0.3, "view_item_page": 0.2, "session_end": 0.1}, - "learn_more_about_item": {"add_item_to_cart": 0.5, "view_item_page": 0.3, "session_end": 0.2}, - "add_item_to_cart": {"purchase_complete": 0.6, "view_item_page": 0.25, "session_end": 0.15}, - "purchase_complete": {"session_end": 1.0}, - } - - def _extract_dwell_params(self, mdp: Dict) -> Dict[str, Tuple[float, float]]: - state_vals = mdp.get("state_values", {}) - params = {} - for state in self.states: - # try canonical and raw state names - val = state_vals.get(state, 0.5) - for raw, canon in EVENT_CANONICAL_MAP.items(): - if canon == state and raw in state_vals: - val = state_vals[raw] - break - shape = 1.5 + val * 2.0 - scale = 0.8 + (1.0 - val) * 1.2 - params[state] = (shape, scale) - return params - - def _transition_probs(self, state: str, product_idx: int) -> Dict[str, float]: - probs = dict(self.transitions.get(state, {"session_end": 1.0})) - if state == "add_item_to_cart": - base = probs.get("purchase_complete", 0.0) - demand_factor = float(self.purchase_probs[int(product_idx)]) - if self.actor == "agents": - demand_factor *= 0.7 - adjusted = np.clip(base * 0.5 + demand_factor * 0.5, 0.0, 0.95) - remainder = max(1e-6, 1.0 - adjusted) - other_total = sum(v for k, v in probs.items() if k != "purchase_complete") - scale = remainder / max(other_total, 1e-6) - for key in probs: - if key == "purchase_complete": - probs[key] = adjusted - else: - probs[key] = probs[key] * scale - total = sum(probs.values()) - if total <= 0: - return {"session_end": 1.0} - return {state: val / total for state, val in probs.items()} - - def sample_session( - self, - rng: np.random.Generator, - session_id: str, - prices: np.ndarray, - unit_cost: np.ndarray, - ) -> Tuple[List[Dict[str, Any]], List[SimpleNamespace]]: - """Generate a single session trajectory respecting business constraints.""" - events: List[Dict[str, Any]] = [] - feature_events: List[SimpleNamespace] = [] - state = "session_start" - t = 0.0 - product_idx = int(rng.integers(0, len(prices))) - product_id = f"product-{product_idx:04d}" - - - # enforce price >= cost constraint (lipschitz bound on pricing) - # This is a sort of last resort to not let an pricing learner go rogue - cost = float(unit_cost[product_idx]) - constrained_price = max(float(prices[product_idx]), cost * 1.05) # 5% min margin - - while state != "session_end" and len(events) < 40: - if state != "session_start": - row = { - "session_id": session_id, - "actor": "agent" if self.actor == "agents" else "human", - "eventName": state, - "product_idx": product_idx, - "productId": product_id, - "price_offered": constrained_price, - "price_paid": 0.0, - "page": EVENT_PAGE_MAP.get(state, "/"), - "ts": t, - "unit_cost": cost, - "base_price": float(prices[product_idx]), - } - if state == "purchase_complete": - noise = float(rng.normal(0.0, 0.015)) - row["price_paid"] = max(constrained_price * (1.0 + noise), cost) - events.append(row) - feature_events.append( - SimpleNamespace( - eventName=row["eventName"], - page=row["page"], - productId=row["productId"], - ts=row["ts"], - ) - ) - - transitions = self._transition_probs(state, product_idx) - next_state = rng.choice(list(transitions.keys()), p=list(transitions.values())) - shape, scale = self.dwell_params.get(state, (2.0, 1.0)) - dwell = max(0.3, rng.gamma(shape=shape, scale=scale)) - t += dwell - state = next_state - - return events, feature_events - - -def _load_behavioral_profile( - actor: str, - demand_forcing: np.ndarray, - *, - human_data_dir: str, - agent_data_dir: str, -) -> BehavioralProfile: - """returns a behavioral profile for generating synthetic sessions - actor: 'humans' or 'agents' - demand_forcing: per-product purchase probabilities used to weight interactions - """ - return BehavioralProfile(actor, demand_forcing, human_data_dir=human_data_dir, agent_data_dir=agent_data_dir) - - -class CommercePlatform: - """state management for the environment, simulates demand""" - def __init__(self, product_catalogue_size: int, max_price: float, min_price: float, constraints: BusinessLogicConstraints): - self.product_catalogue_size = product_catalogue_size - self.max_price = max_price - self.min_price = min_price - self.constraints = constraints - self.simulation_history: List[Dict[str, Any]] = [] - self._rng = np.random.default_rng(constraints.seed) - self._last_interaction_df: pd.DataFrame = pd.DataFrame() - self.unit_cost = np.random.uniform(low=15.0, high=60.0, size=(self.product_catalogue_size,)).astype(np.float32) - self.base_price = np.random.uniform(low=60.0, high=140.0, size=(self.product_catalogue_size,)).astype(np.float32) - self.alpha_hat = constraints.agent_share - self._human_data_dir, self._agent_data_dir = _resolve_behavior_data_dirs(constraints) - try: - self.separability_artifacts = load_artifacts() - except FileNotFoundError: - self.separability_artifacts = None - - def setup_true_demand(self, prices: np.ndarray) -> Dict[str, np.ndarray]: - p = np.clip(prices, self.min_price, self.max_price) - cost = np.clip(self.unit_cost, self.min_price * 0.2, self.max_price) - margin = np.clip((p - cost) / np.maximum(cost, 1e-3), -0.9, 2.0) - # isoelastic demand approximation - human_prob = self.constraints.base_human_demand * np.exp(self.constraints.human_price_elasticity * margin) - agent_prob = self.constraints.base_agent_demand * np.exp(self.constraints.agent_price_elasticity * margin) - return { - "human_purchase_prob": np.clip(human_prob, 0.0, 0.95), - "agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95), - } - - def _simulate_sessions(self, prices: np.ndarray) -> Tuple[pd.DataFrame, Dict[str, Any]]: - demand = self.setup_true_demand(prices) - T = self.constraints.sessions_per_step - effective_share = float(np.clip(self.alpha_hat, 0.0, 0.95)) - n_agent_sessions = max(1, int(round(T * effective_share))) - n_human_sessions = max(1, T - n_agent_sessions) - - session_map = { - "humans": n_human_sessions, - "agents": n_agent_sessions, - } - pprob_map = { - "humans": demand["human_purchase_prob"], - "agents": demand["agent_purchase_prob"], - } - - rows: List[Dict[str, Any]] = [] - session_scores: List[Dict[str, float]] = [] - demand_human = np.zeros_like(prices, dtype=np.float32) - demand_agent = np.zeros_like(prices, dtype=np.float32) - - for actor, n_sessions in session_map.items(): - profile = _load_behavioral_profile( - actor, - pprob_map[actor], - human_data_dir=self._human_data_dir, - agent_data_dir=self._agent_data_dir, - ) - for idx in range(n_sessions): - session_id = f"{actor}_{idx:06d}" - session_rows, feature_events = profile.sample_session( - self._rng, session_id, prices, self.unit_cost - ) - rows.extend(session_rows) - if session_rows: - df_session = pd.DataFrame(session_rows) - purchases = df_session[df_session["eventName"] == "purchase_complete"] - if not purchases.empty: - counts = purchases.groupby("product_idx").size() - if actor == "agents": - demand_agent[counts.index.to_numpy(dtype=int)] += counts.to_numpy(dtype=np.float32) - else: - demand_human[counts.index.to_numpy(dtype=int)] += counts.to_numpy(dtype=np.float32) - if self.separability_artifacts and feature_events: - score = score_session(feature_events, self.separability_artifacts) - session_scores.append(score) - - interactions_df = pd.DataFrame(rows) - diagnostics = { - "alpha_hat": float(self.alpha_hat), - "session_scores": session_scores, - "demand_human": demand_human, - "demand_agent": demand_agent, - } - - if session_scores: - alphas = [ - estimate_alpha(s["prob_agent"], s["delta_h"], s["delta_a"], temperature=2.0) - for s in session_scores - ] - mean_alpha = float(np.mean(alphas)) - # exponential moving average for stability - self.alpha_hat = 0.7 * self.alpha_hat + 0.3 * mean_alpha - diagnostics.update( - { - "alpha_hat": float(self.alpha_hat), - "delta_h_mean": float(np.mean([s["delta_h"] for s in session_scores])), - "delta_a_mean": float(np.mean([s["delta_a"] for s in session_scores])), - "prob_agent_mean": float(np.mean([s["prob_agent"] for s in session_scores])), - } - ) - - self._last_interaction_df = interactions_df - return interactions_df, diagnostics - - def compute_interaction_features(self, interaction_df: pd.DataFrame) -> Dict[str, float]: - if interaction_df.empty: - return { - "revenue_observed": 0.0, - "revenue_oracle": 0.0, - "agent_loss": 0.0, - "true_human_purchases": 0.0, - "true_agent_purchases": 0.0, - "mean_sale_price": 0.0, - "look_to_book": 0.0, - "coi": 0.0, - "expected_premium": 0.0, - } - - purchases = interaction_df[interaction_df["eventName"] == "purchase_complete"] - human_purchases = purchases[purchases["actor"] == "human"] - agent_purchases = purchases[purchases["actor"] == "agent"] - - revenue_observed = float(purchases["price_paid"].sum()) - revenue_oracle = float(purchases["base_price"].sum()) - agent_loss = float((agent_purchases["base_price"] - agent_purchases["price_paid"]).sum()) - - mean_sale_price = float(purchases["price_paid"].mean()) if not purchases.empty else 0.0 - views = float((interaction_df["eventName"] == "view_item_page").sum()) - look_to_book = float(views / (len(purchases) + 1e-6)) - true_human = float(len(human_purchases)) - true_agent = float(len(agent_purchases)) - - human_prices = human_purchases["price_offered"] if not human_purchases.empty else pd.Series(dtype=float) - human_costs = human_purchases["unit_cost"] if not human_purchases.empty else pd.Series(dtype=float) - human_base = human_purchases["base_price"] if not human_purchases.empty else pd.Series(dtype=float) - coi = 0.0 - if not human_prices.empty and not human_costs.empty: - # COI = E[P] - p_min where p_min is cost, accounting for expected premium (base - realized) - margin = human_prices.mean() - human_costs.mean() - expected_premium = human_base.mean() - human_prices.mean() if not human_base.empty else 0.0 - coi = float(np.maximum(0.0, margin - expected_premium * 0.5)) - - return { - "revenue_observed": revenue_observed, - "revenue_oracle": revenue_oracle, - "agent_loss": agent_loss, - "true_human_purchases": true_human, - "true_agent_purchases": true_agent, - "mean_sale_price": mean_sale_price, - "look_to_book": look_to_book, - "coi": coi, - "expected_premium": float(expected_premium) if not human_base.empty else 0.0, - } - - def _session_feature_table(self, df: pd.DataFrame) -> pd.DataFrame: - """Extract per-session behavioral features for separability analysis.""" - if df.empty: - return pd.DataFrame() - g = df.groupby("session_id", sort=False) - session_duration = g["ts"].max() - g["ts"].min() - total_interactions = g.size() - avg_time_between = g["ts"].apply(lambda x: float(np.diff(np.sort(x.to_numpy())).mean()) if len(x) > 1 else 0.0) - interaction_velocity = total_interactions / (session_duration + 1e-6) - views = g.apply(lambda x: int((x["eventName"] == "view_item_page").sum()), include_groups=False) - cart_adds = g.apply(lambda x: int((x["eventName"] == "add_item_to_cart").sum()), include_groups=False) - purchases = g.apply(lambda x: int((x["eventName"] == "purchase_complete").sum()), include_groups=False) - learn_more = g.apply(lambda x: int((x["eventName"] == "learn_more_about_item").sum()), include_groups=False) - conversion_rate = purchases / (views + 1e-6) - is_agent = g["actor"].apply(lambda s: bool((s == "agent").any()), include_groups=False) - # price sensitivity features - price_variance = g["price_offered"].var().fillna(0.0) - avg_price_seen = g["price_offered"].mean().fillna(0.0) - products_viewed = g["product_idx"].nunique() - - return pd.DataFrame({ - "session_duration_sec": session_duration.astype(float), - "avg_time_between_events": avg_time_between.astype(float), - "total_interactions": total_interactions.astype(int), - "interaction_velocity": interaction_velocity.astype(float), - "item_views": views.astype(int), - "cart_adds": cart_adds.astype(int), - "purchases": purchases.astype(int), - "learn_more_clicks": learn_more.astype(int), - "conversion_rate": conversion_rate.astype(float), - "price_variance": price_variance.astype(float), - "avg_price_seen": avg_price_seen.astype(float), - "products_viewed": products_viewed.astype(int), - "is_agent": is_agent.astype(bool), - }).reset_index() - - def get_interaction_data(self) -> np.ndarray: - if self._last_interaction_df.empty: - return np.array([], dtype=object) - return self._last_interaction_df.to_dict(orient="records") +def make_env(constraints: Optional[BusinessLogicConstraints] = None) -> "PHANTOMEnv": + return PHANTOMEnv(constraints=constraints or BusinessLogicConstraints()) class PHANTOMEnv(gym.Env): - metadata = {"render_modes": []} + metadata = {"render_modes": ["human", "ansi"]} - def __init__(self, constraints: Optional[BusinessLogicConstraints] = None, use_jax: bool = True): + def __init__(self, constraints: Optional[BusinessLogicConstraints] = None): super().__init__() - self.constraints = constraints if isinstance(constraints, BusinessLogicConstraints) else BusinessLogicConstraints() - self.use_jax = use_jax and JAX_AVAILABLE - self.action_space = spaces.Box(low=-self.constraints.max_price_adjustment, - high=self.constraints.max_price_adjustment, - shape=(self.constraints.product_catalogue_size,), dtype=np.float32) - n_products = self.constraints.product_catalogue_size - self.observation_space = spaces.Dict({ - "elasticity": spaces.Dict({ - "price": spaces.Box( - low=np.full((n_products,), self.constraints.system_min_price, dtype=np.float32), - high=np.full((n_products,), self.constraints.system_max_price, dtype=np.float32), - dtype=np.float32), - "demand": spaces.Box( - low=np.zeros((n_products,), dtype=np.float32), - high=np.full((n_products,), 1e6, dtype=np.float32), - dtype=np.float32), - }), - "market": spaces.Dict({ - "alpha_hat": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), - "revenue_rate": spaces.Box(low=0.0, high=1e6, shape=(1,), dtype=np.float32), - "conversion_rate": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), - "price_volatility": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), - }), - "cost": spaces.Box(low=0.0, high=self.constraints.system_max_price, shape=(n_products,), dtype=np.float32), - }) - self.commerce_platform = CommercePlatform( - product_catalogue_size=self.constraints.product_catalogue_size, - max_price=self.constraints.system_max_price, - min_price=self.constraints.system_min_price, - constraints=self.constraints) - self._rng = np.random.default_rng(self.constraints.seed) - self.t = 0 - self._prev_prices: Optional[np.ndarray] = None - self.state: Dict[str, Any] = {} - self._jax_key = None - self._jax_trans = None - if self.use_jax: - self._jax_key = jax.random.PRNGKey(self.constraints.seed) - self._init_jax_transitions() + self.c = constraints or BusinessLogicConstraints() + self.n = int(self.c.product_catalogue_size) - def _init_jax_transitions(self): - try: - human_dir, agent_dir = _resolve_behavior_data_dirs(self.constraints) - human_profile = _load_behavioral_profile( - "humans", - np.ones(self.constraints.product_catalogue_size) * 0.1, - human_data_dir=human_dir, - agent_data_dir=agent_dir, - ) - agent_profile = _load_behavioral_profile( - "agents", - np.ones(self.constraints.product_catalogue_size) * 0.1, - human_data_dir=human_dir, - agent_data_dir=agent_dir, - ) - self._jax_trans = compile_transitions(human_profile, agent_profile).to_jax() - except Exception: - self._jax_trans = fallback_transitions().to_jax() + self._rng = np.random.default_rng(self.c.seed) + self._t = 0 + self._alpha_true = float(self.c.agent_share) + self._alpha_hat = float(self.c.agent_share) + self._costs = np.zeros(self.n, dtype=np.float32) + self._refs = np.zeros(self.n, dtype=np.float32) + self._prices: Optional[np.ndarray] = None + self._last_sessions: list[Session] = [] + self._last_coi: COIWindow | None = None + self._limbo = Limbo() + + self.action_space = spaces.Box( + low=np.full((self.n,), self.c.system_min_price, dtype=np.float32), + high=np.full((self.n,), self.c.system_max_price, dtype=np.float32), + dtype=np.float32, + ) + self.observation_space = spaces.Dict( + { + "elasticity": spaces.Dict( + { + "price": spaces.Box( + low=np.full((self.n,), self.c.system_min_price, dtype=np.float32), + high=np.full((self.n,), self.c.system_max_price, dtype=np.float32), + dtype=np.float32, + ), + "demand": spaces.Box( + low=np.zeros((self.n,), dtype=np.float32), + high=np.full((self.n,), 1e9, dtype=np.float32), + dtype=np.float32, + ), + } + ), + "market": spaces.Dict( + { + "alpha_hat": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), + "revenue_rate": spaces.Box(low=0.0, high=1e12, shape=(1,), dtype=np.float32), + "conversion_rate": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), + "price_volatility": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), + } + ), + "cost": spaces.Box( + low=np.zeros((self.n,), dtype=np.float32), + high=np.full((self.n,), self.c.system_max_price, dtype=np.float32), + dtype=np.float32, + ), + } + ) + + def _reset_catalogue(self) -> None: + self._costs = self._rng.uniform(15.0, 60.0, size=self.n).astype(np.float32) + margins = self._rng.uniform(0.2, 0.6, size=self.n).astype(np.float32) + self._refs = (self._costs * (1.0 + margins)).astype(np.float32) + self._prices = self._refs.copy() + + def _observe_market( + self, prices: np.ndarray + ) -> tuple[list[Session], Dict[str, float], np.ndarray, np.ndarray, float, float, int]: + sessions, demand_map = put_prices_to_market( + prices, + costs=self._costs, + alpha=self._alpha_true, + n_sessions=int(self.c.sessions_per_step), + seed=int(self._rng.integers(0, 2**31 - 1)), + ) + demand_by_product = aggregate_demand_by_product(sessions, demand_map, self.n) + purchases, revenue, cost, n_agents = aggregate_purchases(sessions, self._costs, self.n) + conversion = float(np.sum(purchases) / max(len(sessions), 1)) + return sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents + + def _update_alpha_hat(self, sessions: list[Session]) -> float: + scores = [estimate_session_alpha(s) for s in sessions if s.events] + if not scores: + return self._alpha_hat + alpha_step = float(np.mean(scores)) + self._alpha_hat = 0.8 * self._alpha_hat + 0.2 * alpha_step + self._alpha_hat = float(np.clip(self._alpha_hat, 0.0, 1.0)) + return self._alpha_hat + + def _reward(self, prices: np.ndarray, revenue: float, cost: float, volatility: float) -> float: + profit = float(revenue - cost) + coi_leak = float(self._last_coi.leak) if self._last_coi else 0.0 + alpha_err = abs(self._alpha_hat - self._alpha_true) + return profit - self.c.coi_strength * coi_leak - self.c.w_volatility * volatility - self.c.w_estimation_error * alpha_err + + def _build_obs( + self, + prices: np.ndarray, + demand_by_product: np.ndarray, + revenue: float, + conversion: float, + volatility: float, + ) -> Dict[str, Any]: + return { + "elasticity": {"price": prices.astype(np.float32), "demand": demand_by_product.astype(np.float32)}, + "market": { + "alpha_hat": np.array([self._alpha_hat], dtype=np.float32), + "revenue_rate": np.array([revenue], dtype=np.float32), + "conversion_rate": np.array([conversion], dtype=np.float32), + "price_volatility": np.array([volatility], dtype=np.float32), + }, + "cost": self._costs.astype(np.float32), + } def reset(self, seed: Optional[int] = None, options: Optional[dict] = None): super().reset(seed=seed) if seed is not None: self._rng = np.random.default_rng(seed) - self.commerce_platform._rng = np.random.default_rng(seed) - if self.use_jax: - self._jax_key = jax.random.PRNGKey(seed) - self.commerce_platform.alpha_hat = self.constraints.agent_share - self.t = 0 - init_prices = self._rng.uniform( - low=60.0, - high=140.0, - size=(self.constraints.product_catalogue_size,), - ).astype(np.float32) - self.commerce_platform.unit_cost = self._rng.uniform( - low=15.0, - high=60.0, - size=(self.constraints.product_catalogue_size,), - ).astype(np.float32) - self.commerce_platform.base_price = init_prices.copy() - self._prev_prices = init_prices.copy() - self.state = { - "elasticity": { - "price": init_prices, - "demand": np.zeros((self.constraints.product_catalogue_size,), dtype=np.float32), - }, - "market": { - "alpha_hat": np.array([self.constraints.agent_share], dtype=np.float32), - "revenue_rate": np.array([0.0], dtype=np.float32), - "conversion_rate": np.array([0.0], dtype=np.float32), - "price_volatility": np.array([0.0], dtype=np.float32), - }, - "cost": self.commerce_platform.unit_cost.astype(np.float32), - } - return self.state, {} + self._t = 0 + self._alpha_true = float(np.clip(self.c.agent_share, *self.c.alpha_bounds)) + self._alpha_hat = float(self.c.agent_share) + self._reset_catalogue() + self._limbo = Limbo() + self._last_sessions = [] + self._last_coi = None - def _step_jax(self, new_prices: np.ndarray) -> Tuple[Dict, Dict]: - self._jax_key, subkey = jax.random.split(self._jax_key) - alpha = float(np.clip(self.commerce_platform.alpha_hat, 0.0, 0.95)) - n_agent = max(1, int(self.constraints.sessions_per_step * alpha)) - n_human = max(1, self.constraints.sessions_per_step - n_agent) - batch = sample_sessions(subkey, self._jax_trans, n_human, n_agent, len(new_prices)) - sim = compute_metrics(batch, new_prices, self.commerce_platform.unit_cost, self.commerce_platform.base_price) - result = {"revenue_observed": sim.revenue, "revenue_oracle": sim.revenue_oracle, - "agent_loss": sim.agent_loss, "coi": sim.coi, "look_to_book": sim.look_to_book, - "mean_sale_price": sim.mean_sale_price, "true_human_purchases": sim.n_human_purchases, - "true_agent_purchases": sim.n_agent_purchases} - diagnostics = {"demand_human": sim.demand_human, "demand_agent": sim.demand_agent, "alpha_hat": alpha} - return result, diagnostics + prices = self._prices if self._prices is not None else np.zeros(self.n, dtype=np.float32) + obs = self._build_obs(prices, np.zeros(self.n, dtype=np.float32), 0.0, 0.0, 0.0) + return obs, {"alpha_true": self._alpha_true} - def step(self, action: np.ndarray): - self.t += 1 - base_prices = self.state["elasticity"]["price"].astype(np.float32) - new_prices = np.clip(base_prices * (1.0 + action.astype(np.float32)), - self.constraints.system_min_price, - self.constraints.system_max_price).astype(np.float32) + def step(self, action: np.ndarray) -> Tuple[Dict[str, Any], float, bool, bool, Dict[str, Any]]: + if self._prices is None: + raise RuntimeError("reset() must be called before step()") - self.state["elasticity"]["price"] = new_prices - if self.use_jax: - result, diagnostics = self._step_jax(new_prices) - else: - interactions_df, diagnostics = self.commerce_platform._simulate_sessions(new_prices) - result = self.commerce_platform.compute_interaction_features(interactions_df) - COI = float(result.get("coi", 0.0)) - - demand_vector = diagnostics.get("demand_human", np.zeros_like(new_prices)) + diagnostics.get( - "demand_agent", np.zeros_like(new_prices) + prev = self._prices + prices = constrain_prices( + prev, + np.asarray(action, dtype=np.float32), + costs=self._costs, + min_price=float(self.c.system_min_price), + max_price=float(self.c.system_max_price), + max_adjustment=float(self.c.max_price_adjustment), + min_margin_pct=float(self.c.min_margin_pct), ) - self.state["elasticity"]["demand"] = demand_vector.astype(np.float32) + self._prices = prices + self._limbo.add_update("prices", prices) - volatility = 0.0 if self._prev_prices is None else \ - float(np.mean(np.abs((new_prices - self._prev_prices) / (self._prev_prices + 1e-6)))) - self._prev_prices = new_prices.copy() + sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents = self._observe_market(prices) + self._last_sessions = sessions + self._limbo.add_update("demand", demand_map) - # update market observation features - total_demand = float(np.sum(demand_vector)) - total_purchases = float(result.get("true_human_purchases", 0.0) + result.get("true_agent_purchases", 0.0)) - conv_rate = total_purchases / max(total_demand, 1.0) - self.state["market"] = { - "alpha_hat": np.array([float(diagnostics.get("alpha_hat", self.commerce_platform.alpha_hat))], dtype=np.float32), - "revenue_rate": np.array([float(result.get("revenue_observed", 0.0))], dtype=np.float32), - "conversion_rate": np.array([float(np.clip(conv_rate, 0.0, 1.0))], dtype=np.float32), - "price_volatility": np.array([float(volatility)], dtype=np.float32), - } - self.state["cost"] = self.commerce_platform.unit_cost.astype(np.float32) + self._update_alpha_hat(self._last_sessions) + self._last_coi = compute_coi_window(self._last_sessions, self._costs, demand_mapping=demand_map) - # extract metrics with safe defaults for incomplete simulation - revenue_observed = float(result.get("revenue_observed", 0.0)) - agent_loss = float(result.get("agent_loss", 0.0)) + self._alpha_true = float(np.clip(self._alpha_true + self.c.alpha_drift, *self.c.alpha_bounds)) + volatility = float(np.std((prices - prev) / (prev + 1e-6))) + reward = float(self._reward(prices, revenue, cost, volatility)) + conversion = float(np.sum(purchases) / max(len(self._last_sessions), 1)) - reward = (revenue_observed - - COI - - self.constraints.w_agent_loss * agent_loss - - self.constraints.w_volatility * volatility - - self.constraints.w_estimation_error) + self._t += 1 + terminated = self._t >= int(self.c.max_steps) - terminated = self.t >= self.constraints.episode_length + obs = self._build_obs(prices, demand_by_product, revenue, conversion, min(volatility, 1.0)) info = { - "t": self.t, - "revenue_observed": revenue_observed, - "revenue_oracle": float(result.get("revenue_oracle", revenue_observed)), - "agent_loss": agent_loss, - "ux_volatility": volatility, - "look_to_book": float(result.get("look_to_book", 0.0)), - "mean_sale_price": float(result.get("mean_sale_price", 0.0)), - "true_human_purchases_total": float(result.get("true_human_purchases", 0.0)), - "true_agent_purchases_total": float(result.get("true_agent_purchases", 0.0)), - "coi": COI, - "alpha_hat": diagnostics.get("alpha_hat", self.commerce_platform.alpha_hat), - "mean_human_demand": float(np.mean(diagnostics.get("demand_human", np.zeros_like(new_prices)))), - "mean_agent_demand": float(np.mean(diagnostics.get("demand_agent", np.zeros_like(new_prices)))), + "step": self._t, + "reward": reward, + "revenue": float(revenue), + "profit": float(revenue - cost), + "n_sessions": int(self.c.sessions_per_step), + "n_agents": int(n_agents), + "alpha_true": float(self._alpha_true), + "alpha_hat": float(self._alpha_hat), + "alpha_error": float(abs(self._alpha_hat - self._alpha_true)), + "price_std": float(np.std(prices)), + "price_volatility": float(volatility), } - if "delta_h_mean" in diagnostics: + if self._last_coi is not None: info.update( { - "delta_h_mean": diagnostics["delta_h_mean"], - "delta_a_mean": diagnostics["delta_a_mean"], - "prob_agent_mean": diagnostics["prob_agent_mean"], + "coi_policy": float(self._last_coi.policy), + "coi_agent": float(self._last_coi.agent), + "coi_leakage": float(self._last_coi.leak), + "coi_survival": float(self._last_coi.survival_ratio), + "coi_erosion": float(coi_erosion(self._last_coi.policy, self._last_coi.agent)), } ) - return self.state, float(reward), terminated, False, info + return obs, reward, terminated, False, info + def render(self, mode: str = "human") -> str | None: + if self._prices is None: + return None + out = ( + f"t={self._t}/{self.c.max_steps} " + f"alpha_true={self._alpha_true:.3f} alpha_hat={self._alpha_hat:.3f} " + f"price_std={float(np.std(self._prices)):.2f}" + ) + if mode == "human": + print(out) + return out -if __name__ == "__main__": - import matplotlib.pyplot as plt - from collections import defaultdict - - env = PHANTOMEnv(constraints=BusinessLogicConstraints()) - obs, _ = env.reset(seed=42) - metrics = defaultdict(list) - total_reward = 0.0 - done = False - - while not done: - action = env.action_space.sample() - obs, reward, done, _, info = env.step(action) - total_reward += reward - p_mean = float(np.mean(obs["elasticity"]["price"])) - q_mean = float(np.mean(obs["elasticity"]["demand"])) - p_std = float(np.std(obs["elasticity"]["price"])) - - metrics['t'].append(info['t']) - metrics['price_mean'].append(p_mean) - metrics['price_std'].append(p_std) - metrics['demand_mean'].append(q_mean) - metrics['revenue_observed'].append(info['revenue_observed']) - metrics['revenue_oracle'].append(info['revenue_oracle']) - metrics['agent_loss'].append(info['agent_loss']) - metrics['ux_volatility'].append(info['ux_volatility']) - metrics['look_to_book'].append(info['look_to_book']) - metrics['reward'].append(reward) - metrics['human_purchases'].append(info['true_human_purchases_total']) - metrics['agent_purchases'].append(info['true_agent_purchases_total']) - metrics['coi'].append(info.get('coi', 0.0)) - metrics['alpha_hat'].append(info.get('alpha_hat', env.commerce_platform.alpha_hat)) - metrics['mean_human_demand'].append(info.get('mean_human_demand', 0.0)) - metrics['mean_agent_demand'].append(info.get('mean_agent_demand', 0.0)) - metrics['delta_h_mean'].append(info.get('delta_h_mean', 0.0)) - metrics['delta_a_mean'].append(info.get('delta_a_mean', 0.0)) - metrics['prob_agent_mean'].append(info.get('prob_agent_mean', 0.0)) - - if info['t'] % 20 == 0 or done: - print(f"t={info['t']:03d} p={p_mean:6.2f}±{p_std:4.2f} q={q_mean:6.2f} " - f"rev={info['revenue_observed']:7.2f} oracle={info['revenue_oracle']:7.2f} " - f"loss={info['agent_loss']:6.2f} ux={info['ux_volatility']:.3f} " - f"coi={info.get('coi', 0.0):6.2f} alpha={info.get('alpha_hat', 0.0):4.2f} " - f"ltb={info['look_to_book']:5.2f} r={reward:7.2f}") - - print(f"total_reward={total_reward:.2f}") - - fig, axes = plt.subplots(3, 4, figsize=(18, 12)) - fig.suptitle('PHANTOM Environment Run', fontsize=14, fontweight='bold') - - plot_configs = [ - ('price_mean', 'Mean Price', 'Price'), - ('demand_mean', 'Mean Demand (All)', 'Demand'), - ('mean_human_demand', 'Mean Human Demand', 'Count'), - ('mean_agent_demand', 'Mean Agent Demand', 'Count'), - ('revenue_observed', 'Revenue (Observed)', 'Revenue'), - ('agent_loss', 'Agent Loss (Oracle - Observed)', 'Loss'), - ('coi', 'Cost of Information', 'COI'), - ('alpha_hat', 'Estimated α̂', 'alpha'), - ('ux_volatility', 'UX Volatility (Price Change)', 'Volatility'), - ('look_to_book', 'Look-to-Book Ratio', 'Ratio'), - ('reward', 'Step Reward', 'Reward'), - ('prob_agent_mean', 'Avg Agent Probability', 'Probability'), - ] - - for idx, (key, title, ylabel) in enumerate(plot_configs): - ax = axes[idx // 4, idx % 4] - ax.plot(metrics['t'], metrics[key], color='blue', alpha=0.7, linewidth=1.5) - ax.set_xlabel('Step') - ax.set_ylabel(ylabel) - ax.set_title(title, fontsize=10, fontweight='bold') - ax.grid(True, alpha=0.3) - - plt.tight_layout() - plt.savefig('phantom_env_comparison.png', dpi=150, bbox_inches='tight') - print("Plot saved to phantom_env_comparison.png") - plt.show() + def close(self) -> None: + return