feat: introduced cumulative features step for state definition

This commit is contained in:
2025-11-29 22:28:40 +01:00
parent d654bbf4b4
commit 955102090d
6 changed files with 135 additions and 181 deletions

View File

@@ -13,8 +13,8 @@ from procesing.steps import (
ComputeDemandForChunksStep, ComputeDemandForChunksStep,
AggregatePriceLogsStep, AggregatePriceLogsStep,
ComputeElasticityStep, ComputeElasticityStep,
StateSpace, # StateSpace,
BuildStateSpaceStep, # BuildStateSpaceStep,
FitPricingFunctionStep, FitPricingFunctionStep,
PredictPricesStep, PredictPricesStep,
) )
@@ -43,8 +43,8 @@ __all__ = [
'ComputeDemandForChunksStep', 'ComputeDemandForChunksStep',
'AggregatePriceLogsStep', 'AggregatePriceLogsStep',
'ComputeElasticityStep', 'ComputeElasticityStep',
'StateSpace', # 'StateSpace',
'BuildStateSpaceStep', # 'BuildStateSpaceStep',
'FitPricingFunctionStep', 'FitPricingFunctionStep',
'PredictPricesStep', 'PredictPricesStep',
'interaction_extraction_pipeline', 'interaction_extraction_pipeline',

View File

@@ -14,7 +14,7 @@ from procesing.steps import (
ComputeDemandForChunksStep, ComputeDemandForChunksStep,
AggregatePriceLogsStep, AggregatePriceLogsStep,
ComputeElasticityStep, ComputeElasticityStep,
BuildStateSpaceStep, # BuildStateSpaceStep,
FitPricingFunctionStep, FitPricingFunctionStep,
PredictPricesStep, PredictPricesStep,
) )

View File

@@ -5,7 +5,9 @@ from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesSte
from procesing.steps.chunk import ChunkByTimeWindowStep from procesing.steps.chunk import ChunkByTimeWindowStep
from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep
from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep
from procesing.steps.pricing import StateSpace, BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep
from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session
# StateSpace, BuildStateSpaceStep,
__all__ = [ __all__ = [
'BaseContextStep', 'BaseContextStep',
@@ -20,8 +22,8 @@ __all__ = [
'ComputeDemandForChunksStep', 'ComputeDemandForChunksStep',
'AggregatePriceLogsStep', 'AggregatePriceLogsStep',
'ComputeElasticityStep', 'ComputeElasticityStep',
'StateSpace',
'BuildStateSpaceStep',
'FitPricingFunctionStep', 'FitPricingFunctionStep',
'PredictPricesStep', 'PredictPricesStep',
'ExtractSessionFeaturesStep',
'_extract_features_for_session',
] ]

View File

@@ -17,6 +17,8 @@ class FetchInteractionsStep(BaseContextStep):
) )
df = df.dropna(subset=['eventName']) df = df.dropna(subset=['eventName'])
# drop all where page has /admin/
df = df[~df['page'].str.contains('/admin/', na=False)]
# Remap dateIndex if present # Remap dateIndex if present
if 'metadata_dateIndex' in df.columns: if 'metadata_dateIndex' in df.columns:

View File

@@ -2,114 +2,19 @@ import numpy as np
import pandas as pd import pandas as pd
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field from dataclasses import dataclass, field
from experiments.procesing.pricers.simple import StaticPricer from procesing.pricers.simple import StaticPricer
from procesing.steps.base import BaseContextStep from procesing.steps.base import BaseContextStep
from procesing.pricers import ElasticityBasedPricer from procesing.pricers import ElasticityBasedPricer
@dataclass class State:
class StateSpace: def __init__(self,
""" last_action : str,
State representation for pricing functions. last_productId : str,
last_price : float,
session_features : np.ndarray
):
pass
Components:
Q_t: demand ∈ R^n (current demand signal per product)
P_t: prices ∈ R^n (current/base prices)
S_t: session_features (behavioral signals, interaction data)
H_t: history = {Q_{t-k}, P_{t-k}, S_{t-k}} for k in [1, history_length]
Additionally stores:
- product_ids: product identifiers (n,)
- elasticity: price elasticity per product (n,)
- metadata: arbitrary context (experiment_id, timestamp, etc.)
"""
demand: np.ndarray # Q_t ∈ R^n
prices: np.ndarray # P_t ∈ R^n
session_features: pd.DataFrame = field(default_factory=pd.DataFrame) # S_t
# augmented state components
product_ids: Optional[np.ndarray] = None
elasticity: Optional[np.ndarray] = None
# historical trajectory H_t = {(Q_{t-k}, P_{t-k}, S_{t-k})}
history: List[Dict[str, Any]] = field(default_factory=list)
# metadata for context
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Validate dimensions."""
n = len(self.demand)
assert len(self.prices) == n, "demand and prices must have same dimension"
if self.elasticity is not None:
assert len(self.elasticity) == n, "elasticity must match dimension"
if self.product_ids is not None:
assert len(self.product_ids) == n, "product_ids must match dimension"
@property
def n_products(self) -> int:
"""Number of products in state space."""
return len(self.demand)
def add_history(self, q: np.ndarray, p: np.ndarray, s: pd.DataFrame, max_length: int = 10):
"""Append historical state to trajectory H_t."""
self.history.append({'demand': q, 'prices': p, 'session_features': s})
if len(self.history) > max_length:
self.history.pop(0)
def get_history_window(self, k: int = 5) -> List[Dict[str, Any]]:
"""Retrieve last k historical states."""
return self.history[-k:] if len(self.history) >= k else self.history
class BuildStateSpaceStep(BaseContextStep):
"""
Build state space from elasticity, demand, and price data.
Input: elasticity_df [productId, elasticity, ...], optional demand_df
Output: StateSpace instance with Q_t, P_t, elasticity, product_ids
"""
def transform(self, elasticity_df: pd.DataFrame, demand_df: Optional[pd.DataFrame] = None):
products = self.context.products
# extract base prices from product metadata
products_with_prices = products.copy()
if 'metadata' in products_with_prices.columns:
products_with_prices['base_price'] = products_with_prices['metadata'].apply(
lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0
)
else:
products_with_prices['base_price'] = 0
# merge with elasticity
merged = products_with_prices[['id', 'base_price']].rename(
columns={'id': 'productId'}
).merge(
elasticity_df[['productId', 'elasticity']],
on='productId',
how='left'
).fillna({'elasticity': 0.0, 'base_price': 0.0})
# merge with demand if provided, else use default
if demand_df is not None and 'demand' in demand_df.columns:
merged = merged.merge(
demand_df[['productId', 'demand']],
on='productId',
how='left'
).fillna({'demand': 0.0})
demand_vector = merged['demand'].values
else:
# default: uniform demand or use elasticity as proxy
demand_vector = np.ones(len(merged)) * 10.0
return StateSpace(
demand=demand_vector,
prices=merged['base_price'].values,
session_features=pd.DataFrame(),
product_ids=merged['productId'].values,
elasticity=merged['elasticity'].values,
metadata={'timestamp': pd.Timestamp.now().isoformat()}
)
class FitPricingFunctionStep(BaseContextStep): class FitPricingFunctionStep(BaseContextStep):

View File

@@ -8,45 +8,14 @@ from typing import Optional, Dict, Any
from collections import Counter from collections import Counter
from procesing.steps.base import BaseContextStep from procesing.steps.base import BaseContextStep
def _extract_features_for_session(session_df: pd.DataFrame, session_timeout_sec: float = 900) -> Dict[str, Any]:
"""Compute features for single session.
class ExtractSessionFeaturesStep(BaseContextStep): Args:
session_df: interaction events for this session
session_timeout_sec: max gap between events before resetting duration (default 900s = 15min)
""" """
Extract session-level behavioral features from interaction logs. features = {}
Input: interactions_df (user-interactions from earlier pipeline step)
Output: session_features DataFrame [sessionId, feature_1, feature_2, ...]
Features computed:
- total_interactions: count of all events
- page_views, item_views, searches, cart_adds: event type counts
- hovers: hover event counts
- unique_products_viewed: distinct product IDs
- interaction_velocity: events per minute
- session_duration_sec: time span of session
- avg_time_between_events: mean inter-event time
- product_view_depth: max views for single product (attention signal)
"""
def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame:
if interactions_df.empty:
return pd.DataFrame()
# ensure timestamp column
if 'ts' in interactions_df.columns:
interactions_df = interactions_df.copy()
interactions_df['ts'] = pd.to_datetime(interactions_df['ts'])
# group by session and compute features
session_features = []
for session_id, session_df in interactions_df.groupby('sessionId'):
features = self._extract_features_for_session(session_id, session_df)
session_features.append(features)
return pd.DataFrame(session_features)
def _extract_features_for_session(self, session_id: str, session_df: pd.DataFrame) -> Dict[str, Any]:
"""Compute features for single session."""
features = {'sessionId': session_id}
# basic counts # basic counts
features['total_interactions'] = len(session_df) features['total_interactions'] = len(session_df)
@@ -71,24 +40,28 @@ class ExtractSessionFeaturesStep(BaseContextStep):
else: else:
features['product_view_depth'] = 0 features['product_view_depth'] = 0
# temporal features # temporal features with session timeout logic
if 'ts' in session_df.columns: if 'ts' in session_df.columns:
timestamps = session_df['ts'].sort_values() timestamps = session_df['ts'].sort_values()
features['session_duration_sec'] = (timestamps.max() - timestamps.min()).total_seconds()
# compute active duration considering timeout gaps
if len(timestamps) > 1:
time_diffs = timestamps.diff().dropna().dt.total_seconds()
# only count gaps shorter than timeout towards active session duration
active_diffs = time_diffs[time_diffs <= session_timeout_sec]
features['session_duration_sec'] = active_diffs.sum() if len(active_diffs) > 0 else 0.0
features['avg_time_between_events'] = time_diffs.mean()
features['std_time_between_events'] = time_diffs.std()
else:
features['session_duration_sec'] = 0.0
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
if features['session_duration_sec'] > 0: if features['session_duration_sec'] > 0:
features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60 features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60
else: else:
features['interaction_velocity'] = 0.0 features['interaction_velocity'] = 0.0
# inter-event timing
if len(timestamps) > 1:
time_diffs = timestamps.diff().dropna().dt.total_seconds()
features['avg_time_between_events'] = time_diffs.mean()
features['std_time_between_events'] = time_diffs.std()
else:
features['avg_time_between_events'] = 0.0
features['std_time_between_events'] = 0.0
else: else:
features['session_duration_sec'] = 0.0 features['session_duration_sec'] = 0.0
features['interaction_velocity'] = 0.0 features['interaction_velocity'] = 0.0
@@ -101,6 +74,78 @@ class ExtractSessionFeaturesStep(BaseContextStep):
return features return features
def _apply_to_slice(df: pd.DataFrame) -> pd.DataFrame:
"""Apply feature extraction to sliding window of interactions."""
# add columns of all features at each step
new_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
for col in new_cols: df[col] = np.nan
for idx in range(1, len(df) + 1):
features = _extract_features_for_session(df.iloc[:idx])
# fillna kinda meh
features = { k: (v if not pd.isna(v) else 0.0) for k, v in features.items() }
for col in new_cols:
df.at[df.index[idx - 1], col] = features[col]
#print(f"Processed {idx}/{len(df)} events for session {df['sessionId'].iloc[0]}")
return df
class BuildStateSpaceStep(BaseContextStep):
"""
Build state space representation S_t from session features.
Input: session_features DataFrame
Output: state_space_df DataFrame with S_t vectors
"""
def transform(self, rich_dataset: pd.DataFrame) -> pd.DataFrame:
# check if features are present
required_cols = ["total_interactions", "page_views", "item_views", "searches",
"cart_adds", "hovers", "unique_products_viewed", "product_view_depth",
"session_duration_sec", "interaction_velocity",
"avg_time_between_events", "std_time_between_events",
"cart_to_view_ratio"]
if not all(col in rich_dataset.columns for col in required_cols):
raise ValueError("Missing required columns for feature extraction.")
if rich_dataset.empty:
return pd.DataFrame()
# For simplicity, we return as is
return rich_dataset.copy()
class ExtractSessionFeaturesStep(BaseContextStep):
"""
Extract session-level behavioral features from interaction logs.
Input: interactions_df (user-interactions from earlier pipeline step)
Output: interactions_df with added session feature columns
"""
def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame:
if interactions_df.empty:
return pd.DataFrame()
# ensure timestamp column
if 'ts' in interactions_df.columns:
interactions_df = interactions_df.copy()
interactions_df['ts'] = pd.to_datetime(interactions_df['ts'])
# group by session and compute features
session_features = []
for session_id, session_df in interactions_df.groupby('sessionId'):
new_slice = _apply_to_slice(session_df.sort_values('ts'))
session_features.append(new_slice)
return pd.concat(session_features, ignore_index=True)
class FilterSessionInteractionsStep(BaseContextStep): class FilterSessionInteractionsStep(BaseContextStep):
""" """
Filter interactions DataFrame to specific session. Filter interactions DataFrame to specific session.