Files
PHANTOM/experiments/procesing/steps/session.py

259 lines
11 KiB
Python

"""
Session feature extraction for ML training pipeline.
"""
import pandas as pd
import numpy as np
import re
from typing import Dict, Any
from procesing.steps.base import BaseContextStep
EVENT_CATS = {
'page_view': ['page_view'],
'item_view': ['view_item_page', 'learn_more_about_item'],
'cart_add': ['add_item_to_cart'],
'purchase': ['purchase', 'checkout_complete'],
'hover': ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'],
# 'filter': ['filter', 'search', 'apply_filter'],
}
HEADLESS_RE = re.compile(r'HeadlessChrome|Headless|PhantomJS', re.I)
AUTOMATION_RE = re.compile(r'Selenium|Playwright|Puppeteer|WebDriver|chromedriver|geckodriver', re.I)
BROWSER_PATTERNS = [('Chrome', r'Chrome/[\d.]+'), ('Firefox', r'Firefox/[\d.]+'),
('Safari', r'Safari/[\d.]+'), ('Edge', r'Edg/[\d.]+')]
class TemporalFeatureStep(BaseContextStep):
"""Vectorized time-based features: durations, velocities, gaps."""
def __init__(self, context, timeout_sec: float = 900, velocity_window: str = '5min'):
super().__init__(context)
self.timeout_sec = timeout_sec
self.velocity_window = velocity_window
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty or 'ts' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
df['ts_dt'] = pd.to_datetime(df['ts'])
df = df.sort_values(['sessionId', 'ts_dt'])
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
df['active_diff'] = df['time_diff'].where(df['time_diff'] <= self.timeout_sec, 0)
agg = df.groupby('sessionId').agg(
session_duration_sec=('active_diff', 'sum'),
total_interactions=('sessionId', 'count'),
avg_time_between_events=('time_diff', 'mean'),
std_time_between_events=('time_diff', 'std'),
min_time_between_events=('time_diff', 'min'),
session_start_hour=('ts_dt', lambda x: x.min().hour),
).reset_index()
agg['std_time_between_events'] = agg['std_time_between_events'].fillna(0)
agg['interaction_velocity'] = np.where(
agg['session_duration_sec'] > 0,
(agg['total_interactions'] / agg['session_duration_sec']) * 60, 0)
vel = df.set_index('ts_dt').groupby('sessionId').resample(self.velocity_window, include_groups=False).size()
agg = agg.merge(vel.groupby('sessionId').max().rename('max_velocity_5min'),on='sessionId', how='left').fillna({'max_velocity_5min': 0}) # warns but its a series so whatevs
return agg
class BehavioralFeatureStep(BaseContextStep):
"""Vectorized event counts and ratios per session."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty or 'eventName' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
for cat, events in EVENT_CATS.items():
df[f'is_{cat}'] = df['eventName'].isin(events)
df['is_hover'] = df['is_hover'] | df['eventName'].str.startswith('hover_over_')
agg = df.groupby('sessionId').agg(
total_events=('eventName', 'count'), unique_pages=('page', 'nunique'),
page_views=('is_page_view', 'sum'), item_views=('is_item_view', 'sum'),
cart_adds=('is_cart_add', 'sum'), purchases=('is_purchase', 'sum'),
hover_events=('is_hover', 'sum'),
# filter_events=('is_filter', 'sum'),
).reset_index()
agg['cart_to_view_ratio'] = np.where(agg['item_views'] > 0, agg['cart_adds'] / agg['item_views'], 0)
agg['conversion_rate'] = np.where(agg['item_views'] > 0, agg['purchases'] / agg['item_views'], 0)
agg['hover_intensity'] = np.where(agg['total_events'] > 0, agg['hover_events'] / agg['total_events'], 0)
return agg
class ProductFeatureStep(BaseContextStep):
"""Vectorized product interaction features: diversity, depth, price sensitivity."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty:
return pd.DataFrame(columns=pd.Series(['sessionId']))
price_col = next((c for c in ['metadata_base_price', 'metadata_price', 'base_price'] if c in df.columns), None)
df['price_seen'] = pd.to_numeric(df[price_col], errors='coerce') if price_col else np.nan
prod_df = df[df['productId'].notna()]
if prod_df.empty:
return pd.DataFrame(columns=pd.Series(['sessionId', 'unique_products_viewed', 'product_view_depth', 'avg_price_seen', 'min_price_seen', 'max_price_seen', 'price_range']))
agg = prod_df.groupby('sessionId').agg(
unique_products_viewed=('productId', 'nunique'),
product_view_depth=('productId', lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0),
avg_price_seen=('price_seen', 'mean'), min_price_seen=('price_seen', 'min'),
max_price_seen=('price_seen', 'max'),
).reset_index()
agg['price_range'] = (agg['max_price_seen'] - agg['min_price_seen']).fillna(0)
return agg
class UserAgentFeatureStep(BaseContextStep):
"""Parse userAgent into bot-detection signals."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame|pd.Series:
df = X.copy()
if df.empty or 'userAgent' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
ua = df.groupby('sessionId')['userAgent'].first().reset_index()
ua['is_headless'] = ua['userAgent'].str.contains(HEADLESS_RE, na=False)
ua['is_automation'] = ua['userAgent'].str.contains(AUTOMATION_RE, na=False)
def get_browser(s):
if pd.isna(s): return 'Unknown'
for name, pat in BROWSER_PATTERNS:
if re.search(pat, s): return name
return 'Other'
ua['browser_family'] = ua['userAgent'].apply(get_browser)
return ua[['sessionId', 'is_headless', 'is_automation', 'browser_family']]
class ExtractSessionFeaturesStep(BaseContextStep):
"""
Vectorized session feature extraction - replaces O(n^2) per-row loop.
Input: interactions_df
Output: session-level feature matrix
"""
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
if X.empty:
return pd.DataFrame()
df = X.copy()
# run all feature steps and merge on sessionId
temporal = TemporalFeatureStep(self.context).transform(df)
behavioral = BehavioralFeatureStep(self.context).transform(df)
product = ProductFeatureStep(self.context).transform(df)
ua = UserAgentFeatureStep(self.context).transform(df)
result = temporal
for other in [behavioral, product, ua]:
if not other.empty and 'sessionId' in other.columns:
result = result.merge(other, on='sessionId', how='left')
# carry forward experimentId for label joining
if 'experimentId' in df.columns:
exp_map = df.groupby('sessionId')['experimentId'].first()
result = result.merge(exp_map, on='sessionId', how='left')
return result
class JoinLabelsStep(BaseContextStep):
"""
Join experiment labels to session features.
Input: (features_df, experiments_df) or features_df (fetches experiments)
Output: labeled feature matrix with is_agent column
"""
def transform(self, X : tuple) -> pd.DataFrame:
data = X;
if isinstance(data, tuple):
features_df, experiments_df = data
else:
features_df = data
if 'experimentId' not in features_df.columns:
return features_df
exp_ids = features_df['experimentId'].dropna().unique().tolist()
experiments_df = self.context.provider.fetch_experiments(exp_ids) if exp_ids else pd.DataFrame()
if features_df.empty:
return features_df
if experiments_df.empty:
features_df['is_agent'] = np.nan
return features_df
exp = experiments_df.copy()
if 'id' in exp.columns:
exp = exp.rename(columns={'id': 'experimentId'})
if 'xp_human_only' in exp.columns:
exp['is_agent'] = ~exp['xp_human_only']
cols = ['experimentId'] + [c for c in ['is_agent', 'xp_human_only', 'xp_market_mode'] if c in exp.columns]
return features_df.merge(exp[cols].drop_duplicates(), on='experimentId', how='left')
class ValidateDataStep(BaseContextStep):
"""
Data quality checks before training.
Input: df
Output: df (unchanged, but logs validation report to context)
"""
REQUIRED = ['sessionId', 'eventName', 'ts']
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
report = {'status': 'valid', 'rows': len(df), 'sessions': 0}
if df.empty:
report['status'] = 'empty'
self.context.cache('validation_report', report)
return df
missing = [c for c in self.REQUIRED if c not in df.columns]
if missing:
report['status'] = 'invalid'
report['missing_cols'] = missing
report['sessions'] = df['sessionId'].nunique() if 'sessionId' in df.columns else 0
report['null_sessions'] = int(df['sessionId'].isna().sum()) if 'sessionId' in df.columns else 0
if 'experimentId' in df.columns:
report['null_experiments'] = int(df['experimentId'].isna().sum())
self.context.cache('validation_report', report)
return df
# legacy compat - kept for backwards compatibility with existing code
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
"""Single-session feature extraction (legacy interface)."""
defaults = {k: 0 for k in ['total_interactions', 'page_views', 'item_views', 'searches',
'cart_adds', 'hovers', 'unique_products_viewed', 'product_view_depth',
'session_duration_sec', 'interaction_velocity',
'avg_time_between_events', 'std_time_between_events', 'cart_to_view_ratio']}
if session_df.empty:
return defaults
session_df = session_df.copy()
if 'sessionId' not in session_df.columns:
session_df['sessionId'] = 'tmp'
# use a dummy context for the steps
class DummyCtx: config = {} # should maybe inherit but whatever
ctx = DummyCtx()
t = TemporalFeatureStep(ctx, timeout_sec=session_timeout_sec).transform(session_df)
b = BehavioralFeatureStep(ctx).transform(session_df)
p = ProductFeatureStep(ctx).transform(session_df)
result = {}
for df in [t, b, p]:
if not df.empty:
for col in df.columns:
if col != 'sessionId':
result[col] = df[col].iloc[0] if len(df) > 0 else 0
remap = {'hover_events': 'hovers', 'filter_events': 'searches', 'unique_pages': 'unique_pages_visited'}
for old, new in remap.items():
if old in result:
result[new] = result.pop(old)
return result