import numpy as np import pandas as pd from abc import ABC, abstractmethod from typing import Dict, Any from environment import BusinessLogicConstraints class BasePricingEngine(ABC): """base interface for all pricing engines""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): self.c = constraints self.rng = np.random.default_rng(seed) self.step_count = 0 @abstractmethod def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: """compute new prices given current state and observation from environment args: current_prices: current price vector [N] observation: dict containing 'price', 'demand', and possibly interaction data returns: new_prices: updated price vector [N] """ pass @abstractmethod def update(obs, reward, done, info): pass def reset(self): """reset engine state for new episode""" self.step_count = 0 class WildPricingEngine(BasePricingEngine): """production-like pricing using online elasticity estimation via EWMA regression""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): super().__init__(constraints, seed) # per-product unit costs (unknown to customers; known to platform) self.unit_cost = self.rng.uniform(8.0, 40.0, size=self.c.product_catelogue_size).astype(np.float32) # online elasticity estimate (start moderately elastic) self.e_hat = np.full((self.c.product_catelogue_size,), -1.3, dtype=np.float32) # EWMA state for log-log regression self.mu_logp = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.mu_logq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.cov_pq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.var_p = np.ones(self.c.product_catelogue_size, dtype=np.float32) # knobs typical in production self.lr = 0.08 self.ewma = 0.05 self.eps_explore = 0.03 self.explore_scale = 0.03 def _safe_elasticity(self, e: np.ndarray) -> np.ndarray: return np.clip(e, -5.0, -1.05) def reset(self): super().reset() self.e_hat = np.full((self.c.product_catelogue_size,), -1.3, dtype=np.float32) self.mu_logp = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.mu_logq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.cov_pq = np.zeros(self.c.product_catelogue_size, dtype=np.float32) self.var_p = np.ones(self.c.product_catelogue_size, dtype=np.float32) def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 # extract demand signal (from env observation) as proxy for sales demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) return self._update_from_demand(current_prices, demand) def _update_from_demand(self, prices: np.ndarray, sold: np.ndarray) -> np.ndarray: # log transforms (add 1 to handle zeros) logp = np.log(np.clip(prices, 1e-3, None)).astype(np.float32) logq = np.log(sold + 1.0).astype(np.float32) # EWMA moments for per-product regression: logq ≈ a + e*logp a = self.ewma dp = logp - self.mu_logp dq = logq - self.mu_logq self.mu_logp = (1 - a) * self.mu_logp + a * logp self.mu_logq = (1 - a) * self.mu_logq + a * logq self.cov_pq = (1 - a) * self.cov_pq + a * (dp * dq) self.var_p = (1 - a) * self.var_p + a * (dp * dp + 1e-6) e_new = self.cov_pq / (self.var_p + 1e-6) self.e_hat = self._safe_elasticity(0.9 * self.e_hat + 0.1 * e_new) # profit-optimal price for isoelastic demand (if e < -1) e = self.e_hat p_star = self.unit_cost * (e / (e + 1.0)) # smooth toward p_star new_prices = (1 - self.lr) * prices + self.lr * p_star # exploration (small random perturbations) if self.rng.random() < self.eps_explore: noise = self.rng.normal(0.0, self.explore_scale, size=new_prices.shape).astype(np.float32) new_prices = new_prices * (1.0 + noise) # apply business guardrails (max change + bounds) max_adj = self.c.max_price_adjustment ratio = np.clip(new_prices / (prices + 1e-6), 1 - max_adj, 1 + max_adj) new_prices = prices * ratio new_prices = np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) return new_prices class StaticPricingEngine(BasePricingEngine): """baseline: fixed prices throughout episode""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): super().__init__(constraints, seed) self.fixed_prices = None def reset(self): super().reset() self.fixed_prices = None def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 if self.fixed_prices is None: self.fixed_prices = current_prices.copy() return self.fixed_prices.copy() class SimpleDemandEngine(BasePricingEngine): """demand-driven pricing: increase price when demand rises, decrease when it falls""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): super().__init__(constraints, seed) self.prev_demand = None self.lr = 0.05 def reset(self): super().reset() self.prev_demand = None def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) if self.prev_demand is None: self.prev_demand = demand.copy() return current_prices.copy() # simple rule: if demand increases, raise price; if decreases, lower price delta_d = demand - self.prev_demand price_adj = self.lr * np.sign(delta_d) * np.abs(delta_d) / (np.abs(self.prev_demand) + 1.0) new_prices = current_prices * (1.0 + price_adj) self.prev_demand = demand.copy() # apply constraints max_adj = self.c.max_price_adjustment ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj) new_prices = current_prices * ratio return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) class RandomWalkEngine(BasePricingEngine): """random walk pricing with mean reversion""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): super().__init__(constraints, seed) self.target_price = None self.volatility = 0.02 def reset(self): super().reset() self.target_price = None def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 if self.target_price is None: self.target_price = current_prices.copy() # random walk with mean reversion toward target noise = self.rng.normal(0.0, self.volatility, size=current_prices.shape).astype(np.float32) reversion = 0.01 * (self.target_price - current_prices) new_prices = current_prices * (1.0 + noise) + reversion # apply constraints max_adj = self.c.max_price_adjustment ratio = np.clip(new_prices / (current_prices + 1e-6), 1 - max_adj, 1 + max_adj) new_prices = current_prices * ratio return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32) class ThompsonSamplingEngine(BasePricingEngine): """bayesian bandit approach per product treating price as discrete action""" def __init__(self, constraints: BusinessLogicConstraints, seed: int = 0): super().__init__(constraints, seed) self.n_price_levels = 5 self.alpha = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) self.beta = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) self.price_grid = None self.last_actions = None def reset(self): super().reset() self.alpha = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) self.beta = np.ones((self.c.product_catelogue_size, self.n_price_levels), dtype=np.float32) self.price_grid = None self.last_actions = None def compute_prices(self, current_prices: np.ndarray, observation: Dict[str, Any]) -> np.ndarray: self.step_count += 1 if self.price_grid is None: # define price grid per product lo = current_prices * 0.7 hi = current_prices * 1.3 self.price_grid = np.linspace(lo, hi, self.n_price_levels).T demand = observation.get('demand', np.zeros(self.c.product_catelogue_size, dtype=np.float32)) # update beliefs based on last action if self.last_actions is not None: for i in range(self.c.product_catelogue_size): a = self.last_actions[i] reward = demand[i] if reward > 0.5: self.alpha[i, a] += reward else: self.beta[i, a] += 1.0 # thompson sampling: sample from posterior, pick best new_prices = np.zeros(self.c.product_catelogue_size, dtype=np.float32) actions = np.zeros(self.c.product_catelogue_size, dtype=int) for i in range(self.c.product_catelogue_size): theta = self.rng.beta(self.alpha[i], self.beta[i]).astype(np.float32) actions[i] = int(np.argmax(theta)) new_prices[i] = self.price_grid[i, actions[i]] self.last_actions = actions return np.clip(new_prices, self.c.system_min_price, self.c.system_max_price).astype(np.float32)