Files
PHANTOM/experiments/procesing/steps/session.py

160 lines
6.3 KiB
Python

"""
Session feature extraction for S_t component of state space.
Computes behavioral signals from interaction data already in pipeline.
"""
import pandas as pd
import numpy as np
from typing import Optional, Dict, Any
from collections import Counter
from procesing.steps.base import BaseContextStep
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
"""Compute features for single session.
Args:
session_df: interaction events for this session
session_timeout_sec: max gap between events before resetting duration (default 900s = 15min)
"""
features = {}
# basic counts
features['total_interactions'] = len(session_df)
event_counts = session_df['eventName'].value_counts().to_dict()
features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0)
features['item_views'] = event_counts.get('view_item_page', 0)
features['searches'] = event_counts.get('search', 0)
features['cart_adds'] = event_counts.get('add_item_to_cart', 0)
# hover events
hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button']
features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events)
# product-level signals
product_ids = session_df['productId'].dropna()
features['unique_products_viewed'] = product_ids.nunique()
if len(product_ids) > 0:
product_view_counts = Counter(product_ids)
features['product_view_depth'] = max(product_view_counts.values())
else:
features['product_view_depth'] = 0
# temporal features with session timeout logic
if 'ts' in session_df.columns:
timestamps = session_df['ts'].sort_values()
# compute active duration considering timeout gaps
if len(timestamps) > 1:
time_diffs = timestamps.diff().dropna().dt.total_seconds()
# only count gaps shorter than timeout towards active session duration
active_diffs = time_diffs[time_diffs <= session_timeout_sec]
features['session_duration_sec'] = active_diffs.sum() if len(active_diffs) > 0 else 0.0
features['avg_time_between_events'] = time_diffs.mean()
features['std_time_between_events'] = time_diffs.std()
else:
features['session_duration_sec'] = 0.0
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
if features['session_duration_sec'] > 0:
features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60
else:
features['interaction_velocity'] = 0.0
else:
features['session_duration_sec'] = 0.0
features['interaction_velocity'] = 0.0
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
# cart/conversion signals
features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0
return features
def _apply_to_slice(df: pd.DataFrame) -> pd.DataFrame:
"""Apply feature extraction to sliding window of interactions."""
# add columns of all features at each step
new_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
for col in new_cols: df[col] = np.nan
for idx in range(1, len(df) + 1):
features = _extract_features_for_session(df.iloc[:idx])
# fillna kinda meh
features = { k: (v if not pd.isna(v) else 0.0) for k, v in features.items() }
for col in new_cols:
df.at[df.index[idx - 1], col] = features[col]
#print(f"Processed {idx}/{len(df)} events for session {df['sessionId'].iloc[0]}")
return df
class BuildStateSpaceStep(BaseContextStep):
"""
Build state space representation S_t from session features.
Input: session_features DataFrame
Output: state_space_df DataFrame with S_t vectors
"""
def transform(self, rich_dataset: pd.DataFrame) -> pd.DataFrame:
# check if features are present
required_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
if not all(col in rich_dataset.columns for col in required_cols):
raise ValueError("Missing required columns for feature extraction.")
if rich_dataset.empty:
return pd.DataFrame()
# For simplicity, we return as is
return rich_dataset.copy()
class ExtractSessionFeaturesStep(BaseContextStep):
"""
Extract session-level behavioral features from interaction logs.
Input: interactions_df (user-interactions from earlier pipeline step)
Output: interactions_df with added session feature columns
"""
def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame:
if interactions_df.empty:
return pd.DataFrame()
# ensure timestamp column
if 'ts' in interactions_df.columns:
interactions_df = interactions_df.copy()
interactions_df['ts'] = pd.to_datetime(interactions_df['ts'])
# group by session and compute features
session_features = []
for session_id, session_df in interactions_df.groupby('sessionId'):
new_slice = _apply_to_slice(session_df.sort_values('ts'))
session_features.append(new_slice)
return pd.concat(session_features, ignore_index=True)
class FilterSessionInteractionsStep(BaseContextStep):
"""
Filter interactions DataFrame to specific session.
Input: (interactions_df, session_id)
Output: interactions_df filtered to session_id
"""
def transform(self, data: tuple) -> pd.DataFrame:
interactions_df, session_id = data
return interactions_df[interactions_df['sessionId'] == session_id].copy()