feat: consistent failure case

This commit is contained in:
2026-01-24 15:16:41 +01:00
parent bae51daa1c
commit 4033e73ba1
3 changed files with 264 additions and 54 deletions

View File

@@ -79,22 +79,175 @@ def estimate_alpha(session: Session, beta: float = 2.0) -> float:
return 1.0 / (1.0 + np.exp(-beta * (dh - da))) if (dh + da) > 0 else 0.5
def sample_trajectory(rng: np.random.Generator, trans: Dict, prices: np.ndarray, is_agent: bool) -> Tuple[List[Event], int]:
@dataclass(frozen=True)
class COIWindow:
"""Windowed COI metrics computed from realized price exposures.
COI_policy is the definition-level KPI: E[p_shown] - p_min.
COI_agent is the theorem-level object: E[p^(1)] - p_min, where p^(1) is the minimum price realized under agent querying.
In this simplified simulator, p^(1) is approximated as the minimum price exposed to any agent in the window (per product).
Leak is the observable gap between them.
"""
policy: float
agent: float
leak: float
survival_ratio: float
policy_by_product: np.ndarray
agent_by_product: np.ndarray
demand_weights: np.ndarray
def _prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]:
prices: Dict[int, List[float]] = {}
for s in sessions:
for e in s.events:
prices.setdefault(e.product_idx, []).append(float(e.price_seen))
return prices
def _min_session_prices_by_product(sessions: List[Session]) -> Dict[int, List[float]]:
mins: Dict[int, List[float]] = {}
for s in sessions:
by_p: Dict[int, float] = {}
for e in s.events:
pidx = int(e.product_idx)
price = float(e.price_seen)
by_p[pidx] = price if pidx not in by_p else min(by_p[pidx], price)
for pidx, pmin in by_p.items():
mins.setdefault(pidx, []).append(pmin)
return mins
def _min_price_across_sessions_by_product(sessions: List[Session]) -> Dict[int, float]:
mins: Dict[int, float] = {}
for s in sessions:
for e in s.events:
pidx = int(e.product_idx)
price = float(e.price_seen)
mins[pidx] = price if pidx not in mins else min(mins[pidx], price)
return mins
def _demand_weights_by_product(
sessions: List[Session],
demand_mapping: Dict[str, float],
n_products: int,
) -> np.ndarray:
w = np.zeros(n_products, dtype=float)
sessions_by_id = {s.sid: s for s in sessions}
for sid, q in demand_mapping.items():
sess = sessions_by_id.get(sid)
if not sess or not sess.events:
continue
pidx = int(sess.events[0].product_idx)
w[pidx] += float(q)
s = float(np.sum(w))
return (w / s) if s > 0 else w
def compute_coi_window(
sessions: List[Session],
costs: np.ndarray,
demand_mapping: Dict[str, float] | None = None,
) -> COIWindow:
n_products = int(len(costs))
prices = _prices_by_product(sessions)
agent_min_across = _min_price_across_sessions_by_product([s for s in sessions if s.actor == "A"])
policy_by_product = np.zeros(n_products, dtype=float)
agent_by_product = np.zeros(n_products, dtype=float)
seen = np.array([(i in prices) for i in range(n_products)], dtype=bool)
agent_seen = np.array([(i in agent_min_across) for i in range(n_products)], dtype=bool)
for pidx, ps in prices.items():
if 0 <= pidx < n_products and ps:
policy_by_product[pidx] = float(np.mean(ps) - float(costs[pidx]))
for pidx, pmin in agent_min_across.items():
if 0 <= pidx < n_products:
agent_by_product[pidx] = float(pmin - float(costs[pidx]))
# If no agent exposure exists for a product in the window, there is no realized erosion for that product.
agent_by_product[seen & ~agent_seen] = policy_by_product[seen & ~agent_seen]
demand_weights = (
_demand_weights_by_product(sessions, demand_mapping, n_products)
if demand_mapping is not None
else np.zeros(n_products, dtype=float)
)
has_weights = float(np.sum(demand_weights)) > 0
if has_weights:
policy = float(np.dot(demand_weights, policy_by_product))
agent = float(np.dot(demand_weights, agent_by_product))
else:
if not bool(np.any(seen)):
policy = 0.0
agent = 0.0
else:
policy = float(np.mean(policy_by_product[seen]))
agent = float(np.mean(agent_by_product[seen]))
leak = float(max(policy - agent, 0.0))
survival_ratio = float(np.clip(agent / policy, 0.0, 1.0)) if policy > 0 else 0.0
return COIWindow(
policy=policy,
agent=agent,
leak=leak,
survival_ratio=survival_ratio,
policy_by_product=policy_by_product,
agent_by_product=agent_by_product,
demand_weights=demand_weights,
)
def sample_trajectory(
rng: np.random.Generator,
trans: Dict,
prices: np.ndarray,
costs: np.ndarray,
theta: Dict[str, float],
is_agent: bool,
session_price_noise: float = 0.02,
surge: float = 0.08,
max_markup_mult: float = 1.8,
) -> Tuple[List[Event], int]:
"""Sample session trajectory from behavioral kernel."""
state, t, pidx = "start", 0.0, int(rng.integers(0, len(prices)))
cost = float(costs[pidx])
base_price = float(prices[pidx]) * float(1.0 + rng.normal(0.0, session_price_noise))
base_price = float(np.clip(base_price, cost * 1.01, float(prices[pidx]) * 2.0))
current_price = base_price
signal = 0.0
events = []
# TODO: instead of this very controlled setup implement same session samplin as in models.py
while state != "end" and len(events) < 30:
if state != "start":
events.append(Event(action=state, product_idx=pidx, price_seen=float(prices[pidx]), ts=t))
probs = trans.get(state, {"end": 1.0})
state = rng.choice(list(probs.keys()), p=list(probs.values()))
nxt = rng.choice(list(probs.keys()), p=list(probs.values()))
if nxt == "purchase":
price_sens = float(theta.get("price_sens", 2.0))
base_conv = float(theta.get("base_conv", 0.2))
rel = max((current_price - cost) / (cost + 1e-6), 0.0)
p_buy = float(np.clip(base_conv * np.exp(-price_sens * rel), 0.0, 1.0))
if rng.random() > p_buy:
nxt = "end"
state = nxt
if state not in {"start", "end"}:
events.append(Event(action=state, product_idx=pidx, price_seen=float(current_price), ts=t))
signal += float(ACTION_WEIGHTS.get(state, 0.1))
current_price = float(np.clip(base_price * (1.0 + surge * signal), cost * 1.01, base_price * max_markup_mult))
t += max(0.2, rng.gamma(1.5, 0.8) if is_agent else rng.gamma(2.0, 1.2))
return events, pidx
def put_prices_to_market(prices: np.ndarray, alpha: float = 0.2, n_sessions: int = 50,
def put_prices_to_market(prices: np.ndarray, costs: np.ndarray, alpha: float = 0.2, n_sessions: int = 50,
seed: int | None = None) -> Tuple[List[Session], Dict[str, float]]:
"""Generate sessions from mixture model Q(p) = (1-α)E[d_H] + αE[d_A] (Eq 3).
"""Generate sessions from mixture model
Returns:
sessions: list of Session objects with events and product attribution
@@ -108,7 +261,7 @@ def put_prices_to_market(prices: np.ndarray, alpha: float = 0.2, n_sessions: int
is_agent = rng.random() < alpha
trans = TRANS_A if is_agent else TRANS_H
theta = {"price_sens": rng.uniform(0.05, 0.2), "base_conv": 0.01} if is_agent else {"price_sens": rng.uniform(1.5, 4.0), "base_conv": rng.uniform(0.2, 0.5)}
events, _ = sample_trajectory(rng, trans, prices, is_agent)
events, _ = sample_trajectory(rng, trans, prices, costs=costs, theta=theta, is_agent=is_agent)
session = Session(sid=sid, events=events, actor="A" if is_agent else "H", theta=theta)
sessions.append(session)
demand_mapping[sid] = compute_demand(session)
@@ -167,6 +320,8 @@ class System:
self.limbo = Limbo()
self._alpha_est = 0.2 # current contamination estimate
self._sessions: List[Session] = []
self._last_sessions: List[Session] = []
self._last_coi: COIWindow | None = None
@property
def alpha(self) -> float:
@@ -190,24 +345,27 @@ class System:
agg_demand[pidx] += q
return float(np.dot(prices, agg_demand))
def _coi_leakage(self, prices: np.ndarray, n_agents: int = 1) -> float:
"""COI leakage tied to Theorem 1: erosion from order statistic collapse.
As N agents query, min(p_1..p_N) → p_min and COI → 0.
Leakage = erosion_rate × margin_at_risk
"""
price_std = float(np.std(prices))
erosion = coi_erosion(max(1, n_agents), price_std)
margin_at_risk = float(np.mean(prices - self.costs))
return erosion * margin_at_risk
def _compute_coi_window(self, demand: Dict[str, float]) -> COIWindow:
if not self._last_sessions:
zeros = np.zeros(self.n, dtype=float)
return COIWindow(
policy=0.0,
agent=0.0,
leak=0.0,
survival_ratio=0.0,
policy_by_product=zeros,
agent_by_product=zeros,
demand_weights=zeros,
)
return compute_coi_window(self._last_sessions, self.costs, demand_mapping=demand)
def _objective(self, prices: np.ndarray, demand: Dict[str, float]) -> float:
"""Robust objective: R(p,d) - λ·COI_leak (Eq 23 simplified)."""
revenue = self._revenue_under_demand(prices, demand)
cost = float(np.sum(self.costs)) # fixed cost approximation
profit = revenue - cost
coi_penalty = self.lambda_coi * self._coi_leakage(prices) * float(np.mean(prices - self.costs))
return profit - coi_penalty
self._last_coi = self._compute_coi_window(demand)
return profit - self.lambda_coi * self._last_coi.leak
def compute_prices(self, demand: Dict[str, float] | None = None) -> np.ndarray:
"""Compute next prices via simple gradient-like update on robust objective.
@@ -230,28 +388,44 @@ class System:
def observe_demand(self, prices: np.ndarray, alpha_true: float = 0.2, n_sessions: int = 50) -> Dict[str, float]:
"""Observe market response to prices."""
sessions, demand_map = put_prices_to_market(prices, alpha=alpha_true, n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000)))
sessions, demand_map = put_prices_to_market(prices, costs=self.costs, alpha=alpha_true, n_sessions=n_sessions, seed=int(self.rng.integers(0, 10000)))
self._last_sessions = sessions
self._sessions.extend(sessions) # store actual sessions for correct product attribution
self.limbo.add_update("demand", demand_map)
return demand_map
def step(self, alpha_true: float = 0.2, n_sessions: int = 50) -> Tuple[np.ndarray, Dict[str, float], float]:
def step(self, alpha_true: float = 0.2, n_sessions: int = 50) -> Tuple[np.ndarray, Dict[str, float], float, COIWindow]:
"""Single simulation step: prices -> demand -> reward."""
demand_hist = self.limbo.get_demand_history()
prices = self.compute_prices(demand_hist[-1] if demand_hist else None)
demand = self.observe_demand(prices, alpha_true, n_sessions)
reward = self._objective(prices, demand)
return prices, demand, reward
coi = self._last_coi or self._compute_coi_window(demand)
return prices, demand, reward, coi
def run(self, n_steps: int = 100, alpha_true: float = 0.2) -> Dict:
"""Run simulation for n_steps, return trajectory."""
trajectory = {"prices": [], "demand": [], "rewards": [], "alpha_est": [], "alpha_true": alpha_true}
trajectory = {
"prices": [],
"demand": [],
"rewards": [],
"alpha_est": [],
"alpha_true": alpha_true,
"coi_policy": [],
"coi_agent": [],
"coi_leak": [],
"coi_survival": [],
}
for _ in range(n_steps):
p, d, r = self.step(alpha_true)
p, d, r, coi = self.step(alpha_true)
trajectory["prices"].append(p)
trajectory["demand"].append(d)
trajectory["rewards"].append(r)
trajectory["alpha_est"].append(self._alpha_est)
trajectory["coi_policy"].append(coi.policy)
trajectory["coi_agent"].append(coi.agent)
trajectory["coi_leak"].append(coi.leak)
trajectory["coi_survival"].append(coi.survival_ratio)
return trajectory
@@ -268,10 +442,17 @@ if __name__ == "__main__":
# quick demo
sys = System(n_products=5, seed=42)
traj = sys.run(n_steps=20, alpha_true=0.25)
print(f"avg reward: {np.mean(traj['rewards']):.2f}, final α̂: {traj['alpha_est'][-1]:.3f}")
print(
f"avg reward: {np.mean(traj['rewards']):.2f}, "
f"final α̂: {traj['alpha_est'][-1]:.3f}, "
f"COI_policy: {np.mean(traj['coi_policy']):.3f}, "
f"COI_agent: {np.mean(traj['coi_agent']):.3f}, "
f"leak: {np.mean(traj['coi_leak']):.3f}"
)
prices = np.array([20.0, 35.0, 50.0, 25.0, 40.0])
sessions, demand = put_prices_to_market(prices, alpha=0.3, n_sessions=20, seed=123)
costs = np.array([15.0, 28.0, 40.0, 18.0, 30.0])
sessions, demand = put_prices_to_market(prices, costs=costs, alpha=0.3, n_sessions=20, seed=123)
print(f'sessions: {len(sessions)}, agents: {sum(1 for s in sessions if s.actor=="A")}')
for n in [1, 5, 10, 50, 100]:

View File

@@ -19,8 +19,19 @@ try:
except ImportError:
HAS_GYM = False
from .simplified import (System, Session, Event, Limbo, put_prices_to_market,
compute_demand, estimate_alpha, coi_erosion, TRANS_H, TRANS_A)
from .simplified import (
System,
Session,
Event,
Limbo,
put_prices_to_market,
compute_coi_window,
compute_demand,
estimate_alpha,
coi_erosion,
TRANS_H,
TRANS_A,
)
@dataclass
@@ -116,9 +127,19 @@ class PricingEnv(gym.Env if HAS_GYM else object):
agg[pidx] += q
self._demand_agg = agg
revenue = float(np.dot(prices, agg))
cost = float(np.dot(sys.costs, np.clip(agg, 0, 1))) # simplified cost model
profit = revenue - cost
revenue = 0.0
cost = 0.0
purchases = np.zeros(self.n, dtype=float)
for sess in sys._last_sessions:
for e in sess.events:
if e.action != "purchase":
continue
pidx = int(e.product_idx)
if 0 <= pidx < self.n:
purchases[pidx] += 1.0
revenue += float(e.price_seen)
cost += float(sys.costs[pidx])
profit = float(revenue - cost)
# volatility penalty (price changes)
vol_penalty = 0.0
@@ -126,9 +147,8 @@ class PricingEnv(gym.Env if HAS_GYM else object):
price_change = np.abs(prices - self._last_prices) / (sys.refs + 1e-6)
vol_penalty = cfg.lambda_vol * float(np.mean(price_change))
# COI leakage penalty
avg_margin = float(np.mean(prices - sys.costs))
coi_leak = sys.alpha * avg_margin
coi = compute_coi_window(sys._last_sessions, sys.costs, demand_mapping=demand)
coi_leak = float(coi.leak)
if cfg.reward_mode == "revenue":
r = revenue
@@ -181,11 +201,11 @@ class PricingEnv(gym.Env if HAS_GYM else object):
prices = self._sys.refs * action.astype(np.float64)
prices = np.clip(prices, self._sys.costs * 1.01, self._sys.refs * 2.0)
# drift contamination
if self.cfg.alpha_drift != 0:
self._alpha = np.clip(
self._alpha + self.cfg.alpha_drift * self._sys.rng.normal(),
*self.cfg.alpha_bounds)
# # drift contamination
# if self.cfg.alpha_drift != 0:
# self._alpha = np.clip(
# self._alpha + self.cfg.alpha_drift * self._sys.rng.normal(),
# *self.cfg.alpha_bounds)
# observe demand
demand = self._sys.observe_demand(prices, alpha_true=self._alpha, n_sessions=self.cfg.sessions_per_step)
@@ -205,25 +225,38 @@ class PricingEnv(gym.Env if HAS_GYM else object):
truncated = False
# compute metrics for tracking
revenue = float(np.dot(prices, self._demand_agg))
cost = float(np.dot(self._sys.costs, np.clip(self._demand_agg, 0, 1)))
profit = revenue - cost
revenue = 0.0
cost = 0.0
n_purchases = 0
for sess in self._sys._last_sessions:
for e in sess.events:
if e.action != "purchase":
continue
n_purchases += 1
revenue += float(e.price_seen)
cost += float(self._sys.costs[int(e.product_idx)])
profit = float(revenue - cost)
n_agents = int(self._alpha * self.cfg.sessions_per_step)
price_std = float(np.std(prices))
coi = compute_coi_window(self._sys._last_sessions, self._sys.costs, demand_mapping=demand)
info = {
"alpha_true": self._alpha,
"alpha_est": self._sys.alpha,
"alpha_error": abs(self._alpha - self._sys.alpha),
"revenue": revenue,
"profit": profit,
"cost": cost,
"revenue": float(revenue),
"profit": float(profit),
"cost": float(cost),
"n_purchases": int(n_purchases),
"avg_margin": float(np.mean((prices - self._sys.costs) / self._sys.costs)),
"n_sessions": len(demand),
"n_agents": n_agents,
"price_std": price_std,
"coi_erosion": coi_erosion(max(1, n_agents), price_std),
"coi_leakage": self._sys.alpha * float(np.mean(prices - self._sys.costs)),
"coi_policy": float(coi.policy),
"coi_agent": float(coi.agent),
"coi_leakage": float(coi.leak),
"coi_survival": float(coi.survival_ratio),
"cumulative_reward": sum(self._episode_rewards),
"step": self._t,
}

View File

@@ -2,11 +2,6 @@
Trains pricing policies using stable-baselines3 with TensorBoard logging.
Tracks COI erosion, alpha estimation error, and economic KPIs per thesis formulation.
Usage:
python -m lab.case.thesis.train --algo ppo --alpha 0.3 --steps 100000
python -m lab.case.thesis.train --algo adaptive --sweep # run alpha sweep
tensorboard --logdir lab/case/thesis/runs
"""
from __future__ import annotations
import argparse
@@ -41,9 +36,9 @@ class EpisodeMetrics:
reward: float = 0.0
revenue: float = 0.0
profit: float = 0.0
coi_erosion: float = 0.0 # theorem 1: order statistic erosion
coi_leakage: float = 0.0 # per-step leakage penalty
alpha_error: float = 0.0 # |α - α̂|
coi_erosion: float = 0.0
coi_leakage: float = 0.0
alpha_error: float = 0.0
avg_margin: float = 0.0
n_agents: int = 0
steps: int = 0
@@ -213,6 +208,7 @@ def train(cfg: ExperimentConfig) -> Dict[str, Any]:
if algo_cls is None:
raise ValueError(f"unknown algo: {cfg.algo}")
common = dict(verbose=1, seed=cfg.seed, tensorboard_log=str(log_path), device="auto")
# TODO: setup hyper parameter passing to train different variations (no free lunch)
if cfg.algo.lower() == "ppo":
model = PPO("MlpPolicy", train_env, learning_rate=3e-4, n_steps=2048,
batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95,