Created detailed documentation for implementing multi-task learning system to improve agent detection and dynamic pricing: - GAMEPLAN_MULTITASK_PRICING.md: Complete 50+ page technical specification including feature engineering, supervised learning, multi-task neural networks, synthetic simulator, and knowledge distillation approach - ARCHITECTURE_OVERVIEW.md: Quick reference with visual diagrams comparing current rule-based system to proposed ML architecture, metrics, and implementation phases Key improvements proposed: - Replace O(n²) SessionState pipeline with vectorized feature extraction - Train XGBoost classifier on experimentId labels (ROC-AUC >0.90 target) - Multi-task neural network for joint agent detection + purchase prediction - Gymnasium-based synthetic pricing environment for safe experimentation - Knowledge distillation to extract interpretable pricing heuristics Addresses margin leakage concerns with learned pricing strategies instead of simple velocity thresholds.
50 KiB
Multi-Task Learning for Dynamic Pricing & Agent Detection
Comprehensive Gameplan & Architecture
Date: 2025-12-11 Objective: Transform raw interaction data into a multi-task learning system that (1) discriminates human vs agent behavior and (2) learns pricing heuristics to prevent margin leakage in dynamic pricing algorithms.
Executive Summary
Current State
- ✅ Rich event collection system capturing 15+ event types (views, carts, hovers, filters)
- ✅ experimentId-based labeling via
xp_human_onlyflag - ✅ Kafka streaming to
user-interactionstopic - ✅ Airflow pipeline running every 15min with rule-based pricing (surge, elasticity)
- ❌ SessionState pipeline is O(n²) inefficient, messy feature extraction
- ❌ No supervised ML for agent detection (only velocity threshold heuristics)
- ❌ No multi-task learning architecture
- ❌ Pricing models are analytical formulas, not learned from data
Proposed Solution
- Refactor SessionState Pipeline: Vectorized feature engineering with proper augmentation
- Supervised Agent Classifier: Binary classifier (human/agent) using session features
- Multi-Task Learning: Joint model learning both tasks:
- Task A: Human/Agent Classification (cross-entropy loss)
- Task B: Price Sensitivity Prediction (regression loss)
- Synthetic Pricing Simulator: Fast RL-style environment for testing pricing strategies
- Knowledge Distillation: Distill insights from classifier into pricing heuristics
I. DATA ANALYSIS
What We Actually Collect
Event Schema (web/src/lib/events.ts):
{
sessionId: string, // Session UUID (httpOnly cookie)
experimentId?: string, // Experiment UUID → LABEL SOURCE
storeMode: 'hotel' | 'airline',
ts: string, // ISO8601 timestamp
page: string, // URL path
eventName: EventName, // 15 event types
productId?: string, // Product interaction
metadata?: Record<string, unknown>, // Product details (price, amenities)
userAgent?: string
}
Event Types Breakdown:
- Navigation:
page_view,view_item_page,learn_more_about_item - Cart:
add_item_to_cart,remove_item,checkout_start,purchase_complete - Filters:
search,filter_for_date,filter_for_amenities,filter_for_price,sort_change - Dwell Signals:
hover_over_title,hover_over_paragraph,hover_over_link,hover_over_button - Session:
session_start
Ground Truth Labels
Labeling Strategy:
-- Experiment table schema (Supabase)
CREATE TABLE experiments (
id UUID PRIMARY KEY, -- This becomes experimentId in events
subject_name TEXT,
xp_human_only BOOLEAN, -- KEY FIELD for labeling
xp_market_mode TEXT,
xp_task_id UUID
);
Label Assignment:
- Admin creates experiment with
xp_human_only=true(human) orfalse(agent allowed) - Generates start link:
https://phantom-hotel.vercel.app/start-task?uuid={experimentId} - All events from that session carry
experimentId - Join events with experiments table on
experimentId→ getxp_human_onlylabel
Training Set Construction:
# Pseudocode
events_df = fetch_from_kafka('user-interactions')
experiments_df = fetch_from_supabase('experiments')
labeled_df = events_df.merge(
experiments_df[['id', 'xp_human_only']],
left_on='experimentId',
right_on='id',
how='inner' # Only keep sessions with known experimentId
)
# Label mapping
labeled_df['is_agent'] = ~labeled_df['xp_human_only'] # Flip boolean
What We're Missing
For Pricing Task:
- ❌ Price paid vs price shown (conversion price tracking)
- ❌ Product availability at interaction time
- ❌ Competitor prices (external data)
- ✅ Demand proxy (view counts, cart adds) - WE HAVE THIS
- ✅ Base prices from product metadata - WE HAVE THIS
For Agent Detection:
- ✅ Interaction velocity, dwell times, product view depth - WE HAVE THIS
- ✅ userAgent strings - WE HAVE THIS
- ❌ Mouse movement patterns (only have hover events, not trajectories)
- ❌ Typing speed in search fields
II. PHASE 1: Fix SessionState Pipeline
Current Issues (experiments/procesing/steps/session.py)
Problem 1: O(n²) Complexity
# Line 77-93: _apply_to_slice()
for idx, row in df.iterrows(): # For EVERY row
session_df = df[df['sessionId'] == row['sessionId']] # Scan ALL events
current_time_events = session_df[session_df['ts'] <= row['ts']]
features = _extract_features_for_session(current_time_events)
# This scans the full dataset O(n) for each of n rows = O(n²)
Problem 2: No Proper Feature Augmentation
- Features computed only at aggregate session level
- No rolling window statistics
- No temporal decay of signals
- Missing product-level features (price seen, amenity preferences)
Proposed Solution: Vectorized Feature Engineering
New Pipeline Architecture:
Raw Events (Kafka)
↓
SessionGrouper (group by sessionId + experimentId)
↓
TemporalFeatureExtractor (vectorized time diffs, velocity)
↓
BehavioralFeatureExtractor (interaction counts, ratios)
↓
ProductFeatureExtractor (price sensitivity, preference patterns)
↓
SessionAggregator (final session-level features)
↓
LabelJoiner (merge with experiments table)
↓
Feature Matrix + Labels (ready for ML)
Feature Categories:
A. Temporal Features (Vectorized)
class TemporalFeatureExtractor(BaseEstimator, TransformerMixin):
def transform(self, events_df):
# Sort by session and time
df = events_df.sort_values(['sessionId', 'ts'])
# Vectorized time differences
df['ts_dt'] = pd.to_datetime(df['ts'])
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
# Rolling statistics (vectorized with groupby)
df['velocity_5min'] = df.groupby('sessionId').rolling(
window='5min', on='ts_dt'
)['eventName'].count().reset_index(drop=True)
df['avg_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('mean')
df['std_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('std')
return df
B. Behavioral Features
class BehavioralFeatureExtractor:
def transform(self, events_df):
session_agg = events_df.groupby('sessionId').agg({
# Interaction counts
'eventName': 'count',
'productId': lambda x: x.nunique(),
# Event type breakdowns
'page_view': lambda x: (x == 'page_view').sum(),
'add_item_to_cart': lambda x: (x == 'add_item_to_cart').sum(),
'hover_over_*': lambda x: x.str.startswith('hover').sum(),
# Session duration
'ts': lambda x: (x.max() - x.min()).total_seconds()
}).rename(columns={...})
# Derived ratios
session_agg['cart_to_view_ratio'] = (
session_agg['cart_adds'] / (session_agg['item_views'] + 1)
)
session_agg['hover_intensity'] = (
session_agg['total_hovers'] / session_agg['total_interactions']
)
return session_agg
C. Product Interaction Features
class ProductFeatureExtractor:
def transform(self, events_df):
# Extract price from metadata when available
events_df['price_seen'] = events_df['metadata'].apply(
lambda x: x.get('base_price') if isinstance(x, dict) else None
)
product_agg = events_df[events_df['productId'].notna()].groupby('sessionId').agg({
# Price sensitivity
'price_seen': ['mean', 'min', 'max', 'std'],
# Product diversity
'productId': lambda x: x.nunique(),
# Max view depth (how many times most-viewed product was viewed)
'productId': lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0
})
# Filter usage (proxy for deliberate search)
filter_events = events_df[events_df['eventName'].str.contains('filter|search')]
filter_counts = filter_events.groupby('sessionId').size()
product_agg['filter_usage'] = filter_counts
return product_agg
D. Final Feature Matrix
# Target schema (session-level)
features = pd.DataFrame({
# Identifiers
'sessionId': str,
'experimentId': str,
# Temporal (8 features)
'session_duration_sec': float,
'total_interactions': int,
'avg_time_between_events': float,
'std_time_between_events': float,
'interaction_velocity': float, # interactions/min
'max_velocity_5min': float,
'min_time_between_events': float,
'session_start_hour': int, # time of day (0-23)
# Behavioral (10 features)
'page_views': int,
'item_views': int,
'cart_adds': int,
'purchases': int,
'hover_events': int,
'filter_events': int,
'cart_to_view_ratio': float,
'conversion_rate': float,
'hover_intensity': float,
'unique_pages_visited': int,
# Product interaction (8 features)
'unique_products_viewed': int,
'product_view_depth': int, # max times single product viewed
'avg_price_seen': float,
'min_price_seen': float,
'max_price_seen': float,
'std_price_seen': float,
'price_range_explored': float, # max - min
'filter_usage_count': int,
# userAgent parsing (3 features)
'is_headless_browser': bool, # "HeadlessChrome" in UA
'is_automation_tool': bool, # "Selenium", "Playwright", etc.
'browser_family': str,
# LABELS
'is_agent': bool, # from experiments.xp_human_only
'purchased': bool,
'total_revenue': float # for pricing task
})
Implementation Files
Create new directory structure:
experiments/
ml/
__init__.py
features/
__init__.py
temporal.py # TemporalFeatureExtractor
behavioral.py # BehavioralFeatureExtractor
product.py # ProductFeatureExtractor
useragent.py # UserAgentParser
pipeline.py # Full feature pipeline (sklearn Pipeline)
datasets.py # Data loaders (Kafka → Pandas)
Replace messy session.py with:
# experiments/ml/pipeline.py
from sklearn.pipeline import Pipeline
def build_feature_pipeline():
return Pipeline([
('temporal', TemporalFeatureExtractor()),
('behavioral', BehavioralFeatureExtractor()),
('product', ProductFeatureExtractor()),
('useragent', UserAgentParser()),
('aggregator', SessionAggregator()),
('label_joiner', ExperimentLabelJoiner())
])
# Usage:
pipeline = build_feature_pipeline()
feature_matrix = pipeline.fit_transform(raw_events_df)
# Output: DataFrame with 29 features + 3 labels per session
III. PHASE 2: Supervised Agent Classifier
Model Architecture
Objective: Binary classification is_agent ∈ {0, 1} from session features
Model Choice: Gradient Boosting (XGBoost/LightGBM)
- Handles mixed feature types (continuous, categorical, boolean)
- Interpretable feature importance
- Strong baseline performance
- Fast inference for real-time detection
Alternative: Neural Network (if needed for multi-task later)
class AgentClassifier(nn.Module):
def __init__(self, n_features=29, hidden_dims=[128, 64, 32]):
super().__init__()
layers = []
in_dim = n_features
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(in_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
])
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, 1)) # Binary logit
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x) # Shape: [batch, 1]
Training Pipeline
File: experiments/ml/train_classifier.py
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
def train_agent_classifier(feature_matrix_df):
# Separate features and labels
X = feature_matrix_df.drop(columns=['sessionId', 'experimentId', 'is_agent', 'purchased', 'total_revenue'])
y = feature_matrix_df['is_agent'].astype(int)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# XGBoost with class imbalance handling
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.05,
scale_pos_weight=scale_pos_weight,
eval_metric='auc',
early_stopping_rounds=20
)
model.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
verbose=10
)
# Evaluation
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
print(classification_report(y_test, y_pred))
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")
# Feature importance
importance_df = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
return model, importance_df
# Publish to ModelRegistry
def publish_classifier(model, registry):
import joblib
model_bytes = joblib.dumps(model)
registry.redis_client.set('classifier:agent_detector:latest', model_bytes)
Inference Integration
Update Pricing Provider (backend/provider/app.py):
from lib.model_registry import ModelRegistry
import joblib
registry = ModelRegistry()
# Load classifier on startup
classifier_bytes = registry.redis_client.get('classifier:agent_detector:latest')
agent_classifier = joblib.loads(classifier_bytes)
@app.get("/api/{mode}/price/{productId}")
def get_price(mode, productId, sessionId, experimentId):
# Fetch session features from real-time feature store
session_features = compute_session_features(sessionId) # New function
# Predict agent probability
agent_prob = agent_classifier.predict_proba([session_features])[0, 1]
# Apply dynamic markup based on agent probability
base_price = get_base_price(productId)
if agent_prob > 0.7: # High confidence agent
markup = 1.3
elif agent_prob > 0.4: # Uncertain
markup = 1.1
else: # Likely human
markup = 1.0
final_price = base_price * markup
return PriceResponse(
productId=productId,
price=final_price,
agent_probability=agent_prob,
markup=markup
)
Real-Time Feature Computation:
# backend/provider/features.py
def compute_session_features(sessionId: str) -> np.ndarray:
"""
Compute features for active session from recent Kafka events
Uses sliding window (last 5 minutes)
"""
consumer = KafkaConsumer('user-interactions')
# Fetch recent events for this session
events = []
for msg in consumer:
event = msg.value
if event['sessionId'] == sessionId:
events.append(event)
# Stop after collecting enough history or timeout
if len(events) > 100 or time_elapsed > 300:
break
events_df = pd.DataFrame(events)
# Apply feature pipeline (same as training)
feature_pipeline = load_feature_pipeline()
features = feature_pipeline.transform(events_df)
return features.values[0] # Return single row as array
IV. PHASE 3: Multi-Task Learning Architecture
Conceptual Framework
Two Tasks:
-
Task A: Agent Classification (what we just built)
- Input: Session features
- Output: P(is_agent)
- Loss: Binary cross-entropy
-
Task B: Price Sensitivity Prediction
- Input: Session features + Product features + Current price
- Output: P(purchase | price) or Willingness-to-Pay (WTP)
- Loss: Binary cross-entropy (purchase) or MSE (WTP)
Shared Representation Hypothesis:
- Both tasks benefit from learning session behavioral patterns
- Agent sessions have different price sensitivity than human sessions
- Shared feature encoder can learn robust representations
Multi-Task Network Architecture
class MultiTaskPricingModel(nn.Module):
"""
Shared encoder for session features
Two task-specific heads
"""
def __init__(self, n_session_features=29, n_product_features=10):
super().__init__()
# Shared session encoder
self.session_encoder = nn.Sequential(
nn.Linear(n_session_features, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.BatchNorm1d(64),
nn.ReLU()
)
# Product encoder (processes product features + price)
self.product_encoder = nn.Sequential(
nn.Linear(n_product_features, 32),
nn.ReLU(),
nn.Linear(32, 16)
)
# Task A: Agent classification head
self.agent_head = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1) # Logit for is_agent
)
# Task B: Purchase probability head (uses both session + product)
self.purchase_head = nn.Sequential(
nn.Linear(64 + 16, 32), # Concatenate session + product encodings
nn.ReLU(),
nn.Linear(32, 1) # Logit for purchase probability
)
def forward(self, session_features, product_features):
# Encode session (shared)
session_enc = self.session_encoder(session_features)
# Encode product
product_enc = self.product_encoder(product_features)
# Task A prediction
agent_logit = self.agent_head(session_enc)
# Task B prediction
combined = torch.cat([session_enc, product_enc], dim=1)
purchase_logit = self.purchase_head(combined)
return {
'agent_logit': agent_logit,
'purchase_logit': purchase_logit,
'session_embedding': session_enc # For knowledge distillation
}
Multi-Task Loss Function
def multi_task_loss(outputs, targets, task_weights={'agent': 1.0, 'purchase': 2.0}):
"""
Weighted combination of task losses
Args:
outputs: dict with 'agent_logit', 'purchase_logit'
targets: dict with 'is_agent', 'purchased'
task_weights: importance weights for each task
"""
# Task A: Agent classification
loss_agent = F.binary_cross_entropy_with_logits(
outputs['agent_logit'].squeeze(),
targets['is_agent'].float()
)
# Task B: Purchase prediction
loss_purchase = F.binary_cross_entropy_with_logits(
outputs['purchase_logit'].squeeze(),
targets['purchased'].float()
)
# Weighted sum
total_loss = (
task_weights['agent'] * loss_agent +
task_weights['purchase'] * loss_purchase
)
return total_loss, {
'loss_agent': loss_agent.item(),
'loss_purchase': loss_purchase.item(),
'loss_total': total_loss.item()
}
Training Loop
File: experiments/ml/train_multitask.py
import torch
from torch.utils.data import DataLoader, TensorDataset
def train_multitask_model(feature_matrix_df, n_epochs=100):
# Prepare data
X_session = feature_matrix_df[SESSION_FEATURE_COLS].values
X_product = feature_matrix_df[PRODUCT_FEATURE_COLS].values
y_agent = feature_matrix_df['is_agent'].values
y_purchase = feature_matrix_df['purchased'].values
# Convert to tensors
dataset = TensorDataset(
torch.FloatTensor(X_session),
torch.FloatTensor(X_product),
torch.FloatTensor(y_agent),
torch.FloatTensor(y_purchase)
)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
# Initialize model
model = MultiTaskPricingModel(
n_session_features=X_session.shape[1],
n_product_features=X_product.shape[1]
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
# Training loop
for epoch in range(n_epochs):
model.train()
epoch_losses = {'agent': [], 'purchase': [], 'total': []}
for X_sess_batch, X_prod_batch, y_agent_batch, y_purch_batch in train_loader:
optimizer.zero_grad()
# Forward pass
outputs = model(X_sess_batch, X_prod_batch)
# Compute loss
loss, loss_dict = multi_task_loss(
outputs,
{'is_agent': y_agent_batch, 'purchased': y_purch_batch}
)
# Backward pass
loss.backward()
optimizer.step()
# Track losses
for k, v in loss_dict.items():
epoch_losses[k.replace('loss_', '')].append(v)
# Log epoch stats
avg_loss = np.mean(epoch_losses['total'])
print(f"Epoch {epoch}: Loss={avg_loss:.4f}, "
f"Agent={np.mean(epoch_losses['agent']):.4f}, "
f"Purchase={np.mean(epoch_losses['purchase']):.4f}")
scheduler.step(avg_loss)
return model
Knowledge Distillation for Pricing Heuristics
Goal: Extract interpretable pricing rules from the learned multi-task model
Method 1: Decision Tree Distillation
from sklearn.tree import DecisionTreeClassifier, export_text
def distill_pricing_rules(multitask_model, feature_matrix_df):
"""
Train interpretable decision tree to mimic multi-task model's pricing decisions
"""
# Get embeddings from multi-task model
X_session = torch.FloatTensor(feature_matrix_df[SESSION_FEATURE_COLS].values)
X_product = torch.FloatTensor(feature_matrix_df[PRODUCT_FEATURE_COLS].values)
with torch.no_grad():
outputs = multitask_model(X_session, X_product)
session_embeddings = outputs['session_embedding'].numpy()
agent_probs = torch.sigmoid(outputs['agent_logit']).numpy()
purchase_probs = torch.sigmoid(outputs['purchase_logit']).numpy()
# Compute optimal markup based on model predictions
# High agent prob + low purchase prob → high markup
optimal_markup = compute_markup_from_probs(agent_probs, purchase_probs)
# Train decision tree to predict optimal markup
distilled_tree = DecisionTreeClassifier(max_depth=5, min_samples_leaf=50)
distilled_tree.fit(feature_matrix_df[SESSION_FEATURE_COLS], optimal_markup)
# Extract human-readable rules
rules_text = export_text(distilled_tree, feature_names=SESSION_FEATURE_COLS)
print("Distilled Pricing Rules:")
print(rules_text)
return distilled_tree, rules_text
def compute_markup_from_probs(agent_probs, purchase_probs):
"""
Heuristic:
- High agent prob → increase markup (exploit reconnaissance)
- Low purchase prob → decrease markup (improve conversion)
"""
markup = 1.0 + 0.5 * agent_probs - 0.3 * purchase_probs
return np.clip(markup, 0.7, 1.5) # Bounds: [70% to 150%]
Method 2: SHAP Values for Feature Importance
import shap
def explain_pricing_model(multitask_model, feature_matrix_df):
"""Use SHAP to understand which features drive pricing decisions"""
# Wrap model for SHAP
def model_predict(X_session):
X_session_t = torch.FloatTensor(X_session)
X_product_t = torch.zeros(len(X_session), n_product_features) # Placeholder
with torch.no_grad():
outputs = multitask_model(X_session_t, X_product_t)
agent_probs = torch.sigmoid(outputs['agent_logit']).numpy()
return agent_probs
# Compute SHAP values
explainer = shap.KernelExplainer(model_predict, feature_matrix_df[SESSION_FEATURE_COLS].sample(100))
shap_values = explainer.shap_values(feature_matrix_df[SESSION_FEATURE_COLS].sample(200))
# Visualize
shap.summary_plot(shap_values, feature_matrix_df[SESSION_FEATURE_COLS].sample(200))
return shap_values
V. PHASE 4: Synthetic Dynamic Pricing Simulator
Motivation
Problem: Real-world pricing experiments are:
- Slow (need to wait for user sessions)
- Expensive (margin leakage during exploration)
- Risky (can break revenue if pricing goes wrong)
Solution: Fast synthetic environment to:
- Simulate user/agent behavior
- Test pricing strategies in closed loop
- Train RL agents for dynamic pricing
- Validate multi-task model predictions
Simulator Architecture
File: experiments/ml/simulator/env.py
import gymnasium as gym
import numpy as np
from typing import Dict, Tuple
class DynamicPricingEnv(gym.Env):
"""
Synthetic environment for dynamic pricing with human/agent users
State: [current_demand, inventory_level, time_of_day, agent_fraction, avg_session_velocity]
Action: price_multiplier ∈ [0.7, 1.5] (continuous)
Reward: revenue - margin_leakage_penalty
"""
def __init__(self,
n_products=20,
n_timesteps=1000,
human_price_sensitivity=2.0, # Elasticity
agent_price_sensitivity=5.0, # Agents more price-aware
agent_fraction=0.3):
super().__init__()
self.n_products = n_products
self.n_timesteps = n_timesteps
self.human_sensitivity = human_price_sensitivity
self.agent_sensitivity = agent_price_sensitivity
self.agent_fraction = agent_fraction
# State space: [demand, inventory, hour, agent_frac, velocity]
self.observation_space = gym.spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([100, 100, 24, 1, 20]),
dtype=np.float32
)
# Action space: price multiplier
self.action_space = gym.spaces.Box(
low=0.7, high=1.5, shape=(1,), dtype=np.float32
)
# Internal state
self.base_prices = np.random.uniform(100, 500, n_products)
self.inventory = np.full(n_products, 50)
self.timestep = 0
self.demand_history = []
def reset(self, seed=None):
super().reset(seed=seed)
self.inventory = np.full(self.n_products, 50)
self.timestep = 0
self.demand_history = []
return self._get_state(), {}
def step(self, action: float) -> Tuple[np.ndarray, float, bool, bool, Dict]:
"""
Execute one pricing decision
Args:
action: price_multiplier
Returns:
state, reward, terminated, truncated, info
"""
price_multiplier = action[0]
# Simulate user arrivals (Poisson process)
hour = self.timestep % 24
arrival_rate = self._get_arrival_rate(hour)
n_arrivals = np.random.poisson(arrival_rate)
# Separate humans and agents
n_agents = np.random.binomial(n_arrivals, self.agent_fraction)
n_humans = n_arrivals - n_agents
# Simulate purchase decisions
current_price = self.base_prices[0] * price_multiplier # Simplified: 1 product
# Human purchase probability (logistic demand curve)
human_purchase_prob = self._purchase_probability(
current_price,
self.base_prices[0],
self.human_sensitivity
)
human_purchases = np.random.binomial(n_humans, human_purchase_prob)
# Agent purchase probability (more sensitive)
agent_purchase_prob = self._purchase_probability(
current_price,
self.base_prices[0],
self.agent_sensitivity
)
agent_purchases = np.random.binomial(n_agents, agent_purchase_prob)
# Compute revenue
revenue = current_price * (human_purchases + agent_purchases)
# Compute margin leakage (agents getting oracle prices)
oracle_price = self.base_prices[0] * 1.2 # Optimal price for agents
margin_leakage = (oracle_price - current_price) * agent_purchases
margin_leakage = max(0, margin_leakage) # Only count if we underpriced
# Reward = revenue - margin leakage penalty
reward = revenue - 0.5 * margin_leakage
# Update inventory
total_purchases = human_purchases + agent_purchases
self.inventory[0] -= total_purchases
# Track demand
self.demand_history.append(n_arrivals)
# Next state
self.timestep += 1
terminated = self.timestep >= self.n_timesteps
truncated = self.inventory[0] <= 0
state = self._get_state()
info = {
'revenue': revenue,
'margin_leakage': margin_leakage,
'human_purchases': human_purchases,
'agent_purchases': agent_purchases,
'agent_fraction': n_agents / n_arrivals if n_arrivals > 0 else 0
}
return state, reward, terminated, truncated, info
def _get_state(self) -> np.ndarray:
"""Current state observation"""
demand = np.mean(self.demand_history[-10:]) if self.demand_history else 0
inventory = self.inventory[0]
hour = self.timestep % 24
agent_frac = self.agent_fraction # Simplified
velocity = np.random.uniform(1, 10) # Placeholder
return np.array([demand, inventory, hour, agent_frac, velocity], dtype=np.float32)
def _get_arrival_rate(self, hour: int) -> float:
"""Simulate time-of-day arrival patterns"""
# Higher demand during business hours (9am-5pm)
if 9 <= hour <= 17:
return 10.0
else:
return 3.0
def _purchase_probability(self, current_price, base_price, sensitivity):
"""
Logistic demand curve
P(purchase) = 1 / (1 + exp(sensitivity * (log(current_price) - log(base_price))))
"""
price_ratio = current_price / base_price
logit = sensitivity * (np.log(price_ratio))
prob = 1 / (1 + np.exp(logit))
return np.clip(prob, 0.01, 0.99)
Behavioral Models
File: experiments/ml/simulator/agents.py
class SyntheticUser:
"""Base class for simulated users"""
def __init__(self, price_sensitivity, session_velocity):
self.price_sensitivity = price_sensitivity
self.session_velocity = session_velocity # interactions/min
def generate_session(self) -> List[Event]:
"""Generate event sequence"""
raise NotImplementedError
class HumanUser(SyntheticUser):
"""Simulates human browsing behavior"""
def __init__(self):
super().__init__(
price_sensitivity=2.0, # Moderate
session_velocity=np.random.uniform(1, 3) # Slow browsing
)
def generate_session(self) -> List[Event]:
events = []
# Human pattern: slow, exploratory
n_page_views = np.random.poisson(5)
n_hovers = np.random.poisson(8)
n_products_viewed = np.random.randint(2, 6)
# Time between events (seconds)
inter_event_times = np.random.exponential(20, size=n_page_views)
for i in range(n_page_views):
events.append({
'eventName': 'page_view',
'timestamp': sum(inter_event_times[:i]),
'velocity': self.session_velocity
})
# Purchase decision (price-dependent)
purchase_prob = self._compute_purchase_prob(current_price)
if np.random.rand() < purchase_prob:
events.append({'eventName': 'purchase_complete', ...})
return events
class AgentUser(SyntheticUser):
"""Simulates agent/bot behavior"""
def __init__(self):
super().__init__(
price_sensitivity=5.0, # Highly price-aware
session_velocity=np.random.uniform(10, 20) # Fast reconnaissance
)
def generate_session(self) -> List[Event]:
events = []
# Agent pattern: fast, systematic
n_products_viewed = np.random.randint(15, 30) # Views many products
inter_event_times = np.random.exponential(2, size=n_products_viewed) # Fast
for i in range(n_products_viewed):
events.append({
'eventName': 'view_item_page',
'timestamp': sum(inter_event_times[:i]),
'velocity': self.session_velocity
})
# Agents rarely purchase (reconnaissance)
purchase_prob = 0.05
if np.random.rand() < purchase_prob:
events.append({'eventName': 'purchase_complete', ...})
return events
RL Training for Pricing Policy
File: experiments/ml/simulator/train_rl.py
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
def train_pricing_policy():
"""Train RL agent to learn optimal pricing strategy"""
# Create environment
env = DynamicPricingEnv(agent_fraction=0.3)
env = DummyVecEnv([lambda: env])
# Train PPO agent
model = PPO(
"MlpPolicy",
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
verbose=1,
tensorboard_log="./logs/"
)
model.learn(total_timesteps=100_000)
# Evaluate
obs = env.reset()
total_reward = 0
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
total_reward += reward
if done:
print(f"Episode reward: {total_reward}")
break
return model
# Compare with baseline strategies
def benchmark_strategies():
strategies = {
'fixed': lambda state: np.array([1.0]), # No markup
'surge': lambda state: np.array([1.2]) if state[0] > 10 else np.array([0.9]),
'rl': lambda state: rl_model.predict(state)[0]
}
for name, policy in strategies.items():
env = DynamicPricingEnv()
total_reward = evaluate_policy(env, policy, n_episodes=100)
print(f"{name}: avg reward = {total_reward:.2f}")
Integration with Multi-Task Model
Use multi-task model predictions as state features:
class EnhancedPricingEnv(DynamicPricingEnv):
"""Environment augmented with multi-task model predictions"""
def __init__(self, multitask_model):
super().__init__()
self.multitask_model = multitask_model
def _get_state(self):
# Get base state
base_state = super()._get_state()
# Generate synthetic session features
session_features = self._generate_session_features()
# Get multi-task model predictions
with torch.no_grad():
outputs = self.multitask_model(
torch.FloatTensor(session_features).unsqueeze(0),
torch.zeros(1, n_product_features)
)
agent_prob = torch.sigmoid(outputs['agent_logit']).item()
purchase_prob = torch.sigmoid(outputs['purchase_logit']).item()
# Augment state with model predictions
enhanced_state = np.concatenate([
base_state,
[agent_prob, purchase_prob]
])
return enhanced_state
VI. IMPLEMENTATION ROADMAP
Phase 1: Foundation (Week 1-2)
1.1 Refactor Feature Pipeline ✓
- Create
experiments/ml/features/directory - Implement
TemporalFeatureExtractor(vectorized) - Implement
BehavioralFeatureExtractor - Implement
ProductFeatureExtractor - Implement
UserAgentParser - Create end-to-end
build_feature_pipeline() - Write unit tests for each extractor
- Benchmark: ensure <5min processing for 100k events
1.2 Data Loading ✓
- Implement
experiments/ml/datasets.py - Add function
load_events_from_kafka(topic, time_range) - Add function
load_experiments_from_supabase() - Add function
create_labeled_dataset()(joins events + experiments) - Add data validation (check for missing experimentIds, etc.)
1.3 Model Registry Updates ✓
- Extend
lib/model_registry.pyto store:- Pickled sklearn/XGBoost models
- PyTorch model state_dicts
- Feature column names (schema versioning)
- Add
publish_classifier(model, metadata) - Add
publish_multitask_model(model, metadata)
Phase 2: Supervised Classifier (Week 2-3)
2.1 Training Pipeline ✓
- Create
experiments/ml/train_classifier.py - Load labeled dataset with feature pipeline
- Train XGBoost classifier
- Evaluate: ROC-AUC, precision/recall, confusion matrix
- Generate feature importance report
- Publish model to registry
2.2 Inference Integration ✓
- Update
backend/provider/app.py - Implement
compute_session_features(sessionId)for real-time inference - Add
/api/detect-agent/{sessionId}endpoint (debugging) - Modify
/api/{mode}/price/{productId}to:- Compute agent probability
- Apply dynamic markup based on probability
- Log (sessionId, agent_prob, markup) to Kafka
2.3 Monitoring ✓
- Create Airflow DAG:
agent_classifier_training_pipeline- Runs daily
- Fetches last 7 days of data
- Retrains classifier
- Publishes to registry
- Add Grafana dashboard:
- Agent detection rate over time
- Precision/recall metrics
- Markup distribution (human vs agent sessions)
Phase 3: Multi-Task Learning (Week 3-5)
3.1 Model Development ✓
- Create
experiments/ml/models/multitask.py - Implement
MultiTaskPricingModel(PyTorch) - Implement
multi_task_loss() - Create
experiments/ml/train_multitask.py
3.2 Training ✓
- Prepare product-level dataset (not just session-level)
- Each row: (session_features, product_features, price, is_agent, purchased)
- Requires joining events with product catalog
- Train multi-task model
- Evaluate both tasks:
- Agent classification: ROC-AUC
- Purchase prediction: ROC-AUC, calibration plot
- Hyperparameter tuning (task weights, learning rate, architecture)
3.3 Knowledge Distillation ✓
- Implement
distill_pricing_rules()(decision tree) - Implement
explain_pricing_model()(SHAP) - Generate interpretable pricing rule report
- Validate: compare distilled tree performance vs full model
3.4 Deployment ✓
- Serialize PyTorch model to ONNX for fast inference
- Update pricing provider to use multi-task model:
- Predict agent probability
- Predict purchase probability given current price
- Compute optimal price (maximize expected revenue)
- A/B test: multi-task pricing vs baseline elasticity pricing
Phase 4: Synthetic Simulator (Week 5-6)
4.1 Environment ✓
- Create
experiments/ml/simulator/ - Implement
DynamicPricingEnv(Gymnasium) - Implement
HumanUserandAgentUserbehavioral models - Validate environment:
- Test human_sensitivity vs agent_sensitivity differentiation
- Visualize demand curves for both user types
4.2 RL Training ✓
- Create
experiments/ml/simulator/train_rl.py - Train PPO agent
- Benchmark against baseline strategies (fixed, surge, elasticity)
- Visualize: reward curves, pricing decisions over time
4.3 Integration with Multi-Task Model ✓
- Implement
EnhancedPricingEnv(uses multi-task model predictions) - Train RL agent in enhanced environment
- Compare: RL with model predictions vs RL without
4.4 Validation ✓
- Run counterfactual simulations:
- "What if agent fraction was 50%?"
- "What if agents were less price-sensitive?"
- Generate risk analysis report for pricing strategies
Phase 5: Production Integration (Week 7-8)
5.1 Real-Time Feature Store ✓
- Implement session feature caching (Redis)
- Create Kafka Streams job:
- Consumes
user-interactions - Computes rolling session features
- Publishes to Redis with TTL
- Consumes
- Update
compute_session_features()to read from cache
5.2 Pricing Service Refactor ✓
- Decouple pricing logic from provider service
- Create
experiments/ml/inference/pricing_service.py - Expose gRPC API for low-latency pricing
- Load model once on startup (not per-request)
5.3 Observability ✓
- Add OpenTelemetry tracing to pricing service
- Create Grafana dashboard:
- Pricing latency (p50, p99)
- Revenue metrics (human vs agent)
- Model prediction distribution
- Set up alerts:
- Agent detection rate spike
- Margin leakage threshold exceeded
- Model inference failures
5.4 Documentation ✓
- Write model cards for both models:
- Intended use
- Training data characteristics
- Performance metrics
- Limitations & biases
- Create runbook for retraining pipelines
- API documentation for pricing service
VII. TECHNICAL STACK
Data Engineering
- Streaming: Kafka (existing)
- Storage: Supabase (PostgreSQL) for metadata, Kafka for events
- Feature Store: Redis (for real-time session features)
- Orchestration: Airflow (existing)
Machine Learning
- Feature Engineering: Pandas, NumPy, Scikit-learn
- Classifier: XGBoost / LightGBM (for speed + interpretability)
- Multi-Task Model: PyTorch
- RL: Stable-Baselines3 (PPO)
- Explainability: SHAP, Scikit-learn decision trees
- Model Registry: Redis (lightweight) or MLflow (if scaling up)
Inference
- Serving: ONNX Runtime (for PyTorch models) or direct PyTorch
- API: FastAPI (existing backend)
- Caching: Redis
Monitoring
- Metrics: Prometheus
- Dashboards: Grafana
- Tracing: OpenTelemetry
VIII. KEY DECISIONS & TRADEOFFS
Decision 1: XGBoost vs Neural Network for Classifier
Choice: Start with XGBoost, migrate to NN if multi-task learning shows benefit
Rationale:
- XGBoost: Faster training, interpretable, strong baseline
- Neural Network: Required for multi-task learning with shared encoder
- Approach: Build both, use XGBoost for initial deployment
Decision 2: Real-Time vs Batch Feature Engineering
Choice: Hybrid approach
- Batch: Full feature pipeline in Airflow (daily retraining)
- Real-Time: Lightweight rolling features in Kafka Streams (for inference)
Tradeoff: Complexity vs latency
- Real-time adds complexity but enables dynamic pricing
- Batch is simpler but can't adapt mid-session
Decision 3: Separate Simulator vs Production Trace Replay
Choice: Separate simulator (not trace replay)
Rationale:
- Trace replay: More realistic but slow, requires large historical dataset
- Simulator: Fast iteration, can test edge cases (high agent fraction, etc.)
- Approach: Use simulator for development, validate on historical traces before production
Decision 4: Knowledge Distillation Method
Choice: Decision tree distillation + SHAP explanations
Rationale:
- Decision trees: Generate actionable rules for non-ML stakeholders
- SHAP: Understand feature importance for model debugging
- Both complement each other (global rules + local explanations)
IX. SUCCESS METRICS
Model Performance
- Agent Classifier:
- ROC-AUC > 0.90
- Precision > 0.85 at 80% recall (minimize false agent flags)
- Purchase Predictor:
- ROC-AUC > 0.75
- Calibration error < 0.1 (predicted probs match observed rates)
Business Impact
- Margin Leakage Reduction: Target 30% reduction in
L_agent - Revenue: No degradation in human conversion rate
- Latency: p99 pricing latency < 100ms
Operational
- Retraining Frequency: Daily (automated)
- Model Drift Detection: Alert if agent detection rate changes >20% week-over-week
- Uptime: 99.9% availability for pricing service
X. RISKS & MITIGATION
Risk 1: Label Noise
Issue: xp_human_only flag may be incorrect (humans acting like agents, vice versa)
Mitigation:
- Use soft labels (agent probability) instead of hard classification
- Implement active learning: flag uncertain sessions for manual review
- Combine flag with behavioral heuristics (velocity > threshold)
Risk 2: Concept Drift
Issue: Agent behavior evolves to evade detection
Mitigation:
- Monitor model performance metrics daily
- Implement champion/challenger A/B testing for new models
- Retrain weekly with latest data
Risk 3: Revenue Impact
Issue: Aggressive agent markup hurts conversion
Mitigation:
- Start with conservative markup (10% max)
- Run A/B test with 10% traffic
- Monitor revenue closely, rollback if <5% degradation
Risk 4: Simulator Misalignment
Issue: Simulator doesn't match real-world dynamics
Mitigation:
- Calibrate simulator parameters from historical data
- Validate RL policies on historical traces before production
- Run small-scale live experiments
XI. NEXT STEPS (Immediate Actions)
- Review this document with team for alignment
- Set up project structure:
mkdir -p experiments/ml/{features,models,simulator,inference} mkdir -p experiments/ml/notebooks # For EDA - Create sample dataset:
- Pull 1 week of events from Kafka
- Join with experiments table
- Validate label distribution (human/agent ratio)
- Prototype feature pipeline:
- Start with
TemporalFeatureExtractor - Benchmark on 10k events
- Ensure output matches expected schema
- Start with
- Kickoff Phase 1 tasks from roadmap
XII. REFERENCES & RESOURCES
Papers
- Multi-Task Learning Using Uncertainty to Weigh Losses - Kendall et al.
- Distilling the Knowledge in a Neural Network - Hinton et al.
- Deep Reinforcement Learning for Online Pricing
Code Examples
- Existing pipeline:
/home/user/PHANTOM/experiments/procesing/pipelines.py - Existing pricers:
/home/user/PHANTOM/experiments/procesing/pricers/ - Agent runner:
/home/user/PHANTOM/experiments/agents/agent.py
Internal Documentation
- Event schema:
/home/user/PHANTOM/web/src/lib/events.ts - Pricing provider API:
/home/user/PHANTOM/backend/provider/app.py - Airflow DAGs:
/home/user/PHANTOM/experiments/airflow/dags/
APPENDIX A: Feature Schema
SESSION_FEATURE_COLS = [
# Temporal (8)
'session_duration_sec',
'total_interactions',
'avg_time_between_events',
'std_time_between_events',
'interaction_velocity',
'max_velocity_5min',
'min_time_between_events',
'session_start_hour',
# Behavioral (10)
'page_views',
'item_views',
'cart_adds',
'purchases',
'hover_events',
'filter_events',
'cart_to_view_ratio',
'conversion_rate',
'hover_intensity',
'unique_pages_visited',
# Product (8)
'unique_products_viewed',
'product_view_depth',
'avg_price_seen',
'min_price_seen',
'max_price_seen',
'std_price_seen',
'price_range_explored',
'filter_usage_count',
# UserAgent (3)
'is_headless_browser',
'is_automation_tool',
'browser_family'
]
PRODUCT_FEATURE_COLS = [
'base_price',
'current_demand',
'inventory_level',
'avg_demand_last_hour',
'price_elasticity',
'n_amenities',
'is_refundable',
'days_until_date',
'competitor_price_diff', # If available
'view_count_last_hour'
]
LABEL_COLS = [
'is_agent', # Binary: 0=human, 1=agent
'purchased', # Binary: 0=no purchase, 1=purchase
'total_revenue' # Float: revenue from this session
]
APPENDIX B: Sample Code Snippets
Data Loading
# experiments/ml/datasets.py
from kafka import KafkaConsumer
import pandas as pd
from supabase import create_client
def load_events_from_kafka(topic='user-interactions', hours=24):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
events = []
cutoff_time = datetime.now() - timedelta(hours=hours)
for message in consumer:
event = message.value
if pd.to_datetime(event['ts']) > cutoff_time:
events.append(event)
return pd.DataFrame(events)
def load_experiments_from_supabase():
client = create_client(SUPABASE_URL, SUPABASE_KEY)
response = client.table('experiments').select('*').execute()
return pd.DataFrame(response.data)
def create_labeled_dataset():
events_df = load_events_from_kafka(hours=168) # 1 week
experiments_df = load_experiments_from_supabase()
# Join
labeled_df = events_df.merge(
experiments_df[['id', 'xp_human_only']],
left_on='experimentId',
right_on='id',
how='inner'
)
# Create labels
labeled_df['is_agent'] = ~labeled_df['xp_human_only']
# Apply feature pipeline
feature_pipeline = build_feature_pipeline()
feature_matrix = feature_pipeline.fit_transform(labeled_df)
return feature_matrix
Feature Extraction
# experiments/ml/features/temporal.py
from sklearn.base import BaseEstimator, TransformerMixin
class TemporalFeatureExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, events_df):
df = events_df.copy()
df['ts_dt'] = pd.to_datetime(df['ts'])
df = df.sort_values(['sessionId', 'ts_dt'])
# Time diffs (vectorized)
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
# Session-level aggregates
session_temporal = df.groupby('sessionId').agg({
'ts_dt': [
('session_duration_sec', lambda x: (x.max() - x.min()).total_seconds()),
('session_start_hour', lambda x: x.min().hour)
],
'time_diff': [
('avg_time_between_events', 'mean'),
('std_time_between_events', 'std'),
('min_time_between_events', 'min')
],
'eventName': [
('total_interactions', 'count')
]
})
# Flatten multi-index columns
session_temporal.columns = [col[1] if col[1] else col[0] for col in session_temporal.columns]
# Compute velocity
session_temporal['interaction_velocity'] = (
session_temporal['total_interactions'] /
(session_temporal['session_duration_sec'] / 60 + 1e-6) # per minute
)
return session_temporal.reset_index()
END OF GAMEPLAN