Files
PHANTOM/sim/rl/environment.py

245 lines
10 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
import numpy as np
try:
import gymnasium as gym
from gymnasium import spaces
except ImportError as e:
raise ImportError("sim.rl.environment requires gymnasium") from e
from sim.case.thesis_simplified.coi import COIWindow, coi_erosion, compute_coi_window
from sim.case.thesis_simplified.separability import estimate_alpha as estimate_session_alpha
from sim.case.thesis_simplified.simplified import Limbo, Session, put_prices_to_market
from sim.rl.thesis_core import aggregate_demand_by_product, aggregate_purchases, constrain_prices
@dataclass(frozen=True)
class BusinessLogicConstraints:
product_catalogue_size: int = 100
max_steps: int = 2000
sessions_per_step: int = 250
system_max_price: float = 500.0
system_min_price: float = 1.0
max_price_adjustment: float = 0.30
min_margin_pct: float = 0.05
agent_share: float = 0.2
alpha_drift: float = 0.0
alpha_bounds: tuple[float, float] = (0.0, 0.8)
coi_strength: float = 0.25
w_volatility: float = 5.0
w_estimation_error: float = 0.25
seed: int = 7
def make_env(constraints: Optional[BusinessLogicConstraints] = None) -> "PHANTOMEnv":
return PHANTOMEnv(constraints=constraints or BusinessLogicConstraints())
class PHANTOMEnv(gym.Env):
metadata = {"render_modes": ["human", "ansi"]}
def __init__(self, constraints: Optional[BusinessLogicConstraints] = None):
super().__init__()
self.c = constraints or BusinessLogicConstraints()
self.n = int(self.c.product_catalogue_size)
self._rng = np.random.default_rng(self.c.seed)
self._t = 0
self._alpha_true = float(self.c.agent_share)
self._alpha_hat = float(self.c.agent_share)
self._costs = np.zeros(self.n, dtype=np.float32)
self._refs = np.zeros(self.n, dtype=np.float32)
self._prices: Optional[np.ndarray] = None
self._last_sessions: list[Session] = []
self._last_coi: COIWindow | None = None
self._limbo = Limbo()
self.action_space = spaces.Box(
low=np.full((self.n,), self.c.system_min_price, dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
)
self.observation_space = spaces.Dict(
{
"elasticity": spaces.Dict(
{
"price": spaces.Box(
low=np.full((self.n,), self.c.system_min_price, dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
),
"demand": spaces.Box(
low=np.zeros((self.n,), dtype=np.float32),
high=np.full((self.n,), 1e9, dtype=np.float32),
dtype=np.float32,
),
}
),
"market": spaces.Dict(
{
"alpha_hat": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"revenue_rate": spaces.Box(low=0.0, high=1e12, shape=(1,), dtype=np.float32),
"conversion_rate": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
"price_volatility": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32),
}
),
"cost": spaces.Box(
low=np.zeros((self.n,), dtype=np.float32),
high=np.full((self.n,), self.c.system_max_price, dtype=np.float32),
dtype=np.float32,
),
}
)
def _reset_catalogue(self) -> None:
self._costs = self._rng.uniform(15.0, 60.0, size=self.n).astype(np.float32)
margins = self._rng.uniform(0.2, 0.6, size=self.n).astype(np.float32)
self._refs = (self._costs * (1.0 + margins)).astype(np.float32)
self._prices = self._refs.copy()
def _observe_market(
self, prices: np.ndarray
) -> tuple[list[Session], Dict[str, float], np.ndarray, np.ndarray, float, float, int]:
sessions, demand_map = put_prices_to_market(
prices,
costs=self._costs,
alpha=self._alpha_true,
n_sessions=int(self.c.sessions_per_step),
seed=int(self._rng.integers(0, 2**31 - 1)),
)
demand_by_product = aggregate_demand_by_product(sessions, demand_map, self.n)
purchases, revenue, cost, n_agents = aggregate_purchases(sessions, self._costs, self.n)
conversion = float(np.sum(purchases) / max(len(sessions), 1))
return sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents
def _update_alpha_hat(self, sessions: list[Session]) -> float:
scores = [estimate_session_alpha(s) for s in sessions if s.events]
if not scores:
return self._alpha_hat
alpha_step = float(np.mean(scores))
self._alpha_hat = 0.8 * self._alpha_hat + 0.2 * alpha_step
self._alpha_hat = float(np.clip(self._alpha_hat, 0.0, 1.0))
return self._alpha_hat
def _reward(self, prices: np.ndarray, revenue: float, cost: float, volatility: float) -> float:
profit = float(revenue - cost)
coi_leak = float(self._last_coi.leak) if self._last_coi else 0.0
alpha_err = abs(self._alpha_hat - self._alpha_true)
return profit - self.c.coi_strength * coi_leak - self.c.w_volatility * volatility - self.c.w_estimation_error * alpha_err
def _build_obs(
self,
prices: np.ndarray,
demand_by_product: np.ndarray,
revenue: float,
conversion: float,
volatility: float,
) -> Dict[str, Any]:
return {
"elasticity": {"price": prices.astype(np.float32), "demand": demand_by_product.astype(np.float32)},
"market": {
"alpha_hat": np.array([self._alpha_hat], dtype=np.float32),
"revenue_rate": np.array([revenue], dtype=np.float32),
"conversion_rate": np.array([conversion], dtype=np.float32),
"price_volatility": np.array([volatility], dtype=np.float32),
},
"cost": self._costs.astype(np.float32),
}
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
if seed is not None:
self._rng = np.random.default_rng(seed)
self._t = 0
self._alpha_true = float(np.clip(self.c.agent_share, *self.c.alpha_bounds))
self._alpha_hat = float(self.c.agent_share)
self._reset_catalogue()
self._limbo = Limbo()
self._last_sessions = []
self._last_coi = None
prices = self._prices if self._prices is not None else np.zeros(self.n, dtype=np.float32)
obs = self._build_obs(prices, np.zeros(self.n, dtype=np.float32), 0.0, 0.0, 0.0)
return obs, {"alpha_true": self._alpha_true}
def step(self, action: np.ndarray) -> Tuple[Dict[str, Any], float, bool, bool, Dict[str, Any]]:
if self._prices is None:
raise RuntimeError("reset() must be called before step()")
prev = self._prices
prices = constrain_prices(
prev,
np.asarray(action, dtype=np.float32),
costs=self._costs,
min_price=float(self.c.system_min_price),
max_price=float(self.c.system_max_price),
max_adjustment=float(self.c.max_price_adjustment),
min_margin_pct=float(self.c.min_margin_pct),
)
self._prices = prices
self._limbo.add_update("prices", prices)
sessions, demand_map, demand_by_product, purchases, revenue, cost, n_agents = self._observe_market(prices)
self._last_sessions = sessions
self._limbo.add_update("demand", demand_map)
self._update_alpha_hat(self._last_sessions)
self._last_coi = compute_coi_window(self._last_sessions, self._costs, demand_mapping=demand_map)
self._alpha_true = float(np.clip(self._alpha_true + self.c.alpha_drift, *self.c.alpha_bounds))
volatility = float(np.std((prices - prev) / (prev + 1e-6)))
reward = float(self._reward(prices, revenue, cost, volatility))
conversion = float(np.sum(purchases) / max(len(self._last_sessions), 1))
self._t += 1
terminated = self._t >= int(self.c.max_steps)
obs = self._build_obs(prices, demand_by_product, revenue, conversion, min(volatility, 1.0))
info = {
"step": self._t,
"reward": reward,
"revenue": float(revenue),
"profit": float(revenue - cost),
"n_sessions": int(self.c.sessions_per_step),
"n_agents": int(n_agents),
"alpha_true": float(self._alpha_true),
"alpha_hat": float(self._alpha_hat),
"alpha_error": float(abs(self._alpha_hat - self._alpha_true)),
"price_std": float(np.std(prices)),
"price_volatility": float(volatility),
}
if self._last_coi is not None:
info.update(
{
"coi_policy": float(self._last_coi.policy),
"coi_agent": float(self._last_coi.agent),
"coi_leakage": float(self._last_coi.leak),
"coi_survival": float(self._last_coi.survival_ratio),
"coi_erosion": float(coi_erosion(self._last_coi.policy, self._last_coi.agent)),
}
)
return obs, reward, terminated, False, info
def render(self, mode: str = "human") -> str | None:
if self._prices is None:
return None
out = (
f"t={self._t}/{self.c.max_steps} "
f"alpha_true={self._alpha_true:.3f} alpha_hat={self._alpha_hat:.3f} "
f"price_std={float(np.std(self._prices)):.2f}"
)
if mode == "human":
print(out)
return out
def close(self) -> None:
return