32 refine data pipeline training data construction (#37)

* feature: modularized feature engineering for ml setup (new pipeline)

* chore: updating imports properly

* test: updating fixtures with ua and meta

* chore: migrating code ignore groups

* chore: syntax cleaning and code quality

* chore: fixing pipeline data compatability

* Update experiments/procesing/steps/session.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* chore: refactoring and dixing path joining

* chore: refactoring function definition to avoid reinit

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Daniel Alves Rösel
2025-12-12 12:15:15 +01:00
committed by GitHub
parent a2a443c027
commit a1916c966c
6 changed files with 316 additions and 159 deletions

View File

@@ -2,6 +2,7 @@ from sklearn.pipeline import Pipeline
import pandas as pd
from procesing.context import PipelineContext
from procesing.providers import SupabaseProvider, BackendAPIProvider
import os
from procesing.steps import (
FetchInteractionsStep,
FetchPriceLogsStep,
@@ -12,11 +13,13 @@ from procesing.steps import (
ChunkByTimeWindowStep,
ComputeDemandForChunksStep,
AggregatePriceLogsStep,
# BuildStateSpaceStep,
FitPricingFunctionStep,
PredictPricesStep,
ComputeDemandStep,
JoinProductFeaturesStep
JoinProductFeaturesStep,
ExtractSessionFeaturesStep,
JoinLabelsStep,
ValidateDataStep,
)
from procesing.pricers import SimpleSurgePricer
@@ -106,33 +109,64 @@ def full_pipeline(context: PipelineContext,
return product_features_df, optimal_prices_df
def ml_training_pipeline(context: PipelineContext) -> pd.DataFrame:
"""
Build labeled session-level feature matrix for ML model training.
Pipeline: fetch -> validate -> extract features -> join labels
Returns:
DataFrame with ~25 features per session + is_agent label
Columns: sessionId, experimentId, temporal/behavioral/product/ua features, is_agent
"""
# fetch raw interactions
interactions_df = FetchInteractionsStep(context).transform(None)
# validate data quality (report cached in context)
interactions_df = ValidateDataStep(context).transform(interactions_df)
if interactions_df.empty:
return pd.DataFrame()
# extract vectorized session features
features_df = ExtractSessionFeaturesStep(context).transform(interactions_df)
if features_df.empty:
return pd.DataFrame()
# join experiment labels (is_agent = ~xp_human_only)
labeled_df = JoinLabelsStep(context).transform(features_df)
return labeled_df
if __name__ == '__main__':
class Provider(SupabaseProvider, BackendAPIProvider):
def __init__(self, backend_url: str):
SupabaseProvider.__init__(self)
BackendAPIProvider.__init__(self, backend_url=backend_url)
class HistoricalProvider(SupabaseProvider, BackendAPIProvider):
class ExperimentsProvider(SupabaseProvider, BackendAPIProvider):
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/858c61ab-0a7f-4595-ae49-33f4365517b9/"
interactions_file = "messages(2).json"
prices_file = "messages(3).json"
base_path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" # os.path.join(os.path.dirname(__file__), "collected_data")
if not os.path.isdir(base_path):
return pd.DataFrame()
data = pd.read_json(path + (interactions_file if topic == "user-interactions" else prices_file))
data = [r['payload'] for r in data['value'].to_list()]
data = pd.DataFrame(data)
return data
files = {"user-interactions": "int.json", "price-logs": "price.json"}
file_to_read = files.get(topic, files["user-interactions"])
frames = []
for d in os.listdir(base_path):
full_path = os.path.join(base_path, d, file_to_read)
if not os.path.isfile(full_path):
continue
try:
data = pd.read_json(full_path)
payloads = pd.DataFrame([r['payload'] for r in data['value'].to_list()])
frames.append(payloads)
except Exception as e:
print(f"Warning: Could not process {full_path}: {e}")
# example run
context = PipelineContext(
provider=HistoricalProvider(),
store_mode='airline',
)
return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
product_features, prices = full_pipeline(context)
print(prices.to_string())
# demo: run ML training pipeline
context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel')
features = ml_training_pipeline(context)
print(f"Feature matrix: {features.shape}")
print(features.head())
print(features.info())

View File

@@ -6,7 +6,11 @@ from procesing.steps.chunk import ChunkByTimeWindowStep
from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep
from procesing.steps.elasticity import AggregatePriceLogsStep
from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep
from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session
from procesing.steps.session import (
ExtractSessionFeaturesStep, JoinLabelsStep, ValidateDataStep,
TemporalFeatureStep, BehavioralFeatureStep, ProductFeatureStep, UserAgentFeatureStep,
_extract_features_for_session
)
__all__ = [
'BaseContextStep',
@@ -25,5 +29,11 @@ __all__ = [
'FitPricingFunctionStep',
'PredictPricesStep',
'ExtractSessionFeaturesStep',
'JoinLabelsStep',
'ValidateDataStep',
'TemporalFeatureStep',
'BehavioralFeatureStep',
'ProductFeatureStep',
'UserAgentFeatureStep',
'_extract_features_for_session',
]

View File

@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from sklearn.base import BaseEstimator, TransformerMixin
from procesing.context import PipelineContext
from typing import Any
class BaseContextStep(BaseEstimator, TransformerMixin, ABC):
"""
@@ -16,7 +17,7 @@ class BaseContextStep(BaseEstimator, TransformerMixin, ABC):
return self
@abstractmethod
def transform(self, X):
def transform(self, X) -> Any:
"""Transform input using context. Must be implemented by subclass."""
pass

View File

@@ -7,12 +7,12 @@ class AggregatePriceLogsStep(BaseContextStep):
"""
Aggregate price logs into time windows using VECTORIZED operations.
Input: price_logs_df
Output: list of price chunks with [productId, price]
Output: DataFrame with columns [productId, price]
"""
def transform(self, price_logs_df: pd.DataFrame):
if price_logs_df.empty:
return []
return pd.DataFrame(columns=['productId', 'price'])
df = price_logs_df.copy()
ts_col = self.context.config.get('ts_col', 'ts')

View File

@@ -1,159 +1,261 @@
"""
Session feature extraction for S_t component of state space.
Computes behavioral signals from interaction data already in pipeline.
Session feature extraction for ML training pipeline.
"""
import pandas as pd
import numpy as np
from typing import Optional, Dict, Any
from collections import Counter
import re
from typing import Dict, Any
from procesing.steps.base import BaseContextStep
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
"""Compute features for single session.
Args:
session_df: interaction events for this session
session_timeout_sec: max gap between events before resetting duration (default 900s = 15min)
"""
features = {}
# basic counts
features['total_interactions'] = len(session_df)
event_counts = session_df['eventName'].value_counts().to_dict()
features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0)
features['item_views'] = event_counts.get('view_item_page', 0)
features['searches'] = event_counts.get('search', 0)
features['cart_adds'] = event_counts.get('add_item_to_cart', 0)
# hover events
hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button']
features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events)
# product-level signals
product_ids = session_df['productId'].dropna()
features['unique_products_viewed'] = product_ids.nunique()
if len(product_ids) > 0:
product_view_counts = Counter(product_ids)
features['product_view_depth'] = max(product_view_counts.values())
else:
features['product_view_depth'] = 0
# temporal features with session timeout logic
if 'ts' in session_df.columns:
timestamps = session_df['ts'].sort_values()
# compute active duration considering timeout gaps
if len(timestamps) > 1:
time_diffs = timestamps.diff().dropna().dt.total_seconds()
# only count gaps shorter than timeout towards active session duration
active_diffs = time_diffs[time_diffs <= session_timeout_sec]
features['session_duration_sec'] = active_diffs.sum() if len(active_diffs) > 0 else 0.0
features['avg_time_between_events'] = time_diffs.mean()
features['std_time_between_events'] = time_diffs.std()
else:
features['session_duration_sec'] = 0.0
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
if features['session_duration_sec'] > 0:
features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60
else:
features['interaction_velocity'] = 0.0
else:
features['session_duration_sec'] = 0.0
features['interaction_velocity'] = 0.0
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
# cart/conversion signals
features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0
return features
EVENT_CATS = {
'page_view': ['page_view'],
'item_view': ['view_item_page', 'learn_more_about_item'],
'cart_add': ['add_item_to_cart'],
'purchase': ['purchase', 'checkout_complete'],
'hover': ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'],
# 'filter': ['filter', 'search', 'apply_filter'],
}
HEADLESS_RE = re.compile(r'HeadlessChrome|Headless|PhantomJS', re.I)
AUTOMATION_RE = re.compile(r'Selenium|Playwright|Puppeteer|WebDriver|chromedriver|geckodriver', re.I)
BROWSER_PATTERNS = [('Chrome', r'Chrome/[\d.]+'), ('Firefox', r'Firefox/[\d.]+'),
('Safari', r'Safari/[\d.]+'), ('Edge', r'Edg/[\d.]+')]
def _apply_to_slice(df: pd.DataFrame) -> pd.DataFrame:
"""Apply feature extraction to sliding window of interactions."""
# add columns of all features at each step
new_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
for col in new_cols: df[col] = np.nan
for idx in range(1, len(df) + 1):
features = _extract_features_for_session(df.iloc[:idx])
# fillna kinda meh
features = { k: (v if not pd.isna(v) else 0.0) for k, v in features.items() }
for col in new_cols:
df.at[df.index[idx - 1], col] = features[col]
#print(f"Processed {idx}/{len(df)} events for session {df['sessionId'].iloc[0]}")
return df
class BuildStateSpaceStep(BaseContextStep):
"""
Build state space representation S_t from session features.
Input: session_features DataFrame
Output: state_space_df DataFrame with S_t vectors
"""
def transform(self, rich_dataset: pd.DataFrame) -> pd.DataFrame:
# check if features are present
required_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
if not all(col in rich_dataset.columns for col in required_cols):
raise ValueError("Missing required columns for feature extraction.")
if rich_dataset.empty:
return pd.DataFrame()
def _get_browser(s: str) -> str:
if pd.isna(s): return 'Unknown'
for name, pat in BROWSER_PATTERNS:
if re.search(pat, s): return name
return 'Other'
# For simplicity, we return as is
return rich_dataset.copy()
class TemporalFeatureStep(BaseContextStep):
"""Vectorized time-based features: durations, velocities, gaps."""
def __init__(self, context, timeout_sec: float = 900, velocity_window: str = '5min'):
super().__init__(context)
self.timeout_sec = timeout_sec
self.velocity_window = velocity_window
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty or 'ts' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
df['ts_dt'] = pd.to_datetime(df['ts'])
df = df.sort_values(['sessionId', 'ts_dt'])
df['time_diff'] = df.groupby('sessionId')['ts_dt'].diff().dt.total_seconds()
df['active_diff'] = df['time_diff'].where(df['time_diff'] <= self.timeout_sec, 0)
agg = df.groupby('sessionId').agg(
session_duration_sec=('active_diff', 'sum'),
total_interactions=('sessionId', 'count'),
avg_time_between_events=('time_diff', 'mean'),
std_time_between_events=('time_diff', 'std'),
min_time_between_events=('time_diff', 'min'),
session_start_hour=('ts_dt', lambda x: x.min().hour),
).reset_index()
agg['std_time_between_events'] = agg['std_time_between_events'].fillna(0)
agg['interaction_velocity'] = np.where(
agg['session_duration_sec'] > 0,
(agg['total_interactions'] / agg['session_duration_sec']) * 60, 0)
vel = df.set_index('ts_dt').groupby('sessionId').resample(self.velocity_window, include_groups=False).size()
max_velocity = vel.groupby('sessionId').max().rename('max_velocity_5min')
agg = agg.merge(max_velocity, on='sessionId', how='left')
agg['max_velocity_5min'] = agg['max_velocity_5min'].fillna(0)
return agg
class BehavioralFeatureStep(BaseContextStep):
"""Vectorized event counts and ratios per session."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty or 'eventName' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
for cat, events in EVENT_CATS.items():
df[f'is_{cat}'] = df['eventName'].isin(events)
df['is_hover'] = df['is_hover'] | df['eventName'].str.startswith('hover_over_')
agg = df.groupby('sessionId').agg(
total_events=('eventName', 'count'), unique_pages=('page', 'nunique'),
page_views=('is_page_view', 'sum'), item_views=('is_item_view', 'sum'),
cart_adds=('is_cart_add', 'sum'), purchases=('is_purchase', 'sum'),
hover_events=('is_hover', 'sum'),
# filter_events=('is_filter', 'sum'),
).reset_index()
agg['cart_to_view_ratio'] = np.where(agg['item_views'] > 0, agg['cart_adds'] / agg['item_views'], 0)
agg['conversion_rate'] = np.where(agg['item_views'] > 0, agg['purchases'] / agg['item_views'], 0)
agg['hover_intensity'] = np.where(agg['total_events'] > 0, agg['hover_events'] / agg['total_events'], 0)
return agg
class ProductFeatureStep(BaseContextStep):
"""Vectorized product interaction features: diversity, depth, price sensitivity."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
if df.empty:
return pd.DataFrame(columns=pd.Series(['sessionId']))
price_col = next((c for c in ['metadata_base_price', 'metadata_price', 'base_price'] if c in df.columns), None)
df['price_seen'] = pd.to_numeric(df[price_col], errors='coerce') if price_col else np.nan
prod_df = df[df['productId'].notna()]
if prod_df.empty:
return pd.DataFrame(columns=pd.Series(['sessionId', 'unique_products_viewed', 'product_view_depth', 'avg_price_seen', 'min_price_seen', 'max_price_seen', 'price_range']))
agg = prod_df.groupby('sessionId').agg(
unique_products_viewed=('productId', 'nunique'),
product_view_depth=('productId', lambda x: x.value_counts().iloc[0] if len(x) > 0 else 0),
avg_price_seen=('price_seen', 'mean'), min_price_seen=('price_seen', 'min'),
max_price_seen=('price_seen', 'max'),
).reset_index()
agg['price_range'] = (agg['max_price_seen'] - agg['min_price_seen']).fillna(0)
return agg
class UserAgentFeatureStep(BaseContextStep):
"""Parse userAgent into bot-detection signals."""
def transform(self, X: pd.DataFrame) -> pd.DataFrame|pd.Series:
df = X.copy()
if df.empty or 'userAgent' not in df.columns:
return pd.DataFrame(columns=pd.Series(['sessionId']))
ua = df.groupby('sessionId')['userAgent'].first().reset_index()
ua['is_headless'] = ua['userAgent'].str.contains(HEADLESS_RE, na=False)
ua['is_automation'] = ua['userAgent'].str.contains(AUTOMATION_RE, na=False)
ua['browser_family'] = ua['userAgent'].apply(_get_browser)
return ua[['sessionId', 'is_headless', 'is_automation', 'browser_family']]
class ExtractSessionFeaturesStep(BaseContextStep):
"""
Extract session-level behavioral features from interaction logs.
Input: interactions_df (user-interactions from earlier pipeline step)
Output: interactions_df with added session feature columns
Vectorized session feature extraction - replaces O(n^2) per-row loop.
Input: interactions_df
Output: session-level feature matrix
"""
def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame:
if interactions_df.empty:
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
if X.empty:
return pd.DataFrame()
df = X.copy()
# ensure timestamp column
if 'ts' in interactions_df.columns:
interactions_df = interactions_df.copy()
interactions_df['ts'] = pd.to_datetime(interactions_df['ts'])
# run all feature steps and merge on sessionId
temporal = TemporalFeatureStep(self.context).transform(df)
behavioral = BehavioralFeatureStep(self.context).transform(df)
product = ProductFeatureStep(self.context).transform(df)
ua = UserAgentFeatureStep(self.context).transform(df)
# group by session and compute features
session_features = []
for session_id, session_df in interactions_df.groupby('sessionId'):
new_slice = _apply_to_slice(session_df.sort_values('ts'))
session_features.append(new_slice)
result = temporal
for other in [behavioral, product, ua]:
if not other.empty and 'sessionId' in other.columns:
result = result.merge(other, on='sessionId', how='left')
return pd.concat(session_features, ignore_index=True)
# carry forward experimentId for label joining
if 'experimentId' in df.columns:
exp_map = df.groupby('sessionId')['experimentId'].first()
result = result.merge(exp_map, on='sessionId', how='left')
return result
class FilterSessionInteractionsStep(BaseContextStep):
class JoinLabelsStep(BaseContextStep):
"""
Filter interactions DataFrame to specific session.
Input: (interactions_df, session_id)
Output: interactions_df filtered to session_id
Join experiment labels to session features.
Input: (features_df, experiments_df) or features_df (fetches experiments)
Output: labeled feature matrix with is_agent column
"""
def transform(self, data: tuple) -> pd.DataFrame:
interactions_df, session_id = data
return interactions_df[interactions_df['sessionId'] == session_id].copy()
def transform(self, X : tuple) -> pd.DataFrame:
data = X;
if isinstance(data, tuple):
features_df, experiments_df = data
else:
features_df = data
if 'experimentId' not in features_df.columns:
return features_df
exp_ids = features_df['experimentId'].dropna().unique().tolist()
experiments_df = self.context.provider.fetch_experiments(exp_ids) if exp_ids else pd.DataFrame()
if features_df.empty:
return features_df
if experiments_df.empty:
features_df['is_agent'] = np.nan
return features_df
exp = experiments_df.copy()
if 'id' in exp.columns:
exp = exp.rename(columns={'id': 'experimentId'})
if 'xp_human_only' in exp.columns:
exp['is_agent'] = ~exp['xp_human_only']
cols = ['experimentId'] + [c for c in ['is_agent', 'xp_human_only', 'xp_market_mode'] if c in exp.columns]
return features_df.merge(exp[cols].drop_duplicates(), on='experimentId', how='left')
class ValidateDataStep(BaseContextStep):
"""
Data quality checks before training.
Input: df
Output: df (unchanged, but logs validation report to context)
"""
REQUIRED = ['sessionId', 'eventName', 'ts']
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
df = X.copy()
report = {'status': 'valid', 'rows': len(df), 'sessions': 0}
if df.empty:
report['status'] = 'empty'
self.context.cache('validation_report', report)
return df
missing = [c for c in self.REQUIRED if c not in df.columns]
if missing:
report['status'] = 'invalid'
report['missing_cols'] = missing
report['sessions'] = df['sessionId'].nunique() if 'sessionId' in df.columns else 0
report['null_sessions'] = int(df['sessionId'].isna().sum()) if 'sessionId' in df.columns else 0
if 'experimentId' in df.columns:
report['null_experiments'] = int(df['experimentId'].isna().sum())
self.context.cache('validation_report', report)
return df
# legacy compat - kept for backwards compatibility with existing code
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
"""Single-session feature extraction (legacy interface)."""
defaults = {k: 0 for k in ['total_interactions', 'page_views', 'item_views', 'searches',
'cart_adds', 'hovers', 'unique_products_viewed', 'product_view_depth',
'session_duration_sec', 'interaction_velocity',
'avg_time_between_events', 'std_time_between_events', 'cart_to_view_ratio']}
if session_df.empty:
return defaults
session_df = session_df.copy()
if 'sessionId' not in session_df.columns:
session_df['sessionId'] = 'tmp'
# use a dummy context for the steps
class DummyCtx: config = {} # should maybe inherit but whatever
ctx = DummyCtx()
t = TemporalFeatureStep(ctx, timeout_sec=session_timeout_sec).transform(session_df)
b = BehavioralFeatureStep(ctx).transform(session_df)
p = ProductFeatureStep(ctx).transform(session_df)
result = {}
for df in [t, b, p]:
if not df.empty:
for col in df.columns:
if col != 'sessionId':
result[col] = df[col].iloc[0] if len(df) > 0 else 0
remap = {'hover_events': 'hovers', 'filter_events': 'searches', 'unique_pages': 'unique_pages_visited'}
for old, new in remap.items():
if old in result:
result[new] = result.pop(old)
return result

View File

@@ -269,3 +269,13 @@ def empty_context(empty_provider):
store_mode='hotel',
window_size='30s'
)
@pytest.fixture
def session_interactions(mock_interactions):
"""Enriched interaction data for session feature extraction tests"""
df = mock_interactions.copy()
df['userAgent'] = ['Mozilla/5.0 Chrome/120', 'Mozilla/5.0 Chrome/120',
'HeadlessChrome/120', 'HeadlessChrome/120', 'HeadlessChrome/120']
df['metadata_base_price'] = [None, None, 150.0, 150.0, 200.0]
return df