chore: rough migration of environment configuration

This commit is contained in:
2026-01-26 14:12:41 +01:00
parent cd6c3d6006
commit fa2aca8b13
2 changed files with 216 additions and 677 deletions

View File

@@ -1,715 +1,244 @@
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
from types import SimpleNamespace
from typing import Optional, Dict, Any, List, Tuple
from __future__ import annotations
from lib.separability import load_artifacts, score_session, estimate_alpha
from sim.rl.behavior_loader.models import AgentBehaviorModel, BehaviorModel, aggregate_event_transitions
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
import numpy as np
try:
import jax
from sim.rl.jax_core import JAX_AVAILABLE, compile_transitions, fallback_transitions, sample_sessions, compute_metrics
from sim.rl.jax_core import session_features, compute_session_transitions, compute_divergences, estimate_alpha_batch
except ImportError:
JAX_AVAILABLE = False
import gymnasium as gym
from gymnasium import spaces
except ImportError as e:
raise ImportError("sim.rl.environment requires gymnasium") from e
# "learner" agent learning to optimize pricing
# "agent" part of environment creating demand signals that learner processes
from sim.case.thesis_simplified.coi import COIWindow, coi_erosion, compute_coi_window
from sim.case.thesis_simplified.separability import estimate_alpha as estimate_session_alpha
from sim.case.thesis_simplified.simplified import Limbo, Session, put_prices_to_market
from sim.rl.thesis_core import aggregate_demand_by_product, aggregate_purchases, constrain_prices
@dataclass(frozen=True)
class BusinessLogicConstraints:
product_catalogue_size: int = 100
max_steps: int = 2000
sessions_per_step: int = 250
@dataclass
class BusinessLogicConstraints():
max_price_adjustment: float = 0.30
system_max_price: float = 500.0
system_min_price: float = 1.0
product_catalogue_size: int = 100
episode_length: int = 2000
sessions_per_step: int = 250
max_price_adjustment: float = 0.30
min_margin_pct: float = 0.05
agent_share: float = 0.2
agent_recon_multiplier: float = 6.0
agent_purchase_probability: float = 0.20
alpha_drift: float = 0.0
alpha_bounds: tuple[float, float] = (0.0, 0.8)
coi_strength: float = 0.25
coi_threshold: float = 4.0
coi_sigmoid_temp: float = 1.25
base_human_demand: float = 0.08
base_agent_demand: float = 0.05
human_price_elasticity: float = -1.2 # assumptions here
agent_price_elasticity: float = -0.6
w_agent_loss: float = 1.0
w_volatility: float = 5.0
w_estimation_error: float = 0.25
seed: int = 7
human_data_dir: str | None = None
agent_data_dir: str | None = None
def _resolve_behavior_data_dirs(constraints: BusinessLogicConstraints) -> tuple[str, str]:
base = Path(__file__).resolve().parents[2] / "experiments"
human_default = str(base / "collected_data")
agent_default = str(base / "agents" / "collected_data")
human = constraints.human_data_dir or human_default
agent = constraints.agent_data_dir or agent_default
return human, agent
def _sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-x))
EVENT_PAGE_MAP = {
"session_start": "/",
"page_view": "/",
"view_item_page": "/products",
"learn_more_about_item": "/products/details",
"add_item_to_cart": "/cart",
"checkout_start": "/checkout",
"purchase_complete": "/checkout",
"session_end": "/checkout/success",
}
# map real collected event names to canonical simulation states
EVENT_CANONICAL_MAP = {
"page_view": "session_start",
"hover_over_paragraph": "view_item_page",
"hover_over_title": "view_item_page",
"view_item_page": "view_item_page",
"learn_more_about_item": "learn_more_about_item",
"add_item_to_cart": "add_item_to_cart",
"checkout_start": "purchase_complete",
"remove_item": "view_item_page",
}
def _canonicalize_transitions(raw_trans: Dict[str, Dict[str, float]]) -> Dict[str, Dict[str, float]]:
"""Map real event transition names to canonical simulation states."""
canonical: Dict[str, Dict[str, float]] = {}
for src, dsts in raw_trans.items():
src_canon = EVENT_CANONICAL_MAP.get(src, src)
if src_canon not in canonical:
canonical[src_canon] = {}
for dst, prob in dsts.items():
dst_canon = EVENT_CANONICAL_MAP.get(dst, dst)
canonical[src_canon][dst_canon] = canonical[src_canon].get(dst_canon, 0.0) + prob
# re-normalize after aggregation
for src in canonical:
total = sum(canonical[src].values())
if total > 0:
canonical[src] = {k: v / total for k, v in canonical[src].items()}
return canonical
class BehavioralProfile:
"""Synthetic Markov profile used to generate interaction sessions.
Uses aggregate_event_transitions from models.py to build transition kernels from real data."""
def __init__(self, actor: str, purchase_probs: np.ndarray, *, human_data_dir: str, agent_data_dir: str):
self.actor = actor
self.purchase_probs = np.clip(purchase_probs, 0.0, 0.95)
self.states = [
"session_start",
"view_item_page",
"learn_more_about_item",
"add_item_to_cart",
"purchase_complete",
"session_end",
]
model = AgentBehaviorModel(agent_data_dir) if actor == "agents" else BehaviorModel(human_data_dir)
mdp = model.build_MDP()
raw_trans = aggregate_event_transitions(mdp) if mdp.get("transitions") else {}
self.transitions = _canonicalize_transitions(raw_trans) if raw_trans else self._fallback_transitions()
self._ensure_terminal_states()
self.dwell_params = self._extract_dwell_params(mdp)
def _ensure_terminal_states(self):
# guarantee purchase_complete leads to session_end and session_start exists
if "purchase_complete" not in self.transitions:
self.transitions["purchase_complete"] = {"session_end": 1.0}
elif "session_end" not in self.transitions.get("purchase_complete", {}):
self.transitions["purchase_complete"]["session_end"] = 1.0
total = sum(self.transitions["purchase_complete"].values())
self.transitions["purchase_complete"] = {k: v/total for k, v in self.transitions["purchase_complete"].items()}
if "session_start" not in self.transitions:
self.transitions["session_start"] = {"view_item_page": 0.7, "learn_more_about_item": 0.2, "session_end": 0.1}
def _fallback_transitions(self) -> Dict[str, Dict[str, float]]:
return {
"session_start": {"view_item_page": 0.85, "session_end": 0.15},
"view_item_page": {"learn_more_about_item": 0.4, "add_item_to_cart": 0.3, "view_item_page": 0.2, "session_end": 0.1},
"learn_more_about_item": {"add_item_to_cart": 0.5, "view_item_page": 0.3, "session_end": 0.2},
"add_item_to_cart": {"purchase_complete": 0.6, "view_item_page": 0.25, "session_end": 0.15},
"purchase_complete": {"session_end": 1.0},
}
def _extract_dwell_params(self, mdp: Dict) -> Dict[str, Tuple[float, float]]:
state_vals = mdp.get("state_values", {})
params = {}
for state in self.states:
# try canonical and raw state names
val = state_vals.get(state, 0.5)
for raw, canon in EVENT_CANONICAL_MAP.items():
if canon == state and raw in state_vals:
val = state_vals[raw]
break
shape = 1.5 + val * 2.0
scale = 0.8 + (1.0 - val) * 1.2
params[state] = (shape, scale)
return params
def _transition_probs(self, state: str, product_idx: int) -> Dict[str, float]:
probs = dict(self.transitions.get(state, {"session_end": 1.0}))
if state == "add_item_to_cart":
base = probs.get("purchase_complete", 0.0)
demand_factor = float(self.purchase_probs[int(product_idx)])
if self.actor == "agents":
demand_factor *= 0.7
adjusted = np.clip(base * 0.5 + demand_factor * 0.5, 0.0, 0.95)
remainder = max(1e-6, 1.0 - adjusted)
other_total = sum(v for k, v in probs.items() if k != "purchase_complete")
scale = remainder / max(other_total, 1e-6)
for key in probs:
if key == "purchase_complete":
probs[key] = adjusted
else:
probs[key] = probs[key] * scale
total = sum(probs.values())
if total <= 0:
return {"session_end": 1.0}
return {state: val / total for state, val in probs.items()}
def sample_session(
self,
rng: np.random.Generator,
session_id: str,
prices: np.ndarray,
unit_cost: np.ndarray,
) -> Tuple[List[Dict[str, Any]], List[SimpleNamespace]]:
"""Generate a single session trajectory respecting business constraints."""
events: List[Dict[str, Any]] = []
feature_events: List[SimpleNamespace] = []
state = "session_start"
t = 0.0
product_idx = int(rng.integers(0, len(prices)))
product_id = f"product-{product_idx:04d}"
# enforce price >= cost constraint (lipschitz bound on pricing)
# This is a sort of last resort to not let an pricing learner go rogue
cost = float(unit_cost[product_idx])
constrained_price = max(float(prices[product_idx]), cost * 1.05) # 5% min margin
while state != "session_end" and len(events) < 40:
if state != "session_start":
row = {
"session_id": session_id,
"actor": "agent" if self.actor == "agents" else "human",
"eventName": state,
"product_idx": product_idx,
"productId": product_id,
"price_offered": constrained_price,
"price_paid": 0.0,
"page": EVENT_PAGE_MAP.get(state, "/"),
"ts": t,
"unit_cost": cost,
"base_price": float(prices[product_idx]),
}
if state == "purchase_complete":
noise = float(rng.normal(0.0, 0.015))
row["price_paid"] = max(constrained_price * (1.0 + noise), cost)
events.append(row)
feature_events.append(
SimpleNamespace(
eventName=row["eventName"],
page=row["page"],
productId=row["productId"],
ts=row["ts"],
)
)
transitions = self._transition_probs(state, product_idx)
next_state = rng.choice(list(transitions.keys()), p=list(transitions.values()))
shape, scale = self.dwell_params.get(state, (2.0, 1.0))
dwell = max(0.3, rng.gamma(shape=shape, scale=scale))
t += dwell
state = next_state
return events, feature_events
def _load_behavioral_profile(
actor: str,
demand_forcing: np.ndarray,
*,
human_data_dir: str,
agent_data_dir: str,
) -> BehavioralProfile:
"""returns a behavioral profile for generating synthetic sessions
actor: 'humans' or 'agents'
demand_forcing: per-product purchase probabilities used to weight interactions
"""
return BehavioralProfile(actor, demand_forcing, human_data_dir=human_data_dir, agent_data_dir=agent_data_dir)
class CommercePlatform:
"""state management for the environment, simulates demand"""
def __init__(self, product_catalogue_size: int, max_price: float, min_price: float, constraints: BusinessLogicConstraints):
self.product_catalogue_size = product_catalogue_size
self.max_price = max_price
self.min_price = min_price
self.constraints = constraints
self.simulation_history: List[Dict[str, Any]] = []
self._rng = np.random.default_rng(constraints.seed)
self._last_interaction_df: pd.DataFrame = pd.DataFrame()
self.unit_cost = np.random.uniform(low=15.0, high=60.0, size=(self.product_catalogue_size,)).astype(np.float32)
self.base_price = np.random.uniform(low=60.0, high=140.0, size=(self.product_catalogue_size,)).astype(np.float32)
self.alpha_hat = constraints.agent_share
self._human_data_dir, self._agent_data_dir = _resolve_behavior_data_dirs(constraints)
try:
self.separability_artifacts = load_artifacts()
except FileNotFoundError:
self.separability_artifacts = None
def setup_true_demand(self, prices: np.ndarray) -> Dict[str, np.ndarray]:
p = np.clip(prices, self.min_price, self.max_price)
cost = np.clip(self.unit_cost, self.min_price * 0.2, self.max_price)
margin = np.clip((p - cost) / np.maximum(cost, 1e-3), -0.9, 2.0)
# isoelastic demand approximation
human_prob = self.constraints.base_human_demand * np.exp(self.constraints.human_price_elasticity * margin)
agent_prob = self.constraints.base_agent_demand * np.exp(self.constraints.agent_price_elasticity * margin)
return {
"human_purchase_prob": np.clip(human_prob, 0.0, 0.95),
"agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95),
}
def _simulate_sessions(self, prices: np.ndarray) -> Tuple[pd.DataFrame, Dict[str, Any]]:
demand = self.setup_true_demand(prices)
T = self.constraints.sessions_per_step
effective_share = float(np.clip(self.alpha_hat, 0.0, 0.95))
n_agent_sessions = max(1, int(round(T * effective_share)))
n_human_sessions = max(1, T - n_agent_sessions)
session_map = {
"humans": n_human_sessions,
"agents": n_agent_sessions,
}
pprob_map = {
"humans": demand["human_purchase_prob"],
"agents": demand["agent_purchase_prob"],
}
rows: List[Dict[str, Any]] = []
session_scores: List[Dict[str, float]] = []
demand_human = np.zeros_like(prices, dtype=np.float32)
demand_agent = np.zeros_like(prices, dtype=np.float32)
for actor, n_sessions in session_map.items():
profile = _load_behavioral_profile(
actor,
pprob_map[actor],
human_data_dir=self._human_data_dir,
agent_data_dir=self._agent_data_dir,
)
for idx in range(n_sessions):
session_id = f"{actor}_{idx:06d}"
session_rows, feature_events = profile.sample_session(
self._rng, session_id, prices, self.unit_cost
)
rows.extend(session_rows)
if session_rows:
df_session = pd.DataFrame(session_rows)
purchases = df_session[df_session["eventName"] == "purchase_complete"]
if not purchases.empty:
counts = purchases.groupby("product_idx").size()
if actor == "agents":
demand_agent[counts.index.to_numpy(dtype=int)] += counts.to_numpy(dtype=np.float32)
else:
demand_human[counts.index.to_numpy(dtype=int)] += counts.to_numpy(dtype=np.float32)
if self.separability_artifacts and feature_events:
score = score_session(feature_events, self.separability_artifacts)
session_scores.append(score)
interactions_df = pd.DataFrame(rows)
diagnostics = {
"alpha_hat": float(self.alpha_hat),
"session_scores": session_scores,
"demand_human": demand_human,
"demand_agent": demand_agent,
}
if session_scores:
alphas = [
estimate_alpha(s["prob_agent"], s["delta_h"], s["delta_a"], temperature=2.0)
for s in session_scores
]
mean_alpha = float(np.mean(alphas))
# exponential moving average for stability
self.alpha_hat = 0.7 * self.alpha_hat + 0.3 * mean_alpha
diagnostics.update(
{
"alpha_hat": float(self.alpha_hat),
"delta_h_mean": float(np.mean([s["delta_h"] for s in session_scores])),
"delta_a_mean": float(np.mean([s["delta_a"] for s in session_scores])),
"prob_agent_mean": float(np.mean([s["prob_agent"] for s in session_scores])),
}
)
self._last_interaction_df = interactions_df
return interactions_df, diagnostics
def compute_interaction_features(self, interaction_df: pd.DataFrame) -> Dict[str, float]:
if interaction_df.empty:
return {
"revenue_observed": 0.0,
"revenue_oracle": 0.0,
"agent_loss": 0.0,
"true_human_purchases": 0.0,
"true_agent_purchases": 0.0,
"mean_sale_price": 0.0,
"look_to_book": 0.0,
"coi": 0.0,
"expected_premium": 0.0,
}
purchases = interaction_df[interaction_df["eventName"] == "purchase_complete"]
human_purchases = purchases[purchases["actor"] == "human"]
agent_purchases = purchases[purchases["actor"] == "agent"]
revenue_observed = float(purchases["price_paid"].sum())
revenue_oracle = float(purchases["base_price"].sum())
agent_loss = float((agent_purchases["base_price"] - agent_purchases["price_paid"]).sum())
mean_sale_price = float(purchases["price_paid"].mean()) if not purchases.empty else 0.0
views = float((interaction_df["eventName"] == "view_item_page").sum())
look_to_book = float(views / (len(purchases) + 1e-6))
true_human = float(len(human_purchases))
true_agent = float(len(agent_purchases))
human_prices = human_purchases["price_offered"] if not human_purchases.empty else pd.Series(dtype=float)
human_costs = human_purchases["unit_cost"] if not human_purchases.empty else pd.Series(dtype=float)
human_base = human_purchases["base_price"] if not human_purchases.empty else pd.Series(dtype=float)
coi = 0.0
if not human_prices.empty and not human_costs.empty:
# COI = E[P] - p_min where p_min is cost, accounting for expected premium (base - realized)
margin = human_prices.mean() - human_costs.mean()
expected_premium = human_base.mean() - human_prices.mean() if not human_base.empty else 0.0
coi = float(np.maximum(0.0, margin - expected_premium * 0.5))
return {
"revenue_observed": revenue_observed,
"revenue_oracle": revenue_oracle,
"agent_loss": agent_loss,
"true_human_purchases": true_human,
"true_agent_purchases": true_agent,
"mean_sale_price": mean_sale_price,
"look_to_book": look_to_book,
"coi": coi,
"expected_premium": float(expected_premium) if not human_base.empty else 0.0,
}
def _session_feature_table(self, df: pd.DataFrame) -> pd.DataFrame:
"""Extract per-session behavioral features for separability analysis."""
if df.empty:
return pd.DataFrame()
g = df.groupby("session_id", sort=False)
session_duration = g["ts"].max() - g["ts"].min()
total_interactions = g.size()
avg_time_between = g["ts"].apply(lambda x: float(np.diff(np.sort(x.to_numpy())).mean()) if len(x) > 1 else 0.0)
interaction_velocity = total_interactions / (session_duration + 1e-6)
views = g.apply(lambda x: int((x["eventName"] == "view_item_page").sum()), include_groups=False)
cart_adds = g.apply(lambda x: int((x["eventName"] == "add_item_to_cart").sum()), include_groups=False)
purchases = g.apply(lambda x: int((x["eventName"] == "purchase_complete").sum()), include_groups=False)
learn_more = g.apply(lambda x: int((x["eventName"] == "learn_more_about_item").sum()), include_groups=False)
conversion_rate = purchases / (views + 1e-6)
is_agent = g["actor"].apply(lambda s: bool((s == "agent").any()), include_groups=False)
# price sensitivity features
price_variance = g["price_offered"].var().fillna(0.0)
avg_price_seen = g["price_offered"].mean().fillna(0.0)
products_viewed = g["product_idx"].nunique()
return pd.DataFrame({
"session_duration_sec": session_duration.astype(float),
"avg_time_between_events": avg_time_between.astype(float),
"total_interactions": total_interactions.astype(int),
"interaction_velocity": interaction_velocity.astype(float),
"item_views": views.astype(int),
"cart_adds": cart_adds.astype(int),
"purchases": purchases.astype(int),
"learn_more_clicks": learn_more.astype(int),
"conversion_rate": conversion_rate.astype(float),
"price_variance": price_variance.astype(float),
"avg_price_seen": avg_price_seen.astype(float),
"products_viewed": products_viewed.astype(int),
"is_agent": is_agent.astype(bool),
}).reset_index()
def get_interaction_data(self) -> np.ndarray:
if self._last_interaction_df.empty:
return np.array([], dtype=object)
return self._last_interaction_df.to_dict(orient="records")
def make_env(constraints: Optional[BusinessLogicConstraints] = None) -> "PHANTOMEnv":
return PHANTOMEnv(constraints=constraints or BusinessLogicConstraints())
class PHANTOMEnv(gym.Env):
metadata = {"render_modes": []}
metadata = {"render_modes": ["human", "ansi"]}
def __init__(self, constraints: Optional[BusinessLogicConstraints] = None, use_jax: bool = True):
def __init__(self, constraints: Optional[BusinessLogicConstraints] = None):
super().__init__()
self.constraints = constraints if isinstance(constraints, BusinessLogicConstraints) else BusinessLogicConstraints()
self.use_jax = use_jax and JAX_AVAILABLE
self.action_space = spaces.Box(low=-self.constraints.max_price_adjustment,
high=self.constraints.max_price_adjustment,
shape=(self.constraints.product_catalogue_size,), dtype=np.float32)
n_products = self.constraints.product_catalogue_size
self.observation_space = spaces.Dict({
"elasticity": spaces.Dict({
"price": spaces.Box(
low=np.full((n_products,), self.constraints.system_min_price, dtype=np.float32),
high=np.full((n_products,), self.constraints.system_max_price, dtype=np.float32),
dtype=np.float32),
"demand": spaces.Box(
low=np.zeros((n_products,), dtype=np.float32),
high=np.full((n_products,), 1e6, dtype=np.float32),
dtype=np.float32),
}),
"market": spaces.Dict({
"alpha_hat": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"revenue_rate": spaces.Box(low=0.0, high=1e6, shape=(1,), dtype=np.float32),
"conversion_rate": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"price_volatility": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
}),
"cost": spaces.Box(low=0.0, high=self.constraints.system_max_price, shape=(n_products,), dtype=np.float32),
})
self.commerce_platform = CommercePlatform(
product_catalogue_size=self.constraints.product_catalogue_size,
max_price=self.constraints.system_max_price,
min_price=self.constraints.system_min_price,
constraints=self.constraints)
self._rng = np.random.default_rng(self.constraints.seed)
self.t = 0
self._prev_prices: Optional[np.ndarray] = None
self.state: Dict[str, Any] = {}
self._jax_key = None
self._jax_trans = None
if self.use_jax:
self._jax_key = jax.random.PRNGKey(self.constraints.seed)
self._init_jax_transitions()
self.c = constraints or BusinessLogicConstraints()
self.n = int(self.c.product_catalogue_size)
def _init_jax_transitions(self):
try:
human_dir, agent_dir = _resolve_behavior_data_dirs(self.constraints)
human_profile = _load_behavioral_profile(
"humans",
np.ones(self.constraints.product_catalogue_size) * 0.1,
human_data_dir=human_dir,
agent_data_dir=agent_dir,
)
agent_profile = _load_behavioral_profile(
"agents",
np.ones(self.constraints.product_catalogue_size) * 0.1,
human_data_dir=human_dir,
agent_data_dir=agent_dir,
)
self._jax_trans = compile_transitions(human_profile, agent_profile).to_jax()
except Exception:
self._jax_trans = fallback_transitions().to_jax()
self._rng = np.random.default_rng(self.c.seed)
self._t = 0
self._alpha_true = float(self.c.agent_share)
self._alpha_hat = float(self.c.agent_share)
self._costs = np.zeros(self.n, dtype=np.float32)
self._refs = np.zeros(self.n, dtype=np.float32)
self._prices: Optional[np.ndarray] = None
self._last_sessions: list[Session] = []
self._last_coi: COIWindow | None = None
self._limbo = Limbo()
self.action_space = spaces.Box(
low=np.full((self.n,), self.c.system_min_price, dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
)
self.observation_space = spaces.Dict(
{
"elasticity": spaces.Dict(
{
"price": spaces.Box(
low=np.full((self.n,), self.c.system_min_price, dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
),
"demand": spaces.Box(
low=np.zeros((self.n,), dtype=np.float32),
high=np.full((self.n,), 1e9, dtype=np.float32),
dtype=np.float32,
),
}
),
"market": spaces.Dict(
{
"alpha_hat": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"revenue_rate": spaces.Box(low=0.0, high=1e12, shape=(1,), dtype=np.float32),
"conversion_rate": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"price_volatility": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
}
),
"cost": spaces.Box(
low=np.zeros((self.n,), dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
),
}
)
def _reset_catalogue(self) -> None:
self._costs = self._rng.uniform(15.0, 60.0, size=self.n).astype(np.float32)
margins = self._rng.uniform(0.2, 0.6, size=self.n).astype(np.float32)
self._refs = (self._costs * (1.0 + margins)).astype(np.float32)
self._prices = self._refs.copy()
def _observe_market(
self, prices: np.ndarray
) -> tuple[list[Session], Dict[str, float], np.ndarray, np.ndarray, float, float, int]:
sessions, demand_map = put_prices_to_market(
prices,
costs=self._costs,
alpha=self._alpha_true,
n_sessions=int(self.c.sessions_per_step),
seed=int(self._rng.integers(0, 2**31 - 1)),
)
demand_by_product = aggregate_demand_by_product(sessions, demand_map, self.n)
purchases, revenue, cost, n_agents = aggregate_purchases(sessions, self._costs, self.n)
conversion = float(np.sum(purchases) / max(len(sessions), 1))
return sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents
def _update_alpha_hat(self, sessions: list[Session]) -> float:
scores = [estimate_session_alpha(s) for s in sessions if s.events]
if not scores:
return self._alpha_hat
alpha_step = float(np.mean(scores))
self._alpha_hat = 0.8 * self._alpha_hat + 0.2 * alpha_step
self._alpha_hat = float(np.clip(self._alpha_hat, 0.0, 1.0))
return self._alpha_hat
def _reward(self, prices: np.ndarray, revenue: float, cost: float, volatility: float) -> float:
profit = float(revenue - cost)
coi_leak = float(self._last_coi.leak) if self._last_coi else 0.0
alpha_err = abs(self._alpha_hat - self._alpha_true)
return profit - self.c.coi_strength * coi_leak - self.c.w_volatility * volatility - self.c.w_estimation_error * alpha_err
def _build_obs(
self,
prices: np.ndarray,
demand_by_product: np.ndarray,
revenue: float,
conversion: float,
volatility: float,
) -> Dict[str, Any]:
return {
"elasticity": {"price": prices.astype(np.float32), "demand": demand_by_product.astype(np.float32)},
"market": {
"alpha_hat": np.array([self._alpha_hat], dtype=np.float32),
"revenue_rate": np.array([revenue], dtype=np.float32),
"conversion_rate": np.array([conversion], dtype=np.float32),
"price_volatility": np.array([volatility], dtype=np.float32),
},
"cost": self._costs.astype(np.float32),
}
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
if seed is not None:
self._rng = np.random.default_rng(seed)
self.commerce_platform._rng = np.random.default_rng(seed)
if self.use_jax:
self._jax_key = jax.random.PRNGKey(seed)
self.commerce_platform.alpha_hat = self.constraints.agent_share
self.t = 0
init_prices = self._rng.uniform(
low=60.0,
high=140.0,
size=(self.constraints.product_catalogue_size,),
).astype(np.float32)
self.commerce_platform.unit_cost = self._rng.uniform(
low=15.0,
high=60.0,
size=(self.constraints.product_catalogue_size,),
).astype(np.float32)
self.commerce_platform.base_price = init_prices.copy()
self._prev_prices = init_prices.copy()
self.state = {
"elasticity": {
"price": init_prices,
"demand": np.zeros((self.constraints.product_catalogue_size,), dtype=np.float32),
},
"market": {
"alpha_hat": np.array([self.constraints.agent_share], dtype=np.float32),
"revenue_rate": np.array([0.0], dtype=np.float32),
"conversion_rate": np.array([0.0], dtype=np.float32),
"price_volatility": np.array([0.0], dtype=np.float32),
},
"cost": self.commerce_platform.unit_cost.astype(np.float32),
}
return self.state, {}
self._t = 0
self._alpha_true = float(np.clip(self.c.agent_share, *self.c.alpha_bounds))
self._alpha_hat = float(self.c.agent_share)
self._reset_catalogue()
self._limbo = Limbo()
self._last_sessions = []
self._last_coi = None
def _step_jax(self, new_prices: np.ndarray) -> Tuple[Dict, Dict]:
self._jax_key, subkey = jax.random.split(self._jax_key)
alpha = float(np.clip(self.commerce_platform.alpha_hat, 0.0, 0.95))
n_agent = max(1, int(self.constraints.sessions_per_step * alpha))
n_human = max(1, self.constraints.sessions_per_step - n_agent)
batch = sample_sessions(subkey, self._jax_trans, n_human, n_agent, len(new_prices))
sim = compute_metrics(batch, new_prices, self.commerce_platform.unit_cost, self.commerce_platform.base_price)
result = {"revenue_observed": sim.revenue, "revenue_oracle": sim.revenue_oracle,
"agent_loss": sim.agent_loss, "coi": sim.coi, "look_to_book": sim.look_to_book,
"mean_sale_price": sim.mean_sale_price, "true_human_purchases": sim.n_human_purchases,
"true_agent_purchases": sim.n_agent_purchases}
diagnostics = {"demand_human": sim.demand_human, "demand_agent": sim.demand_agent, "alpha_hat": alpha}
return result, diagnostics
prices = self._prices if self._prices is not None else np.zeros(self.n, dtype=np.float32)
obs = self._build_obs(prices, np.zeros(self.n, dtype=np.float32), 0.0, 0.0, 0.0)
return obs, {"alpha_true": self._alpha_true}
def step(self, action: np.ndarray):
self.t += 1
base_prices = self.state["elasticity"]["price"].astype(np.float32)
new_prices = np.clip(base_prices * (1.0 + action.astype(np.float32)),
self.constraints.system_min_price,
self.constraints.system_max_price).astype(np.float32)
def step(self, action: np.ndarray) -> Tuple[Dict[str, Any], float, bool, bool, Dict[str, Any]]:
if self._prices is None:
raise RuntimeError("reset() must be called before step()")
self.state["elasticity"]["price"] = new_prices
if self.use_jax:
result, diagnostics = self._step_jax(new_prices)
else:
interactions_df, diagnostics = self.commerce_platform._simulate_sessions(new_prices)
result = self.commerce_platform.compute_interaction_features(interactions_df)
COI = float(result.get("coi", 0.0))
demand_vector = diagnostics.get("demand_human", np.zeros_like(new_prices)) + diagnostics.get(
"demand_agent", np.zeros_like(new_prices)
prev = self._prices
prices = constrain_prices(
prev,
np.asarray(action, dtype=np.float32),
costs=self._costs,
min_price=float(self.c.system_min_price),
max_price=float(self.c.system_max_price),
max_adjustment=float(self.c.max_price_adjustment),
min_margin_pct=float(self.c.min_margin_pct),
)
self.state["elasticity"]["demand"] = demand_vector.astype(np.float32)
self._prices = prices
self._limbo.add_update("prices", prices)
volatility = 0.0 if self._prev_prices is None else \
float(np.mean(np.abs((new_prices - self._prev_prices) / (self._prev_prices + 1e-6))))
self._prev_prices = new_prices.copy()
sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents = self._observe_market(prices)
self._last_sessions = sessions
self._limbo.add_update("demand", demand_map)
# update market observation features
total_demand = float(np.sum(demand_vector))
total_purchases = float(result.get("true_human_purchases", 0.0) + result.get("true_agent_purchases", 0.0))
conv_rate = total_purchases / max(total_demand, 1.0)
self.state["market"] = {
"alpha_hat": np.array([float(diagnostics.get("alpha_hat", self.commerce_platform.alpha_hat))], dtype=np.float32),
"revenue_rate": np.array([float(result.get("revenue_observed", 0.0))], dtype=np.float32),
"conversion_rate": np.array([float(np.clip(conv_rate, 0.0, 1.0))], dtype=np.float32),
"price_volatility": np.array([float(volatility)], dtype=np.float32),
}
self.state["cost"] = self.commerce_platform.unit_cost.astype(np.float32)
self._update_alpha_hat(self._last_sessions)
self._last_coi = compute_coi_window(self._last_sessions, self._costs, demand_mapping=demand_map)
# extract metrics with safe defaults for incomplete simulation
revenue_observed = float(result.get("revenue_observed", 0.0))
agent_loss = float(result.get("agent_loss", 0.0))
self._alpha_true = float(np.clip(self._alpha_true + self.c.alpha_drift, *self.c.alpha_bounds))
volatility = float(np.std((prices - prev) / (prev + 1e-6)))
reward = float(self._reward(prices, revenue, cost, volatility))
conversion = float(np.sum(purchases) / max(len(self._last_sessions), 1))
reward = (revenue_observed
- COI
- self.constraints.w_agent_loss * agent_loss
- self.constraints.w_volatility * volatility
- self.constraints.w_estimation_error)
self._t += 1
terminated = self._t >= int(self.c.max_steps)
terminated = self.t >= self.constraints.episode_length
obs = self._build_obs(prices, demand_by_product, revenue, conversion, min(volatility, 1.0))
info = {
"t": self.t,
"revenue_observed": revenue_observed,
"revenue_oracle": float(result.get("revenue_oracle", revenue_observed)),
"agent_loss": agent_loss,
"ux_volatility": volatility,
"look_to_book": float(result.get("look_to_book", 0.0)),
"mean_sale_price": float(result.get("mean_sale_price", 0.0)),
"true_human_purchases_total": float(result.get("true_human_purchases", 0.0)),
"true_agent_purchases_total": float(result.get("true_agent_purchases", 0.0)),
"coi": COI,
"alpha_hat": diagnostics.get("alpha_hat", self.commerce_platform.alpha_hat),
"mean_human_demand": float(np.mean(diagnostics.get("demand_human", np.zeros_like(new_prices)))),
"mean_agent_demand": float(np.mean(diagnostics.get("demand_agent", np.zeros_like(new_prices)))),
"step": self._t,
"reward": reward,
"revenue": float(revenue),
"profit": float(revenue - cost),
"n_sessions": int(self.c.sessions_per_step),
"n_agents": int(n_agents),
"alpha_true": float(self._alpha_true),
"alpha_hat": float(self._alpha_hat),
"alpha_error": float(abs(self._alpha_hat - self._alpha_true)),
"price_std": float(np.std(prices)),
"price_volatility": float(volatility),
}
if "delta_h_mean" in diagnostics:
if self._last_coi is not None:
info.update(
{
"delta_h_mean": diagnostics["delta_h_mean"],
"delta_a_mean": diagnostics["delta_a_mean"],
"prob_agent_mean": diagnostics["prob_agent_mean"],
"coi_policy": float(self._last_coi.policy),
"coi_agent": float(self._last_coi.agent),
"coi_leakage": float(self._last_coi.leak),
"coi_survival": float(self._last_coi.survival_ratio),
"coi_erosion": float(coi_erosion(self._last_coi.policy, self._last_coi.agent)),
}
)
return self.state, float(reward), terminated, False, info
return obs, reward, terminated, False, info
def render(self, mode: str = "human") -> str | None:
if self._prices is None:
return None
out = (
f"t={self._t}/{self.c.max_steps} "
f"alpha_true={self._alpha_true:.3f} alpha_hat={self._alpha_hat:.3f} "
f"price_std={float(np.std(self._prices)):.2f}"
)
if mode == "human":
print(out)
return out
if __name__ == "__main__":
import matplotlib.pyplot as plt
from collections import defaultdict
env = PHANTOMEnv(constraints=BusinessLogicConstraints())
obs, _ = env.reset(seed=42)
metrics = defaultdict(list)
total_reward = 0.0
done = False
while not done:
action = env.action_space.sample()
obs, reward, done, _, info = env.step(action)
total_reward += reward
p_mean = float(np.mean(obs["elasticity"]["price"]))
q_mean = float(np.mean(obs["elasticity"]["demand"]))
p_std = float(np.std(obs["elasticity"]["price"]))
metrics['t'].append(info['t'])
metrics['price_mean'].append(p_mean)
metrics['price_std'].append(p_std)
metrics['demand_mean'].append(q_mean)
metrics['revenue_observed'].append(info['revenue_observed'])
metrics['revenue_oracle'].append(info['revenue_oracle'])
metrics['agent_loss'].append(info['agent_loss'])
metrics['ux_volatility'].append(info['ux_volatility'])
metrics['look_to_book'].append(info['look_to_book'])
metrics['reward'].append(reward)
metrics['human_purchases'].append(info['true_human_purchases_total'])
metrics['agent_purchases'].append(info['true_agent_purchases_total'])
metrics['coi'].append(info.get('coi', 0.0))
metrics['alpha_hat'].append(info.get('alpha_hat', env.commerce_platform.alpha_hat))
metrics['mean_human_demand'].append(info.get('mean_human_demand', 0.0))
metrics['mean_agent_demand'].append(info.get('mean_agent_demand', 0.0))
metrics['delta_h_mean'].append(info.get('delta_h_mean', 0.0))
metrics['delta_a_mean'].append(info.get('delta_a_mean', 0.0))
metrics['prob_agent_mean'].append(info.get('prob_agent_mean', 0.0))
if info['t'] % 20 == 0 or done:
print(f"t={info['t']:03d} p={p_mean:6.2f}±{p_std:4.2f} q={q_mean:6.2f} "
f"rev={info['revenue_observed']:7.2f} oracle={info['revenue_oracle']:7.2f} "
f"loss={info['agent_loss']:6.2f} ux={info['ux_volatility']:.3f} "
f"coi={info.get('coi', 0.0):6.2f} alpha={info.get('alpha_hat', 0.0):4.2f} "
f"ltb={info['look_to_book']:5.2f} r={reward:7.2f}")
print(f"total_reward={total_reward:.2f}")
fig, axes = plt.subplots(3, 4, figsize=(18, 12))
fig.suptitle('PHANTOM Environment Run', fontsize=14, fontweight='bold')
plot_configs = [
('price_mean', 'Mean Price', 'Price'),
('demand_mean', 'Mean Demand (All)', 'Demand'),
('mean_human_demand', 'Mean Human Demand', 'Count'),
('mean_agent_demand', 'Mean Agent Demand', 'Count'),
('revenue_observed', 'Revenue (Observed)', 'Revenue'),
('agent_loss', 'Agent Loss (Oracle - Observed)', 'Loss'),
('coi', 'Cost of Information', 'COI'),
('alpha_hat', 'Estimated α̂', 'alpha'),
('ux_volatility', 'UX Volatility (Price Change)', 'Volatility'),
('look_to_book', 'Look-to-Book Ratio', 'Ratio'),
('reward', 'Step Reward', 'Reward'),
('prob_agent_mean', 'Avg Agent Probability', 'Probability'),
]
for idx, (key, title, ylabel) in enumerate(plot_configs):
ax = axes[idx // 4, idx % 4]
ax.plot(metrics['t'], metrics[key], color='blue', alpha=0.7, linewidth=1.5)
ax.set_xlabel('Step')
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=10, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('phantom_env_comparison.png', dpi=150, bbox_inches='tight')
print("Plot saved to phantom_env_comparison.png")
plt.show()
def close(self) -> None:
return