mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
337 lines
13 KiB
Python
337 lines
13 KiB
Python
import gymnasium as gym
|
|
from gymnasium import spaces
|
|
import numpy as np
|
|
from .engine import Limbo, MarketEngine, PricingEngine
|
|
from .lib.render import DashboardRenderer
|
|
from .lib.coi import (
|
|
compute_uplift_coi,
|
|
extract_purchases,
|
|
compute_agent_probability,
|
|
)
|
|
from .lib.behavior import get_transition_models, trajectory_to_events
|
|
from .lib.wrappers import EconomicMetricsWrapper
|
|
|
|
|
|
class _ActionPricingEngine(PricingEngine):
|
|
def __init__(self, n_products: int, price_bounds: tuple):
|
|
self._prices = np.full(n_products, price_bounds[0], dtype=float)
|
|
|
|
def set_prices(self, prices: np.ndarray):
|
|
self._prices = np.asarray(prices, dtype=float)
|
|
|
|
def act(self, _):
|
|
return self._prices
|
|
|
|
|
|
class PHANTOM(gym.Env):
|
|
"""Gymnasium wrapper for Limbo pricing-market simulation implementing thesis COI framework
|
|
|
|
reward = R(p,d) - λ·COI_leak(p,τ') per thesis Section on DR-RL
|
|
COI_leak uses behavioral divergence to estimate agent probability f(τ')
|
|
robust inner step: min over alpha in Wasserstein interval around nominal alpha
|
|
actions are discrete global price-scale moves
|
|
"""
|
|
|
|
metadata = {"render_modes": ["human", "ansi"]}
|
|
|
|
def __init__(
|
|
self,
|
|
n_products: int = 10,
|
|
alpha: float = 0.3,
|
|
N: int = 100,
|
|
human_params: tuple = (50.0, 10.0),
|
|
agent_params: tuple = (45.0, 15.0),
|
|
noise_std: float = 1.0,
|
|
price_bounds: tuple = (10.0, 150.0),
|
|
lambda_coi: float = 0.1,
|
|
coi_window: int = 10,
|
|
robust_radius: float = 0.0,
|
|
robust_points: int = 5,
|
|
info_value: float = 1.0,
|
|
action_levels: int = 9,
|
|
action_scale_low: float = 0.9,
|
|
action_scale_high: float = 1.1,
|
|
render_mode: str = None,
|
|
):
|
|
super().__init__()
|
|
self.n_products = n_products
|
|
self.price_bounds = price_bounds
|
|
self.lambda_coi = lambda_coi
|
|
self.coi_window = coi_window
|
|
self.render_mode = render_mode
|
|
self.alpha = float(alpha)
|
|
self.nominal_alpha = float(alpha)
|
|
self.N = N
|
|
self.human_params = human_params
|
|
self.agent_params = agent_params
|
|
self.robust_radius = max(0.0, float(robust_radius))
|
|
self.robust_points = max(1, int(robust_points))
|
|
self.info_value = float(info_value)
|
|
self.action_levels = max(2, int(action_levels))
|
|
self._action_scales = np.linspace(
|
|
float(action_scale_low), float(action_scale_high), self.action_levels
|
|
)
|
|
|
|
self.market = MarketEngine(
|
|
alpha=alpha,
|
|
N=N,
|
|
human_params=human_params,
|
|
agent_params=agent_params,
|
|
noise_std=noise_std,
|
|
)
|
|
self._platform_stub = _ActionPricingEngine(n_products, price_bounds)
|
|
self._limbo = Limbo(self._platform_stub, self.market)
|
|
self._set_market_mix(self.nominal_alpha)
|
|
|
|
self.action_space = spaces.Discrete(self.action_levels)
|
|
self.observation_space = spaces.Dict(
|
|
{
|
|
"demand": spaces.Box(
|
|
low=0.0, high=100.0, shape=(n_products,), dtype=np.float32
|
|
),
|
|
"prices": spaces.Box(
|
|
low=price_bounds[0],
|
|
high=price_bounds[1],
|
|
shape=(n_products,),
|
|
dtype=np.float32,
|
|
),
|
|
}
|
|
)
|
|
|
|
self._prices = None
|
|
self._demand = None
|
|
self._step_count = 0
|
|
self._demand_history = []
|
|
self._price_history = []
|
|
self._revenue_history = []
|
|
self._renderer = None
|
|
self._initial_episode_prices = None
|
|
self._trajectories = [] # session trajectories for agent prob calculation
|
|
self.baseline_prices = np.full(self.n_products, self.price_bounds[0])
|
|
|
|
# load behavioral models for agent probability estimation
|
|
try:
|
|
self._human_trans, self._agent_trans = get_transition_models()
|
|
except Exception:
|
|
# fallback if behavioral data unavailable
|
|
self._human_trans, self._agent_trans = None, None
|
|
|
|
def _get_obs(self) -> dict:
|
|
demand_arr = np.array(
|
|
[self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32
|
|
)
|
|
return {"demand": demand_arr, "prices": self._prices.astype(np.float32)}
|
|
|
|
def _set_market_mix(self, alpha: float):
|
|
alpha = float(np.clip(alpha, 0.0, 1.0))
|
|
n_agents = int(self.N * alpha)
|
|
self.alpha = alpha
|
|
self.market.alpha = alpha
|
|
self.market.Nagents = n_agents
|
|
self.market.Nhumans = self.N - n_agents
|
|
|
|
def _decode_action(self, action) -> np.ndarray:
|
|
base = (
|
|
self._prices
|
|
if self._prices is not None
|
|
else np.full(self.n_products, self.price_bounds[0], dtype=float)
|
|
)
|
|
if np.isscalar(action):
|
|
idx = int(np.clip(int(action), 0, self.action_levels - 1))
|
|
return np.clip(base * self._action_scales[idx], *self.price_bounds)
|
|
a = np.asarray(action)
|
|
if a.size == 1:
|
|
idx = int(np.clip(int(a.reshape(-1)[0]), 0, self.action_levels - 1))
|
|
return np.clip(base * self._action_scales[idx], *self.price_bounds)
|
|
return np.clip(a.astype(float), *self.price_bounds)
|
|
|
|
def _compute_agent_prob(self, trajectories=None) -> float:
|
|
trajectories = (
|
|
self.market.last_trajectories if trajectories is None else trajectories
|
|
)
|
|
if not trajectories or self._human_trans is None or self._agent_trans is None:
|
|
return float(self.market.alpha)
|
|
probs = []
|
|
for traj in trajectories:
|
|
events = trajectory_to_events(traj)
|
|
if len(events) < 2:
|
|
continue
|
|
probs.append(
|
|
compute_agent_probability(events, self._human_trans, self._agent_trans)
|
|
)
|
|
return float(np.mean(probs)) if probs else float(self.market.alpha)
|
|
|
|
def _compute_reward(
|
|
self, prices: np.ndarray, demand: dict, agent_prob: float, trajectories: list
|
|
) -> tuple[float, dict]:
|
|
demand_arr = np.array(
|
|
[demand.get(i, 0.0) for i in range(self.n_products)], dtype=float
|
|
)
|
|
revenue = float(np.dot(prices, demand_arr))
|
|
purchases = extract_purchases(trajectories)
|
|
coi_mix = compute_uplift_coi(prices, purchases, self.baseline_prices)
|
|
coi_leakage = float(agent_prob * self.info_value)
|
|
coi_penalty = float(self.lambda_coi * coi_leakage)
|
|
return float(revenue - coi_penalty), {
|
|
"revenue": revenue,
|
|
"coi_mix": float(coi_mix),
|
|
"coi_base": 0.0,
|
|
"coi_leakage": coi_leakage,
|
|
"coi_penalty": coi_penalty,
|
|
}
|
|
|
|
def _alpha_candidates(self) -> np.ndarray:
|
|
if self.robust_radius <= 0.0 or self.robust_points == 1:
|
|
return np.array([self.nominal_alpha], dtype=float)
|
|
lo = max(0.0, self.nominal_alpha - self.robust_radius)
|
|
hi = min(1.0, self.nominal_alpha + self.robust_radius)
|
|
return np.linspace(lo, hi, self.robust_points)
|
|
|
|
def _select_adversarial_alpha(self, prices: np.ndarray) -> float:
|
|
candidates = self._alpha_candidates()
|
|
if len(candidates) == 1:
|
|
return float(candidates[0])
|
|
best_alpha, worst_reward = float(candidates[0]), np.inf
|
|
for alpha in candidates:
|
|
self._set_market_mix(float(alpha))
|
|
demand = self.market.act(prices)
|
|
trajectories = self.market.last_trajectories
|
|
agent_prob = self._compute_agent_prob(trajectories)
|
|
reward, _ = self._compute_reward(prices, demand, agent_prob, trajectories)
|
|
if reward < worst_reward:
|
|
worst_reward = reward
|
|
best_alpha = float(alpha)
|
|
return best_alpha
|
|
|
|
def _record_history(self):
|
|
demand_arr = np.array(
|
|
[self._demand.get(i, 0.0) for i in range(self.n_products)]
|
|
)
|
|
self._demand_history.append(demand_arr)
|
|
self._price_history.append(self._prices.copy())
|
|
self._revenue_history.append(np.sum(self._prices * demand_arr))
|
|
|
|
def reset(self, seed=None, options=None):
|
|
super().reset(seed=seed)
|
|
self._set_market_mix(self.nominal_alpha)
|
|
self._limbo.reset()
|
|
self._prices = np.random.uniform(*self.price_bounds, size=self.n_products)
|
|
self._platform_stub.set_prices(self._prices)
|
|
self._limbo.step()
|
|
self._demand = self._limbo.step()
|
|
self._initial_episode_prices = self._prices.copy()
|
|
self._step_count = 0
|
|
self._demand_history, self._price_history, self._revenue_history = [], [], []
|
|
self._trajectories = list(getattr(self.market, "last_trajectories", []))
|
|
self._record_history()
|
|
return self._get_obs(), {}
|
|
|
|
def step(self, action):
|
|
self._prices = self._decode_action(action)
|
|
alpha_adv = self._select_adversarial_alpha(self._prices)
|
|
self._set_market_mix(alpha_adv)
|
|
self._platform_stub.set_prices(self._prices)
|
|
self._limbo.step()
|
|
self._demand = self._limbo.step()
|
|
trajectories = getattr(self.market, "last_trajectories", [])
|
|
self._step_count += 1
|
|
self._trajectories.extend(trajectories)
|
|
|
|
agent_prob = self._compute_agent_prob(trajectories)
|
|
reward, metrics = self._compute_reward(
|
|
self._prices, self._demand, agent_prob, trajectories
|
|
)
|
|
self._record_history()
|
|
terminated = self._step_count >= 100
|
|
|
|
info = {
|
|
"step": self._step_count,
|
|
"agent_prob": agent_prob,
|
|
"alpha_adv": float(alpha_adv),
|
|
"wasserstein_radius": float(self.robust_radius),
|
|
**metrics,
|
|
"raw_revenue": np.sum(
|
|
self._prices
|
|
* np.array([self._demand.get(i, 0.0) for i in range(self.n_products)])
|
|
),
|
|
}
|
|
return self._get_obs(), reward, terminated, False, info
|
|
|
|
def _compute_elasticity(self) -> np.ndarray:
|
|
"""point elasticity: e = (dQ/dP) * (P/Q) via finite differences, clipped to [-5, 5]"""
|
|
if len(self._price_history) < 2:
|
|
return np.zeros(self.n_products)
|
|
p, q = np.array(self._price_history), np.array(self._demand_history)
|
|
dp, dq = np.diff(p, axis=0), np.diff(q, axis=0)
|
|
valid = np.abs(dp) > 0.5
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
elasticity = np.where(
|
|
valid, (dq / dp) * (p[:-1] / np.maximum(q[:-1], 1.0)), 0.0
|
|
)
|
|
elasticity = np.nan_to_num(np.clip(elasticity, -5.0, 5.0), nan=0.0)
|
|
return (
|
|
np.mean(elasticity, axis=0)
|
|
if len(elasticity) > 0
|
|
else np.zeros(self.n_products)
|
|
)
|
|
|
|
def render(self):
|
|
if self.render_mode == "human":
|
|
if self._renderer is None:
|
|
self._renderer = DashboardRenderer()
|
|
self._renderer.render(self)
|
|
elif self.render_mode == "ansi":
|
|
return (
|
|
f"step={self._step_count}, prices={self._prices}, demand={self._demand}"
|
|
)
|
|
return None
|
|
|
|
def close(self):
|
|
if self._renderer:
|
|
self._renderer.close()
|
|
self._renderer = None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import wandb
|
|
from .lib import MetricsCallback
|
|
|
|
class RandomPolicy:
|
|
"""Minimal SB3-compatible random policy for baseline testing."""
|
|
|
|
def __init__(self, env):
|
|
self.env = env
|
|
self.num_timesteps = 0
|
|
|
|
def learn(self, total_timesteps, callback=None):
|
|
callback.model = self
|
|
callback.num_timesteps = 0
|
|
callback.locals = {}
|
|
callback.on_training_start({}, {})
|
|
|
|
obs, _ = self.env.reset()
|
|
for step in range(total_timesteps):
|
|
action = self.env.action_space.sample()
|
|
obs, reward, term, trunc, info = self.env.step(action)
|
|
self.num_timesteps = step + 1
|
|
callback.num_timesteps = self.num_timesteps
|
|
callback.locals = {"infos": [info]}
|
|
callback.on_step()
|
|
if term or trunc:
|
|
callback.on_rollout_end()
|
|
obs, _ = self.env.reset()
|
|
return self
|
|
|
|
def predict(self, obs, **kwargs):
|
|
return self.env.action_space.sample(), None
|
|
|
|
wandb.init(project="phantom-pricing", config={"policy": "random", "alpha": 0.3})
|
|
env = EconomicMetricsWrapper(PHANTOM(n_products=15, alpha=0.3, render_mode=None))
|
|
|
|
model = RandomPolicy(env)
|
|
model.learn(total_timesteps=1000, callback=MetricsCallback())
|
|
|
|
print(f"Episode revenue: {env.episode_revenue:.1f}")
|
|
wandb.finish()
|
|
env.close()
|