From 955102090df9a7a077234b8d651c54b10c1bae17 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sat, 29 Nov 2025 22:28:40 +0100 Subject: [PATCH] feat: introduced cumulative features step for state definition --- experiments/procesing/__init__.py | 8 +- experiments/procesing/pipelines.py | 2 +- experiments/procesing/steps/__init__.py | 8 +- experiments/procesing/steps/fetch.py | 2 + experiments/procesing/steps/pricing.py | 113 ++------------- experiments/procesing/steps/session.py | 183 +++++++++++++++--------- 6 files changed, 135 insertions(+), 181 deletions(-) diff --git a/experiments/procesing/__init__.py b/experiments/procesing/__init__.py index 2ea5d56..c5f4051 100644 --- a/experiments/procesing/__init__.py +++ b/experiments/procesing/__init__.py @@ -13,8 +13,8 @@ from procesing.steps import ( ComputeDemandForChunksStep, AggregatePriceLogsStep, ComputeElasticityStep, - StateSpace, - BuildStateSpaceStep, + # StateSpace, + # BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep, ) @@ -43,8 +43,8 @@ __all__ = [ 'ComputeDemandForChunksStep', 'AggregatePriceLogsStep', 'ComputeElasticityStep', - 'StateSpace', - 'BuildStateSpaceStep', + # 'StateSpace', + # 'BuildStateSpaceStep', 'FitPricingFunctionStep', 'PredictPricesStep', 'interaction_extraction_pipeline', diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index cae639f..3b86654 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -14,7 +14,7 @@ from procesing.steps import ( ComputeDemandForChunksStep, AggregatePriceLogsStep, ComputeElasticityStep, - BuildStateSpaceStep, + # BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep, ) diff --git a/experiments/procesing/steps/__init__.py b/experiments/procesing/steps/__init__.py index 6fa8779..d982269 100755 --- a/experiments/procesing/steps/__init__.py +++ b/experiments/procesing/steps/__init__.py @@ -5,7 +5,9 @@ from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesSte from procesing.steps.chunk import ChunkByTimeWindowStep from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep -from procesing.steps.pricing import StateSpace, BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep +from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep +from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session +# StateSpace, BuildStateSpaceStep, __all__ = [ 'BaseContextStep', @@ -20,8 +22,8 @@ __all__ = [ 'ComputeDemandForChunksStep', 'AggregatePriceLogsStep', 'ComputeElasticityStep', - 'StateSpace', - 'BuildStateSpaceStep', 'FitPricingFunctionStep', 'PredictPricesStep', + 'ExtractSessionFeaturesStep', + '_extract_features_for_session', ] diff --git a/experiments/procesing/steps/fetch.py b/experiments/procesing/steps/fetch.py index cde2b55..8d9c311 100755 --- a/experiments/procesing/steps/fetch.py +++ b/experiments/procesing/steps/fetch.py @@ -17,6 +17,8 @@ class FetchInteractionsStep(BaseContextStep): ) df = df.dropna(subset=['eventName']) + # drop all where page has /admin/ + df = df[~df['page'].str.contains('/admin/', na=False)] # Remap dateIndex if present if 'metadata_dateIndex' in df.columns: diff --git a/experiments/procesing/steps/pricing.py b/experiments/procesing/steps/pricing.py index 6439474..3d2fa5d 100755 --- a/experiments/procesing/steps/pricing.py +++ b/experiments/procesing/steps/pricing.py @@ -2,114 +2,19 @@ import numpy as np import pandas as pd from typing import Optional, List, Dict, Any from dataclasses import dataclass, field -from experiments.procesing.pricers.simple import StaticPricer +from procesing.pricers.simple import StaticPricer from procesing.steps.base import BaseContextStep from procesing.pricers import ElasticityBasedPricer -@dataclass -class StateSpace: - """ - State representation for pricing functions. +class State: + def __init__(self, + last_action : str, + last_productId : str, + last_price : float, + session_features : np.ndarray + ): + pass - Components: - Q_t: demand ∈ R^n (current demand signal per product) - P_t: prices ∈ R^n (current/base prices) - S_t: session_features (behavioral signals, interaction data) - H_t: history = {Q_{t-k}, P_{t-k}, S_{t-k}} for k in [1, history_length] - - Additionally stores: - - product_ids: product identifiers (n,) - - elasticity: price elasticity per product (n,) - - metadata: arbitrary context (experiment_id, timestamp, etc.) - """ - demand: np.ndarray # Q_t ∈ R^n - prices: np.ndarray # P_t ∈ R^n - session_features: pd.DataFrame = field(default_factory=pd.DataFrame) # S_t - - # augmented state components - product_ids: Optional[np.ndarray] = None - elasticity: Optional[np.ndarray] = None - - # historical trajectory H_t = {(Q_{t-k}, P_{t-k}, S_{t-k})} - history: List[Dict[str, Any]] = field(default_factory=list) - - # metadata for context - metadata: Dict[str, Any] = field(default_factory=dict) - - def __post_init__(self): - """Validate dimensions.""" - n = len(self.demand) - assert len(self.prices) == n, "demand and prices must have same dimension" - if self.elasticity is not None: - assert len(self.elasticity) == n, "elasticity must match dimension" - if self.product_ids is not None: - assert len(self.product_ids) == n, "product_ids must match dimension" - - @property - def n_products(self) -> int: - """Number of products in state space.""" - return len(self.demand) - - def add_history(self, q: np.ndarray, p: np.ndarray, s: pd.DataFrame, max_length: int = 10): - """Append historical state to trajectory H_t.""" - self.history.append({'demand': q, 'prices': p, 'session_features': s}) - if len(self.history) > max_length: - self.history.pop(0) - - def get_history_window(self, k: int = 5) -> List[Dict[str, Any]]: - """Retrieve last k historical states.""" - return self.history[-k:] if len(self.history) >= k else self.history - - -class BuildStateSpaceStep(BaseContextStep): - """ - Build state space from elasticity, demand, and price data. - - Input: elasticity_df [productId, elasticity, ...], optional demand_df - Output: StateSpace instance with Q_t, P_t, elasticity, product_ids - """ - - def transform(self, elasticity_df: pd.DataFrame, demand_df: Optional[pd.DataFrame] = None): - products = self.context.products - - # extract base prices from product metadata - products_with_prices = products.copy() - if 'metadata' in products_with_prices.columns: - products_with_prices['base_price'] = products_with_prices['metadata'].apply( - lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0 - ) - else: - products_with_prices['base_price'] = 0 - - # merge with elasticity - merged = products_with_prices[['id', 'base_price']].rename( - columns={'id': 'productId'} - ).merge( - elasticity_df[['productId', 'elasticity']], - on='productId', - how='left' - ).fillna({'elasticity': 0.0, 'base_price': 0.0}) - - # merge with demand if provided, else use default - if demand_df is not None and 'demand' in demand_df.columns: - merged = merged.merge( - demand_df[['productId', 'demand']], - on='productId', - how='left' - ).fillna({'demand': 0.0}) - demand_vector = merged['demand'].values - else: - # default: uniform demand or use elasticity as proxy - demand_vector = np.ones(len(merged)) * 10.0 - - return StateSpace( - demand=demand_vector, - prices=merged['base_price'].values, - session_features=pd.DataFrame(), - product_ids=merged['productId'].values, - elasticity=merged['elasticity'].values, - metadata={'timestamp': pd.Timestamp.now().isoformat()} - ) class FitPricingFunctionStep(BaseContextStep): diff --git a/experiments/procesing/steps/session.py b/experiments/procesing/steps/session.py index 4329651..1dd5992 100644 --- a/experiments/procesing/steps/session.py +++ b/experiments/procesing/steps/session.py @@ -8,23 +8,123 @@ from typing import Optional, Dict, Any from collections import Counter from procesing.steps.base import BaseContextStep +def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]: + """Compute features for single session. + + Args: + session_df: interaction events for this session + session_timeout_sec: max gap between events before resetting duration (default 900s = 15min) + """ + features = {} + + # basic counts + features['total_interactions'] = len(session_df) + + event_counts = session_df['eventName'].value_counts().to_dict() + features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) + features['item_views'] = event_counts.get('view_item_page', 0) + features['searches'] = event_counts.get('search', 0) + features['cart_adds'] = event_counts.get('add_item_to_cart', 0) + + # hover events + hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'] + features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events) + + # product-level signals + product_ids = session_df['productId'].dropna() + features['unique_products_viewed'] = product_ids.nunique() + + if len(product_ids) > 0: + product_view_counts = Counter(product_ids) + features['product_view_depth'] = max(product_view_counts.values()) + else: + features['product_view_depth'] = 0 + + # temporal features with session timeout logic + if 'ts' in session_df.columns: + timestamps = session_df['ts'].sort_values() + + # compute active duration considering timeout gaps + if len(timestamps) > 1: + time_diffs = timestamps.diff().dropna().dt.total_seconds() + # only count gaps shorter than timeout towards active session duration + active_diffs = time_diffs[time_diffs <= session_timeout_sec] + features['session_duration_sec'] = active_diffs.sum() if len(active_diffs) > 0 else 0.0 + + features['avg_time_between_events'] = time_diffs.mean() + features['std_time_between_events'] = time_diffs.std() + else: + features['session_duration_sec'] = 0.0 + features['avg_time_between_events'] = 0.0 + features['std_time_between_events'] = 0.0 + + if features['session_duration_sec'] > 0: + features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60 + else: + features['interaction_velocity'] = 0.0 + else: + features['session_duration_sec'] = 0.0 + features['interaction_velocity'] = 0.0 + features['avg_time_between_events'] = 0.0 + features['std_time_between_events'] = 0.0 + + # cart/conversion signals + features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0 + + return features + + +def _apply_to_slice(df: pd.DataFrame) -> pd.DataFrame: + """Apply feature extraction to sliding window of interactions.""" + # add columns of all features at each step + new_cols = ["total_interactions", "page_views", "item_views", "searches", + "cart_adds", "hovers", "unique_products_viewed", "product_view_depth", + "session_duration_sec", "interaction_velocity", + "avg_time_between_events", "std_time_between_events", + "cart_to_view_ratio"] + for col in new_cols: df[col] = np.nan + for idx in range(1, len(df) + 1): + features = _extract_features_for_session(df.iloc[:idx]) + # fillna kinda meh + features = { k: (v if not pd.isna(v) else 0.0) for k, v in features.items() } + for col in new_cols: + df.at[df.index[idx - 1], col] = features[col] + #print(f"Processed {idx}/{len(df)} events for session {df['sessionId'].iloc[0]}") + return df + +class BuildStateSpaceStep(BaseContextStep): + """ + Build state space representation S_t from session features. + + Input: session_features DataFrame + Output: state_space_df DataFrame with S_t vectors + """ + + def transform(self, rich_dataset: pd.DataFrame) -> pd.DataFrame: + # check if features are present + required_cols = ["total_interactions", "page_views", "item_views", "searches", + "cart_adds", "hovers", "unique_products_viewed", "product_view_depth", + "session_duration_sec", "interaction_velocity", + "avg_time_between_events", "std_time_between_events", + "cart_to_view_ratio"] + if not all(col in rich_dataset.columns for col in required_cols): + raise ValueError("Missing required columns for feature extraction.") + if rich_dataset.empty: + return pd.DataFrame() + + + # For simplicity, we return as is + return rich_dataset.copy() + + + class ExtractSessionFeaturesStep(BaseContextStep): """ Extract session-level behavioral features from interaction logs. Input: interactions_df (user-interactions from earlier pipeline step) - Output: session_features DataFrame [sessionId, feature_1, feature_2, ...] - - Features computed: - - total_interactions: count of all events - - page_views, item_views, searches, cart_adds: event type counts - - hovers: hover event counts - - unique_products_viewed: distinct product IDs - - interaction_velocity: events per minute - - session_duration_sec: time span of session - - avg_time_between_events: mean inter-event time - - product_view_depth: max views for single product (attention signal) + Output: interactions_df with added session feature columns """ def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame: @@ -39,66 +139,11 @@ class ExtractSessionFeaturesStep(BaseContextStep): # group by session and compute features session_features = [] for session_id, session_df in interactions_df.groupby('sessionId'): - features = self._extract_features_for_session(session_id, session_df) - session_features.append(features) + new_slice = _apply_to_slice(session_df.sort_values('ts')) + session_features.append(new_slice) - return pd.DataFrame(session_features) + return pd.concat(session_features, ignore_index=True) - def _extract_features_for_session(self, session_id: str, session_df: pd.DataFrame) -> Dict[str, Any]: - """Compute features for single session.""" - features = {'sessionId': session_id} - - # basic counts - features['total_interactions'] = len(session_df) - - event_counts = session_df['eventName'].value_counts().to_dict() - features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) - features['item_views'] = event_counts.get('view_item_page', 0) - features['searches'] = event_counts.get('search', 0) - features['cart_adds'] = event_counts.get('add_item_to_cart', 0) - - # hover events - hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'] - features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events) - - # product-level signals - product_ids = session_df['productId'].dropna() - features['unique_products_viewed'] = product_ids.nunique() - - if len(product_ids) > 0: - product_view_counts = Counter(product_ids) - features['product_view_depth'] = max(product_view_counts.values()) - else: - features['product_view_depth'] = 0 - - # temporal features - if 'ts' in session_df.columns: - timestamps = session_df['ts'].sort_values() - features['session_duration_sec'] = (timestamps.max() - timestamps.min()).total_seconds() - - if features['session_duration_sec'] > 0: - features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60 - else: - features['interaction_velocity'] = 0.0 - - # inter-event timing - if len(timestamps) > 1: - time_diffs = timestamps.diff().dropna().dt.total_seconds() - features['avg_time_between_events'] = time_diffs.mean() - features['std_time_between_events'] = time_diffs.std() - else: - features['avg_time_between_events'] = 0.0 - features['std_time_between_events'] = 0.0 - else: - features['session_duration_sec'] = 0.0 - features['interaction_velocity'] = 0.0 - features['avg_time_between_events'] = 0.0 - features['std_time_between_events'] = 0.0 - - # cart/conversion signals - features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0 - - return features class FilterSessionInteractionsStep(BaseContextStep):