mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
261 lines
11 KiB
Python
261 lines
11 KiB
Python
"""
|
|
Session feature extraction for ML training pipeline.
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
import re
|
|
from typing import Dict, Any
|
|
from procesing.steps.base import BaseContextStep
|
|
|
|
EVENT_CATS = {
|
|
'page_view': ['page_view'],
|
|
'item_view': ['view_item_page', 'learn_more_about_item'],
|
|
'cart_add': ['add_item_to_cart'],
|
|
'purchase': ['purchase', 'checkout_complete'],
|
|
'hover': ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'],
|
|
# 'filter': ['filter', 'search', 'apply_filter'],
|
|
}
|
|
HEADLESS_RE = re.compile(r'HeadlessChrome|Headless|PhantomJS', re.I)
|
|
AUTOMATION_RE = re.compile(r'Selenium|Playwright|Puppeteer|WebDriver|chromedriver|geckodriver', re.I)
|
|
BROWSER_PATTERNS = [('Chrome', r'Chrome/[\d.]+'), ('Firefox', r'Firefox/[\d.]+'),
|
|
('Safari', r'Safari/[\d.]+'), ('Edge', r'Edg/[\d.]+')]
|
|
|
|
|
|
class TemporalFeatureStep(BaseContextStep):
|
|
"""Vectorized time-based features: durations, velocities, gaps."""
|
|
|
|
def __init__(self, context, timeout_sec: float = 900, velocity_window: str = '5min'):
|
|
super().__init__(context)
|
|
self.timeout_sec = timeout_sec
|
|
self.velocity_window = velocity_window
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
|
df = X.copy()
|
|
if df.empty or 'ts' not in df.columns:
|
|
return pd.DataFrame(columns=pd.Series(['sessionId']))
|
|
|
|
df['ts_dt'] = pd.to_datetime(df['ts'])
|
|
df = df.sort_values(['sessionId', 'ts_dt'])
|
|
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
|
|
df['active_diff'] = df['time_diff'].where(df['time_diff'] <= self.timeout_sec, 0)
|
|
|
|
agg = df.groupby('sessionId').agg(
|
|
session_duration_sec=('active_diff', 'sum'),
|
|
total_interactions=('sessionId', 'count'),
|
|
avg_time_between_events=('time_diff', 'mean'),
|
|
std_time_between_events=('time_diff', 'std'),
|
|
min_time_between_events=('time_diff', 'min'),
|
|
session_start_hour=('ts_dt', lambda x: x.min().hour),
|
|
).reset_index()
|
|
agg['std_time_between_events'] = agg['std_time_between_events'].fillna(0)
|
|
agg['interaction_velocity'] = np.where(
|
|
agg['session_duration_sec'] > 0,
|
|
(agg['total_interactions'] / agg['session_duration_sec']) * 60, 0)
|
|
|
|
vel = df.set_index('ts_dt').groupby('sessionId').resample(self.velocity_window, include_groups=False).size()
|
|
max_velocity = vel.groupby('sessionId').max().rename('max_velocity_5min')
|
|
agg = agg.merge(max_velocity, on='sessionId', how='left')
|
|
agg['max_velocity_5min'] = agg['max_velocity_5min'].fillna(0)
|
|
return agg
|
|
|
|
|
|
class BehavioralFeatureStep(BaseContextStep):
|
|
"""Vectorized event counts and ratios per session."""
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
|
df = X.copy()
|
|
if df.empty or 'eventName' not in df.columns:
|
|
return pd.DataFrame(columns=pd.Series(['sessionId']))
|
|
|
|
for cat, events in EVENT_CATS.items():
|
|
df[f'is_{cat}'] = df['eventName'].isin(events)
|
|
df['is_hover'] = df['is_hover'] | df['eventName'].str.startswith('hover_over_')
|
|
|
|
agg = df.groupby('sessionId').agg(
|
|
total_events=('eventName', 'count'), unique_pages=('page', 'nunique'),
|
|
page_views=('is_page_view', 'sum'), item_views=('is_item_view', 'sum'),
|
|
cart_adds=('is_cart_add', 'sum'), purchases=('is_purchase', 'sum'),
|
|
hover_events=('is_hover', 'sum'),
|
|
# filter_events=('is_filter', 'sum'),
|
|
).reset_index()
|
|
agg['cart_to_view_ratio'] = np.where(agg['item_views'] > 0, agg['cart_adds'] / agg['item_views'], 0)
|
|
agg['conversion_rate'] = np.where(agg['item_views'] > 0, agg['purchases'] / agg['item_views'], 0)
|
|
agg['hover_intensity'] = np.where(agg['total_events'] > 0, agg['hover_events'] / agg['total_events'], 0)
|
|
return agg
|
|
|
|
|
|
class ProductFeatureStep(BaseContextStep):
|
|
"""Vectorized product interaction features: diversity, depth, price sensitivity."""
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
|
df = X.copy()
|
|
if df.empty:
|
|
return pd.DataFrame(columns=pd.Series(['sessionId']))
|
|
price_col = next((c for c in ['metadata_base_price', 'metadata_price', 'base_price'] if c in df.columns), None)
|
|
df['price_seen'] = pd.to_numeric(df[price_col], errors='coerce') if price_col else np.nan
|
|
|
|
prod_df = df[df['productId'].notna()]
|
|
if prod_df.empty:
|
|
return pd.DataFrame(columns=pd.Series(['sessionId', 'unique_products_viewed', 'product_view_depth', 'avg_price_seen', 'min_price_seen', 'max_price_seen', 'price_range']))
|
|
|
|
agg = prod_df.groupby('sessionId').agg(
|
|
unique_products_viewed=('productId', 'nunique'),
|
|
product_view_depth=('productId', lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0),
|
|
avg_price_seen=('price_seen', 'mean'), min_price_seen=('price_seen', 'min'),
|
|
max_price_seen=('price_seen', 'max'),
|
|
).reset_index()
|
|
agg['price_range'] = (agg['max_price_seen'] - agg['min_price_seen']).fillna(0)
|
|
return agg
|
|
|
|
|
|
class UserAgentFeatureStep(BaseContextStep):
|
|
"""Parse userAgent into bot-detection signals."""
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame|pd.Series:
|
|
df = X.copy()
|
|
if df.empty or 'userAgent' not in df.columns:
|
|
return pd.DataFrame(columns=pd.Series(['sessionId']))
|
|
|
|
ua = df.groupby('sessionId')['userAgent'].first().reset_index()
|
|
ua['is_headless'] = ua['userAgent'].str.contains(HEADLESS_RE, na=False)
|
|
ua['is_automation'] = ua['userAgent'].str.contains(AUTOMATION_RE, na=False)
|
|
|
|
def get_browser(s):
|
|
if pd.isna(s): return 'Unknown'
|
|
for name, pat in BROWSER_PATTERNS:
|
|
if re.search(pat, s): return name
|
|
return 'Other'
|
|
ua['browser_family'] = ua['userAgent'].apply(get_browser)
|
|
return ua[['sessionId', 'is_headless', 'is_automation', 'browser_family']]
|
|
|
|
|
|
class ExtractSessionFeaturesStep(BaseContextStep):
|
|
"""
|
|
Vectorized session feature extraction - replaces O(n^2) per-row loop.
|
|
Input: interactions_df
|
|
Output: session-level feature matrix
|
|
"""
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
|
if X.empty:
|
|
return pd.DataFrame()
|
|
df = X.copy()
|
|
|
|
# run all feature steps and merge on sessionId
|
|
temporal = TemporalFeatureStep(self.context).transform(df)
|
|
behavioral = BehavioralFeatureStep(self.context).transform(df)
|
|
product = ProductFeatureStep(self.context).transform(df)
|
|
ua = UserAgentFeatureStep(self.context).transform(df)
|
|
|
|
result = temporal
|
|
for other in [behavioral, product, ua]:
|
|
if not other.empty and 'sessionId' in other.columns:
|
|
result = result.merge(other, on='sessionId', how='left')
|
|
|
|
# carry forward experimentId for label joining
|
|
if 'experimentId' in df.columns:
|
|
exp_map = df.groupby('sessionId')['experimentId'].first()
|
|
result = result.merge(exp_map, on='sessionId', how='left')
|
|
|
|
return result
|
|
|
|
|
|
class JoinLabelsStep(BaseContextStep):
|
|
"""
|
|
Join experiment labels to session features.
|
|
Input: (features_df, experiments_df) or features_df (fetches experiments)
|
|
Output: labeled feature matrix with is_agent column
|
|
"""
|
|
|
|
def transform(self, X : tuple) -> pd.DataFrame:
|
|
data = X;
|
|
if isinstance(data, tuple):
|
|
features_df, experiments_df = data
|
|
else:
|
|
features_df = data
|
|
if 'experimentId' not in features_df.columns:
|
|
return features_df
|
|
exp_ids = features_df['experimentId'].dropna().unique().tolist()
|
|
experiments_df = self.context.provider.fetch_experiments(exp_ids) if exp_ids else pd.DataFrame()
|
|
|
|
if features_df.empty:
|
|
return features_df
|
|
if experiments_df.empty:
|
|
features_df['is_agent'] = np.nan
|
|
return features_df
|
|
|
|
exp = experiments_df.copy()
|
|
if 'id' in exp.columns:
|
|
exp = exp.rename(columns={'id': 'experimentId'})
|
|
if 'xp_human_only' in exp.columns:
|
|
exp['is_agent'] = ~exp['xp_human_only']
|
|
|
|
cols = ['experimentId'] + [c for c in ['is_agent', 'xp_human_only', 'xp_market_mode'] if c in exp.columns]
|
|
return features_df.merge(exp[cols].drop_duplicates(), on='experimentId', how='left')
|
|
|
|
|
|
class ValidateDataStep(BaseContextStep):
|
|
"""
|
|
Data quality checks before training.
|
|
Input: df
|
|
Output: df (unchanged, but logs validation report to context)
|
|
"""
|
|
REQUIRED = ['sessionId', 'eventName', 'ts']
|
|
|
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
|
df = X.copy()
|
|
report = {'status': 'valid', 'rows': len(df), 'sessions': 0}
|
|
if df.empty:
|
|
report['status'] = 'empty'
|
|
self.context.cache('validation_report', report)
|
|
return df
|
|
|
|
missing = [c for c in self.REQUIRED if c not in df.columns]
|
|
if missing:
|
|
report['status'] = 'invalid'
|
|
report['missing_cols'] = missing
|
|
|
|
report['sessions'] = df['sessionId'].nunique() if 'sessionId' in df.columns else 0
|
|
report['null_sessions'] = int(df['sessionId'].isna().sum()) if 'sessionId' in df.columns else 0
|
|
if 'experimentId' in df.columns:
|
|
report['null_experiments'] = int(df['experimentId'].isna().sum())
|
|
|
|
self.context.cache('validation_report', report)
|
|
return df
|
|
|
|
|
|
# legacy compat - kept for backwards compatibility with existing code
|
|
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
|
|
"""Single-session feature extraction (legacy interface)."""
|
|
defaults = {k: 0 for k in ['total_interactions', 'page_views', 'item_views', 'searches',
|
|
'cart_adds', 'hovers', 'unique_products_viewed', 'product_view_depth',
|
|
'session_duration_sec', 'interaction_velocity',
|
|
'avg_time_between_events', 'std_time_between_events', 'cart_to_view_ratio']}
|
|
if session_df.empty:
|
|
return defaults
|
|
|
|
session_df = session_df.copy()
|
|
if 'sessionId' not in session_df.columns:
|
|
session_df['sessionId'] = 'tmp'
|
|
|
|
# use a dummy context for the steps
|
|
class DummyCtx: config = {} # should maybe inherit but whatever
|
|
ctx = DummyCtx()
|
|
|
|
t = TemporalFeatureStep(ctx, timeout_sec=session_timeout_sec).transform(session_df)
|
|
b = BehavioralFeatureStep(ctx).transform(session_df)
|
|
p = ProductFeatureStep(ctx).transform(session_df)
|
|
|
|
result = {}
|
|
for df in [t, b, p]:
|
|
if not df.empty:
|
|
for col in df.columns:
|
|
if col != 'sessionId':
|
|
result[col] = df[col].iloc[0] if len(df) > 0 else 0
|
|
|
|
remap = {'hover_events': 'hovers', 'filter_events': 'searches', 'unique_pages': 'unique_pages_visited'}
|
|
for old, new in remap.items():
|
|
if old in result:
|
|
result[new] = result.pop(old)
|
|
return result
|