From 3ed473ae8b5d3ef16ec7ca1eb68438add5bf284d Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 12 Dec 2025 11:29:52 +0100 Subject: [PATCH] feature: modularized feature engineering for ml setup (new pipeline) --- experiments/procesing/pipelines.py | 76 ++++-- experiments/procesing/steps/session.py | 355 ++++++++++++++++--------- 2 files changed, 276 insertions(+), 155 deletions(-) diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index 6837be9..4a98f5b 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -2,6 +2,7 @@ from sklearn.pipeline import Pipeline import pandas as pd from procesing.context import PipelineContext from procesing.providers import SupabaseProvider, BackendAPIProvider +import os from procesing.steps import ( FetchInteractionsStep, FetchPriceLogsStep, @@ -12,11 +13,13 @@ from procesing.steps import ( ChunkByTimeWindowStep, ComputeDemandForChunksStep, AggregatePriceLogsStep, - # BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep, ComputeDemandStep, - JoinProductFeaturesStep + JoinProductFeaturesStep, + ExtractSessionFeaturesStep, + JoinLabelsStep, + ValidateDataStep, ) from procesing.pricers import SimpleSurgePricer @@ -106,33 +109,54 @@ def full_pipeline(context: PipelineContext, return product_features_df, optimal_prices_df +def ml_training_pipeline(context: PipelineContext) -> pd.DataFrame: + """ + Build labeled session-level feature matrix for ML model training. + Pipeline: fetch -> validate -> extract features -> join labels + + Returns: + DataFrame with ~25 features per session + is_agent label + Columns: sessionId, experimentId, temporal/behavioral/product/ua features, is_agent + """ + # fetch raw interactions + interactions_df = FetchInteractionsStep(context).transform(None) + + # validate data quality (report cached in context) + interactions_df = ValidateDataStep(context).transform(interactions_df) + if interactions_df.empty: + return pd.DataFrame() + + # extract vectorized session features + features_df = ExtractSessionFeaturesStep(context).transform(interactions_df) + if features_df.empty: + return pd.DataFrame() + + # join experiment labels (is_agent = ~xp_human_only) + labeled_df = JoinLabelsStep(context).transform(features_df) + + return labeled_df + + if __name__ == '__main__': - class Provider(SupabaseProvider, BackendAPIProvider): - def __init__(self, backend_url: str): - SupabaseProvider.__init__(self) - BackendAPIProvider.__init__(self, backend_url=backend_url) - - - class HistoricalProvider(SupabaseProvider, BackendAPIProvider): + class ExperimentsProvider(SupabaseProvider, BackendAPIProvider): def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: - path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/858c61ab-0a7f-4595-ae49-33f4365517b9/" - interactions_file = "messages(2).json" - prices_file = "messages(3).json" + path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" + subdirs = os.listdir(path) + full_df = pd.DataFrame() + files = {"user-interactions": "int.json", "price-logs": "price.json"} + for d in subdirs: + path += d + "/" + data = pd.read_json(path + files.get(topic, files["user-interactions"])) + data = pd.DataFrame([r['payload'] for r in data['value'].to_list()]) + full_df = pd.concat([full_df, data], ignore_index=True) + return full_df - data = pd.read_json(path + (interactions_file if topic == "user-interactions" else prices_file)) - data = [r['payload'] for r in data['value'].to_list()] - data = pd.DataFrame(data) - return data - - - # example run - context = PipelineContext( - provider=HistoricalProvider(), - store_mode='airline', - ) - - product_features, prices = full_pipeline(context) - print(prices.to_string()) + # demo: run ML training pipeline + context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel') + features = ml_training_pipeline(context) + print(f"Feature matrix: {features.shape}") + print(features.head()) + print(features.info()) diff --git a/experiments/procesing/steps/session.py b/experiments/procesing/steps/session.py index 1dd5992..f445dea 100644 --- a/experiments/procesing/steps/session.py +++ b/experiments/procesing/steps/session.py @@ -1,159 +1,256 @@ """ -Session feature extraction for S_t component of state space. -Computes behavioral signals from interaction data already in pipeline. +Session feature extraction for ML training pipeline. """ import pandas as pd import numpy as np -from typing import Optional, Dict, Any -from collections import Counter +import re +from typing import Dict, Any from procesing.steps.base import BaseContextStep -def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]: - """Compute features for single session. - - Args: - session_df: interaction events for this session - session_timeout_sec: max gap between events before resetting duration (default 900s = 15min) - """ - features = {} - - # basic counts - features['total_interactions'] = len(session_df) - - event_counts = session_df['eventName'].value_counts().to_dict() - features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) - features['item_views'] = event_counts.get('view_item_page', 0) - features['searches'] = event_counts.get('search', 0) - features['cart_adds'] = event_counts.get('add_item_to_cart', 0) - - # hover events - hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'] - features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events) - - # product-level signals - product_ids = session_df['productId'].dropna() - features['unique_products_viewed'] = product_ids.nunique() - - if len(product_ids) > 0: - product_view_counts = Counter(product_ids) - features['product_view_depth'] = max(product_view_counts.values()) - else: - features['product_view_depth'] = 0 - - # temporal features with session timeout logic - if 'ts' in session_df.columns: - timestamps = session_df['ts'].sort_values() - - # compute active duration considering timeout gaps - if len(timestamps) > 1: - time_diffs = timestamps.diff().dropna().dt.total_seconds() - # only count gaps shorter than timeout towards active session duration - active_diffs = time_diffs[time_diffs <= session_timeout_sec] - features['session_duration_sec'] = active_diffs.sum() if len(active_diffs) > 0 else 0.0 - - features['avg_time_between_events'] = time_diffs.mean() - features['std_time_between_events'] = time_diffs.std() - else: - features['session_duration_sec'] = 0.0 - features['avg_time_between_events'] = 0.0 - features['std_time_between_events'] = 0.0 - - if features['session_duration_sec'] > 0: - features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60 - else: - features['interaction_velocity'] = 0.0 - else: - features['session_duration_sec'] = 0.0 - features['interaction_velocity'] = 0.0 - features['avg_time_between_events'] = 0.0 - features['std_time_between_events'] = 0.0 - - # cart/conversion signals - features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0 - - return features +EVENT_CATS = { + 'page_view': ['page_view'], + 'item_view': ['view_item_page', 'learn_more_about_item'], + 'cart_add': ['add_item_to_cart'], + 'purchase': ['purchase', 'checkout_complete'], + 'hover': ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'], + # 'filter': ['filter', 'search', 'apply_filter'], +} +HEADLESS_RE = re.compile(r'HeadlessChrome|Headless|PhantomJS', re.I) +AUTOMATION_RE = re.compile(r'Selenium|Playwright|Puppeteer|WebDriver|chromedriver|geckodriver', re.I) +BROWSER_PATTERNS = [('Chrome', r'Chrome/[\d.]+'), ('Firefox', r'Firefox/[\d.]+'), + ('Safari', r'Safari/[\d.]+'), ('Edge', r'Edg/[\d.]+')] -def _apply_to_slice(df: pd.DataFrame) -> pd.DataFrame: - """Apply feature extraction to sliding window of interactions.""" - # add columns of all features at each step - new_cols = ["total_interactions", "page_views", "item_views", "searches", - "cart_adds", "hovers", "unique_products_viewed", "product_view_depth", - "session_duration_sec", "interaction_velocity", - "avg_time_between_events", "std_time_between_events", - "cart_to_view_ratio"] - for col in new_cols: df[col] = np.nan - for idx in range(1, len(df) + 1): - features = _extract_features_for_session(df.iloc[:idx]) - # fillna kinda meh - features = { k: (v if not pd.isna(v) else 0.0) for k, v in features.items() } - for col in new_cols: - df.at[df.index[idx - 1], col] = features[col] - #print(f"Processed {idx}/{len(df)} events for session {df['sessionId'].iloc[0]}") - return df +class TemporalFeatureStep(BaseContextStep): + """Vectorized time-based features: durations, velocities, gaps.""" -class BuildStateSpaceStep(BaseContextStep): - """ - Build state space representation S_t from session features. + def __init__(self, context, timeout_sec: float = 900, velocity_window: str = '5min'): + super().__init__(context) + self.timeout_sec = timeout_sec + self.velocity_window = velocity_window - Input: session_features DataFrame - Output: state_space_df DataFrame with S_t vectors - """ + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty or 'ts' not in df.columns: + return pd.DataFrame(columns=['sessionId']) - def transform(self, rich_dataset: pd.DataFrame) -> pd.DataFrame: - # check if features are present - required_cols = ["total_interactions", "page_views", "item_views", "searches", - "cart_adds", "hovers", "unique_products_viewed", "product_view_depth", - "session_duration_sec", "interaction_velocity", - "avg_time_between_events", "std_time_between_events", - "cart_to_view_ratio"] - if not all(col in rich_dataset.columns for col in required_cols): - raise ValueError("Missing required columns for feature extraction.") - if rich_dataset.empty: - return pd.DataFrame() + df = df.copy() + df['ts_dt'] = pd.to_datetime(df['ts']) + df = df.sort_values(['sessionId', 'ts_dt']) + df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds() + df['active_diff'] = df['time_diff'].where(df['time_diff'] <= self.timeout_sec, 0) + + agg = df.groupby('sessionId').agg( + session_duration_sec=('active_diff', 'sum'), + total_interactions=('sessionId', 'count'), + avg_time_between_events=('time_diff', 'mean'), + std_time_between_events=('time_diff', 'std'), + min_time_between_events=('time_diff', 'min'), + session_start_hour=('ts_dt', lambda x: x.min().hour), + ).reset_index() + agg['std_time_between_events'] = agg['std_time_between_events'].fillna(0) + agg['interaction_velocity'] = np.where( + agg['session_duration_sec'] > 0, + (agg['total_interactions'] / agg['session_duration_sec']) * 60, 0) + + vel = df.set_index('ts_dt').groupby('sessionId').resample(self.velocity_window).size() + agg = agg.merge(vel.groupby('sessionId').max().rename('max_velocity_5min'), + on='sessionId', how='left').fillna({'max_velocity_5min': 0}) + return agg - # For simplicity, we return as is - return rich_dataset.copy() +class BehavioralFeatureStep(BaseContextStep): + """Vectorized event counts and ratios per session.""" + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty or 'eventName' not in df.columns: + return pd.DataFrame(columns=['sessionId']) + + df = df.copy() + for cat, events in EVENT_CATS.items(): + df[f'is_{cat}'] = df['eventName'].isin(events) + df['is_hover'] = df['is_hover'] | df['eventName'].str.startswith('hover_over_') + + agg = df.groupby('sessionId').agg( + total_events=('eventName', 'count'), unique_pages=('page', 'nunique'), + page_views=('is_page_view', 'sum'), item_views=('is_item_view', 'sum'), + cart_adds=('is_cart_add', 'sum'), purchases=('is_purchase', 'sum'), + hover_events=('is_hover', 'sum'), + # filter_events=('is_filter', 'sum'), + ).reset_index() + agg['cart_to_view_ratio'] = np.where(agg['item_views'] > 0, agg['cart_adds'] / agg['item_views'], 0) + agg['conversion_rate'] = np.where(agg['item_views'] > 0, agg['purchases'] / agg['item_views'], 0) + agg['hover_intensity'] = np.where(agg['total_events'] > 0, agg['hover_events'] / agg['total_events'], 0) + return agg +class ProductFeatureStep(BaseContextStep): + """Vectorized product interaction features: diversity, depth, price sensitivity.""" + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty: + return pd.DataFrame(columns=['sessionId']) + + df = df.copy() + price_col = next((c for c in ['metadata_base_price', 'metadata_price', 'base_price'] if c in df.columns), None) + df['price_seen'] = pd.to_numeric(df[price_col], errors='coerce') if price_col else np.nan + + prod_df = df[df['productId'].notna()] + if prod_df.empty: + return pd.DataFrame(columns=pd.Series(['sessionId', 'unique_products_viewed', 'product_view_depth', 'avg_price_seen', 'min_price_seen', 'max_price_seen', 'price_range'])) + + agg = prod_df.groupby('sessionId').agg( + unique_products_viewed=('productId', 'nunique'), + product_view_depth=('productId', lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0), + avg_price_seen=('price_seen', 'mean'), min_price_seen=('price_seen', 'min'), + max_price_seen=('price_seen', 'max'), + ).reset_index() + agg['price_range'] = (agg['max_price_seen'] - agg['min_price_seen']).fillna(0) + return agg + + +class UserAgentFeatureStep(BaseContextStep): + """Parse userAgent into bot-detection signals.""" + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty or 'userAgent' not in df.columns: + return pd.DataFrame(columns=['sessionId']) + + ua = df.groupby('sessionId')['userAgent'].first().reset_index() + ua['is_headless'] = ua['userAgent'].str.contains(HEADLESS_RE, na=False) + ua['is_automation'] = ua['userAgent'].str.contains(AUTOMATION_RE, na=False) + + def get_browser(s): + if pd.isna(s): return 'Unknown' + for name, pat in BROWSER_PATTERNS: + if re.search(pat, s): return name + return 'Other' + ua['browser_family'] = ua['userAgent'].apply(get_browser) + return ua[['sessionId', 'is_headless', 'is_automation', 'browser_family']] class ExtractSessionFeaturesStep(BaseContextStep): """ - Extract session-level behavioral features from interaction logs. - - Input: interactions_df (user-interactions from earlier pipeline step) - Output: interactions_df with added session feature columns + Vectorized session feature extraction - replaces O(n^2) per-row loop. + Input: interactions_df + Output: session-level feature matrix """ - def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame: - if interactions_df.empty: + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty: return pd.DataFrame() - # ensure timestamp column - if 'ts' in interactions_df.columns: - interactions_df = interactions_df.copy() - interactions_df['ts'] = pd.to_datetime(interactions_df['ts']) + # run all feature steps and merge on sessionId + temporal = TemporalFeatureStep(self.context).transform(df) + behavioral = BehavioralFeatureStep(self.context).transform(df) + product = ProductFeatureStep(self.context).transform(df) + ua = UserAgentFeatureStep(self.context).transform(df) - # group by session and compute features - session_features = [] - for session_id, session_df in interactions_df.groupby('sessionId'): - new_slice = _apply_to_slice(session_df.sort_values('ts')) - session_features.append(new_slice) + result = temporal + for other in [behavioral, product, ua]: + if not other.empty and 'sessionId' in other.columns: + result = result.merge(other, on='sessionId', how='left') - return pd.concat(session_features, ignore_index=True) + # carry forward experimentId for label joining + if 'experimentId' in df.columns: + exp_map = df.groupby('sessionId')['experimentId'].first() + result = result.merge(exp_map, on='sessionId', how='left') + + return result - -class FilterSessionInteractionsStep(BaseContextStep): +class JoinLabelsStep(BaseContextStep): """ - Filter interactions DataFrame to specific session. - - Input: (interactions_df, session_id) - Output: interactions_df filtered to session_id + Join experiment labels to session features. + Input: (features_df, experiments_df) or features_df (fetches experiments) + Output: labeled feature matrix with is_agent column """ - def transform(self, data: tuple) -> pd.DataFrame: - interactions_df, session_id = data - return interactions_df[interactions_df['sessionId'] == session_id].copy() + def transform(self, data) -> pd.DataFrame: + if isinstance(data, tuple): + features_df, experiments_df = data + else: + features_df = data + if 'experimentId' not in features_df.columns: + return features_df + exp_ids = features_df['experimentId'].dropna().unique().tolist() + experiments_df = self.context.provider.fetch_experiments(exp_ids) if exp_ids else pd.DataFrame() + + if features_df.empty: + return features_df + if experiments_df.empty: + features_df['is_agent'] = np.nan + return features_df + + exp = experiments_df.copy() + if 'id' in exp.columns: + exp = exp.rename(columns={'id': 'experimentId'}) + if 'xp_human_only' in exp.columns: + exp['is_agent'] = ~exp['xp_human_only'] + + cols = ['experimentId'] + [c for c in ['is_agent', 'xp_human_only', 'xp_market_mode'] if c in exp.columns] + return features_df.merge(exp[cols].drop_duplicates(), on='experimentId', how='left') + + +class ValidateDataStep(BaseContextStep): + """ + Data quality checks before training. + Input: df + Output: df (unchanged, but logs validation report to context) + """ + REQUIRED = ['sessionId', 'eventName', 'ts'] + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + report = {'status': 'valid', 'rows': len(df), 'sessions': 0} + if df.empty: + report['status'] = 'empty' + self.context.cache('validation_report', report) + return df + + missing = [c for c in self.REQUIRED if c not in df.columns] + if missing: + report['status'] = 'invalid' + report['missing_cols'] = missing + + report['sessions'] = df['sessionId'].nunique() if 'sessionId' in df.columns else 0 + report['null_sessions'] = int(df['sessionId'].isna().sum()) if 'sessionId' in df.columns else 0 + if 'experimentId' in df.columns: + report['null_experiments'] = int(df['experimentId'].isna().sum()) + + self.context.cache('validation_report', report) + return df + + +# legacy compat - kept for backwards compatibility with existing code +def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]: + """Single-session feature extraction (legacy interface).""" + defaults = {k: 0 for k in ['total_interactions', 'page_views', 'item_views', 'searches', + 'cart_adds', 'hovers', 'unique_products_viewed', 'product_view_depth', + 'session_duration_sec', 'interaction_velocity', + 'avg_time_between_events', 'std_time_between_events', 'cart_to_view_ratio']} + if session_df.empty: + return defaults + + session_df = session_df.copy() + if 'sessionId' not in session_df.columns: + session_df['sessionId'] = 'tmp' + + # use a dummy context for the steps + class DummyCtx: config = {} + ctx = DummyCtx() + + t = TemporalFeatureStep(ctx, timeout_sec=session_timeout_sec).transform(session_df) + b = BehavioralFeatureStep(ctx).transform(session_df) + p = ProductFeatureStep(ctx).transform(session_df) + + result = {} + for df in [t, b, p]: + if not df.empty: + for col in df.columns: + if col != 'sessionId': + result[col] = df[col].iloc[0] if len(df) > 0 else 0 + + remap = {'hover_events': 'hovers', 'filter_events': 'searches', 'unique_pages': 'unique_pages_visited'} + for old, new in remap.items(): + if old in result: + result[new] = result.pop(old) + return result