minor refactors to codebase to implement DRO

This commit is contained in:
2026-02-14 14:53:30 +01:00
parent 895eea5674
commit bc6c481d03
6 changed files with 195 additions and 75 deletions

View File

@@ -12,11 +12,23 @@ from .lib.behavior import get_transition_models, trajectory_to_events
from .lib.wrappers import EconomicMetricsWrapper
class _ActionPricingEngine(PricingEngine):
def __init__(self, n_products: int, price_bounds: tuple):
self._prices = np.full(n_products, price_bounds[0], dtype=float)
def set_prices(self, prices: np.ndarray):
self._prices = np.asarray(prices, dtype=float)
def act(self, _):
return self._prices
class PHANTOM(gym.Env):
"""Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework
reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL
COI_leak uses behavioral divergence to estimate agent probability f(τ')
robust inner step: min over alpha in Wasserstein interval around nominal alpha
"""
metadata = {"render_modes": ["human", "ansi"]}
@@ -32,6 +44,9 @@ class PHANTOM(gym.Env):
price_bounds: tuple = (10.0, 150.0),
lambda_coi: float = 0.1,
coi_window: int = 10,
robust_radius: float = 0.0,
robust_points: int = 5,
info_value: float = 1.0,
render_mode: str = None,
):
super().__init__()
@@ -40,10 +55,14 @@ class PHANTOM(gym.Env):
self.lambda_coi = lambda_coi
self.coi_window = coi_window
self.render_mode = render_mode
self.alpha = alpha
self.alpha = float(alpha)
self.nominal_alpha = float(alpha)
self.N = N
self.human_params = human_params
self.agent_params = agent_params
self.robust_radius = max(0.0, float(robust_radius))
self.robust_points = max(1, int(robust_points))
self.info_value = float(info_value)
self.market = MarketEngine(
alpha=alpha,
@@ -52,8 +71,9 @@ class PHANTOM(gym.Env):
agent_params=agent_params,
noise_std=noise_std,
)
self._platform_stub = PricingEngine()
self._platform_stub = _ActionPricingEngine(n_products, price_bounds)
self._limbo = Limbo(self._platform_stub, self.market)
self._set_market_mix(self.nominal_alpha)
self.action_space = spaces.Box(
low=price_bounds[0],
@@ -99,53 +119,72 @@ class PHANTOM(gym.Env):
)
return {"demand": demand_arr, "prices": self._prices.astype(np.float32)}
def _compute_agent_prob(self) -> float:
"""estimate agent probability from accumulated trajectories using KL divergence"""
if (
not self._trajectories
or self._human_trans is None
or self._agent_trans is None
):
return self.alpha # fallback to contamination level
def _set_market_mix(self, alpha: float):
alpha = float(np.clip(alpha, 0.0, 1.0))
n_agents = int(self.N * alpha)
self.alpha = alpha
self.market.alpha = alpha
self.market.Nagents = n_agents
self.market.Nhumans = self.N - n_agents
# aggregate all trajectories from this episode
all_events = []
for traj in self._trajectories:
all_events.extend(trajectory_to_events(traj))
if len(all_events) < 2:
return self.alpha
return compute_agent_probability(
all_events, self._human_trans, self._agent_trans
def _compute_agent_prob(self, trajectories=None) -> float:
trajectories = (
self.market.last_trajectories if trajectories is None else trajectories
)
if not trajectories or self._human_trans is None or self._agent_trans is None:
return float(self.market.alpha)
probs = []
for traj in trajectories:
events = trajectory_to_events(traj)
if len(events) < 2:
continue
probs.append(
compute_agent_probability(events, self._human_trans, self._agent_trans)
)
return float(np.mean(probs)) if probs else float(self.market.alpha)
def _compute_reward(self, prices: np.ndarray, demand: dict) -> tuple[float, dict]:
revenue = sum(prices[i] * demand.get(i, 0.0) for i in range(self.n_products))
trajs_mix = self.market.last_trajectories
purchases_mix = extract_purchases(trajs_mix)
coi_mix = compute_uplift_coi(prices, purchases_mix, self.baseline_prices)
old_state = (self.market.alpha, self.market.Nagents, self.market.Nhumans)
self.market.alpha, self.market.Nagents, self.market.Nhumans = 0.0, 0, self.N
self.market.act(prices)
purchases_base = extract_purchases(self.market.last_trajectories)
coi_base = compute_uplift_coi(prices, purchases_base, self.baseline_prices)
self.market.alpha, self.market.Nagents, self.market.Nhumans = old_state
coi_leakage = max(0.0, coi_base - coi_mix)
coi_penalty = max(self.lambda_coi * coi_leakage, 1000) / 1000
coi_penalty *= revenue
def _compute_reward(
self, prices: np.ndarray, demand: dict, agent_prob: float, trajectories: list
) -> tuple[float, dict]:
demand_arr = np.array(
[demand.get(i, 0.0) for i in range(self.n_products)], dtype=float
)
revenue = float(np.dot(prices, demand_arr))
purchases = extract_purchases(trajectories)
coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices)
coi_leakage = float(agent_prob * self.info_value)
coi_penalty = float(self.lambda_coi * coi_leakage)
return float(revenue - coi_penalty), {
"revenue": float(revenue),
"revenue": revenue,
"coi_mix": float(coi_mix),
"coi_base": float(coi_base),
"coi_leakage": float(coi_leakage),
"coi_penalty": float(coi_penalty),
"coi_base": 0.0,
"coi_leakage": coi_leakage,
"coi_penalty": coi_penalty,
}
def _alpha_candidates(self) -> np.ndarray:
if self.robust_radius <= 0.0 or self.robust_points == 1:
return np.array([self.nominal_alpha], dtype=float)
lo = max(0.0, self.nominal_alpha - self.robust_radius)
hi = min(1.0, self.nominal_alpha + self.robust_radius)
return np.linspace(lo, hi, self.robust_points)
def _select_adversarial_alpha(self, prices: np.ndarray) -> float:
candidates = self._alpha_candidates()
if len(candidates) == 1:
return float(candidates[0])
best_alpha, worst_reward = float(candidates[0]), np.inf
for alpha in candidates:
self._set_market_mix(float(alpha))
demand = self.market.act(prices)
trajectories = self.market.last_trajectories
agent_prob = self._compute_agent_prob(trajectories)
reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories)
if reward < worst_reward:
worst_reward = reward
best_alpha = float(alpha)
return best_alpha
def _record_history(self):
demand_arr = np.array(
[self._demand.get(i, 0.0) for i in range(self.n_products)]
@@ -156,32 +195,42 @@ class PHANTOM(gym.Env):
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._set_market_mix(self.nominal_alpha)
self._limbo.reset()
self._prices = np.random.uniform(*self.price_bounds, size=self.n_products)
self._platform_stub.set_prices(self._prices)
self._limbo.step()
self._demand = self._limbo.step()
self._initial_episode_prices = self._prices.copy()
self._demand = self.market.act(self._prices)
self._step_count = 0
self._demand_history, self._price_history, self._revenue_history = [], [], []
self._trajectories = []
self._trajectories = list(getattr(self.market, "last_trajectories", []))
self._record_history()
return self._get_obs(), {}
def step(self, action: np.ndarray):
self._prices = np.clip(action, *self.price_bounds)
self._demand = self.market.act(self._prices)
alpha_adv = self._select_adversarial_alpha(self._prices)
self._set_market_mix(alpha_adv)
self._platform_stub.set_prices(self._prices)
self._limbo.step()
self._demand = self._limbo.step()
trajectories = getattr(self.market, "last_trajectories", [])
self._step_count += 1
self._trajectories.extend(trajectories)
agent_prob = self._compute_agent_prob(trajectories)
reward, metrics = self._compute_reward(
self._prices, self._demand, agent_prob, trajectories
)
self._record_history()
# capture trajectories generated by market for agent prob estimation
if hasattr(self.market, "last_trajectories"):
self._trajectories.extend(self.market.last_trajectories)
agent_prob = self._compute_agent_prob()
reward, metrics = self._compute_reward(self._prices, self._demand)
terminated = self._step_count >= 100
info = {
"step": self._step_count,
"agent_prob": agent_prob,
"alpha_adv": float(alpha_adv),
"wasserstein_radius": float(self.robust_radius),
**metrics,
"raw_revenue": np.sum(
self._prices