chore: make lib backwards compatible

This commit is contained in:
2026-01-21 19:12:35 +01:00
parent 56308ecb10
commit 2ed200f870
3 changed files with 126 additions and 152 deletions

View File

@@ -8,6 +8,20 @@ import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import sys
from pathlib import Path
# add lib to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'lib'))
from lib.features import (
transition_histogram as _lib_transition_histogram,
temporal_signature as _lib_temporal_signature,
state_coverage as _lib_state_coverage,
transition_entropy as _lib_transition_entropy,
featurize_trajectory as _lib_featurize_trajectory,
parse_timestamp
)
from lib.state import event_to_state, get_event_name, get_timestamp
TASK = 'classification'
LABELS = ['human', 'agent']
@@ -101,91 +115,40 @@ def nt_xent_loss(z_i: torch.Tensor, z_j: torch.Tensor, temperature: float = 0.5)
return F.cross_entropy(sim, labels)
# feature extraction utilities for trajectory -> feature vector
# feature extraction utilities - delegating to lib.features for unified implementation
# these wrappers maintain backwards compatibility for existing imports
def transition_histogram(events: List, state_fn, max_states: int = 50) -> np.ndarray:
"""Compute normalized histogram of state transitions in trajectory"""
if len(events) < 2:
return np.zeros(max_states)
states = [state_fn(e) for e in events]
trans_counts = defaultdict(int)
for s, s_next in zip(states, states[1:]):
trans_counts[(s, s_next)] += 1
total = sum(trans_counts.values())
hist = np.array(list(trans_counts.values())[:max_states], dtype=np.float32)
hist = np.pad(hist, (0, max(0, max_states - len(hist))))
return hist / (total + 1e-10)
return _lib_transition_histogram(events, state_fn, max_states)
def temporal_signature(events: List, ts_fn) -> np.ndarray:
"""Extract temporal features: mean/std/skew of inter-event times"""
if len(events) < 2:
return np.zeros(4, dtype=np.float32)
times = sorted([ts_fn(e) for e in events])
diffs = np.diff(times).astype(np.float32)
if len(diffs) == 0:
return np.zeros(4, dtype=np.float32)
mean_dt, std_dt = np.mean(diffs), np.std(diffs) + 1e-10
skew = np.mean(((diffs - mean_dt) / std_dt) ** 3) if std_dt > 1e-8 else 0.0
return np.array([mean_dt, std_dt, skew, len(diffs)], dtype=np.float32)
return _lib_temporal_signature(events, ts_fn)
def state_coverage(events: List, state_fn, mdp_states: set) -> float:
"""Fraction of MDP states visited by trajectory"""
if not mdp_states:
return 0.0
visited = set(state_fn(e) for e in events)
return len(visited & mdp_states) / len(mdp_states)
return _lib_state_coverage(events, state_fn, mdp_states)
def transition_entropy(events: List, state_fn) -> float:
"""Compute entropy of transition distribution (randomness of navigation)"""
if len(events) < 2:
return 0.0
states = [state_fn(e) for e in events]
trans_counts = defaultdict(int)
for s, s_next in zip(states, states[1:]):
trans_counts[(s, s_next)] += 1
total = sum(trans_counts.values())
probs = [c / total for c in trans_counts.values()]
return -sum(p * np.log(p + 1e-10) for p in probs)
return _lib_transition_entropy(events, state_fn)
def featurize_trajectory(events: List, mdp: Optional[Dict] = None, input_dim: int = 64) -> np.ndarray:
"""Convert trajectory to fixed-dim feature vector"""
def _state_repr(e):
return f"{getattr(e, 'page', None) or 'unk'}|{getattr(e, 'productId', None) or 'none'}|{e.eventName}"
"""Convert trajectory to fixed-dim feature vector - uses lib.features implementation"""
mdp_states = set(mdp.get('states', [])) if mdp else set()
def _ts_fn(e):
ts = getattr(e, 'ts', None)
if isinstance(ts, str):
from datetime import datetime
try:
return datetime.fromisoformat(ts.replace('Z', '+00:00')).timestamp()
except:
return 0.0
return float(ts) if ts else 0.0
return parse_timestamp(get_timestamp(e))
feats = []
feats.extend(transition_histogram(events, _state_repr, max_states=40)) # 40 dims
feats.extend(temporal_signature(events, _ts_fn)) # 4 dims
mdp_states = set(mdp.get('states', [])) if mdp else set()
feats.append(state_coverage(events, _state_repr, mdp_states)) # 1 dim
feats.append(transition_entropy(events, _state_repr)) # 1 dim
feats.append(len(events)) # trajectory length
feats.append(len(set(_state_repr(e) for e in events))) # unique states
def _event_name_fn(e):
return get_event_name(e)
# event type distribution (page_view, hover, cart, purchase indicators)
event_names = [e.eventName for e in events]
feats.append(sum(1 for n in event_names if 'page' in n.lower()) / (len(events) + 1))
feats.append(sum(1 for n in event_names if 'hover' in n.lower()) / (len(events) + 1))
feats.append(sum(1 for n in event_names if 'cart' in n.lower()) / (len(events) + 1))
feats.append(sum(1 for n in event_names if 'purchase' in n.lower() or 'checkout' in n.lower()) / (len(events) + 1))
# pad/truncate to input_dim
feats = np.array(feats[:input_dim], dtype=np.float32)
if len(feats) < input_dim:
feats = np.pad(feats, (0, input_dim - len(feats)))
return feats
return _lib_featurize_trajectory(events, event_to_state, _ts_fn, _event_name_fn, mdp_states, input_dim)
# gradient boosting classifiers for comparison baselines

View File

@@ -6,6 +6,18 @@ from collections import defaultdict
from typing import Dict, List, Tuple, Set
import numpy as np
import graphviz
import sys
from pathlib import Path
# import lib utilities for optional use - models keep their own _state_repr for backwards compat
# with the specific event structure (evt.value.payload)
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / 'lib'))
try:
from lib.state import make_state_repr as lib_make_state_repr
from lib.features import transition_histogram as lib_transition_histogram
except ImportError:
lib_make_state_repr = None
lib_transition_histogram = None
class BehaviorModel:
def __init__(self, src_dir: str, loader_cls=Loader):

View File

@@ -1,7 +1,5 @@
from sys import intern
import gymnasium as gym
from gymnasium import spaces
from matplotlib import interactive
import numpy as np
from dataclasses import dataclass
import pandas as pd
@@ -15,7 +13,7 @@ class BusinessLogicConstraints():
max_price_adjustment: float = 0.30
system_max_price: float = 500.0
system_min_price: float = 1.0
product_catelogue_size: int = 100
product_catalogue_size: int = 100
episode_length: int = 200
sessions_per_step: int = 250
agent_share: float = 0.25
@@ -37,17 +35,42 @@ class BusinessLogicConstraints():
def _sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-x))
class BehavioralProfile:
"""simple markov chain model for generating synthetic interaction events"""
def __init__(self, actor: str, purchase_probs: np.ndarray):
self.actor = actor
self.purchase_probs = purchase_probs
self.states = ['view', 'cart', 'checkout']
# transition matrix: view->cart 0.3, view->view 0.6, view->exit 0.1, cart->checkout 0.5, cart->view 0.4, cart->exit 0.1
self.trans = {'view': {'view': 0.6, 'cart': 0.3, 'exit': 0.1}, 'cart': {'checkout': 0.5, 'view': 0.4, 'exit': 0.1}, 'checkout': {'exit': 1.0}}
if actor == 'agents': # agents browse more before purchasing
self.trans['view'] = {'view': 0.75, 'cart': 0.15, 'exit': 0.1}
self.trans['cart'] = {'checkout': 0.3, 'view': 0.6, 'exit': 0.1}
def sample(self, rng: np.random.Generator) -> Dict[str, Any]:
"""sample single interaction event"""
product_idx = rng.integers(0, len(self.purchase_probs))
state = 'view' # always start with view
# pick next state based on transition probs
trans = self.trans.get(state, {'exit': 1.0})
next_state = rng.choice(list(trans.keys()), p=list(trans.values()))
price_paid = 0.0 if next_state != 'checkout' else float(rng.uniform(50, 200))
return {'action': state, 'product_idx': product_idx, 'actor': 'agent' if self.actor == 'agents' else 'human', 't': 0.0, 'price_paid': price_paid}
def _load_behavioral_profile(actor: str, demand_forcing: np.ndarray) -> BehavioralProfile:
"""returns a behavioral profile for generating synthetic sessions
actor: 'humans' or 'agents'
demand_forcing: per-product purchase probabilities used to weight interactions
"""
return BehavioralProfile(actor, demand_forcing)
class CommercePlatform:
"""
This is just an extension of the state management for the environment, it does not implement anything dynamic just helps us simulate demand.
"""
def __init__(self,
product_catelogue_size: int,
max_price: float,
min_price: float,
constraints: BusinessLogicConstraints):
self.product_catelogue_size = product_catelogue_size
self.product_supply = np.random.uniform(low=10, high=50, size=(self.product_catelogue_size,))
"""state management for the environment, simulates demand"""
def __init__(self, product_catalogue_size: int, max_price: float, min_price: float, constraints: BusinessLogicConstraints):
self.product_catalogue_size = product_catalogue_size
self.product_supply = np.random.uniform(low=10, high=50, size=(self.product_catalogue_size,))
self.max_price = max_price
self.min_price = min_price
self.constraints = constraints
@@ -55,27 +78,12 @@ class CommercePlatform:
self._rng = np.random.default_rng(constraints.seed)
self._last_interaction_df: pd.DataFrame = pd.DataFrame()
def setup_true_demand(self, prices: np.ndarray) -> Dict[str, np.ndarray]:
# ground truth purchase propensities
p = np.clip(prices, self.min_price, self.max_price)
pn = p / self.max_price
human_prob = self.constraints.base_human_demand * (pn ** self.constraints.human_price_elasticity)
agent_prob = self.constraints.base_agent_demand * (pn ** self.constraints.agent_price_elasticity)
return {
"human_purchase_prob": np.clip(human_prob, 0.0, 0.95),
"agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95)
}
def _load_behavioral_profile(actor : str, demand_forcing):
"""
This returns a markov chain with average weights which we get from interaction data of our experiments.
This defines transition probabilities between different events:
search -> view_item_price_binN: 0.7
view_item_price_binN -> add_to_cart: 0.2
we also must reweight with the demand_forcing vector or purchase probabilities per-product
"""
return {"human_purchase_prob": np.clip(human_prob, 0.0, 0.95), "agent_purchase_prob": np.clip(agent_prob, 0.0, 0.95)}
def _simulate_sessions(self, base_prices: np.ndarray) -> pd.DataFrame:
demand = self.setup_true_demand(base_prices)
@@ -162,22 +170,22 @@ class PHANTOMEnv(gym.Env):
self.constraints = BusinessLogicConstraints()
self.action_space = spaces.Box(low=-self.constraints.max_price_adjustment,
high=self.constraints.max_price_adjustment,
shape=(self.constraints.product_catelogue_size,), dtype=np.float32)
shape=(self.constraints.product_catalogue_size,), dtype=np.float32)
self.observation_space = spaces.Dict({
"elasticity": spaces.Dict({
"price": spaces.Box(
low=np.full((self.constraints.product_catelogue_size,), self.constraints.system_min_price, dtype=np.float32),
high=np.full((self.constraints.product_catelogue_size,), self.constraints.system_max_price, dtype=np.float32),
low=np.full((self.constraints.product_catalogue_size,), self.constraints.system_min_price, dtype=np.float32),
high=np.full((self.constraints.product_catalogue_size,), self.constraints.system_max_price, dtype=np.float32),
dtype=np.float32),
"demand": spaces.Box(
low=np.zeros((self.constraints.product_catelogue_size,), dtype=np.float32),
high=np.full((self.constraints.product_catelogue_size,), 1e6, dtype=np.float32),
low=np.zeros((self.constraints.product_catalogue_size,), dtype=np.float32),
high=np.full((self.constraints.product_catalogue_size,), 1e6, dtype=np.float32),
dtype=np.float32),
})
# TODO: define more features that we compute from the interaction data
})
self.commerce_platform = CommercePlatform(
product_catelogue_size=self.constraints.product_catelogue_size,
product_catalogue_size=self.constraints.product_catalogue_size,
max_price=self.constraints.system_max_price,
min_price=self.constraints.system_min_price,
constraints=self.constraints)
@@ -192,12 +200,12 @@ class PHANTOMEnv(gym.Env):
self._rng = np.random.default_rng(seed)
self.commerce_platform._rng = np.random.default_rng(seed)
self.t = 0
init_prices = self._rng.uniform(low=60.0, high=140.0, size=(self.constraints.product_catelogue_size,)).astype(np.float32)
init_prices = self._rng.uniform(low=60.0, high=140.0, size=(self.constraints.product_catalogue_size,)).astype(np.float32)
self._prev_prices = init_prices.copy()
self.state = {
"elasticity": {
"price": init_prices,
"demand": np.zeros((self.constraints.product_catelogue_size,), dtype=np.float32),
"demand": np.zeros((self.constraints.product_catalogue_size,), dtype=np.float32),
}
}
return self.state, {}
@@ -210,38 +218,35 @@ class PHANTOMEnv(gym.Env):
self.constraints.system_max_price).astype(np.float32)
self.state["elasticity"]["price"] = new_prices
# TODO: use the commerce platform to simulate sessions
interactions_df = self.commerce_platform._simulate_sessions(new_prices)
result = self.commerce_platform.compute_interaction_features(interactions_df)
# TODO: implement COI computation to use in reward
COI = 0.0
COI = 0.0 # TODO: implement cost-of-information computation
volatility = 0.0 if self._prev_prices is None else \
float(np.mean(np.abs((new_prices - self._prev_prices) / (self._prev_prices + 1e-6))))
self._prev_prices = new_prices.copy()
revenue_observed = float(result["revenue_observed"])
agent_loss = float(result["agent_loss"])
# extract metrics with safe defaults for incomplete simulation
revenue_observed = float(result.get("revenue_observed", result.get("mean_sale_price", 0.0)))
agent_loss = float(result.get("agent_loss", 0.0))
reward = (revenue_observed
- COI
- self.constraints.w_agent_loss * agent_loss
- self.constraints.w_volatility * volatility
- self.constraints.w_estimation_error
)
- self.constraints.w_estimation_error)
terminated = self.t >= self.constraints.episode_length
info = {
"t": self.t,
"revenue_observed": revenue_observed,
"revenue_oracle": float(result["revenue_oracle"]),
"revenue_oracle": float(result.get("revenue_oracle", revenue_observed)),
"agent_loss": agent_loss,
"ux_volatility": volatility,
"mean_internal_error": err_mean,
"look_to_book": float(result["interaction_features"].get("look_to_book", 0.0)),
"mean_sale_price": float(result["interaction_features"].get("mean_sale_price", 0.0)),
"true_human_purchases_total": float(np.sum(result["true_human_demand"])),
"true_agent_purchases_total": float(np.sum(result["true_agent_purchases"])),
"look_to_book": float(result.get("look_to_book", 0.0)),
"mean_sale_price": float(result.get("mean_sale_price", 0.0)),
"true_human_purchases_total": 0.0, # TODO: track from simulation
"true_agent_purchases_total": 0.0, # TODO: track from simulation
}
return self.state, float(reward), terminated, False, info
@@ -250,9 +255,7 @@ if __name__ == "__main__":
import matplotlib.pyplot as plt
from collections import defaultdict
runs = {}
for use_defense in (False, True):
env = PHANTOMEnv(use_defense=use_defense)
env = PHANTOMEnv(constraints=BusinessLogicConstraints())
obs, _ = env.reset(seed=42)
metrics = defaultdict(list)
total_reward = 0.0
@@ -280,16 +283,15 @@ if __name__ == "__main__":
metrics['agent_purchases'].append(info['true_agent_purchases_total'])
if info['t'] % 20 == 0 or done:
print(f"defense={'ON ' if use_defense else 'OFF'} t={info['t']:03d} p={p_mean:6.2f}±{p_std:4.2f} "
f"q={q_mean:6.2f} rev={info['revenue_observed']:7.2f} oracle={info['revenue_oracle']:7.2f} "
print(f"t={info['t']:03d} p={p_mean:6.2f}±{p_std:4.2f} q={q_mean:6.2f} "
f"rev={info['revenue_observed']:7.2f} oracle={info['revenue_oracle']:7.2f} "
f"loss={info['agent_loss']:6.2f} ux={info['ux_volatility']:.3f} "
f"ltb={info['look_to_book']:5.2f} r={reward:7.2f}")
runs[use_defense] = metrics
print(f"defense={'ON ' if use_defense else 'OFF'} total_reward={total_reward:.2f}\n")
print(f"total_reward={total_reward:.2f}")
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
fig.suptitle('PHANTOM Environment: Defense OFF vs ON', fontsize=14, fontweight='bold')
fig.suptitle('PHANTOM Environment Run', fontsize=14, fontweight='bold')
plot_configs = [
('price_mean', 'Mean Price', 'Price'),
@@ -305,13 +307,10 @@ if __name__ == "__main__":
for idx, (key, title, ylabel) in enumerate(plot_configs):
ax = axes[idx // 3, idx % 3]
for use_defense, label, color in [(False, 'No Defense', 'red'), (True, 'With Defense', 'blue')]:
m = runs[use_defense]
ax.plot(m['t'], m[key], label=label, color=color, alpha=0.7, linewidth=1.5)
ax.plot(metrics['t'], metrics[key], color='blue', alpha=0.7, linewidth=1.5)
ax.set_xlabel('Step')
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=10, fontweight='bold')
ax.legend(loc='best', fontsize=8)
ax.grid(True, alpha=0.3)
plt.tight_layout()