From aab54ea7c04abf2a4489226fb427e0fb6ae75de6 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 11 Dec 2025 09:51:41 +0000 Subject: [PATCH] docs: Add comprehensive multi-task learning architecture and gameplan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Created detailed documentation for implementing multi-task learning system to improve agent detection and dynamic pricing: - GAMEPLAN_MULTITASK_PRICING.md: Complete 50+ page technical specification including feature engineering, supervised learning, multi-task neural networks, synthetic simulator, and knowledge distillation approach - ARCHITECTURE_OVERVIEW.md: Quick reference with visual diagrams comparing current rule-based system to proposed ML architecture, metrics, and implementation phases Key improvements proposed: - Replace O(n²) SessionState pipeline with vectorized feature extraction - Train XGBoost classifier on experimentId labels (ROC-AUC >0.90 target) - Multi-task neural network for joint agent detection + purchase prediction - Gymnasium-based synthetic pricing environment for safe experimentation - Knowledge distillation to extract interpretable pricing heuristics Addresses margin leakage concerns with learned pricing strategies instead of simple velocity thresholds. --- docs/ARCHITECTURE_OVERVIEW.md | 403 +++++++ docs/GAMEPLAN_MULTITASK_PRICING.md | 1596 ++++++++++++++++++++++++++++ 2 files changed, 1999 insertions(+) create mode 100644 docs/ARCHITECTURE_OVERVIEW.md create mode 100644 docs/GAMEPLAN_MULTITASK_PRICING.md diff --git a/docs/ARCHITECTURE_OVERVIEW.md b/docs/ARCHITECTURE_OVERVIEW.md new file mode 100644 index 0000000..c16b820 --- /dev/null +++ b/docs/ARCHITECTURE_OVERVIEW.md @@ -0,0 +1,403 @@ +# Multi-Task Learning Architecture - Quick Reference + +## Current System (Baseline) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ CURRENT STATE │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ Browser Events → Next.js → FastAPI → Kafka (user-interactions) │ +│ ↓ │ +│ Airflow (every 15min) │ +│ ↓ │ +│ [Messy SessionState Pipeline] │ +│ ↓ │ +│ Simple Rule-Based Pricing: │ +│ - Surge (if demand > 10) │ +│ - Elasticity formula │ +│ - Velocity threshold for agents │ +│ ↓ │ +│ Redis (prices) │ +│ ↓ │ +│ Pricing Provider API │ +│ │ +│ ISSUES: │ +│ ✗ O(n²) feature extraction │ +│ ✗ No supervised ML for agent detection │ +│ ✗ Simple heuristics (velocity > 5 → agent) │ +│ ✗ No learning from data │ +│ ✗ Margin leakage not effectively addressed │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Proposed System (Multi-Task Learning) + +``` +┌──────────────────────────────────────────────────────────────────────────┐ +│ PHASE 1: DATA PIPELINE │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Kafka (user-interactions) │ +│ ↓ │ +│ ┌─────────────────────────────────────┐ │ +│ │ VECTORIZED FEATURE PIPELINE │ │ +│ ├─────────────────────────────────────┤ │ +│ │ 1. TemporalFeatureExtractor │ → 8 features (velocity, etc.) │ +│ │ 2. BehavioralFeatureExtractor │ → 10 features (carts, hovers) │ +│ │ 3. ProductFeatureExtractor │ → 8 features (prices, depth) │ +│ │ 4. UserAgentParser │ → 3 features (browser type) │ +│ │ 5. SessionAggregator │ → Session-level matrix │ +│ │ 6. ExperimentLabelJoiner │ → Join with xp_human_only │ +│ └─────────────────────────────────────┘ │ +│ ↓ │ +│ Feature Matrix: [sessionId, 29 features, 3 labels] │ +│ │ +└──────────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────────────────────────────────────────────────────────┐ +│ PHASE 2: SUPERVISED AGENT CLASSIFIER │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Feature Matrix (29 features) │ +│ ↓ │ +│ ┌────────────────────┐ │ +│ │ XGBoost Model │ │ +│ ├────────────────────┤ │ +│ │ Input: 29 dims │ │ +│ │ Output: P(agent) │ │ +│ │ Loss: BCE │ │ +│ └────────────────────┘ │ +│ ↓ │ +│ Target: ROC-AUC > 0.90 │ +│ │ +│ DEPLOYMENT: │ +│ - Real-time inference in Pricing Provider │ +│ - Dynamic markup: P(agent) > 0.7 → 1.3x price │ +│ - Retrain daily via Airflow │ +│ │ +└──────────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────────────────────────────────────────────────────────┐ +│ PHASE 3: MULTI-TASK LEARNING MODEL │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Input: Session Features (29) + Product Features (10) + Current Price │ +│ ↓ │ +│ ┌───────────────────────────────────────────────────────────┐ │ +│ │ MULTI-TASK NEURAL NETWORK │ │ +│ ├───────────────────────────────────────────────────────────┤ │ +│ │ │ │ +│ │ ┌──────────────────────┐ │ │ +│ │ │ Session Encoder │ (Shared) │ │ +│ │ │ [29] → [128] → [64] │ │ │ +│ │ └──────────┬───────────┘ │ │ +│ │ │ │ │ +│ │ ├────────────┬───────────────┐ │ │ +│ │ ↓ ↓ ↓ │ │ +│ │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │ +│ │ │ Task A │ │ Product │ │ Task B │ │ │ +│ │ │ Agent │ │ Encoder │ │ Purchase │ │ │ +│ │ │ Head │ │ [10]→16 │ │ Prob Head │ │ │ +│ │ └────┬────┘ └────┬────┘ └──────┬──────┘ │ │ +│ │ ↓ └────┬────────────┘ │ │ +│ │ P(agent) ↓ │ │ +│ │ P(purchase|price) │ │ +│ │ │ │ +│ │ Loss = α·BCE(agent) + β·BCE(purchase) │ │ +│ │ α=1.0, β=2.0 (tune these weights) │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ ↓ │ +│ OUTPUTS: │ +│ 1. Agent probability (like Phase 2) │ +│ 2. Purchase probability given price │ +│ 3. Session embedding (for knowledge distillation) │ +│ │ +│ USE CASE: │ +│ Optimal Price = argmax_p [ p · P(purchase|p) · (1 + λ·P(agent)) ] │ +│ │ +└──────────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────────────────────────────────────────────────────────┐ +│ KNOWLEDGE DISTILLATION BRANCH │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Multi-Task Model (teacher) │ +│ ↓ │ +│ Generate predictions on validation set │ +│ ↓ │ +│ ┌──────────────────────────────────────┐ │ +│ │ Distill to Decision Tree (student) │ │ +│ ├──────────────────────────────────────┤ │ +│ │ Input: 29 session features │ │ +│ │ Output: Optimal markup multiplier │ │ +│ │ Max depth: 5 (interpretable) │ │ +│ └──────────────────────────────────────┘ │ +│ ↓ │ +│ Extract Human-Readable Rules: │ +│ │ +│ IF interaction_velocity > 10 AND cart_to_view_ratio < 0.1: │ +│ markup = 1.3 (likely agent reconnaissance) │ +│ ELIF unique_products_viewed < 3 AND session_duration > 300: │ +│ markup = 0.9 (engaged human, offer discount) │ +│ ELSE: │ +│ markup = 1.0 (baseline) │ +│ │ +│ Also: SHAP values for feature importance analysis │ +│ │ +└──────────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────────────────────────────────────────────────────────┐ +│ PHASE 4: SYNTHETIC DYNAMIC PRICING SIMULATOR │ +├──────────────────────────────────────────────────────────────────────────┤ +│ │ +│ PURPOSE: Fast experimentation without real users │ +│ │ +│ ┌────────────────────────────────────────────────────┐ │ +│ │ DynamicPricingEnv (Gymnasium) │ │ +│ ├────────────────────────────────────────────────────┤ │ +│ │ │ │ +│ │ State: [demand, inventory, hour, agent_frac, │ │ +│ │ avg_velocity] │ │ +│ │ │ │ +│ │ Action: price_multiplier ∈ [0.7, 1.5] │ │ +│ │ │ │ +│ │ Dynamics: │ │ +│ │ - Simulate user arrivals (Poisson) │ │ +│ │ - Split into humans (30%) vs agents (70%) │ │ +│ │ - Purchase probability: │ │ +│ │ P_human(buy) = logistic(price, sensitivity=2) │ │ +│ │ P_agent(buy) = logistic(price, sensitivity=5) │ │ +│ │ │ │ +│ │ Reward: revenue - 0.5 * margin_leakage │ │ +│ │ where margin_leakage = (oracle_price - │ │ +│ │ actual_price) × │ │ +│ │ agent_purchases │ │ +│ └────────────────────────────────────────────────────┘ │ +│ ↓ │ +│ ┌────────────────────────────────────────┐ │ +│ │ Train RL Agent (PPO) │ │ +│ ├────────────────────────────────────────┤ │ +│ │ Learn policy: State → Optimal Price │ │ +│ │ 100k timesteps training │ │ +│ └────────────────────────────────────────┘ │ +│ ↓ │ +│ BENCHMARK vs Baselines: │ +│ - Fixed pricing: 1.0x always │ +│ - Simple surge: 1.2x if demand > 10, else 0.9x │ +│ - Elasticity-based: formula │ +│ - RL policy: learned │ +│ - Multi-task + RL: Use MT model predictions as state features │ +│ │ +│ VALIDATION: │ +│ - Calibrate simulator from historical data │ +│ - Run counterfactuals ("what if agent_frac=0.8?") │ +│ - A/B test winner on real traffic │ +│ │ +└──────────────────────────────────────────────────────────────────────────┘ +``` + +## Data Flow (Production) + +``` +┌─────────────┐ +│ Browser │ +│ (User/Agent)│ +└──────┬──────┘ + │ POST /api/ingest (events + experimentId) + ↓ +┌──────────────┐ +│ Next.js API │ +└──────┬───────┘ + │ Forward events + ↓ +┌──────────────┐ +│ FastAPI │ +│ /api/kafka │ +│ /ingest │ +└──────┬───────┘ + │ Publish + ↓ +┌─────────────────────────┐ +│ Kafka │ +│ Topic: user-interactions│ +└──────┬──────────────────┘ + │ + ├──────────────────┬──────────────────┐ + ↓ ↓ ↓ +┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ +│ Airflow │ │ Real-Time │ │ Kafka Streams │ +│ (Batch) │ │ Inference │ │ (Feature Cache) │ +│ │ │ │ │ │ +│ Daily: │ │ On Price │ │ Rolling window │ +│ - Retrain │ │ Request: │ │ compute session │ +│ classifier │ │ - Get session│ │ features, push │ +│ - Retrain MT │ │ features │ │ to Redis │ +│ model │ │ - Predict │ │ │ +│ - Publish to │ │ P(agent) │ │ TTL: 1 hour │ +│ registry │ │ - Predict │ │ │ +│ │ │ P(purchase)│ │ │ +│ │ │ - Compute │ │ │ +│ │ │ optimal_p │ │ │ +└──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ + │ │ │ + ↓ ↓ ↓ +┌──────────────────────────────────────────────┐ +│ Redis (Model Registry) │ +├──────────────────────────────────────────────┤ +│ Keys: │ +│ - classifier:agent_detector:latest (pickle) │ +│ - multitask_model:latest (state_dict) │ +│ - session_features:{sessionId} (json, TTL) │ +│ - prices:latest (DataFrame) │ +│ - elasticity:latest (DataFrame) │ +└──────────────────┬───────────────────────────┘ + │ + ↓ + ┌─────────────────────┐ + │ Pricing Provider │ + │ /api/{mode}/price/ │ + │ {productId} │ + │ │ + │ GET sessionId │ + │ → Load features │ + │ → Load models │ + │ → Predict │ + │ → Return price │ + └─────────┬───────────┘ + │ + ↓ + ┌─────────────────────┐ + │ Frontend │ + │ (Display price) │ + └─────────────────────┘ +``` + +## Key Metrics + +### Model Performance +| Metric | Target | Current | Phase | +|--------|--------|---------|-------| +| Agent Classifier ROC-AUC | >0.90 | N/A (rule-based) | Phase 2 | +| Purchase Predictor ROC-AUC | >0.75 | N/A | Phase 3 | +| Pricing Latency (p99) | <100ms | ~50ms | All | +| Retraining Frequency | Daily | Every 15min (rules) | Phase 2+ | + +### Business Impact +| Metric | Target | Current | Phase | +|--------|--------|---------|-------| +| Margin Leakage Reduction | -30% | Baseline | Phase 2-4 | +| Human Conversion Rate | No change | Baseline | All | +| Agent Detection Rate | >85% precision | ~60% (velocity) | Phase 2 | +| Revenue Uplift | +10% | Baseline | Phase 3-4 | + +## File Structure (New) + +``` +experiments/ + ml/ + __init__.py + + # Phase 1: Features + features/ + __init__.py + temporal.py # TemporalFeatureExtractor + behavioral.py # BehavioralFeatureExtractor + product.py # ProductFeatureExtractor + useragent.py # UserAgentParser + aggregator.py # SessionAggregator + + pipeline.py # build_feature_pipeline() + datasets.py # load_events_from_kafka(), etc. + + # Phase 2: Classifier + train_classifier.py # XGBoost training script + + # Phase 3: Multi-Task + models/ + __init__.py + multitask.py # MultiTaskPricingModel (PyTorch) + + train_multitask.py # Multi-task training script + distill.py # Knowledge distillation + + # Phase 4: Simulator + simulator/ + __init__.py + env.py # DynamicPricingEnv (Gymnasium) + agents.py # HumanUser, AgentUser + train_rl.py # PPO training + + # Inference + inference/ + __init__.py + pricing_service.py # gRPC service (optional) + feature_cache.py # Redis feature store client + + # Notebooks + notebooks/ + 01_eda.ipynb + 02_feature_analysis.ipynb + 03_model_evaluation.ipynb + 04_simulator_calibration.ipynb +``` + +## Critical Code Changes + +### 1. Replace Messy SessionState +**Before:** `experiments/procesing/steps/session.py` (O(n²) loops) +**After:** `experiments/ml/pipeline.py` (vectorized pipeline) + +### 2. Upgrade Pricing Provider +**Before:** Simple velocity threshold +**After:** ML model inference with agent probability + +### 3. Add Real-Time Feature Store +**Before:** No feature caching +**After:** Kafka Streams → Redis (session features) + +### 4. Airflow DAG Upgrades +**Before:** `surge_pricing_pipeline` (rule-based) +**After:** Add `agent_classifier_training_pipeline` (daily retrain) + +## Next Actions (Start Here) + +1. ✅ **Read gameplan**: See `/home/user/PHANTOM/docs/GAMEPLAN_MULTITASK_PRICING.md` + +2. **Create directory structure**: + ```bash + mkdir -p experiments/ml/{features,models,simulator,inference,notebooks} + ``` + +3. **Pull sample data**: + ```python + # experiments/ml/notebooks/01_eda.ipynb + from kafka import KafkaConsumer + # Pull 1 week of events, join with experiments table + # Analyze label distribution, feature correlations + ``` + +4. **Prototype first feature extractor**: + ```python + # experiments/ml/features/temporal.py + # Start with TemporalFeatureExtractor + # Test on 10k events, validate output schema + ``` + +5. **Review with team**: Discuss tradeoffs, priorities, timeline + +## Questions to Resolve + +1. **Label Quality**: How confident are we in `xp_human_only` labels? Should we add manual verification? + +2. **Compute Budget**: Do we have GPU access for PyTorch training? (Phase 3) + +3. **Latency Requirements**: Is 100ms p99 acceptable for pricing API? + +4. **A/B Testing**: Do we have infrastructure for traffic splitting? (Deployment) + +5. **Monitoring**: Who owns the Grafana dashboards? What alerting thresholds? + +--- + +**For detailed implementation, see:** `/home/user/PHANTOM/docs/GAMEPLAN_MULTITASK_PRICING.md` diff --git a/docs/GAMEPLAN_MULTITASK_PRICING.md b/docs/GAMEPLAN_MULTITASK_PRICING.md new file mode 100644 index 0000000..4f4d387 --- /dev/null +++ b/docs/GAMEPLAN_MULTITASK_PRICING.md @@ -0,0 +1,1596 @@ +# Multi-Task Learning for Dynamic Pricing & Agent Detection +## Comprehensive Gameplan & Architecture + +**Date:** 2025-12-11 +**Objective:** Transform raw interaction data into a multi-task learning system that (1) discriminates human vs agent behavior and (2) learns pricing heuristics to prevent margin leakage in dynamic pricing algorithms. + +--- + +## Executive Summary + +### Current State +- ✅ Rich event collection system capturing 15+ event types (views, carts, hovers, filters) +- ✅ experimentId-based labeling via `xp_human_only` flag +- ✅ Kafka streaming to `user-interactions` topic +- ✅ Airflow pipeline running every 15min with rule-based pricing (surge, elasticity) +- ❌ SessionState pipeline is O(n²) inefficient, messy feature extraction +- ❌ No supervised ML for agent detection (only velocity threshold heuristics) +- ❌ No multi-task learning architecture +- ❌ Pricing models are analytical formulas, not learned from data + +### Proposed Solution +1. **Refactor SessionState Pipeline**: Vectorized feature engineering with proper augmentation +2. **Supervised Agent Classifier**: Binary classifier (human/agent) using session features +3. **Multi-Task Learning**: Joint model learning both tasks: + - Task A: Human/Agent Classification (cross-entropy loss) + - Task B: Price Sensitivity Prediction (regression loss) +4. **Synthetic Pricing Simulator**: Fast RL-style environment for testing pricing strategies +5. **Knowledge Distillation**: Distill insights from classifier into pricing heuristics + +--- + +## I. DATA ANALYSIS + +### What We Actually Collect + +**Event Schema** (`web/src/lib/events.ts`): +```typescript +{ + sessionId: string, // Session UUID (httpOnly cookie) + experimentId?: string, // Experiment UUID → LABEL SOURCE + storeMode: 'hotel' | 'airline', + ts: string, // ISO8601 timestamp + page: string, // URL path + eventName: EventName, // 15 event types + productId?: string, // Product interaction + metadata?: Record, // Product details (price, amenities) + userAgent?: string +} +``` + +**Event Types Breakdown:** +- **Navigation:** `page_view`, `view_item_page`, `learn_more_about_item` +- **Cart:** `add_item_to_cart`, `remove_item`, `checkout_start`, `purchase_complete` +- **Filters:** `search`, `filter_for_date`, `filter_for_amenities`, `filter_for_price`, `sort_change` +- **Dwell Signals:** `hover_over_title`, `hover_over_paragraph`, `hover_over_link`, `hover_over_button` +- **Session:** `session_start` + +### Ground Truth Labels + +**Labeling Strategy:** +```sql +-- Experiment table schema (Supabase) +CREATE TABLE experiments ( + id UUID PRIMARY KEY, -- This becomes experimentId in events + subject_name TEXT, + xp_human_only BOOLEAN, -- KEY FIELD for labeling + xp_market_mode TEXT, + xp_task_id UUID +); +``` + +**Label Assignment:** +1. Admin creates experiment with `xp_human_only=true` (human) or `false` (agent allowed) +2. Generates start link: `https://phantom-hotel.vercel.app/start-task?uuid={experimentId}` +3. All events from that session carry `experimentId` +4. Join events with experiments table on `experimentId` → get `xp_human_only` label + +**Training Set Construction:** +```python +# Pseudocode +events_df = fetch_from_kafka('user-interactions') +experiments_df = fetch_from_supabase('experiments') + +labeled_df = events_df.merge( + experiments_df[['id', 'xp_human_only']], + left_on='experimentId', + right_on='id', + how='inner' # Only keep sessions with known experimentId +) + +# Label mapping +labeled_df['is_agent'] = ~labeled_df['xp_human_only'] # Flip boolean +``` + +### What We're Missing + +**For Pricing Task:** +- ❌ Price paid vs price shown (conversion price tracking) +- ❌ Product availability at interaction time +- ❌ Competitor prices (external data) +- ✅ Demand proxy (view counts, cart adds) - **WE HAVE THIS** +- ✅ Base prices from product metadata - **WE HAVE THIS** + +**For Agent Detection:** +- ✅ Interaction velocity, dwell times, product view depth - **WE HAVE THIS** +- ✅ userAgent strings - **WE HAVE THIS** +- ❌ Mouse movement patterns (only have hover events, not trajectories) +- ❌ Typing speed in search fields + +--- + +## II. PHASE 1: Fix SessionState Pipeline + +### Current Issues (`experiments/procesing/steps/session.py`) + +**Problem 1: O(n²) Complexity** +```python +# Line 77-93: _apply_to_slice() +for idx, row in df.iterrows(): # For EVERY row + session_df = df[df['sessionId'] == row['sessionId']] # Scan ALL events + current_time_events = session_df[session_df['ts'] <= row['ts']] + features = _extract_features_for_session(current_time_events) + # This scans the full dataset O(n) for each of n rows = O(n²) +``` + +**Problem 2: No Proper Feature Augmentation** +- Features computed only at aggregate session level +- No rolling window statistics +- No temporal decay of signals +- Missing product-level features (price seen, amenity preferences) + +### Proposed Solution: Vectorized Feature Engineering + +**New Pipeline Architecture:** +``` +Raw Events (Kafka) + ↓ +SessionGrouper (group by sessionId + experimentId) + ↓ +TemporalFeatureExtractor (vectorized time diffs, velocity) + ↓ +BehavioralFeatureExtractor (interaction counts, ratios) + ↓ +ProductFeatureExtractor (price sensitivity, preference patterns) + ↓ +SessionAggregator (final session-level features) + ↓ +LabelJoiner (merge with experiments table) + ↓ +Feature Matrix + Labels (ready for ML) +``` + +**Feature Categories:** + +**A. Temporal Features (Vectorized)** +```python +class TemporalFeatureExtractor(BaseEstimator, TransformerMixin): + def transform(self, events_df): + # Sort by session and time + df = events_df.sort_values(['sessionId', 'ts']) + + # Vectorized time differences + df['ts_dt'] = pd.to_datetime(df['ts']) + df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds() + + # Rolling statistics (vectorized with groupby) + df['velocity_5min'] = df.groupby('sessionId').rolling( + window='5min', on='ts_dt' + )['eventName'].count().reset_index(drop=True) + + df['avg_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('mean') + df['std_time_between_events'] = df.groupby('sessionId')['time_diff'].transform('std') + + return df +``` + +**B. Behavioral Features** +```python +class BehavioralFeatureExtractor: + def transform(self, events_df): + session_agg = events_df.groupby('sessionId').agg({ + # Interaction counts + 'eventName': 'count', + 'productId': lambda x: x.nunique(), + + # Event type breakdowns + 'page_view': lambda x: (x == 'page_view').sum(), + 'add_item_to_cart': lambda x: (x == 'add_item_to_cart').sum(), + 'hover_over_*': lambda x: x.str.startswith('hover').sum(), + + # Session duration + 'ts': lambda x: (x.max() - x.min()).total_seconds() + }).rename(columns={...}) + + # Derived ratios + session_agg['cart_to_view_ratio'] = ( + session_agg['cart_adds'] / (session_agg['item_views'] + 1) + ) + session_agg['hover_intensity'] = ( + session_agg['total_hovers'] / session_agg['total_interactions'] + ) + + return session_agg +``` + +**C. Product Interaction Features** +```python +class ProductFeatureExtractor: + def transform(self, events_df): + # Extract price from metadata when available + events_df['price_seen'] = events_df['metadata'].apply( + lambda x: x.get('base_price') if isinstance(x, dict) else None + ) + + product_agg = events_df[events_df['productId'].notna()].groupby('sessionId').agg({ + # Price sensitivity + 'price_seen': ['mean', 'min', 'max', 'std'], + + # Product diversity + 'productId': lambda x: x.nunique(), + + # Max view depth (how many times most-viewed product was viewed) + 'productId': lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0 + }) + + # Filter usage (proxy for deliberate search) + filter_events = events_df[events_df['eventName'].str.contains('filter|search')] + filter_counts = filter_events.groupby('sessionId').size() + + product_agg['filter_usage'] = filter_counts + + return product_agg +``` + +**D. Final Feature Matrix** +```python +# Target schema (session-level) +features = pd.DataFrame({ + # Identifiers + 'sessionId': str, + 'experimentId': str, + + # Temporal (8 features) + 'session_duration_sec': float, + 'total_interactions': int, + 'avg_time_between_events': float, + 'std_time_between_events': float, + 'interaction_velocity': float, # interactions/min + 'max_velocity_5min': float, + 'min_time_between_events': float, + 'session_start_hour': int, # time of day (0-23) + + # Behavioral (10 features) + 'page_views': int, + 'item_views': int, + 'cart_adds': int, + 'purchases': int, + 'hover_events': int, + 'filter_events': int, + 'cart_to_view_ratio': float, + 'conversion_rate': float, + 'hover_intensity': float, + 'unique_pages_visited': int, + + # Product interaction (8 features) + 'unique_products_viewed': int, + 'product_view_depth': int, # max times single product viewed + 'avg_price_seen': float, + 'min_price_seen': float, + 'max_price_seen': float, + 'std_price_seen': float, + 'price_range_explored': float, # max - min + 'filter_usage_count': int, + + # userAgent parsing (3 features) + 'is_headless_browser': bool, # "HeadlessChrome" in UA + 'is_automation_tool': bool, # "Selenium", "Playwright", etc. + 'browser_family': str, + + # LABELS + 'is_agent': bool, # from experiments.xp_human_only + 'purchased': bool, + 'total_revenue': float # for pricing task +}) +``` + +### Implementation Files + +**Create new directory structure:** +``` +experiments/ + ml/ + __init__.py + features/ + __init__.py + temporal.py # TemporalFeatureExtractor + behavioral.py # BehavioralFeatureExtractor + product.py # ProductFeatureExtractor + useragent.py # UserAgentParser + pipeline.py # Full feature pipeline (sklearn Pipeline) + datasets.py # Data loaders (Kafka → Pandas) +``` + +**Replace messy session.py with:** +```python +# experiments/ml/pipeline.py +from sklearn.pipeline import Pipeline + +def build_feature_pipeline(): + return Pipeline([ + ('temporal', TemporalFeatureExtractor()), + ('behavioral', BehavioralFeatureExtractor()), + ('product', ProductFeatureExtractor()), + ('useragent', UserAgentParser()), + ('aggregator', SessionAggregator()), + ('label_joiner', ExperimentLabelJoiner()) + ]) + +# Usage: +pipeline = build_feature_pipeline() +feature_matrix = pipeline.fit_transform(raw_events_df) +# Output: DataFrame with 29 features + 3 labels per session +``` + +--- + +## III. PHASE 2: Supervised Agent Classifier + +### Model Architecture + +**Objective:** Binary classification `is_agent ∈ {0, 1}` from session features + +**Model Choice: Gradient Boosting (XGBoost/LightGBM)** +- Handles mixed feature types (continuous, categorical, boolean) +- Interpretable feature importance +- Strong baseline performance +- Fast inference for real-time detection + +**Alternative: Neural Network (if needed for multi-task later)** +```python +class AgentClassifier(nn.Module): + def __init__(self, n_features=29, hidden_dims=[128, 64, 32]): + super().__init__() + + layers = [] + in_dim = n_features + for hidden_dim in hidden_dims: + layers.extend([ + nn.Linear(in_dim, hidden_dim), + nn.BatchNorm1d(hidden_dim), + nn.ReLU(), + nn.Dropout(0.3) + ]) + in_dim = hidden_dim + + layers.append(nn.Linear(in_dim, 1)) # Binary logit + self.network = nn.Sequential(*layers) + + def forward(self, x): + return self.network(x) # Shape: [batch, 1] +``` + +### Training Pipeline + +**File: `experiments/ml/train_classifier.py`** + +```python +import xgboost as xgb +from sklearn.model_selection import train_test_split +from sklearn.metrics import classification_report, roc_auc_score + +def train_agent_classifier(feature_matrix_df): + # Separate features and labels + X = feature_matrix_df.drop(columns=['sessionId', 'experimentId', 'is_agent', 'purchased', 'total_revenue']) + y = feature_matrix_df['is_agent'].astype(int) + + # Train/test split + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42, stratify=y + ) + + # XGBoost with class imbalance handling + scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() + + model = xgb.XGBClassifier( + n_estimators=200, + max_depth=6, + learning_rate=0.05, + scale_pos_weight=scale_pos_weight, + eval_metric='auc', + early_stopping_rounds=20 + ) + + model.fit( + X_train, y_train, + eval_set=[(X_test, y_test)], + verbose=10 + ) + + # Evaluation + y_pred = model.predict(X_test) + y_prob = model.predict_proba(X_test)[:, 1] + + print(classification_report(y_test, y_pred)) + print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}") + + # Feature importance + importance_df = pd.DataFrame({ + 'feature': X.columns, + 'importance': model.feature_importances_ + }).sort_values('importance', ascending=False) + + return model, importance_df + +# Publish to ModelRegistry +def publish_classifier(model, registry): + import joblib + model_bytes = joblib.dumps(model) + registry.redis_client.set('classifier:agent_detector:latest', model_bytes) +``` + +### Inference Integration + +**Update Pricing Provider** (`backend/provider/app.py`): +```python +from lib.model_registry import ModelRegistry +import joblib + +registry = ModelRegistry() + +# Load classifier on startup +classifier_bytes = registry.redis_client.get('classifier:agent_detector:latest') +agent_classifier = joblib.loads(classifier_bytes) + +@app.get("/api/{mode}/price/{productId}") +def get_price(mode, productId, sessionId, experimentId): + # Fetch session features from real-time feature store + session_features = compute_session_features(sessionId) # New function + + # Predict agent probability + agent_prob = agent_classifier.predict_proba([session_features])[0, 1] + + # Apply dynamic markup based on agent probability + base_price = get_base_price(productId) + + if agent_prob > 0.7: # High confidence agent + markup = 1.3 + elif agent_prob > 0.4: # Uncertain + markup = 1.1 + else: # Likely human + markup = 1.0 + + final_price = base_price * markup + + return PriceResponse( + productId=productId, + price=final_price, + agent_probability=agent_prob, + markup=markup + ) +``` + +**Real-Time Feature Computation:** +```python +# backend/provider/features.py +def compute_session_features(sessionId: str) -> np.ndarray: + """ + Compute features for active session from recent Kafka events + Uses sliding window (last 5 minutes) + """ + consumer = KafkaConsumer('user-interactions') + + # Fetch recent events for this session + events = [] + for msg in consumer: + event = msg.value + if event['sessionId'] == sessionId: + events.append(event) + + # Stop after collecting enough history or timeout + if len(events) > 100 or time_elapsed > 300: + break + + events_df = pd.DataFrame(events) + + # Apply feature pipeline (same as training) + feature_pipeline = load_feature_pipeline() + features = feature_pipeline.transform(events_df) + + return features.values[0] # Return single row as array +``` + +--- + +## IV. PHASE 3: Multi-Task Learning Architecture + +### Conceptual Framework + +**Two Tasks:** +1. **Task A: Agent Classification** (what we just built) + - Input: Session features + - Output: P(is_agent) + - Loss: Binary cross-entropy + +2. **Task B: Price Sensitivity Prediction** + - Input: Session features + Product features + Current price + - Output: P(purchase | price) or Willingness-to-Pay (WTP) + - Loss: Binary cross-entropy (purchase) or MSE (WTP) + +**Shared Representation Hypothesis:** +- Both tasks benefit from learning session behavioral patterns +- Agent sessions have different price sensitivity than human sessions +- Shared feature encoder can learn robust representations + +### Multi-Task Network Architecture + +```python +class MultiTaskPricingModel(nn.Module): + """ + Shared encoder for session features + Two task-specific heads + """ + def __init__(self, n_session_features=29, n_product_features=10): + super().__init__() + + # Shared session encoder + self.session_encoder = nn.Sequential( + nn.Linear(n_session_features, 128), + nn.BatchNorm1d(128), + nn.ReLU(), + nn.Dropout(0.3), + nn.Linear(128, 64), + nn.BatchNorm1d(64), + nn.ReLU() + ) + + # Product encoder (processes product features + price) + self.product_encoder = nn.Sequential( + nn.Linear(n_product_features, 32), + nn.ReLU(), + nn.Linear(32, 16) + ) + + # Task A: Agent classification head + self.agent_head = nn.Sequential( + nn.Linear(64, 32), + nn.ReLU(), + nn.Linear(32, 1) # Logit for is_agent + ) + + # Task B: Purchase probability head (uses both session + product) + self.purchase_head = nn.Sequential( + nn.Linear(64 + 16, 32), # Concatenate session + product encodings + nn.ReLU(), + nn.Linear(32, 1) # Logit for purchase probability + ) + + def forward(self, session_features, product_features): + # Encode session (shared) + session_enc = self.session_encoder(session_features) + + # Encode product + product_enc = self.product_encoder(product_features) + + # Task A prediction + agent_logit = self.agent_head(session_enc) + + # Task B prediction + combined = torch.cat([session_enc, product_enc], dim=1) + purchase_logit = self.purchase_head(combined) + + return { + 'agent_logit': agent_logit, + 'purchase_logit': purchase_logit, + 'session_embedding': session_enc # For knowledge distillation + } +``` + +### Multi-Task Loss Function + +```python +def multi_task_loss(outputs, targets, task_weights={'agent': 1.0, 'purchase': 2.0}): + """ + Weighted combination of task losses + + Args: + outputs: dict with 'agent_logit', 'purchase_logit' + targets: dict with 'is_agent', 'purchased' + task_weights: importance weights for each task + """ + # Task A: Agent classification + loss_agent = F.binary_cross_entropy_with_logits( + outputs['agent_logit'].squeeze(), + targets['is_agent'].float() + ) + + # Task B: Purchase prediction + loss_purchase = F.binary_cross_entropy_with_logits( + outputs['purchase_logit'].squeeze(), + targets['purchased'].float() + ) + + # Weighted sum + total_loss = ( + task_weights['agent'] * loss_agent + + task_weights['purchase'] * loss_purchase + ) + + return total_loss, { + 'loss_agent': loss_agent.item(), + 'loss_purchase': loss_purchase.item(), + 'loss_total': total_loss.item() + } +``` + +### Training Loop + +**File: `experiments/ml/train_multitask.py`** + +```python +import torch +from torch.utils.data import DataLoader, TensorDataset + +def train_multitask_model(feature_matrix_df, n_epochs=100): + # Prepare data + X_session = feature_matrix_df[SESSION_FEATURE_COLS].values + X_product = feature_matrix_df[PRODUCT_FEATURE_COLS].values + y_agent = feature_matrix_df['is_agent'].values + y_purchase = feature_matrix_df['purchased'].values + + # Convert to tensors + dataset = TensorDataset( + torch.FloatTensor(X_session), + torch.FloatTensor(X_product), + torch.FloatTensor(y_agent), + torch.FloatTensor(y_purchase) + ) + + train_loader = DataLoader(dataset, batch_size=64, shuffle=True) + + # Initialize model + model = MultiTaskPricingModel( + n_session_features=X_session.shape[1], + n_product_features=X_product.shape[1] + ) + + optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4) + scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5) + + # Training loop + for epoch in range(n_epochs): + model.train() + epoch_losses = {'agent': [], 'purchase': [], 'total': []} + + for X_sess_batch, X_prod_batch, y_agent_batch, y_purch_batch in train_loader: + optimizer.zero_grad() + + # Forward pass + outputs = model(X_sess_batch, X_prod_batch) + + # Compute loss + loss, loss_dict = multi_task_loss( + outputs, + {'is_agent': y_agent_batch, 'purchased': y_purch_batch} + ) + + # Backward pass + loss.backward() + optimizer.step() + + # Track losses + for k, v in loss_dict.items(): + epoch_losses[k.replace('loss_', '')].append(v) + + # Log epoch stats + avg_loss = np.mean(epoch_losses['total']) + print(f"Epoch {epoch}: Loss={avg_loss:.4f}, " + f"Agent={np.mean(epoch_losses['agent']):.4f}, " + f"Purchase={np.mean(epoch_losses['purchase']):.4f}") + + scheduler.step(avg_loss) + + return model +``` + +### Knowledge Distillation for Pricing Heuristics + +**Goal:** Extract interpretable pricing rules from the learned multi-task model + +**Method 1: Decision Tree Distillation** +```python +from sklearn.tree import DecisionTreeClassifier, export_text + +def distill_pricing_rules(multitask_model, feature_matrix_df): + """ + Train interpretable decision tree to mimic multi-task model's pricing decisions + """ + # Get embeddings from multi-task model + X_session = torch.FloatTensor(feature_matrix_df[SESSION_FEATURE_COLS].values) + X_product = torch.FloatTensor(feature_matrix_df[PRODUCT_FEATURE_COLS].values) + + with torch.no_grad(): + outputs = multitask_model(X_session, X_product) + session_embeddings = outputs['session_embedding'].numpy() + agent_probs = torch.sigmoid(outputs['agent_logit']).numpy() + purchase_probs = torch.sigmoid(outputs['purchase_logit']).numpy() + + # Compute optimal markup based on model predictions + # High agent prob + low purchase prob → high markup + optimal_markup = compute_markup_from_probs(agent_probs, purchase_probs) + + # Train decision tree to predict optimal markup + distilled_tree = DecisionTreeClassifier(max_depth=5, min_samples_leaf=50) + distilled_tree.fit(feature_matrix_df[SESSION_FEATURE_COLS], optimal_markup) + + # Extract human-readable rules + rules_text = export_text(distilled_tree, feature_names=SESSION_FEATURE_COLS) + + print("Distilled Pricing Rules:") + print(rules_text) + + return distilled_tree, rules_text + +def compute_markup_from_probs(agent_probs, purchase_probs): + """ + Heuristic: + - High agent prob → increase markup (exploit reconnaissance) + - Low purchase prob → decrease markup (improve conversion) + """ + markup = 1.0 + 0.5 * agent_probs - 0.3 * purchase_probs + return np.clip(markup, 0.7, 1.5) # Bounds: [70% to 150%] +``` + +**Method 2: SHAP Values for Feature Importance** +```python +import shap + +def explain_pricing_model(multitask_model, feature_matrix_df): + """Use SHAP to understand which features drive pricing decisions""" + + # Wrap model for SHAP + def model_predict(X_session): + X_session_t = torch.FloatTensor(X_session) + X_product_t = torch.zeros(len(X_session), n_product_features) # Placeholder + + with torch.no_grad(): + outputs = multitask_model(X_session_t, X_product_t) + agent_probs = torch.sigmoid(outputs['agent_logit']).numpy() + + return agent_probs + + # Compute SHAP values + explainer = shap.KernelExplainer(model_predict, feature_matrix_df[SESSION_FEATURE_COLS].sample(100)) + shap_values = explainer.shap_values(feature_matrix_df[SESSION_FEATURE_COLS].sample(200)) + + # Visualize + shap.summary_plot(shap_values, feature_matrix_df[SESSION_FEATURE_COLS].sample(200)) + + return shap_values +``` + +--- + +## V. PHASE 4: Synthetic Dynamic Pricing Simulator + +### Motivation + +**Problem:** Real-world pricing experiments are: +- Slow (need to wait for user sessions) +- Expensive (margin leakage during exploration) +- Risky (can break revenue if pricing goes wrong) + +**Solution:** Fast synthetic environment to: +1. Simulate user/agent behavior +2. Test pricing strategies in closed loop +3. Train RL agents for dynamic pricing +4. Validate multi-task model predictions + +### Simulator Architecture + +**File: `experiments/ml/simulator/env.py`** + +```python +import gymnasium as gym +import numpy as np +from typing import Dict, Tuple + +class DynamicPricingEnv(gym.Env): + """ + Synthetic environment for dynamic pricing with human/agent users + + State: [current_demand, inventory_level, time_of_day, agent_fraction, avg_session_velocity] + Action: price_multiplier ∈ [0.7, 1.5] (continuous) + Reward: revenue - margin_leakage_penalty + """ + + def __init__(self, + n_products=20, + n_timesteps=1000, + human_price_sensitivity=2.0, # Elasticity + agent_price_sensitivity=5.0, # Agents more price-aware + agent_fraction=0.3): + super().__init__() + + self.n_products = n_products + self.n_timesteps = n_timesteps + self.human_sensitivity = human_price_sensitivity + self.agent_sensitivity = agent_price_sensitivity + self.agent_fraction = agent_fraction + + # State space: [demand, inventory, hour, agent_frac, velocity] + self.observation_space = gym.spaces.Box( + low=np.array([0, 0, 0, 0, 0]), + high=np.array([100, 100, 24, 1, 20]), + dtype=np.float32 + ) + + # Action space: price multiplier + self.action_space = gym.spaces.Box( + low=0.7, high=1.5, shape=(1,), dtype=np.float32 + ) + + # Internal state + self.base_prices = np.random.uniform(100, 500, n_products) + self.inventory = np.full(n_products, 50) + self.timestep = 0 + self.demand_history = [] + + def reset(self, seed=None): + super().reset(seed=seed) + self.inventory = np.full(self.n_products, 50) + self.timestep = 0 + self.demand_history = [] + return self._get_state(), {} + + def step(self, action: float) -> Tuple[np.ndarray, float, bool, bool, Dict]: + """ + Execute one pricing decision + + Args: + action: price_multiplier + + Returns: + state, reward, terminated, truncated, info + """ + price_multiplier = action[0] + + # Simulate user arrivals (Poisson process) + hour = self.timestep % 24 + arrival_rate = self._get_arrival_rate(hour) + n_arrivals = np.random.poisson(arrival_rate) + + # Separate humans and agents + n_agents = np.random.binomial(n_arrivals, self.agent_fraction) + n_humans = n_arrivals - n_agents + + # Simulate purchase decisions + current_price = self.base_prices[0] * price_multiplier # Simplified: 1 product + + # Human purchase probability (logistic demand curve) + human_purchase_prob = self._purchase_probability( + current_price, + self.base_prices[0], + self.human_sensitivity + ) + human_purchases = np.random.binomial(n_humans, human_purchase_prob) + + # Agent purchase probability (more sensitive) + agent_purchase_prob = self._purchase_probability( + current_price, + self.base_prices[0], + self.agent_sensitivity + ) + agent_purchases = np.random.binomial(n_agents, agent_purchase_prob) + + # Compute revenue + revenue = current_price * (human_purchases + agent_purchases) + + # Compute margin leakage (agents getting oracle prices) + oracle_price = self.base_prices[0] * 1.2 # Optimal price for agents + margin_leakage = (oracle_price - current_price) * agent_purchases + margin_leakage = max(0, margin_leakage) # Only count if we underpriced + + # Reward = revenue - margin leakage penalty + reward = revenue - 0.5 * margin_leakage + + # Update inventory + total_purchases = human_purchases + agent_purchases + self.inventory[0] -= total_purchases + + # Track demand + self.demand_history.append(n_arrivals) + + # Next state + self.timestep += 1 + terminated = self.timestep >= self.n_timesteps + truncated = self.inventory[0] <= 0 + + state = self._get_state() + info = { + 'revenue': revenue, + 'margin_leakage': margin_leakage, + 'human_purchases': human_purchases, + 'agent_purchases': agent_purchases, + 'agent_fraction': n_agents / n_arrivals if n_arrivals > 0 else 0 + } + + return state, reward, terminated, truncated, info + + def _get_state(self) -> np.ndarray: + """Current state observation""" + demand = np.mean(self.demand_history[-10:]) if self.demand_history else 0 + inventory = self.inventory[0] + hour = self.timestep % 24 + agent_frac = self.agent_fraction # Simplified + velocity = np.random.uniform(1, 10) # Placeholder + + return np.array([demand, inventory, hour, agent_frac, velocity], dtype=np.float32) + + def _get_arrival_rate(self, hour: int) -> float: + """Simulate time-of-day arrival patterns""" + # Higher demand during business hours (9am-5pm) + if 9 <= hour <= 17: + return 10.0 + else: + return 3.0 + + def _purchase_probability(self, current_price, base_price, sensitivity): + """ + Logistic demand curve + P(purchase) = 1 / (1 + exp(sensitivity * (log(current_price) - log(base_price)))) + """ + price_ratio = current_price / base_price + logit = sensitivity * (np.log(price_ratio)) + prob = 1 / (1 + np.exp(logit)) + return np.clip(prob, 0.01, 0.99) +``` + +### Behavioral Models + +**File: `experiments/ml/simulator/agents.py`** + +```python +class SyntheticUser: + """Base class for simulated users""" + def __init__(self, price_sensitivity, session_velocity): + self.price_sensitivity = price_sensitivity + self.session_velocity = session_velocity # interactions/min + + def generate_session(self) -> List[Event]: + """Generate event sequence""" + raise NotImplementedError + +class HumanUser(SyntheticUser): + """Simulates human browsing behavior""" + def __init__(self): + super().__init__( + price_sensitivity=2.0, # Moderate + session_velocity=np.random.uniform(1, 3) # Slow browsing + ) + + def generate_session(self) -> List[Event]: + events = [] + + # Human pattern: slow, exploratory + n_page_views = np.random.poisson(5) + n_hovers = np.random.poisson(8) + n_products_viewed = np.random.randint(2, 6) + + # Time between events (seconds) + inter_event_times = np.random.exponential(20, size=n_page_views) + + for i in range(n_page_views): + events.append({ + 'eventName': 'page_view', + 'timestamp': sum(inter_event_times[:i]), + 'velocity': self.session_velocity + }) + + # Purchase decision (price-dependent) + purchase_prob = self._compute_purchase_prob(current_price) + if np.random.rand() < purchase_prob: + events.append({'eventName': 'purchase_complete', ...}) + + return events + +class AgentUser(SyntheticUser): + """Simulates agent/bot behavior""" + def __init__(self): + super().__init__( + price_sensitivity=5.0, # Highly price-aware + session_velocity=np.random.uniform(10, 20) # Fast reconnaissance + ) + + def generate_session(self) -> List[Event]: + events = [] + + # Agent pattern: fast, systematic + n_products_viewed = np.random.randint(15, 30) # Views many products + inter_event_times = np.random.exponential(2, size=n_products_viewed) # Fast + + for i in range(n_products_viewed): + events.append({ + 'eventName': 'view_item_page', + 'timestamp': sum(inter_event_times[:i]), + 'velocity': self.session_velocity + }) + + # Agents rarely purchase (reconnaissance) + purchase_prob = 0.05 + if np.random.rand() < purchase_prob: + events.append({'eventName': 'purchase_complete', ...}) + + return events +``` + +### RL Training for Pricing Policy + +**File: `experiments/ml/simulator/train_rl.py`** + +```python +from stable_baselines3 import PPO +from stable_baselines3.common.vec_env import DummyVecEnv + +def train_pricing_policy(): + """Train RL agent to learn optimal pricing strategy""" + + # Create environment + env = DynamicPricingEnv(agent_fraction=0.3) + env = DummyVecEnv([lambda: env]) + + # Train PPO agent + model = PPO( + "MlpPolicy", + env, + learning_rate=3e-4, + n_steps=2048, + batch_size=64, + n_epochs=10, + verbose=1, + tensorboard_log="./logs/" + ) + + model.learn(total_timesteps=100_000) + + # Evaluate + obs = env.reset() + total_reward = 0 + for _ in range(1000): + action, _ = model.predict(obs, deterministic=True) + obs, reward, done, info = env.step(action) + total_reward += reward + + if done: + print(f"Episode reward: {total_reward}") + break + + return model + +# Compare with baseline strategies +def benchmark_strategies(): + strategies = { + 'fixed': lambda state: np.array([1.0]), # No markup + 'surge': lambda state: np.array([1.2]) if state[0] > 10 else np.array([0.9]), + 'rl': lambda state: rl_model.predict(state)[0] + } + + for name, policy in strategies.items(): + env = DynamicPricingEnv() + total_reward = evaluate_policy(env, policy, n_episodes=100) + print(f"{name}: avg reward = {total_reward:.2f}") +``` + +### Integration with Multi-Task Model + +**Use multi-task model predictions as state features:** + +```python +class EnhancedPricingEnv(DynamicPricingEnv): + """Environment augmented with multi-task model predictions""" + + def __init__(self, multitask_model): + super().__init__() + self.multitask_model = multitask_model + + def _get_state(self): + # Get base state + base_state = super()._get_state() + + # Generate synthetic session features + session_features = self._generate_session_features() + + # Get multi-task model predictions + with torch.no_grad(): + outputs = self.multitask_model( + torch.FloatTensor(session_features).unsqueeze(0), + torch.zeros(1, n_product_features) + ) + agent_prob = torch.sigmoid(outputs['agent_logit']).item() + purchase_prob = torch.sigmoid(outputs['purchase_logit']).item() + + # Augment state with model predictions + enhanced_state = np.concatenate([ + base_state, + [agent_prob, purchase_prob] + ]) + + return enhanced_state +``` + +--- + +## VI. IMPLEMENTATION ROADMAP + +### Phase 1: Foundation (Week 1-2) + +**1.1 Refactor Feature Pipeline** ✓ +- [ ] Create `experiments/ml/features/` directory +- [ ] Implement `TemporalFeatureExtractor` (vectorized) +- [ ] Implement `BehavioralFeatureExtractor` +- [ ] Implement `ProductFeatureExtractor` +- [ ] Implement `UserAgentParser` +- [ ] Create end-to-end `build_feature_pipeline()` +- [ ] Write unit tests for each extractor +- [ ] Benchmark: ensure <5min processing for 100k events + +**1.2 Data Loading** ✓ +- [ ] Implement `experiments/ml/datasets.py` +- [ ] Add function `load_events_from_kafka(topic, time_range)` +- [ ] Add function `load_experiments_from_supabase()` +- [ ] Add function `create_labeled_dataset()` (joins events + experiments) +- [ ] Add data validation (check for missing experimentIds, etc.) + +**1.3 Model Registry Updates** ✓ +- [ ] Extend `lib/model_registry.py` to store: + - Pickled sklearn/XGBoost models + - PyTorch model state_dicts + - Feature column names (schema versioning) +- [ ] Add `publish_classifier(model, metadata)` +- [ ] Add `publish_multitask_model(model, metadata)` + +### Phase 2: Supervised Classifier (Week 2-3) + +**2.1 Training Pipeline** ✓ +- [ ] Create `experiments/ml/train_classifier.py` +- [ ] Load labeled dataset with feature pipeline +- [ ] Train XGBoost classifier +- [ ] Evaluate: ROC-AUC, precision/recall, confusion matrix +- [ ] Generate feature importance report +- [ ] Publish model to registry + +**2.2 Inference Integration** ✓ +- [ ] Update `backend/provider/app.py` +- [ ] Implement `compute_session_features(sessionId)` for real-time inference +- [ ] Add `/api/detect-agent/{sessionId}` endpoint (debugging) +- [ ] Modify `/api/{mode}/price/{productId}` to: + - Compute agent probability + - Apply dynamic markup based on probability + - Log (sessionId, agent_prob, markup) to Kafka + +**2.3 Monitoring** ✓ +- [ ] Create Airflow DAG: `agent_classifier_training_pipeline` + - Runs daily + - Fetches last 7 days of data + - Retrains classifier + - Publishes to registry +- [ ] Add Grafana dashboard: + - Agent detection rate over time + - Precision/recall metrics + - Markup distribution (human vs agent sessions) + +### Phase 3: Multi-Task Learning (Week 3-5) + +**3.1 Model Development** ✓ +- [ ] Create `experiments/ml/models/multitask.py` +- [ ] Implement `MultiTaskPricingModel` (PyTorch) +- [ ] Implement `multi_task_loss()` +- [ ] Create `experiments/ml/train_multitask.py` + +**3.2 Training** ✓ +- [ ] Prepare product-level dataset (not just session-level) + - Each row: (session_features, product_features, price, is_agent, purchased) + - Requires joining events with product catalog +- [ ] Train multi-task model +- [ ] Evaluate both tasks: + - Agent classification: ROC-AUC + - Purchase prediction: ROC-AUC, calibration plot +- [ ] Hyperparameter tuning (task weights, learning rate, architecture) + +**3.3 Knowledge Distillation** ✓ +- [ ] Implement `distill_pricing_rules()` (decision tree) +- [ ] Implement `explain_pricing_model()` (SHAP) +- [ ] Generate interpretable pricing rule report +- [ ] Validate: compare distilled tree performance vs full model + +**3.4 Deployment** ✓ +- [ ] Serialize PyTorch model to ONNX for fast inference +- [ ] Update pricing provider to use multi-task model: + - Predict agent probability + - Predict purchase probability given current price + - Compute optimal price (maximize expected revenue) +- [ ] A/B test: multi-task pricing vs baseline elasticity pricing + +### Phase 4: Synthetic Simulator (Week 5-6) + +**4.1 Environment** ✓ +- [ ] Create `experiments/ml/simulator/` +- [ ] Implement `DynamicPricingEnv` (Gymnasium) +- [ ] Implement `HumanUser` and `AgentUser` behavioral models +- [ ] Validate environment: + - Test human_sensitivity vs agent_sensitivity differentiation + - Visualize demand curves for both user types + +**4.2 RL Training** ✓ +- [ ] Create `experiments/ml/simulator/train_rl.py` +- [ ] Train PPO agent +- [ ] Benchmark against baseline strategies (fixed, surge, elasticity) +- [ ] Visualize: reward curves, pricing decisions over time + +**4.3 Integration with Multi-Task Model** ✓ +- [ ] Implement `EnhancedPricingEnv` (uses multi-task model predictions) +- [ ] Train RL agent in enhanced environment +- [ ] Compare: RL with model predictions vs RL without + +**4.4 Validation** ✓ +- [ ] Run counterfactual simulations: + - "What if agent fraction was 50%?" + - "What if agents were less price-sensitive?" +- [ ] Generate risk analysis report for pricing strategies + +### Phase 5: Production Integration (Week 7-8) + +**5.1 Real-Time Feature Store** ✓ +- [ ] Implement session feature caching (Redis) +- [ ] Create Kafka Streams job: + - Consumes `user-interactions` + - Computes rolling session features + - Publishes to Redis with TTL +- [ ] Update `compute_session_features()` to read from cache + +**5.2 Pricing Service Refactor** ✓ +- [ ] Decouple pricing logic from provider service +- [ ] Create `experiments/ml/inference/pricing_service.py` +- [ ] Expose gRPC API for low-latency pricing +- [ ] Load model once on startup (not per-request) + +**5.3 Observability** ✓ +- [ ] Add OpenTelemetry tracing to pricing service +- [ ] Create Grafana dashboard: + - Pricing latency (p50, p99) + - Revenue metrics (human vs agent) + - Model prediction distribution +- [ ] Set up alerts: + - Agent detection rate spike + - Margin leakage threshold exceeded + - Model inference failures + +**5.4 Documentation** ✓ +- [ ] Write model cards for both models: + - Intended use + - Training data characteristics + - Performance metrics + - Limitations & biases +- [ ] Create runbook for retraining pipelines +- [ ] API documentation for pricing service + +--- + +## VII. TECHNICAL STACK + +### Data Engineering +- **Streaming:** Kafka (existing) +- **Storage:** Supabase (PostgreSQL) for metadata, Kafka for events +- **Feature Store:** Redis (for real-time session features) +- **Orchestration:** Airflow (existing) + +### Machine Learning +- **Feature Engineering:** Pandas, NumPy, Scikit-learn +- **Classifier:** XGBoost / LightGBM (for speed + interpretability) +- **Multi-Task Model:** PyTorch +- **RL:** Stable-Baselines3 (PPO) +- **Explainability:** SHAP, Scikit-learn decision trees +- **Model Registry:** Redis (lightweight) or MLflow (if scaling up) + +### Inference +- **Serving:** ONNX Runtime (for PyTorch models) or direct PyTorch +- **API:** FastAPI (existing backend) +- **Caching:** Redis + +### Monitoring +- **Metrics:** Prometheus +- **Dashboards:** Grafana +- **Tracing:** OpenTelemetry + +--- + +## VIII. KEY DECISIONS & TRADEOFFS + +### Decision 1: XGBoost vs Neural Network for Classifier +**Choice:** Start with XGBoost, migrate to NN if multi-task learning shows benefit + +**Rationale:** +- XGBoost: Faster training, interpretable, strong baseline +- Neural Network: Required for multi-task learning with shared encoder +- **Approach:** Build both, use XGBoost for initial deployment + +### Decision 2: Real-Time vs Batch Feature Engineering +**Choice:** Hybrid approach +- **Batch:** Full feature pipeline in Airflow (daily retraining) +- **Real-Time:** Lightweight rolling features in Kafka Streams (for inference) + +**Tradeoff:** Complexity vs latency +- Real-time adds complexity but enables dynamic pricing +- Batch is simpler but can't adapt mid-session + +### Decision 3: Separate Simulator vs Production Trace Replay +**Choice:** Separate simulator (not trace replay) + +**Rationale:** +- **Trace replay:** More realistic but slow, requires large historical dataset +- **Simulator:** Fast iteration, can test edge cases (high agent fraction, etc.) +- **Approach:** Use simulator for development, validate on historical traces before production + +### Decision 4: Knowledge Distillation Method +**Choice:** Decision tree distillation + SHAP explanations + +**Rationale:** +- Decision trees: Generate actionable rules for non-ML stakeholders +- SHAP: Understand feature importance for model debugging +- Both complement each other (global rules + local explanations) + +--- + +## IX. SUCCESS METRICS + +### Model Performance +- **Agent Classifier:** + - ROC-AUC > 0.90 + - Precision > 0.85 at 80% recall (minimize false agent flags) +- **Purchase Predictor:** + - ROC-AUC > 0.75 + - Calibration error < 0.1 (predicted probs match observed rates) + +### Business Impact +- **Margin Leakage Reduction:** Target 30% reduction in `L_agent` +- **Revenue:** No degradation in human conversion rate +- **Latency:** p99 pricing latency < 100ms + +### Operational +- **Retraining Frequency:** Daily (automated) +- **Model Drift Detection:** Alert if agent detection rate changes >20% week-over-week +- **Uptime:** 99.9% availability for pricing service + +--- + +## X. RISKS & MITIGATION + +### Risk 1: Label Noise +**Issue:** `xp_human_only` flag may be incorrect (humans acting like agents, vice versa) + +**Mitigation:** +- Use soft labels (agent probability) instead of hard classification +- Implement active learning: flag uncertain sessions for manual review +- Combine flag with behavioral heuristics (velocity > threshold) + +### Risk 2: Concept Drift +**Issue:** Agent behavior evolves to evade detection + +**Mitigation:** +- Monitor model performance metrics daily +- Implement champion/challenger A/B testing for new models +- Retrain weekly with latest data + +### Risk 3: Revenue Impact +**Issue:** Aggressive agent markup hurts conversion + +**Mitigation:** +- Start with conservative markup (10% max) +- Run A/B test with 10% traffic +- Monitor revenue closely, rollback if <5% degradation + +### Risk 4: Simulator Misalignment +**Issue:** Simulator doesn't match real-world dynamics + +**Mitigation:** +- Calibrate simulator parameters from historical data +- Validate RL policies on historical traces before production +- Run small-scale live experiments + +--- + +## XI. NEXT STEPS (Immediate Actions) + +1. **Review this document** with team for alignment +2. **Set up project structure:** + ```bash + mkdir -p experiments/ml/{features,models,simulator,inference} + mkdir -p experiments/ml/notebooks # For EDA + ``` +3. **Create sample dataset:** + - Pull 1 week of events from Kafka + - Join with experiments table + - Validate label distribution (human/agent ratio) +4. **Prototype feature pipeline:** + - Start with `TemporalFeatureExtractor` + - Benchmark on 10k events + - Ensure output matches expected schema +5. **Kickoff Phase 1** tasks from roadmap + +--- + +## XII. REFERENCES & RESOURCES + +### Papers +- [Multi-Task Learning Using Uncertainty to Weigh Losses](https://arxiv.org/abs/1705.07115) - Kendall et al. +- [Distilling the Knowledge in a Neural Network](https://arxiv.org/abs/1503.02531) - Hinton et al. +- [Deep Reinforcement Learning for Online Pricing](https://arxiv.org/abs/1912.02572) + +### Code Examples +- Existing pipeline: `/home/user/PHANTOM/experiments/procesing/pipelines.py` +- Existing pricers: `/home/user/PHANTOM/experiments/procesing/pricers/` +- Agent runner: `/home/user/PHANTOM/experiments/agents/agent.py` + +### Internal Documentation +- Event schema: `/home/user/PHANTOM/web/src/lib/events.ts` +- Pricing provider API: `/home/user/PHANTOM/backend/provider/app.py` +- Airflow DAGs: `/home/user/PHANTOM/experiments/airflow/dags/` + +--- + +## APPENDIX A: Feature Schema + +```python +SESSION_FEATURE_COLS = [ + # Temporal (8) + 'session_duration_sec', + 'total_interactions', + 'avg_time_between_events', + 'std_time_between_events', + 'interaction_velocity', + 'max_velocity_5min', + 'min_time_between_events', + 'session_start_hour', + + # Behavioral (10) + 'page_views', + 'item_views', + 'cart_adds', + 'purchases', + 'hover_events', + 'filter_events', + 'cart_to_view_ratio', + 'conversion_rate', + 'hover_intensity', + 'unique_pages_visited', + + # Product (8) + 'unique_products_viewed', + 'product_view_depth', + 'avg_price_seen', + 'min_price_seen', + 'max_price_seen', + 'std_price_seen', + 'price_range_explored', + 'filter_usage_count', + + # UserAgent (3) + 'is_headless_browser', + 'is_automation_tool', + 'browser_family' +] + +PRODUCT_FEATURE_COLS = [ + 'base_price', + 'current_demand', + 'inventory_level', + 'avg_demand_last_hour', + 'price_elasticity', + 'n_amenities', + 'is_refundable', + 'days_until_date', + 'competitor_price_diff', # If available + 'view_count_last_hour' +] + +LABEL_COLS = [ + 'is_agent', # Binary: 0=human, 1=agent + 'purchased', # Binary: 0=no purchase, 1=purchase + 'total_revenue' # Float: revenue from this session +] +``` + +--- + +## APPENDIX B: Sample Code Snippets + +### Data Loading +```python +# experiments/ml/datasets.py +from kafka import KafkaConsumer +import pandas as pd +from supabase import create_client + +def load_events_from_kafka(topic='user-interactions', hours=24): + consumer = KafkaConsumer( + topic, + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest', + value_deserializer=lambda m: json.loads(m.decode('utf-8')) + ) + + events = [] + cutoff_time = datetime.now() - timedelta(hours=hours) + + for message in consumer: + event = message.value + if pd.to_datetime(event['ts']) > cutoff_time: + events.append(event) + + return pd.DataFrame(events) + +def load_experiments_from_supabase(): + client = create_client(SUPABASE_URL, SUPABASE_KEY) + response = client.table('experiments').select('*').execute() + return pd.DataFrame(response.data) + +def create_labeled_dataset(): + events_df = load_events_from_kafka(hours=168) # 1 week + experiments_df = load_experiments_from_supabase() + + # Join + labeled_df = events_df.merge( + experiments_df[['id', 'xp_human_only']], + left_on='experimentId', + right_on='id', + how='inner' + ) + + # Create labels + labeled_df['is_agent'] = ~labeled_df['xp_human_only'] + + # Apply feature pipeline + feature_pipeline = build_feature_pipeline() + feature_matrix = feature_pipeline.fit_transform(labeled_df) + + return feature_matrix +``` + +### Feature Extraction +```python +# experiments/ml/features/temporal.py +from sklearn.base import BaseEstimator, TransformerMixin + +class TemporalFeatureExtractor(BaseEstimator, TransformerMixin): + def fit(self, X, y=None): + return self + + def transform(self, events_df): + df = events_df.copy() + df['ts_dt'] = pd.to_datetime(df['ts']) + df = df.sort_values(['sessionId', 'ts_dt']) + + # Time diffs (vectorized) + df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds() + + # Session-level aggregates + session_temporal = df.groupby('sessionId').agg({ + 'ts_dt': [ + ('session_duration_sec', lambda x: (x.max() - x.min()).total_seconds()), + ('session_start_hour', lambda x: x.min().hour) + ], + 'time_diff': [ + ('avg_time_between_events', 'mean'), + ('std_time_between_events', 'std'), + ('min_time_between_events', 'min') + ], + 'eventName': [ + ('total_interactions', 'count') + ] + }) + + # Flatten multi-index columns + session_temporal.columns = [col[1] if col[1] else col[0] for col in session_temporal.columns] + + # Compute velocity + session_temporal['interaction_velocity'] = ( + session_temporal['total_interactions'] / + (session_temporal['session_duration_sec'] / 60 + 1e-6) # per minute + ) + + return session_temporal.reset_index() +``` + +--- + +**END OF GAMEPLAN**