from sys import intern import gymnasium as gym from gymnasium import spaces from matplotlib import interactive import numpy as np from dataclasses import dataclass import pandas as pd from typing import Callable, Optional, Dict, Any, List # "learner" agent learning to optimize pricing # "agent" part of environment creating demand signals that learner processes @dataclass class BusinessLogicConstraints(): max_price_adjustment: float = 0.30 system_max_price: float = 500.0 system_min_price: float = 1.0 product_catelogue_size: int = 100 episode_length: int = 200 sessions_per_step: int = 250 agent_share: float = 0.25 agent_recon_multiplier: float = 6.0 agent_purchase_probability: float = 0.20 coi_strength: float = 0.25 coi_threshold: float = 4.0 coi_sigmoid_temp: float = 1.25 base_human_demand: float = 0.08 base_agent_demand: float = 0.05 human_price_elasticity: float = -1.2 # assumptions here agent_price_elasticity: float = -0.6 w_agent_loss: float = 1.0 w_volatility: float = 5.0 w_estimation_error: float = 0.25 seed: int = 7 def _sigmoid(x: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-x)) class CommercePlatform: """ This is just an extension of the state management for the environment, it does not implement anything dynamic just helps us simulate demand. """ def __init__(self, product_catelogue_size: int, max_price: float, min_price: float, constraints: BusinessLogicConstraints): self.product_catelogue_size = product_catelogue_size self.product_supply = np.random.uniform(low=10, high=50, size=(self.product_catelogue_size,)) self.max_price = max_price self.min_price = min_price self.constraints = constraints self.simulation_history: List[Dict[str, Any]] = [] self._rng = np.random.default_rng(constraints.seed) self._last_interaction_df: pd.DataFrame = pd.DataFrame() def setup_true_demand(self, prices: np.ndarray) -> Dict[str, np.ndarray]: # ground truth purchase propensities p = np.clip(prices, self.min_price, self.max_price) pn = p / self.max_price human_prob = self.constraints.base_human_demand * (pn ** self.constraints.human_price_elasticity) agent_prob = self.constraints.base_agent_demand * (pn ** self.constraints.agent_price_elasticity) return { "human_purchase_prob": np.clip(human_prob, 0.0, 0.95), "agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95) } def _load_behavioral_profile(actor : str, demand_forcing): """ This returns a markov chain with average weights which we get from interaction data of our experiments. This defines transition probabilities between different events: search -> view_item_price_binN: 0.7 view_item_price_binN -> add_to_cart: 0.2 we also must reweight with the demand_forcing vector or purchase probabilities per-product """ def _simulate_sessions(self, base_prices: np.ndarray) -> pd.DataFrame: demand = self.setup_true_demand(base_prices) human_pprob = demand["human_purchase_prob"] agent_pprob = demand["agent_purchase_prob"] events: List[Dict[str, Any]] = [] T = self.constraints.sessions_per_step n_agent_sessions = int(round(T * self.constraints.agent_share)) n_human_sessions = T - n_agent_sessions n_agent_ids = max(1, n_agent_sessions // 2) session_map = { 'humans': n_human_sessions, 'agents': n_agent_ids } pprob_map = { 'humans': human_pprob, 'agents': agent_pprob } joint_events = [] for actor, n_sessions in session_map.items(): bp = _load_behavioral_profile(actor, pprob_map[actor]) counter = 0 events = [] while counter < n_sessions: session_events = [] while len(session_events) == 0 or session_events[-1]['action'] == 'checkout': interaction_event = bp.sample(self._rng) interaction_event['session_id'] = f'{actor}_{counter:06d}' # TODO any other assignments session_events.append(interaction_event) events.extend(session_events) counter += 1 joint_events.extend(events) return pd.DataFrame(joint_events) def compute_interaction_features(self, interaction_df: pd.DataFrame) -> Dict[str, float]: if interaction_df.empty: return {"mean_sale_price": 0.0, "look_to_book": 0.0} purchases = interaction_df[interaction_df["action"] == "purchase"] mean_sale_price = float(purchases["price_paid"].mean()) if not purchases.empty else 0.0 views = float((interaction_df["action"] == "view").sum()) buys = float((interaction_df["action"] == "purchase").sum()) return {"mean_sale_price": mean_sale_price, "look_to_book": float(views / (buys + 1e-6))} def _session_feature_table(self, df: pd.DataFrame) -> pd.DataFrame: # TODO: adapt this if df.empty: return pd.DataFrame() g = df.groupby("session_id", sort=False) session_duration = g["t"].max() - g["t"].min() total_interactions = g.size() avg_time_between = g["t"].apply(lambda x: float(np.diff(np.sort(x.to_numpy())).mean()) if len(x) > 1 else 0.0) interaction_velocity = total_interactions / (session_duration + 1e-6) views = g.apply(lambda x: int((x["action"] == "view").sum()), include_groups=False) cart_adds = g.apply(lambda x: int((x["action"] == "cart").sum()), include_groups=False) purchases = g.apply(lambda x: int((x["action"] == "purchase").sum()), include_groups=False) conversion_rate = purchases / (views + 1e-6) is_agent = g["actor"].apply(lambda s: bool((s == "agent").any()), include_groups=False) return pd.DataFrame({ "session_duration_sec": session_duration.astype(float), "avg_time_between_events": avg_time_between.astype(float), "total_interactions": total_interactions.astype(int), "interaction_velocity": interaction_velocity.astype(float), "item_views": views.astype(int), "cart_adds": cart_adds.astype(int), "purchases": purchases.astype(int), "conversion_rate": conversion_rate.astype(float), "is_agent": is_agent.astype(bool), }).reset_index() def get_interaction_data(self) -> np.ndarray: if self._last_interaction_df.empty: return np.array([], dtype=object) return self._last_interaction_df.to_dict(orient="records") class PHANTOMEnv(gym.Env): metadata = {"render_modes": []} def __init__(self, constraints): super().__init__() self.constraints = BusinessLogicConstraints() self.action_space = spaces.Box(low=-self.constraints.max_price_adjustment, high=self.constraints.max_price_adjustment, shape=(self.constraints.product_catelogue_size,), dtype=np.float32) self.observation_space = spaces.Dict({ "elasticity": spaces.Dict({ "price": spaces.Box( low=np.full((self.constraints.product_catelogue_size,), self.constraints.system_min_price, dtype=np.float32), high=np.full((self.constraints.product_catelogue_size,), self.constraints.system_max_price, dtype=np.float32), dtype=np.float32), "demand": spaces.Box( low=np.zeros((self.constraints.product_catelogue_size,), dtype=np.float32), high=np.full((self.constraints.product_catelogue_size,), 1e6, dtype=np.float32), dtype=np.float32), }) # TODO: define more features that we compute from the interaction data }) self.commerce_platform = CommercePlatform( product_catelogue_size=self.constraints.product_catelogue_size, max_price=self.constraints.system_max_price, min_price=self.constraints.system_min_price, constraints=self.constraints) self._rng = np.random.default_rng(self.constraints.seed) self.t = 0 self._prev_prices: Optional[np.ndarray] = None self.state: Dict[str, Any] = {} def reset(self, seed: Optional[int] = None, options: Optional[dict] = None): super().reset(seed=seed) if seed is not None: self._rng = np.random.default_rng(seed) self.commerce_platform._rng = np.random.default_rng(seed) self.t = 0 init_prices = self._rng.uniform(low=60.0, high=140.0, size=(self.constraints.product_catelogue_size,)).astype(np.float32) self._prev_prices = init_prices.copy() self.state = { "elasticity": { "price": init_prices, "demand": np.zeros((self.constraints.product_catelogue_size,), dtype=np.float32), } } return self.state, {} def step(self, action: np.ndarray): self.t += 1 base_prices = self.state["elasticity"]["price"].astype(np.float32) new_prices = np.clip(base_prices * (1.0 + action.astype(np.float32)), self.constraints.system_min_price, self.constraints.system_max_price).astype(np.float32) self.state["elasticity"]["price"] = new_prices # TODO: use the commerce platform to simulate sessions interactions_df = self.commerce_platform._simulate_sessions(new_prices) result = self.commerce_platform.compute_interaction_features(interactions_df) # TODO: implement COI computation to use in reward COI = 0.0 volatility = 0.0 if self._prev_prices is None else \ float(np.mean(np.abs((new_prices - self._prev_prices) / (self._prev_prices + 1e-6)))) self._prev_prices = new_prices.copy() revenue_observed = float(result["revenue_observed"]) agent_loss = float(result["agent_loss"]) reward = (revenue_observed - COI - self.constraints.w_agent_loss * agent_loss - self.constraints.w_volatility * volatility - self.constraints.w_estimation_error ) terminated = self.t >= self.constraints.episode_length info = { "t": self.t, "revenue_observed": revenue_observed, "revenue_oracle": float(result["revenue_oracle"]), "agent_loss": agent_loss, "ux_volatility": volatility, "mean_internal_error": err_mean, "look_to_book": float(result["interaction_features"].get("look_to_book", 0.0)), "mean_sale_price": float(result["interaction_features"].get("mean_sale_price", 0.0)), "true_human_purchases_total": float(np.sum(result["true_human_demand"])), "true_agent_purchases_total": float(np.sum(result["true_agent_purchases"])), } return self.state, float(reward), terminated, False, info if __name__ == "__main__": import matplotlib.pyplot as plt from collections import defaultdict runs = {} for use_defense in (False, True): env = PHANTOMEnv(use_defense=use_defense) obs, _ = env.reset(seed=42) metrics = defaultdict(list) total_reward = 0.0 done = False while not done: action = env.action_space.sample() obs, reward, done, _, info = env.step(action) total_reward += reward p_mean = float(np.mean(obs["elasticity"]["price"])) q_mean = float(np.mean(obs["elasticity"]["demand"])) p_std = float(np.std(obs["elasticity"]["price"])) metrics['t'].append(info['t']) metrics['price_mean'].append(p_mean) metrics['price_std'].append(p_std) metrics['demand_mean'].append(q_mean) metrics['revenue_observed'].append(info['revenue_observed']) metrics['revenue_oracle'].append(info['revenue_oracle']) metrics['agent_loss'].append(info['agent_loss']) metrics['ux_volatility'].append(info['ux_volatility']) metrics['look_to_book'].append(info['look_to_book']) metrics['reward'].append(reward) metrics['human_purchases'].append(info['true_human_purchases_total']) metrics['agent_purchases'].append(info['true_agent_purchases_total']) if info['t'] % 20 == 0 or done: print(f"defense={'ON ' if use_defense else 'OFF'} t={info['t']:03d} p={p_mean:6.2f}±{p_std:4.2f} " f"q={q_mean:6.2f} rev={info['revenue_observed']:7.2f} oracle={info['revenue_oracle']:7.2f} " f"loss={info['agent_loss']:6.2f} ux={info['ux_volatility']:.3f} " f"ltb={info['look_to_book']:5.2f} r={reward:7.2f}") runs[use_defense] = metrics print(f"defense={'ON ' if use_defense else 'OFF'} total_reward={total_reward:.2f}\n") fig, axes = plt.subplots(3, 3, figsize=(15, 12)) fig.suptitle('PHANTOM Environment: Defense OFF vs ON', fontsize=14, fontweight='bold') plot_configs = [ ('price_mean', 'Mean Price', 'Price'), ('demand_mean', 'Mean Demand Estimate', 'Demand'), ('revenue_observed', 'Revenue (Observed)', 'Revenue'), ('agent_loss', 'Agent Loss (Oracle - Observed)', 'Loss'), ('ux_volatility', 'UX Volatility (Price Change)', 'Volatility'), ('look_to_book', 'Look-to-Book Ratio', 'Ratio'), ('reward', 'Step Reward', 'Reward'), ('human_purchases', 'Human Purchases', 'Count'), ('agent_purchases', 'Agent Purchases', 'Count'), ] for idx, (key, title, ylabel) in enumerate(plot_configs): ax = axes[idx // 3, idx % 3] for use_defense, label, color in [(False, 'No Defense', 'red'), (True, 'With Defense', 'blue')]: m = runs[use_defense] ax.plot(m['t'], m[key], label=label, color=color, alpha=0.7, linewidth=1.5) ax.set_xlabel('Step') ax.set_ylabel(ylabel) ax.set_title(title, fontsize=10, fontweight='bold') ax.legend(loc='best', fontsize=8) ax.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('phantom_env_comparison.png', dpi=150, bbox_inches='tight') print("Plot saved to phantom_env_comparison.png") plt.show()