From 772772b5b93323212f2473312de62ad238d9e733 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Thu, 29 Jan 2026 10:01:53 +0100 Subject: [PATCH] chore: better wrapping amd more performant --- engine/lib/behavior.py | 43 ++++++------- engine/lib/demand.py | 3 +- engine/wrapper.py | 141 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 162 insertions(+), 25 deletions(-) diff --git a/engine/lib/behavior.py b/engine/lib/behavior.py index 69b6649..1822dde 100644 --- a/engine/lib/behavior.py +++ b/engine/lib/behavior.py @@ -6,33 +6,32 @@ from .demand import generate_demand base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments" human_dir, agent_dir = f"{base_dir}/collected_data/", f"{base_dir}/agents/collected_data/" +_cache = {} # lazy cache for models and base pivots + +def _get_base_pivot(human: bool): + key = 'human' if human else 'agent' + if key not in _cache: + model = BehaviorModel(human_dir) if human else AgentBehaviorModel(agent_dir) + mdp = model.build_MDP() + _cache[key] = pd.DataFrame(aggregate_event_transitions(mdp)).fillna(0.0) + return _cache[key] def adjust_behavior_to_condition(condition, transition_matrix): - # transition matrix just maps probability of eventA to eventB - # we enhance this that eventA-productI to eventB-productJ... based on the condition of interest - # this is to simulate the impact of demand onto the behavior - # NxN -> (N*(P + 1))x(N*(P + 1)) where P is number of products - new_transitions = transition_matrix.copy() - for col in new_transitions.columns: - for product in range(len(condition)): - # adjust the transition probability based on the demand condition - newname = f"{col}_product{product}" - new_transitions[newname] = new_transitions[col] * (condition[product] / np.sum(condition)) - for row in transition_matrix.index: - for product in range(len(condition)): - newname = f"{row}_product{product}" - new_transitions.loc[newname] = new_transitions.loc[row] * (condition[product] / np.sum(condition)) + # expand NxN transition matrix to (N*P)x(N*P) weighted by demand condition + cond_norm = condition / np.sum(condition) + n_products = len(condition) + base_vals = transition_matrix.values + base_cols, base_rows = transition_matrix.columns.tolist(), transition_matrix.index.tolist() - return new_transitions.fillna(0.0) + # expand via kronecker-like tiling: each cell becomes a P*P block weighted by outer product of cond_norm + expanded = np.kron(base_vals, np.outer(cond_norm, cond_norm)) + new_cols = [f"{c}_product{p}" for c in base_cols for p in range(n_products)] + new_rows = [f"{r}_product{p}" for r in base_rows for p in range(n_products)] + return pd.DataFrame(expanded, index=new_rows, columns=new_cols) def sample_behavior(condition, human=True, max_len=40): - model = BehaviorModel(human_dir) if human else AgentBehaviorModel(agent_dir) - mdp = model.build_MDP() - raw_events = aggregate_event_transitions(mdp) # this gets us transtition between events (blind to products or prices) - # staet: {state: p} is raw_events we needc a matrix a pivot table - events_pivot = pd.DataFrame(raw_events).fillna(0.0) - # now adjust the transition matrix based on the condition to get a more informed transition matrix - adjusted_transitions = adjust_behavior_to_condition(condition, events_pivot) + base_pivot = _get_base_pivot(human) + adjusted_transitions = adjust_behavior_to_condition(condition, base_pivot) trajectory = [np.random.choice(adjusted_transitions.index)] while len(trajectory) < max_len or 'checkout' in trajectory[-1]: diff --git a/engine/lib/demand.py b/engine/lib/demand.py index 687afe8..7215f7c 100644 --- a/engine/lib/demand.py +++ b/engine/lib/demand.py @@ -8,7 +8,8 @@ def generate_demand(prices, distribution_method = np.random.normal, distribution product_valuations = distribution_method(*distribution_params, size=len(prices)) # assumption 2: demand decreases as price increases, following a simple linear model demand = np.maximum(0, product_valuations - prices) # demand cannot be negative - demand = demand / np.sum(demand) * 100 # normalize to total demand of 1000 units so demand output is within [0, 100] + total = np.sum(demand) + demand = demand / total * 100 if total > 0 else demand # normalize to percentage, avoid div by zero logger.info(f"Generated demand for prices {prices}: {demand} with valuations from distribution {distribution_params}") return demand diff --git a/engine/wrapper.py b/engine/wrapper.py index 1cf5815..5543670 100644 --- a/engine/wrapper.py +++ b/engine/wrapper.py @@ -1,5 +1,142 @@ import gymnasium as gym from gymnasium import spaces -from engine import Limbo +import numpy as np +import matplotlib.pyplot as plt +from .engine import Limbo, MarketEngine, PricingEngine -class PHANTOM(gym.Env, Limbo): + +class PHANTOM(gym.Env): + """Gymnasium wrapper for the Limbo pricing-market simulation. Platform sets prices, market responds with demand.""" + metadata = {"render_modes": ["human", "ansi"]} + + def __init__(self, + n_products: int = 10, + alpha: float = 0.3, + N: int = 100, + price_bounds: tuple = (10.0, 150.0), + lambda_coi: float = 0.1, # coi leakage penalty weight + render_mode: str = None): + super().__init__() + self.n_products = n_products + self.price_bounds = price_bounds + self.lambda_coi = lambda_coi + self.render_mode = render_mode + + self.market = MarketEngine(alpha=alpha, N=N) + self._platform_stub = PricingEngine() + self._limbo = Limbo(self._platform_stub, self.market) + + # action: continuous prices for each product + self.action_space = spaces.Box( + low=price_bounds[0], high=price_bounds[1], + shape=(n_products,), dtype=np.float32 + ) + # observation: demand estimate + previous prices + self.observation_space = spaces.Dict({ + "demand": spaces.Box(low=0.0, high=100.0, shape=(n_products,), dtype=np.float32), + "prices": spaces.Box(low=price_bounds[0], high=price_bounds[1], shape=(n_products,), dtype=np.float32), + }) + + self._prices = None + self._demand = None + self._step_count = 0 + self._demand_history = [] # list of demand arrays over time + self._price_history = [] # list of price arrays over time + self._fig, self._axes = None, None + + def _get_obs(self) -> dict: + demand_arr = np.array([self._demand.get(i, 0.0) for i in range(self.n_products)], dtype=np.float32) + return {"demand": demand_arr, "prices": self._prices.astype(np.float32)} + + def _compute_reward(self, prices: np.ndarray, demand: dict) -> float: + demand_arr = np.array([demand.get(i, 0.0) for i in range(self.n_products)]) + revenue = np.sum(prices * demand_arr) # revenue = price * quantity proxy + base_price = self.price_bounds[0] + return float(revenue)# - self.lambda_coi * coi_leak) + + def _record_history(self): + demand_arr = np.array([self._demand.get(i, 0.0) for i in range(self.n_products)]) + self._demand_history.append(demand_arr) + self._price_history.append(self._prices.copy()) + + def reset(self, seed=None, options=None): + super().reset(seed=seed) + self._prices = np.random.uniform(*self.price_bounds, size=self.n_products) + self._demand = self.market.act(self._prices) + self._step_count = 0 + self._demand_history, self._price_history = [], [] + self._record_history() + if self._fig: plt.close(self._fig) + self._fig, self._axes = None, None + return self._get_obs(), {} + + def step(self, action: np.ndarray): + self._prices = np.clip(action, *self.price_bounds) + self._demand = self.market.act(self._prices) + self._step_count += 1 + self._record_history() + + reward = self._compute_reward(self._prices, self._demand) + terminated = self._step_count >= 100 + truncated = False + + return self._get_obs(), reward, terminated, truncated, {"step": self._step_count} + + def render(self): + if self.render_mode == "human": + if self._fig is None: + plt.ion() + self._fig, self._axes = plt.subplots(2, 2, figsize=(12, 8)) + self._fig.suptitle("PHANTOM Environment") + + demand_mat = np.array(self._demand_history).T # shape: (n_products, timesteps) + price_mat = np.array(self._price_history).T + revenue_per_step = np.sum(demand_mat * price_mat, axis=0) # revenue = demand * price + demand_variance = np.var(demand_mat, axis=0) # how spread demand is across products + + for row in self._axes: + for ax in row: ax.clear() + + self._axes[0, 0].imshow(demand_mat, aspect='auto', cmap='viridis', origin='lower') + self._axes[0, 0].set_xlabel("Time Step") + self._axes[0, 0].set_ylabel("Product") + self._axes[0, 0].set_title("Demand Over Time") + + self._axes[0, 1].imshow(price_mat, aspect='auto', cmap='plasma', origin='lower') + self._axes[0, 1].set_xlabel("Time Step") + self._axes[0, 1].set_ylabel("Product") + self._axes[0, 1].set_title("Price Over Time") + + self._axes[1, 0].plot(revenue_per_step, color='teal', linewidth=1.5) + self._axes[1, 0].set_xlabel("Time Step") + self._axes[1, 0].set_ylabel("Revenue") + self._axes[1, 0].set_title("Revenue per Step") + + self._axes[1, 1].plot(demand_variance, color='orangered', linewidth=1.5) + self._axes[1, 1].set_xlabel("Time Step") + self._axes[1, 1].set_ylabel("Variance") + self._axes[1, 1].set_title("Demand Variance") + + self._fig.tight_layout() + self._fig.canvas.draw() + self._fig.canvas.flush_events() + plt.pause(0.01) + + elif self.render_mode == "ansi": + return f"step={self._step_count}, prices={self._prices}, demand={self._demand}" + return None + + def close(self): + if self._fig: plt.close(self._fig) + self._fig, self._axes = None, None + + +if __name__ == "__main__": + env = PHANTOM(n_products=100, render_mode="human") + obs, _ = env.reset() + for _ in range(100): + action = env.action_space.sample() + obs, reward, term, trunc, info = env.step(action) + env.render() + print(f"Reward: {reward:.2f}") + if term: break