Files
PHANTOM/docs/GAMEPLAN_MULTITASK_PRICING.md
Claude aab54ea7c0 docs: Add comprehensive multi-task learning architecture and gameplan
Created detailed documentation for implementing multi-task learning system
to improve agent detection and dynamic pricing:

- GAMEPLAN_MULTITASK_PRICING.md: Complete 50+ page technical specification
  including feature engineering, supervised learning, multi-task neural
  networks, synthetic simulator, and knowledge distillation approach

- ARCHITECTURE_OVERVIEW.md: Quick reference with visual diagrams comparing
  current rule-based system to proposed ML architecture, metrics, and
  implementation phases

Key improvements proposed:
- Replace O(n²) SessionState pipeline with vectorized feature extraction
- Train XGBoost classifier on experimentId labels (ROC-AUC >0.90 target)
- Multi-task neural network for joint agent detection + purchase prediction
- Gymnasium-based synthetic pricing environment for safe experimentation
- Knowledge distillation to extract interpretable pricing heuristics

Addresses margin leakage concerns with learned pricing strategies instead
of simple velocity thresholds.
2025-12-11 09:51:41 +00:00

1597 lines
50 KiB
Markdown

# Multi-Task Learning for Dynamic Pricing & Agent Detection
## Comprehensive Gameplan & Architecture
**Date:** 2025-12-11
**Objective:** Transform raw interaction data into a multi-task learning system that (1) discriminates human vs agent behavior and (2) learns pricing heuristics to prevent margin leakage in dynamic pricing algorithms.
---
## Executive Summary
### Current State
- ✅ Rich event collection system capturing 15+ event types (views, carts, hovers, filters)
- ✅ experimentId-based labeling via `xp_human_only` flag
- ✅ Kafka streaming to `user-interactions` topic
- ✅ Airflow pipeline running every 15min with rule-based pricing (surge, elasticity)
- ❌ SessionState pipeline is O(n²) inefficient, messy feature extraction
- ❌ No supervised ML for agent detection (only velocity threshold heuristics)
- ❌ No multi-task learning architecture
- ❌ Pricing models are analytical formulas, not learned from data
### Proposed Solution
1. **Refactor SessionState Pipeline**: Vectorized feature engineering with proper augmentation
2. **Supervised Agent Classifier**: Binary classifier (human/agent) using session features
3. **Multi-Task Learning**: Joint model learning both tasks:
- Task A: Human/Agent Classification (cross-entropy loss)
- Task B: Price Sensitivity Prediction (regression loss)
4. **Synthetic Pricing Simulator**: Fast RL-style environment for testing pricing strategies
5. **Knowledge Distillation**: Distill insights from classifier into pricing heuristics
---
## I. DATA ANALYSIS
### What We Actually Collect
**Event Schema** (`web/src/lib/events.ts`):
```typescript
{
sessionId: string, // Session UUID (httpOnly cookie)
experimentId?: string, // Experiment UUID → LABEL SOURCE
storeMode: 'hotel' | 'airline',
ts: string, // ISO8601 timestamp
page: string, // URL path
eventName: EventName, // 15 event types
productId?: string, // Product interaction
metadata?: Record<string, unknown>, // Product details (price, amenities)
userAgent?: string
}
```
**Event Types Breakdown:**
- **Navigation:** `page_view`, `view_item_page`, `learn_more_about_item`
- **Cart:** `add_item_to_cart`, `remove_item`, `checkout_start`, `purchase_complete`
- **Filters:** `search`, `filter_for_date`, `filter_for_amenities`, `filter_for_price`, `sort_change`
- **Dwell Signals:** `hover_over_title`, `hover_over_paragraph`, `hover_over_link`, `hover_over_button`
- **Session:** `session_start`
### Ground Truth Labels
**Labeling Strategy:**
```sql
-- Experiment table schema (Supabase)
CREATE TABLE experiments (
id UUID PRIMARY KEY, -- This becomes experimentId in events
subject_name TEXT,
xp_human_only BOOLEAN, -- KEY FIELD for labeling
xp_market_mode TEXT,
xp_task_id UUID
);
```
**Label Assignment:**
1. Admin creates experiment with `xp_human_only=true` (human) or `false` (agent allowed)
2. Generates start link: `https://phantom-hotel.vercel.app/start-task?uuid={experimentId}`
3. All events from that session carry `experimentId`
4. Join events with experiments table on `experimentId` → get `xp_human_only` label
**Training Set Construction:**
```python
# Pseudocode
events_df = fetch_from_kafka('user-interactions')
experiments_df = fetch_from_supabase('experiments')
labeled_df = events_df.merge(
experiments_df[['id', 'xp_human_only']],
left_on='experimentId',
right_on='id',
how='inner' # Only keep sessions with known experimentId
)
# Label mapping
labeled_df['is_agent'] = ~labeled_df['xp_human_only'] # Flip boolean
```
### What We're Missing
**For Pricing Task:**
- ❌ Price paid vs price shown (conversion price tracking)
- ❌ Product availability at interaction time
- ❌ Competitor prices (external data)
- ✅ Demand proxy (view counts, cart adds) - **WE HAVE THIS**
- ✅ Base prices from product metadata - **WE HAVE THIS**
**For Agent Detection:**
- ✅ Interaction velocity, dwell times, product view depth - **WE HAVE THIS**
- ✅ userAgent strings - **WE HAVE THIS**
- ❌ Mouse movement patterns (only have hover events, not trajectories)
- ❌ Typing speed in search fields
---
## II. PHASE 1: Fix SessionState Pipeline
### Current Issues (`experiments/procesing/steps/session.py`)
**Problem 1: O(n²) Complexity**
```python
# Line 77-93: _apply_to_slice()
for idx, row in df.iterrows(): # For EVERY row
session_df = df[df['sessionId'] == row['sessionId']] # Scan ALL events
current_time_events = session_df[session_df['ts'] <= row['ts']]
features = _extract_features_for_session(current_time_events)
# This scans the full dataset O(n) for each of n rows = O(n²)
```
**Problem 2: No Proper Feature Augmentation**
- Features computed only at aggregate session level
- No rolling window statistics
- No temporal decay of signals
- Missing product-level features (price seen, amenity preferences)
### Proposed Solution: Vectorized Feature Engineering
**New Pipeline Architecture:**
```
Raw Events (Kafka)
SessionGrouper (group by sessionId + experimentId)
TemporalFeatureExtractor (vectorized time diffs, velocity)
BehavioralFeatureExtractor (interaction counts, ratios)
ProductFeatureExtractor (price sensitivity, preference patterns)
SessionAggregator (final session-level features)
LabelJoiner (merge with experiments table)
Feature Matrix + Labels (ready for ML)
```
**Feature Categories:**
**A. Temporal Features (Vectorized)**
```python
class TemporalFeatureExtractor(BaseEstimator, TransformerMixin):
def transform(self, events_df):
# Sort by session and time
df = events_df.sort_values(['sessionId', 'ts'])
# Vectorized time differences
df['ts_dt'] = pd.to_datetime(df['ts'])
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
# Rolling statistics (vectorized with groupby)
df['velocity_5min'] = df.groupby('sessionId').rolling(
window='5min', on='ts_dt'
)['eventName'].count().reset_index(drop=True)
df['avg_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('mean')
df['std_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('std')
return df
```
**B. Behavioral Features**
```python
class BehavioralFeatureExtractor:
def transform(self, events_df):
session_agg = events_df.groupby('sessionId').agg({
# Interaction counts
'eventName': 'count',
'productId': lambda x: x.nunique(),
# Event type breakdowns
'page_view': lambda x: (x == 'page_view').sum(),
'add_item_to_cart': lambda x: (x == 'add_item_to_cart').sum(),
'hover_over_*': lambda x: x.str.startswith('hover').sum(),
# Session duration
'ts': lambda x: (x.max() - x.min()).total_seconds()
}).rename(columns={...})
# Derived ratios
session_agg['cart_to_view_ratio'] = (
session_agg['cart_adds'] / (session_agg['item_views'] + 1)
)
session_agg['hover_intensity'] = (
session_agg['total_hovers'] / session_agg['total_interactions']
)
return session_agg
```
**C. Product Interaction Features**
```python
class ProductFeatureExtractor:
def transform(self, events_df):
# Extract price from metadata when available
events_df['price_seen'] = events_df['metadata'].apply(
lambda x: x.get('base_price') if isinstance(x, dict) else None
)
product_agg = events_df[events_df['productId'].notna()].groupby('sessionId').agg({
# Price sensitivity
'price_seen': ['mean', 'min', 'max', 'std'],
# Product diversity
'productId': lambda x: x.nunique(),
# Max view depth (how many times most-viewed product was viewed)
'productId': lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0
})
# Filter usage (proxy for deliberate search)
filter_events = events_df[events_df['eventName'].str.contains('filter|search')]
filter_counts = filter_events.groupby('sessionId').size()
product_agg['filter_usage'] = filter_counts
return product_agg
```
**D. Final Feature Matrix**
```python
# Target schema (session-level)
features = pd.DataFrame({
# Identifiers
'sessionId': str,
'experimentId': str,
# Temporal (8 features)
'session_duration_sec': float,
'total_interactions': int,
'avg_time_between_events': float,
'std_time_between_events': float,
'interaction_velocity': float, # interactions/min
'max_velocity_5min': float,
'min_time_between_events': float,
'session_start_hour': int, # time of day (0-23)
# Behavioral (10 features)
'page_views': int,
'item_views': int,
'cart_adds': int,
'purchases': int,
'hover_events': int,
'filter_events': int,
'cart_to_view_ratio': float,
'conversion_rate': float,
'hover_intensity': float,
'unique_pages_visited': int,
# Product interaction (8 features)
'unique_products_viewed': int,
'product_view_depth': int, # max times single product viewed
'avg_price_seen': float,
'min_price_seen': float,
'max_price_seen': float,
'std_price_seen': float,
'price_range_explored': float, # max - min
'filter_usage_count': int,
# userAgent parsing (3 features)
'is_headless_browser': bool, # "HeadlessChrome" in UA
'is_automation_tool': bool, # "Selenium", "Playwright", etc.
'browser_family': str,
# LABELS
'is_agent': bool, # from experiments.xp_human_only
'purchased': bool,
'total_revenue': float # for pricing task
})
```
### Implementation Files
**Create new directory structure:**
```
experiments/
ml/
__init__.py
features/
__init__.py
temporal.py # TemporalFeatureExtractor
behavioral.py # BehavioralFeatureExtractor
product.py # ProductFeatureExtractor
useragent.py # UserAgentParser
pipeline.py # Full feature pipeline (sklearn Pipeline)
datasets.py # Data loaders (Kafka → Pandas)
```
**Replace messy session.py with:**
```python
# experiments/ml/pipeline.py
from sklearn.pipeline import Pipeline
def build_feature_pipeline():
return Pipeline([
('temporal', TemporalFeatureExtractor()),
('behavioral', BehavioralFeatureExtractor()),
('product', ProductFeatureExtractor()),
('useragent', UserAgentParser()),
('aggregator', SessionAggregator()),
('label_joiner', ExperimentLabelJoiner())
])
# Usage:
pipeline = build_feature_pipeline()
feature_matrix = pipeline.fit_transform(raw_events_df)
# Output: DataFrame with 29 features + 3 labels per session
```
---
## III. PHASE 2: Supervised Agent Classifier
### Model Architecture
**Objective:** Binary classification `is_agent ∈ {0, 1}` from session features
**Model Choice: Gradient Boosting (XGBoost/LightGBM)**
- Handles mixed feature types (continuous, categorical, boolean)
- Interpretable feature importance
- Strong baseline performance
- Fast inference for real-time detection
**Alternative: Neural Network (if needed for multi-task later)**
```python
class AgentClassifier(nn.Module):
def __init__(self, n_features=29, hidden_dims=[128, 64, 32]):
super().__init__()
layers = []
in_dim = n_features
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(in_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
])
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, 1)) # Binary logit
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x) # Shape: [batch, 1]
```
### Training Pipeline
**File: `experiments/ml/train_classifier.py`**
```python
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
def train_agent_classifier(feature_matrix_df):
# Separate features and labels
X = feature_matrix_df.drop(columns=['sessionId', 'experimentId', 'is_agent', 'purchased', 'total_revenue'])
y = feature_matrix_df['is_agent'].astype(int)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# XGBoost with class imbalance handling
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.05,
scale_pos_weight=scale_pos_weight,
eval_metric='auc',
early_stopping_rounds=20
)
model.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
verbose=10
)
# Evaluation
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
print(classification_report(y_test, y_pred))
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")
# Feature importance
importance_df = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
return model, importance_df
# Publish to ModelRegistry
def publish_classifier(model, registry):
import joblib
model_bytes = joblib.dumps(model)
registry.redis_client.set('classifier:agent_detector:latest', model_bytes)
```
### Inference Integration
**Update Pricing Provider** (`backend/provider/app.py`):
```python
from lib.model_registry import ModelRegistry
import joblib
registry = ModelRegistry()
# Load classifier on startup
classifier_bytes = registry.redis_client.get('classifier:agent_detector:latest')
agent_classifier = joblib.loads(classifier_bytes)
@app.get("/api/{mode}/price/{productId}")
def get_price(mode, productId, sessionId, experimentId):
# Fetch session features from real-time feature store
session_features = compute_session_features(sessionId) # New function
# Predict agent probability
agent_prob = agent_classifier.predict_proba([session_features])[0, 1]
# Apply dynamic markup based on agent probability
base_price = get_base_price(productId)
if agent_prob > 0.7: # High confidence agent
markup = 1.3
elif agent_prob > 0.4: # Uncertain
markup = 1.1
else: # Likely human
markup = 1.0
final_price = base_price * markup
return PriceResponse(
productId=productId,
price=final_price,
agent_probability=agent_prob,
markup=markup
)
```
**Real-Time Feature Computation:**
```python
# backend/provider/features.py
def compute_session_features(sessionId: str) -> np.ndarray:
"""
Compute features for active session from recent Kafka events
Uses sliding window (last 5 minutes)
"""
consumer = KafkaConsumer('user-interactions')
# Fetch recent events for this session
events = []
for msg in consumer:
event = msg.value
if event['sessionId'] == sessionId:
events.append(event)
# Stop after collecting enough history or timeout
if len(events) > 100 or time_elapsed > 300:
break
events_df = pd.DataFrame(events)
# Apply feature pipeline (same as training)
feature_pipeline = load_feature_pipeline()
features = feature_pipeline.transform(events_df)
return features.values[0] # Return single row as array
```
---
## IV. PHASE 3: Multi-Task Learning Architecture
### Conceptual Framework
**Two Tasks:**
1. **Task A: Agent Classification** (what we just built)
- Input: Session features
- Output: P(is_agent)
- Loss: Binary cross-entropy
2. **Task B: Price Sensitivity Prediction**
- Input: Session features + Product features + Current price
- Output: P(purchase | price) or Willingness-to-Pay (WTP)
- Loss: Binary cross-entropy (purchase) or MSE (WTP)
**Shared Representation Hypothesis:**
- Both tasks benefit from learning session behavioral patterns
- Agent sessions have different price sensitivity than human sessions
- Shared feature encoder can learn robust representations
### Multi-Task Network Architecture
```python
class MultiTaskPricingModel(nn.Module):
"""
Shared encoder for session features
Two task-specific heads
"""
def __init__(self, n_session_features=29, n_product_features=10):
super().__init__()
# Shared session encoder
self.session_encoder = nn.Sequential(
nn.Linear(n_session_features, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.BatchNorm1d(64),
nn.ReLU()
)
# Product encoder (processes product features + price)
self.product_encoder = nn.Sequential(
nn.Linear(n_product_features, 32),
nn.ReLU(),
nn.Linear(32, 16)
)
# Task A: Agent classification head
self.agent_head = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1) # Logit for is_agent
)
# Task B: Purchase probability head (uses both session + product)
self.purchase_head = nn.Sequential(
nn.Linear(64 + 16, 32), # Concatenate session + product encodings
nn.ReLU(),
nn.Linear(32, 1) # Logit for purchase probability
)
def forward(self, session_features, product_features):
# Encode session (shared)
session_enc = self.session_encoder(session_features)
# Encode product
product_enc = self.product_encoder(product_features)
# Task A prediction
agent_logit = self.agent_head(session_enc)
# Task B prediction
combined = torch.cat([session_enc, product_enc], dim=1)
purchase_logit = self.purchase_head(combined)
return {
'agent_logit': agent_logit,
'purchase_logit': purchase_logit,
'session_embedding': session_enc # For knowledge distillation
}
```
### Multi-Task Loss Function
```python
def multi_task_loss(outputs, targets, task_weights={'agent': 1.0, 'purchase': 2.0}):
"""
Weighted combination of task losses
Args:
outputs: dict with 'agent_logit', 'purchase_logit'
targets: dict with 'is_agent', 'purchased'
task_weights: importance weights for each task
"""
# Task A: Agent classification
loss_agent = F.binary_cross_entropy_with_logits(
outputs['agent_logit'].squeeze(),
targets['is_agent'].float()
)
# Task B: Purchase prediction
loss_purchase = F.binary_cross_entropy_with_logits(
outputs['purchase_logit'].squeeze(),
targets['purchased'].float()
)
# Weighted sum
total_loss = (
task_weights['agent'] * loss_agent +
task_weights['purchase'] * loss_purchase
)
return total_loss, {
'loss_agent': loss_agent.item(),
'loss_purchase': loss_purchase.item(),
'loss_total': total_loss.item()
}
```
### Training Loop
**File: `experiments/ml/train_multitask.py`**
```python
import torch
from torch.utils.data import DataLoader, TensorDataset
def train_multitask_model(feature_matrix_df, n_epochs=100):
# Prepare data
X_session = feature_matrix_df[SESSION_FEATURE_COLS].values
X_product = feature_matrix_df[PRODUCT_FEATURE_COLS].values
y_agent = feature_matrix_df['is_agent'].values
y_purchase = feature_matrix_df['purchased'].values
# Convert to tensors
dataset = TensorDataset(
torch.FloatTensor(X_session),
torch.FloatTensor(X_product),
torch.FloatTensor(y_agent),
torch.FloatTensor(y_purchase)
)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
# Initialize model
model = MultiTaskPricingModel(
n_session_features=X_session.shape[1],
n_product_features=X_product.shape[1]
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
# Training loop
for epoch in range(n_epochs):
model.train()
epoch_losses = {'agent': [], 'purchase': [], 'total': []}
for X_sess_batch, X_prod_batch, y_agent_batch, y_purch_batch in train_loader:
optimizer.zero_grad()
# Forward pass
outputs = model(X_sess_batch, X_prod_batch)
# Compute loss
loss, loss_dict = multi_task_loss(
outputs,
{'is_agent': y_agent_batch, 'purchased': y_purch_batch}
)
# Backward pass
loss.backward()
optimizer.step()
# Track losses
for k, v in loss_dict.items():
epoch_losses[k.replace('loss_', '')].append(v)
# Log epoch stats
avg_loss = np.mean(epoch_losses['total'])
print(f"Epoch {epoch}: Loss={avg_loss:.4f}, "
f"Agent={np.mean(epoch_losses['agent']):.4f}, "
f"Purchase={np.mean(epoch_losses['purchase']):.4f}")
scheduler.step(avg_loss)
return model
```
### Knowledge Distillation for Pricing Heuristics
**Goal:** Extract interpretable pricing rules from the learned multi-task model
**Method 1: Decision Tree Distillation**
```python
from sklearn.tree import DecisionTreeClassifier, export_text
def distill_pricing_rules(multitask_model, feature_matrix_df):
"""
Train interpretable decision tree to mimic multi-task model's pricing decisions
"""
# Get embeddings from multi-task model
X_session = torch.FloatTensor(feature_matrix_df[SESSION_FEATURE_COLS].values)
X_product = torch.FloatTensor(feature_matrix_df[PRODUCT_FEATURE_COLS].values)
with torch.no_grad():
outputs = multitask_model(X_session, X_product)
session_embeddings = outputs['session_embedding'].numpy()
agent_probs = torch.sigmoid(outputs['agent_logit']).numpy()
purchase_probs = torch.sigmoid(outputs['purchase_logit']).numpy()
# Compute optimal markup based on model predictions
# High agent prob + low purchase prob → high markup
optimal_markup = compute_markup_from_probs(agent_probs, purchase_probs)
# Train decision tree to predict optimal markup
distilled_tree = DecisionTreeClassifier(max_depth=5, min_samples_leaf=50)
distilled_tree.fit(feature_matrix_df[SESSION_FEATURE_COLS], optimal_markup)
# Extract human-readable rules
rules_text = export_text(distilled_tree, feature_names=SESSION_FEATURE_COLS)
print("Distilled Pricing Rules:")
print(rules_text)
return distilled_tree, rules_text
def compute_markup_from_probs(agent_probs, purchase_probs):
"""
Heuristic:
- High agent prob → increase markup (exploit reconnaissance)
- Low purchase prob → decrease markup (improve conversion)
"""
markup = 1.0 + 0.5 * agent_probs - 0.3 * purchase_probs
return np.clip(markup, 0.7, 1.5) # Bounds: [70% to 150%]
```
**Method 2: SHAP Values for Feature Importance**
```python
import shap
def explain_pricing_model(multitask_model, feature_matrix_df):
"""Use SHAP to understand which features drive pricing decisions"""
# Wrap model for SHAP
def model_predict(X_session):
X_session_t = torch.FloatTensor(X_session)
X_product_t = torch.zeros(len(X_session), n_product_features) # Placeholder
with torch.no_grad():
outputs = multitask_model(X_session_t, X_product_t)
agent_probs = torch.sigmoid(outputs['agent_logit']).numpy()
return agent_probs
# Compute SHAP values
explainer = shap.KernelExplainer(model_predict, feature_matrix_df[SESSION_FEATURE_COLS].sample(100))
shap_values = explainer.shap_values(feature_matrix_df[SESSION_FEATURE_COLS].sample(200))
# Visualize
shap.summary_plot(shap_values, feature_matrix_df[SESSION_FEATURE_COLS].sample(200))
return shap_values
```
---
## V. PHASE 4: Synthetic Dynamic Pricing Simulator
### Motivation
**Problem:** Real-world pricing experiments are:
- Slow (need to wait for user sessions)
- Expensive (margin leakage during exploration)
- Risky (can break revenue if pricing goes wrong)
**Solution:** Fast synthetic environment to:
1. Simulate user/agent behavior
2. Test pricing strategies in closed loop
3. Train RL agents for dynamic pricing
4. Validate multi-task model predictions
### Simulator Architecture
**File: `experiments/ml/simulator/env.py`**
```python
import gymnasium as gym
import numpy as np
from typing import Dict, Tuple
class DynamicPricingEnv(gym.Env):
"""
Synthetic environment for dynamic pricing with human/agent users
State: [current_demand, inventory_level, time_of_day, agent_fraction, avg_session_velocity]
Action: price_multiplier ∈ [0.7, 1.5] (continuous)
Reward: revenue - margin_leakage_penalty
"""
def __init__(self,
n_products=20,
n_timesteps=1000,
human_price_sensitivity=2.0, # Elasticity
agent_price_sensitivity=5.0, # Agents more price-aware
agent_fraction=0.3):
super().__init__()
self.n_products = n_products
self.n_timesteps = n_timesteps
self.human_sensitivity = human_price_sensitivity
self.agent_sensitivity = agent_price_sensitivity
self.agent_fraction = agent_fraction
# State space: [demand, inventory, hour, agent_frac, velocity]
self.observation_space = gym.spaces.Box(
low=np.array([0, 0, 0, 0, 0]),
high=np.array([100, 100, 24, 1, 20]),
dtype=np.float32
)
# Action space: price multiplier
self.action_space = gym.spaces.Box(
low=0.7, high=1.5, shape=(1,), dtype=np.float32
)
# Internal state
self.base_prices = np.random.uniform(100, 500, n_products)
self.inventory = np.full(n_products, 50)
self.timestep = 0
self.demand_history = []
def reset(self, seed=None):
super().reset(seed=seed)
self.inventory = np.full(self.n_products, 50)
self.timestep = 0
self.demand_history = []
return self._get_state(), {}
def step(self, action: float) -> Tuple[np.ndarray, float, bool, bool, Dict]:
"""
Execute one pricing decision
Args:
action: price_multiplier
Returns:
state, reward, terminated, truncated, info
"""
price_multiplier = action[0]
# Simulate user arrivals (Poisson process)
hour = self.timestep % 24
arrival_rate = self._get_arrival_rate(hour)
n_arrivals = np.random.poisson(arrival_rate)
# Separate humans and agents
n_agents = np.random.binomial(n_arrivals, self.agent_fraction)
n_humans = n_arrivals - n_agents
# Simulate purchase decisions
current_price = self.base_prices[0] * price_multiplier # Simplified: 1 product
# Human purchase probability (logistic demand curve)
human_purchase_prob = self._purchase_probability(
current_price,
self.base_prices[0],
self.human_sensitivity
)
human_purchases = np.random.binomial(n_humans, human_purchase_prob)
# Agent purchase probability (more sensitive)
agent_purchase_prob = self._purchase_probability(
current_price,
self.base_prices[0],
self.agent_sensitivity
)
agent_purchases = np.random.binomial(n_agents, agent_purchase_prob)
# Compute revenue
revenue = current_price * (human_purchases + agent_purchases)
# Compute margin leakage (agents getting oracle prices)
oracle_price = self.base_prices[0] * 1.2 # Optimal price for agents
margin_leakage = (oracle_price - current_price) * agent_purchases
margin_leakage = max(0, margin_leakage) # Only count if we underpriced
# Reward = revenue - margin leakage penalty
reward = revenue - 0.5 * margin_leakage
# Update inventory
total_purchases = human_purchases + agent_purchases
self.inventory[0] -= total_purchases
# Track demand
self.demand_history.append(n_arrivals)
# Next state
self.timestep += 1
terminated = self.timestep >= self.n_timesteps
truncated = self.inventory[0] <= 0
state = self._get_state()
info = {
'revenue': revenue,
'margin_leakage': margin_leakage,
'human_purchases': human_purchases,
'agent_purchases': agent_purchases,
'agent_fraction': n_agents / n_arrivals if n_arrivals > 0 else 0
}
return state, reward, terminated, truncated, info
def _get_state(self) -> np.ndarray:
"""Current state observation"""
demand = np.mean(self.demand_history[-10:]) if self.demand_history else 0
inventory = self.inventory[0]
hour = self.timestep % 24
agent_frac = self.agent_fraction # Simplified
velocity = np.random.uniform(1, 10) # Placeholder
return np.array([demand, inventory, hour, agent_frac, velocity], dtype=np.float32)
def _get_arrival_rate(self, hour: int) -> float:
"""Simulate time-of-day arrival patterns"""
# Higher demand during business hours (9am-5pm)
if 9 <= hour <= 17:
return 10.0
else:
return 3.0
def _purchase_probability(self, current_price, base_price, sensitivity):
"""
Logistic demand curve
P(purchase) = 1 / (1 + exp(sensitivity * (log(current_price) - log(base_price))))
"""
price_ratio = current_price / base_price
logit = sensitivity * (np.log(price_ratio))
prob = 1 / (1 + np.exp(logit))
return np.clip(prob, 0.01, 0.99)
```
### Behavioral Models
**File: `experiments/ml/simulator/agents.py`**
```python
class SyntheticUser:
"""Base class for simulated users"""
def __init__(self, price_sensitivity, session_velocity):
self.price_sensitivity = price_sensitivity
self.session_velocity = session_velocity # interactions/min
def generate_session(self) -> List[Event]:
"""Generate event sequence"""
raise NotImplementedError
class HumanUser(SyntheticUser):
"""Simulates human browsing behavior"""
def __init__(self):
super().__init__(
price_sensitivity=2.0, # Moderate
session_velocity=np.random.uniform(1, 3) # Slow browsing
)
def generate_session(self) -> List[Event]:
events = []
# Human pattern: slow, exploratory
n_page_views = np.random.poisson(5)
n_hovers = np.random.poisson(8)
n_products_viewed = np.random.randint(2, 6)
# Time between events (seconds)
inter_event_times = np.random.exponential(20, size=n_page_views)
for i in range(n_page_views):
events.append({
'eventName': 'page_view',
'timestamp': sum(inter_event_times[:i]),
'velocity': self.session_velocity
})
# Purchase decision (price-dependent)
purchase_prob = self._compute_purchase_prob(current_price)
if np.random.rand() < purchase_prob:
events.append({'eventName': 'purchase_complete', ...})
return events
class AgentUser(SyntheticUser):
"""Simulates agent/bot behavior"""
def __init__(self):
super().__init__(
price_sensitivity=5.0, # Highly price-aware
session_velocity=np.random.uniform(10, 20) # Fast reconnaissance
)
def generate_session(self) -> List[Event]:
events = []
# Agent pattern: fast, systematic
n_products_viewed = np.random.randint(15, 30) # Views many products
inter_event_times = np.random.exponential(2, size=n_products_viewed) # Fast
for i in range(n_products_viewed):
events.append({
'eventName': 'view_item_page',
'timestamp': sum(inter_event_times[:i]),
'velocity': self.session_velocity
})
# Agents rarely purchase (reconnaissance)
purchase_prob = 0.05
if np.random.rand() < purchase_prob:
events.append({'eventName': 'purchase_complete', ...})
return events
```
### RL Training for Pricing Policy
**File: `experiments/ml/simulator/train_rl.py`**
```python
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
def train_pricing_policy():
"""Train RL agent to learn optimal pricing strategy"""
# Create environment
env = DynamicPricingEnv(agent_fraction=0.3)
env = DummyVecEnv([lambda: env])
# Train PPO agent
model = PPO(
"MlpPolicy",
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
verbose=1,
tensorboard_log="./logs/"
)
model.learn(total_timesteps=100_000)
# Evaluate
obs = env.reset()
total_reward = 0
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
total_reward += reward
if done:
print(f"Episode reward: {total_reward}")
break
return model
# Compare with baseline strategies
def benchmark_strategies():
strategies = {
'fixed': lambda state: np.array([1.0]), # No markup
'surge': lambda state: np.array([1.2]) if state[0] > 10 else np.array([0.9]),
'rl': lambda state: rl_model.predict(state)[0]
}
for name, policy in strategies.items():
env = DynamicPricingEnv()
total_reward = evaluate_policy(env, policy, n_episodes=100)
print(f"{name}: avg reward = {total_reward:.2f}")
```
### Integration with Multi-Task Model
**Use multi-task model predictions as state features:**
```python
class EnhancedPricingEnv(DynamicPricingEnv):
"""Environment augmented with multi-task model predictions"""
def __init__(self, multitask_model):
super().__init__()
self.multitask_model = multitask_model
def _get_state(self):
# Get base state
base_state = super()._get_state()
# Generate synthetic session features
session_features = self._generate_session_features()
# Get multi-task model predictions
with torch.no_grad():
outputs = self.multitask_model(
torch.FloatTensor(session_features).unsqueeze(0),
torch.zeros(1, n_product_features)
)
agent_prob = torch.sigmoid(outputs['agent_logit']).item()
purchase_prob = torch.sigmoid(outputs['purchase_logit']).item()
# Augment state with model predictions
enhanced_state = np.concatenate([
base_state,
[agent_prob, purchase_prob]
])
return enhanced_state
```
---
## VI. IMPLEMENTATION ROADMAP
### Phase 1: Foundation (Week 1-2)
**1.1 Refactor Feature Pipeline**
- [ ] Create `experiments/ml/features/` directory
- [ ] Implement `TemporalFeatureExtractor` (vectorized)
- [ ] Implement `BehavioralFeatureExtractor`
- [ ] Implement `ProductFeatureExtractor`
- [ ] Implement `UserAgentParser`
- [ ] Create end-to-end `build_feature_pipeline()`
- [ ] Write unit tests for each extractor
- [ ] Benchmark: ensure <5min processing for 100k events
**1.2 Data Loading**
- [ ] Implement `experiments/ml/datasets.py`
- [ ] Add function `load_events_from_kafka(topic, time_range)`
- [ ] Add function `load_experiments_from_supabase()`
- [ ] Add function `create_labeled_dataset()` (joins events + experiments)
- [ ] Add data validation (check for missing experimentIds, etc.)
**1.3 Model Registry Updates**
- [ ] Extend `lib/model_registry.py` to store:
- Pickled sklearn/XGBoost models
- PyTorch model state_dicts
- Feature column names (schema versioning)
- [ ] Add `publish_classifier(model, metadata)`
- [ ] Add `publish_multitask_model(model, metadata)`
### Phase 2: Supervised Classifier (Week 2-3)
**2.1 Training Pipeline**
- [ ] Create `experiments/ml/train_classifier.py`
- [ ] Load labeled dataset with feature pipeline
- [ ] Train XGBoost classifier
- [ ] Evaluate: ROC-AUC, precision/recall, confusion matrix
- [ ] Generate feature importance report
- [ ] Publish model to registry
**2.2 Inference Integration**
- [ ] Update `backend/provider/app.py`
- [ ] Implement `compute_session_features(sessionId)` for real-time inference
- [ ] Add `/api/detect-agent/{sessionId}` endpoint (debugging)
- [ ] Modify `/api/{mode}/price/{productId}` to:
- Compute agent probability
- Apply dynamic markup based on probability
- Log (sessionId, agent_prob, markup) to Kafka
**2.3 Monitoring**
- [ ] Create Airflow DAG: `agent_classifier_training_pipeline`
- Runs daily
- Fetches last 7 days of data
- Retrains classifier
- Publishes to registry
- [ ] Add Grafana dashboard:
- Agent detection rate over time
- Precision/recall metrics
- Markup distribution (human vs agent sessions)
### Phase 3: Multi-Task Learning (Week 3-5)
**3.1 Model Development**
- [ ] Create `experiments/ml/models/multitask.py`
- [ ] Implement `MultiTaskPricingModel` (PyTorch)
- [ ] Implement `multi_task_loss()`
- [ ] Create `experiments/ml/train_multitask.py`
**3.2 Training**
- [ ] Prepare product-level dataset (not just session-level)
- Each row: (session_features, product_features, price, is_agent, purchased)
- Requires joining events with product catalog
- [ ] Train multi-task model
- [ ] Evaluate both tasks:
- Agent classification: ROC-AUC
- Purchase prediction: ROC-AUC, calibration plot
- [ ] Hyperparameter tuning (task weights, learning rate, architecture)
**3.3 Knowledge Distillation**
- [ ] Implement `distill_pricing_rules()` (decision tree)
- [ ] Implement `explain_pricing_model()` (SHAP)
- [ ] Generate interpretable pricing rule report
- [ ] Validate: compare distilled tree performance vs full model
**3.4 Deployment**
- [ ] Serialize PyTorch model to ONNX for fast inference
- [ ] Update pricing provider to use multi-task model:
- Predict agent probability
- Predict purchase probability given current price
- Compute optimal price (maximize expected revenue)
- [ ] A/B test: multi-task pricing vs baseline elasticity pricing
### Phase 4: Synthetic Simulator (Week 5-6)
**4.1 Environment**
- [ ] Create `experiments/ml/simulator/`
- [ ] Implement `DynamicPricingEnv` (Gymnasium)
- [ ] Implement `HumanUser` and `AgentUser` behavioral models
- [ ] Validate environment:
- Test human_sensitivity vs agent_sensitivity differentiation
- Visualize demand curves for both user types
**4.2 RL Training**
- [ ] Create `experiments/ml/simulator/train_rl.py`
- [ ] Train PPO agent
- [ ] Benchmark against baseline strategies (fixed, surge, elasticity)
- [ ] Visualize: reward curves, pricing decisions over time
**4.3 Integration with Multi-Task Model**
- [ ] Implement `EnhancedPricingEnv` (uses multi-task model predictions)
- [ ] Train RL agent in enhanced environment
- [ ] Compare: RL with model predictions vs RL without
**4.4 Validation**
- [ ] Run counterfactual simulations:
- "What if agent fraction was 50%?"
- "What if agents were less price-sensitive?"
- [ ] Generate risk analysis report for pricing strategies
### Phase 5: Production Integration (Week 7-8)
**5.1 Real-Time Feature Store**
- [ ] Implement session feature caching (Redis)
- [ ] Create Kafka Streams job:
- Consumes `user-interactions`
- Computes rolling session features
- Publishes to Redis with TTL
- [ ] Update `compute_session_features()` to read from cache
**5.2 Pricing Service Refactor**
- [ ] Decouple pricing logic from provider service
- [ ] Create `experiments/ml/inference/pricing_service.py`
- [ ] Expose gRPC API for low-latency pricing
- [ ] Load model once on startup (not per-request)
**5.3 Observability**
- [ ] Add OpenTelemetry tracing to pricing service
- [ ] Create Grafana dashboard:
- Pricing latency (p50, p99)
- Revenue metrics (human vs agent)
- Model prediction distribution
- [ ] Set up alerts:
- Agent detection rate spike
- Margin leakage threshold exceeded
- Model inference failures
**5.4 Documentation**
- [ ] Write model cards for both models:
- Intended use
- Training data characteristics
- Performance metrics
- Limitations & biases
- [ ] Create runbook for retraining pipelines
- [ ] API documentation for pricing service
---
## VII. TECHNICAL STACK
### Data Engineering
- **Streaming:** Kafka (existing)
- **Storage:** Supabase (PostgreSQL) for metadata, Kafka for events
- **Feature Store:** Redis (for real-time session features)
- **Orchestration:** Airflow (existing)
### Machine Learning
- **Feature Engineering:** Pandas, NumPy, Scikit-learn
- **Classifier:** XGBoost / LightGBM (for speed + interpretability)
- **Multi-Task Model:** PyTorch
- **RL:** Stable-Baselines3 (PPO)
- **Explainability:** SHAP, Scikit-learn decision trees
- **Model Registry:** Redis (lightweight) or MLflow (if scaling up)
### Inference
- **Serving:** ONNX Runtime (for PyTorch models) or direct PyTorch
- **API:** FastAPI (existing backend)
- **Caching:** Redis
### Monitoring
- **Metrics:** Prometheus
- **Dashboards:** Grafana
- **Tracing:** OpenTelemetry
---
## VIII. KEY DECISIONS & TRADEOFFS
### Decision 1: XGBoost vs Neural Network for Classifier
**Choice:** Start with XGBoost, migrate to NN if multi-task learning shows benefit
**Rationale:**
- XGBoost: Faster training, interpretable, strong baseline
- Neural Network: Required for multi-task learning with shared encoder
- **Approach:** Build both, use XGBoost for initial deployment
### Decision 2: Real-Time vs Batch Feature Engineering
**Choice:** Hybrid approach
- **Batch:** Full feature pipeline in Airflow (daily retraining)
- **Real-Time:** Lightweight rolling features in Kafka Streams (for inference)
**Tradeoff:** Complexity vs latency
- Real-time adds complexity but enables dynamic pricing
- Batch is simpler but can't adapt mid-session
### Decision 3: Separate Simulator vs Production Trace Replay
**Choice:** Separate simulator (not trace replay)
**Rationale:**
- **Trace replay:** More realistic but slow, requires large historical dataset
- **Simulator:** Fast iteration, can test edge cases (high agent fraction, etc.)
- **Approach:** Use simulator for development, validate on historical traces before production
### Decision 4: Knowledge Distillation Method
**Choice:** Decision tree distillation + SHAP explanations
**Rationale:**
- Decision trees: Generate actionable rules for non-ML stakeholders
- SHAP: Understand feature importance for model debugging
- Both complement each other (global rules + local explanations)
---
## IX. SUCCESS METRICS
### Model Performance
- **Agent Classifier:**
- ROC-AUC > 0.90
- Precision > 0.85 at 80% recall (minimize false agent flags)
- **Purchase Predictor:**
- ROC-AUC > 0.75
- Calibration error < 0.1 (predicted probs match observed rates)
### Business Impact
- **Margin Leakage Reduction:** Target 30% reduction in `L_agent`
- **Revenue:** No degradation in human conversion rate
- **Latency:** p99 pricing latency < 100ms
### Operational
- **Retraining Frequency:** Daily (automated)
- **Model Drift Detection:** Alert if agent detection rate changes >20% week-over-week
- **Uptime:** 99.9% availability for pricing service
---
## X. RISKS & MITIGATION
### Risk 1: Label Noise
**Issue:** `xp_human_only` flag may be incorrect (humans acting like agents, vice versa)
**Mitigation:**
- Use soft labels (agent probability) instead of hard classification
- Implement active learning: flag uncertain sessions for manual review
- Combine flag with behavioral heuristics (velocity > threshold)
### Risk 2: Concept Drift
**Issue:** Agent behavior evolves to evade detection
**Mitigation:**
- Monitor model performance metrics daily
- Implement champion/challenger A/B testing for new models
- Retrain weekly with latest data
### Risk 3: Revenue Impact
**Issue:** Aggressive agent markup hurts conversion
**Mitigation:**
- Start with conservative markup (10% max)
- Run A/B test with 10% traffic
- Monitor revenue closely, rollback if <5% degradation
### Risk 4: Simulator Misalignment
**Issue:** Simulator doesn't match real-world dynamics
**Mitigation:**
- Calibrate simulator parameters from historical data
- Validate RL policies on historical traces before production
- Run small-scale live experiments
---
## XI. NEXT STEPS (Immediate Actions)
1. **Review this document** with team for alignment
2. **Set up project structure:**
```bash
mkdir -p experiments/ml/{features,models,simulator,inference}
mkdir -p experiments/ml/notebooks # For EDA
```
3. **Create sample dataset:**
- Pull 1 week of events from Kafka
- Join with experiments table
- Validate label distribution (human/agent ratio)
4. **Prototype feature pipeline:**
- Start with `TemporalFeatureExtractor`
- Benchmark on 10k events
- Ensure output matches expected schema
5. **Kickoff Phase 1** tasks from roadmap
---
## XII. REFERENCES & RESOURCES
### Papers
- [Multi-Task Learning Using Uncertainty to Weigh Losses](https://arxiv.org/abs/1705.07115) - Kendall et al.
- [Distilling the Knowledge in a Neural Network](https://arxiv.org/abs/1503.02531) - Hinton et al.
- [Deep Reinforcement Learning for Online Pricing](https://arxiv.org/abs/1912.02572)
### Code Examples
- Existing pipeline: `/home/user/PHANTOM/experiments/procesing/pipelines.py`
- Existing pricers: `/home/user/PHANTOM/experiments/procesing/pricers/`
- Agent runner: `/home/user/PHANTOM/experiments/agents/agent.py`
### Internal Documentation
- Event schema: `/home/user/PHANTOM/web/src/lib/events.ts`
- Pricing provider API: `/home/user/PHANTOM/backend/provider/app.py`
- Airflow DAGs: `/home/user/PHANTOM/experiments/airflow/dags/`
---
## APPENDIX A: Feature Schema
```python
SESSION_FEATURE_COLS = [
# Temporal (8)
'session_duration_sec',
'total_interactions',
'avg_time_between_events',
'std_time_between_events',
'interaction_velocity',
'max_velocity_5min',
'min_time_between_events',
'session_start_hour',
# Behavioral (10)
'page_views',
'item_views',
'cart_adds',
'purchases',
'hover_events',
'filter_events',
'cart_to_view_ratio',
'conversion_rate',
'hover_intensity',
'unique_pages_visited',
# Product (8)
'unique_products_viewed',
'product_view_depth',
'avg_price_seen',
'min_price_seen',
'max_price_seen',
'std_price_seen',
'price_range_explored',
'filter_usage_count',
# UserAgent (3)
'is_headless_browser',
'is_automation_tool',
'browser_family'
]
PRODUCT_FEATURE_COLS = [
'base_price',
'current_demand',
'inventory_level',
'avg_demand_last_hour',
'price_elasticity',
'n_amenities',
'is_refundable',
'days_until_date',
'competitor_price_diff', # If available
'view_count_last_hour'
]
LABEL_COLS = [
'is_agent', # Binary: 0=human, 1=agent
'purchased', # Binary: 0=no purchase, 1=purchase
'total_revenue' # Float: revenue from this session
]
```
---
## APPENDIX B: Sample Code Snippets
### Data Loading
```python
# experiments/ml/datasets.py
from kafka import KafkaConsumer
import pandas as pd
from supabase import create_client
def load_events_from_kafka(topic='user-interactions', hours=24):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
events = []
cutoff_time = datetime.now() - timedelta(hours=hours)
for message in consumer:
event = message.value
if pd.to_datetime(event['ts']) > cutoff_time:
events.append(event)
return pd.DataFrame(events)
def load_experiments_from_supabase():
client = create_client(SUPABASE_URL, SUPABASE_KEY)
response = client.table('experiments').select('*').execute()
return pd.DataFrame(response.data)
def create_labeled_dataset():
events_df = load_events_from_kafka(hours=168) # 1 week
experiments_df = load_experiments_from_supabase()
# Join
labeled_df = events_df.merge(
experiments_df[['id', 'xp_human_only']],
left_on='experimentId',
right_on='id',
how='inner'
)
# Create labels
labeled_df['is_agent'] = ~labeled_df['xp_human_only']
# Apply feature pipeline
feature_pipeline = build_feature_pipeline()
feature_matrix = feature_pipeline.fit_transform(labeled_df)
return feature_matrix
```
### Feature Extraction
```python
# experiments/ml/features/temporal.py
from sklearn.base import BaseEstimator, TransformerMixin
class TemporalFeatureExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, events_df):
df = events_df.copy()
df['ts_dt'] = pd.to_datetime(df['ts'])
df = df.sort_values(['sessionId', 'ts_dt'])
# Time diffs (vectorized)
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
# Session-level aggregates
session_temporal = df.groupby('sessionId').agg({
'ts_dt': [
('session_duration_sec', lambda x: (x.max() - x.min()).total_seconds()),
('session_start_hour', lambda x: x.min().hour)
],
'time_diff': [
('avg_time_between_events', 'mean'),
('std_time_between_events', 'std'),
('min_time_between_events', 'min')
],
'eventName': [
('total_interactions', 'count')
]
})
# Flatten multi-index columns
session_temporal.columns = [col[1] if col[1] else col[0] for col in session_temporal.columns]
# Compute velocity
session_temporal['interaction_velocity'] = (
session_temporal['total_interactions'] /
(session_temporal['session_duration_sec'] / 60 + 1e-6) # per minute
)
return session_temporal.reset_index()
```
---
**END OF GAMEPLAN**