Files
PHANTOM/sim/rl/engine.py

239 lines
11 KiB
Python

import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from typing import Dict, Any
from sim.rl.environment import BusinessLogicConstraints
"""
An angine by default should have its own demand estimation mechanism from the observed observations whihc are the computer feature.
From these features we then follow the researc hstructure of q -> p with a testable and must be updatable mechanism.
"""
class BasePricingEngine(ABC):
"""base interface for all pricing engines"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
self.c = constraints
self.rng = np.random.default_rng(seed)
self.step_count = 0
@abstractmethod
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
"""compute new prices given current state and observation from environment
args:
current_prices: current price vector [N]
observation: dict containing 'price', 'demand', and possibly interaction data
returns:
new_prices: updated price vector [N]
"""
pass
def update(self, observation: Dict[str, Any], reward: float, done: bool, info: Dict[str, Any]) -> None:
"""Default no-op update. Engines can override as needed."""
self.last_observation = observation
self.last_reward = reward
self.last_info = info
def reset(self):
"""reset engine state for new episode"""
self.step_count = 0
class WildPricingEngine(BasePricingEngine):
"""production-like pricing using online elasticity estimation via EWMA regression"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
super().__init__(constraints, seed)
# per-product unit costs (unknown to customers; known to platform)
self.unit_cost = self.rng.uniform(8.0, 40.0, size=self.c.product_catalogue_size).astype(np.float32)
# online elasticity estimate (start moderately elastic)
self.e_hat = np.full((self.c.product_catalogue_size,), -1.3, dtype=np.float32)
# EWMA state for log-log regression
self.mu_logp = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.mu_logq = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.cov_pq = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.var_p = np.ones(self.c.product_catalogue_size, dtype=np.float32)
# knobs typical in production
self.lr = 0.08
self.ewma = 0.05
self.eps_explore = 0.03
self.explore_scale = 0.03
def _safe_elasticity(self, e: np.ndarray) -> np.ndarray:
return np.clip(e, -5.0, -1.05)
def reset(self):
super().reset()
self.e_hat = np.full((self.c.product_catalogue_size,), -1.3, dtype=np.float32)
self.mu_logp = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.mu_logq = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.cov_pq = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
self.var_p = np.ones(self.c.product_catalogue_size, dtype=np.float32)
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
self.step_count += 1
demand = _extract_demand(observation, self.c.product_catalogue_size)
return self._update_from_demand(current_prices, demand)
def _update_from_demand(self, prices: np.ndarray, sold: np.ndarray) -> np.ndarray:
# log transforms (add 1 to handle zeros)
logp = np.log(np.clip(prices, 1e-3, None)).astype(np.float32)
logq = np.log(sold + 1.0).astype(np.float32)
# EWMA moments for per-product regression: logq ≈ a + e*logp
a = self.ewma
dp = logp - self.mu_logp
dq = logq - self.mu_logq
self.mu_logp = (1 - a) * self.mu_logp + a * logp
self.mu_logq = (1 - a) * self.mu_logq + a * logq
self.cov_pq = (1 - a) * self.cov_pq + a * (dp * dq)
self.var_p = (1 - a) * self.var_p + a * (dp * dp + 1e-6)
e_new = self.cov_pq / (self.var_p + 1e-6)
self.e_hat = self._safe_elasticity(0.9 * self.e_hat + 0.1 * e_new)
# profit-optimal price for isoelastic demand (if e < -1)
e = self.e_hat
p_star = self.unit_cost * (e / (e + 1.0))
# smooth toward p_star
new_prices = (1 - self.lr) * prices + self.lr * p_star
# exploration (small random perturbations)
if self.rng.random() < self.eps_explore:
noise = self.rng.normal(0.0, self.explore_scale, size=new_prices.shape).astype(np.float32)
new_prices = new_prices * (1.0 + noise)
# apply business guardrails (max change + bounds)
max_adj = self.c.max_price_adjustment
ratio = np.clip(new_prices / (prices + 1e-6), 1 - max_adj, 1 + max_adj)
new_prices = prices * ratio
new_prices = np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32)
return new_prices
class StaticPricingEngine(BasePricingEngine):
"""baseline: fixed prices throughout episode"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
super().__init__(constraints, seed)
self.fixed_prices = None
def reset(self):
super().reset()
self.fixed_prices = None
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
self.step_count += 1
if self.fixed_prices is None:
self.fixed_prices = current_prices.copy()
return self.fixed_prices.copy()
class SimpleDemandEngine(BasePricingEngine):
"""demand-driven pricing: increase price when demand rises, decrease when it falls"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
super().__init__(constraints, seed)
self.prev_demand = None
self.lr = 0.05
def reset(self):
super().reset()
self.prev_demand = None
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
self.step_count += 1
demand = _extract_demand(observation, self.c.product_catalogue_size)
if self.prev_demand is None:
self.prev_demand = demand.copy()
return current_prices.copy()
# simple rule: if demand increases, raise price; if decreases, lower price
delta_d = demand - self.prev_demand
price_adj = self.lr * np.sign(delta_d) * np.abs(delta_d) / (np.abs(self.prev_demand) + 1.0)
new_prices = current_prices * (1.0 + price_adj)
self.prev_demand = demand.copy()
# apply constraints
max_adj = self.c.max_price_adjustment
ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj)
new_prices = current_prices * ratio
return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32)
class RandomWalkEngine(BasePricingEngine):
"""random walk pricing with mean reversion"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
super().__init__(constraints, seed)
self.target_price = None
self.volatility = 0.02
def reset(self):
super().reset()
self.target_price = None
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
self.step_count += 1
if self.target_price is None:
self.target_price = current_prices.copy()
# random walk with mean reversion toward target
noise = self.rng.normal(0.0, self.volatility, size=current_prices.shape).astype(np.float32)
reversion = 0.01 * (self.target_price - current_prices)
new_prices = current_prices * (1.0 + noise) + reversion
# apply constraints
max_adj = self.c.max_price_adjustment
ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj)
new_prices = current_prices * ratio
return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32)
class ThompsonSamplingEngine(BasePricingEngine):
"""bayesian bandit approach per product treating price as discrete action"""
def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0):
super().__init__(constraints, seed)
self.n_price_levels = 5
self.alpha = np.ones((self.c.product_catalogue_size, self.n_price_levels), dtype=np.float32)
self.beta = np.ones((self.c.product_catalogue_size, self.n_price_levels), dtype=np.float32)
self.price_grid = None
self.last_actions = None
def reset(self):
super().reset()
self.alpha = np.ones((self.c.product_catalogue_size, self.n_price_levels), dtype=np.float32)
self.beta = np.ones((self.c.product_catalogue_size, self.n_price_levels), dtype=np.float32)
self.price_grid = None
self.last_actions = None
def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray:
self.step_count += 1
if self.price_grid is None:
# define price grid per product
lo = current_prices * 0.7
hi = current_prices * 1.3
self.price_grid = np.linspace(lo, hi, self.n_price_levels).T
demand = _extract_demand(observation, self.c.product_catalogue_size)
# update beliefs based on last action
if self.last_actions is not None:
for i in range(self.c.product_catalogue_size):
a = self.last_actions[i]
reward = demand[i]
if reward > 0.5:
self.alpha[i, a] += reward
else:
self.beta[i, a] += 1.0
# thompson sampling: sample from posterior, pick best
new_prices = np.zeros(self.c.product_catalogue_size, dtype=np.float32)
actions = np.zeros(self.c.product_catalogue_size, dtype=int)
for i in range(self.c.product_catalogue_size):
theta = self.rng.beta(self.alpha[i], self.beta[i]).astype(np.float32)
actions[i] = int(np.argmax(theta))
new_prices[i] = self.price_grid[i, actions[i]]
self.last_actions = actions
return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32)
def _extract_demand(observation: Dict[str, Any], n: int) -> np.ndarray:
if "elasticity" in observation and isinstance(observation["elasticity"], dict):
d = observation["elasticity"].get("demand")
if d is not None:
return np.asarray(d, dtype=np.float32)
d = observation.get("demand")
if d is not None:
return np.asarray(d, dtype=np.float32)
return np.zeros(n, dtype=np.float32)