diff --git a/experiments/procesing/metrics.py b/experiments/procesing/metrics.py new file mode 100644 index 0000000..ce2fe4e --- /dev/null +++ b/experiments/procesing/metrics.py @@ -0,0 +1,245 @@ +""" +Revenue and KPI benchmark framework for pricing strategies. + +Computes session-level and aggregate metrics to compare pricing functions: + - Revenue: R_T = Σ P_t^T · Q_t + - Conversion rate + - Average order value (AOV) + - Agent exploitation loss: L_agent = R_oracle - R_observed +""" +from typing import Dict, List, Any, Optional +from dataclasses import dataclass, field, asdict +import pandas as pd +import numpy as np + + +@dataclass +class SessionMetrics: + """KPIs for single session.""" + session_id: str + experiment_id: Optional[str] = None + + # interaction metrics + total_interactions: int = 0 + page_views: int = 0 + item_views: int = 0 + searches: int = 0 + cart_adds: int = 0 + + # revenue metrics + items_purchased: int = 0 + total_revenue: float = 0.0 + avg_item_price: float = 0.0 + conversion_rate: float = 0.0 + + # pricing signals + total_price_shown: float = 0.0 # sum of all prices displayed + avg_markup: float = 0.0 # avg (price / base_price) + + # behavioral features (for agent detection) + interaction_velocity: float = 0.0 # interactions per minute + session_duration_sec: float = 0.0 + unique_products_viewed: int = 0 + + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class AggregateMetrics: + """Aggregate KPIs across sessions/experiments.""" + experiment_id: Optional[str] = None + n_sessions: int = 0 + + # revenue aggregates + total_revenue: float = 0.0 + avg_revenue_per_session: float = 0.0 + median_revenue_per_session: float = 0.0 + + # conversion aggregates + total_conversions: int = 0 + conversion_rate: float = 0.0 # purchases / sessions + + # pricing aggregates + avg_markup: float = 0.0 + median_markup: float = 0.0 + + # agent exploitation metrics + estimated_agent_sessions: int = 0 # sessions flagged as agent-driven + agent_revenue: float = 0.0 + human_revenue: float = 0.0 + agent_loss: float = 0.0 # L_agent = R_oracle - R_observed (if available) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class MetricsComputer: + """Compute session and aggregate metrics from interaction/price logs.""" + + @staticmethod + def compute_session_metrics( + session_id: str, + interactions: pd.DataFrame, + price_logs: pd.DataFrame, + purchases: Optional[pd.DataFrame] = None, + experiment_id: Optional[str] = None + ) -> SessionMetrics: + """ + Compute metrics for single session. + + Args: + session_id: session identifier + interactions: user-interactions events for this session + price_logs: price-logs for this session + purchases: purchase events (if available) + experiment_id: experiment identifier + """ + metrics = SessionMetrics(session_id=session_id, experiment_id=experiment_id) + + if interactions.empty: + return metrics + + # interaction counts + event_counts = interactions['eventName'].value_counts().to_dict() + metrics.total_interactions = len(interactions) + metrics.page_views = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) + metrics.item_views = event_counts.get('view_item_page', 0) + metrics.searches = event_counts.get('search', 0) + metrics.cart_adds = event_counts.get('add_item_to_cart', 0) + + # unique products viewed + metrics.unique_products_viewed = interactions['productId'].dropna().nunique() + + # session duration + if 'ts' in interactions.columns: + timestamps = pd.to_datetime(interactions['ts']) + metrics.session_duration_sec = (timestamps.max() - timestamps.min()).total_seconds() + if metrics.session_duration_sec > 0: + metrics.interaction_velocity = (metrics.total_interactions / metrics.session_duration_sec) * 60 + + # revenue from purchases + if purchases is not None and not purchases.empty: + metrics.items_purchased = len(purchases) + metrics.total_revenue = purchases['price'].sum() if 'price' in purchases.columns else 0.0 + metrics.avg_item_price = metrics.total_revenue / metrics.items_purchased if metrics.items_purchased > 0 else 0.0 + metrics.conversion_rate = 1.0 if metrics.items_purchased > 0 else 0.0 + + # pricing metrics + if not price_logs.empty: + metrics.total_price_shown = price_logs['price'].sum() + # compute markup if base_price available in price logs or join with product catalog + if 'base_price' in price_logs.columns: + valid_markup = price_logs[price_logs['base_price'] > 0] + if not valid_markup.empty: + metrics.avg_markup = (valid_markup['price'] / valid_markup['base_price']).mean() + + return metrics + + @staticmethod + def compute_aggregate_metrics( + session_metrics_list: List[SessionMetrics], + experiment_id: Optional[str] = None, + agent_detector_fn: Optional[callable] = None + ) -> AggregateMetrics: + """ + Aggregate metrics across sessions. + + Args: + session_metrics_list: list of SessionMetrics + experiment_id: experiment identifier + agent_detector_fn: optional function to classify session as agent (returns bool) + """ + agg = AggregateMetrics(experiment_id=experiment_id) + agg.n_sessions = len(session_metrics_list) + + if agg.n_sessions == 0: + return agg + + df = pd.DataFrame([m.to_dict() for m in session_metrics_list]) + + # revenue aggregates + agg.total_revenue = df['total_revenue'].sum() + agg.avg_revenue_per_session = df['total_revenue'].mean() + agg.median_revenue_per_session = df['total_revenue'].median() + + # conversion aggregates + agg.total_conversions = (df['items_purchased'] > 0).sum() + agg.conversion_rate = agg.total_conversions / agg.n_sessions + + # pricing aggregates + valid_markups = df[df['avg_markup'] > 0] + if not valid_markups.empty: + agg.avg_markup = valid_markups['avg_markup'].mean() + agg.median_markup = valid_markups['avg_markup'].median() + + # agent detection (if detector provided) + if agent_detector_fn is not None: + agent_flags = [agent_detector_fn(m) for m in session_metrics_list] + agg.estimated_agent_sessions = sum(agent_flags) + + agent_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if is_agent) + human_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if not is_agent) + + agg.agent_revenue = agent_revenue + agg.human_revenue = human_revenue + + return agg + + @staticmethod + def compare_pricing_strategies( + experiments: Dict[str, List[SessionMetrics]], + baseline_experiment_id: Optional[str] = None + ) -> pd.DataFrame: + """ + Compare multiple pricing strategies/experiments. + + Args: + experiments: dict mapping experiment_id -> list of SessionMetrics + baseline_experiment_id: experiment to use as baseline for comparison + + Returns: + DataFrame with comparative metrics + """ + results = [] + baseline_agg = None + + for exp_id, session_metrics in experiments.items(): + agg = MetricsComputer.compute_aggregate_metrics(session_metrics, experiment_id=exp_id) + result = agg.to_dict() + + if exp_id == baseline_experiment_id: + baseline_agg = agg + + results.append(result) + + df = pd.DataFrame(results) + + # add relative metrics if baseline exists + if baseline_agg is not None: + df['revenue_lift_pct'] = ((df['total_revenue'] - baseline_agg.total_revenue) / baseline_agg.total_revenue * 100) + df['conversion_lift_pct'] = ((df['conversion_rate'] - baseline_agg.conversion_rate) / baseline_agg.conversion_rate * 100) + + return df + + +def simple_agent_detector(session_metrics: SessionMetrics, velocity_threshold: float = 5.0) -> bool: + """ + Simple heuristic agent detector based on interaction velocity. + + Args: + session_metrics: SessionMetrics instance + velocity_threshold: interactions per minute threshold (default: 5.0) + + Returns: + True if session likely agent-driven + """ + # agents tend to have higher interaction velocity and lower session duration + if session_metrics.interaction_velocity > velocity_threshold: + return True + # agents often view many products quickly without converting + if session_metrics.unique_products_viewed > 10 and session_metrics.conversion_rate == 0: + return True + return False diff --git a/experiments/procesing/pricers/session_aware.py b/experiments/procesing/pricers/session_aware.py new file mode 100644 index 0000000..40343a7 --- /dev/null +++ b/experiments/procesing/pricers/session_aware.py @@ -0,0 +1,172 @@ +""" +Session-aware pricing functions that leverage behavioral features S_t. +These pricers aim to minimize L_agent = R_oracle - R_observed. +""" +import numpy as np +import pandas as pd +from procesing.pricers.base import PricingFunction +from procesing.pricers.elasticity import ElasticityBasedPricer + + +class SessionAwarePricer(PricingFunction): + """ + Extends elasticity-based pricing with session behavioral signals. + + f(Q, P, S) = base_price * elasticity_factor * session_factor + + Where session_factor adjusts for: + - interaction_velocity (agent detection proxy) + - product_view_depth (interest signal) + - cart_to_view_ratio (conversion intent) + + Strategy: charge higher prices to suspected agents (high velocity) + to recover oracle revenue from reconnaissance sessions. + """ + + def __init__(self, + alpha: float = 0.1, + beta_velocity: float = 0.05, + beta_attention: float = 0.03, + agent_velocity_threshold: float = 5.0, + agent_markup: float = 1.2, + price_floor: float = 0.0, + price_ceil: float = np.inf): + """ + Args: + alpha: elasticity sensitivity + beta_velocity: interaction velocity weight + beta_attention: product attention weight + agent_velocity_threshold: velocity above which to apply agent markup + agent_markup: price multiplier for suspected agent sessions + price_floor, price_ceil: price bounds + """ + self.alpha = alpha + self.beta_velocity = beta_velocity + self.beta_attention = beta_attention + self.agent_velocity_threshold = agent_velocity_threshold + self.agent_markup = agent_markup + self.price_floor = price_floor + self.price_ceil = price_ceil + + # fitted parameters + self.elasticity = None + self.base_prices = None + self.mean_demand = None + + def fit(self, historical_data: pd.DataFrame, **kwargs): + """Calibrate from historical elasticity data.""" + if 'elasticity' not in historical_data.columns: + raise ValueError("historical_data must contain 'elasticity'") + + self.elasticity = historical_data['elasticity'].values + self.base_prices = (historical_data['base_price'].values + if 'base_price' in historical_data.columns + else np.ones(len(historical_data)) * 100) + self.mean_demand = (historical_data['mean_demand'].values + if 'mean_demand' in historical_data.columns + else np.ones(len(historical_data)) * 10) + return self + + def predict(self, state_space) -> np.ndarray: + """Generate prices with session awareness.""" + if self.elasticity is None: + raise ValueError("Must call fit() before predict()") + + demand = np.asarray(state_space.demand) + n_products = len(demand) + + # base elasticity-driven pricing + demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6) + elasticity_factor = 1 + self.alpha * np.abs(self.elasticity) * demand_dev + + # session-aware adjustments + session_factor = np.ones(n_products) + + if not state_space.session_features.empty: + sf = state_space.session_features.iloc[0] # single session features + + # agent detection via velocity + velocity = sf.get('interaction_velocity', 0.0) + if velocity > self.agent_velocity_threshold: + # suspected agent: apply markup to recover oracle revenue + session_factor *= self.agent_markup + + # attention signal: higher view depth -> user interested -> can charge more + view_depth = sf.get('product_view_depth', 0) + if view_depth > 0: + attention_boost = 1 + self.beta_attention * np.log1p(view_depth) + session_factor *= attention_boost + + # cart presence: if user has items in cart, slightly increase prices + cart_to_view = sf.get('cart_to_view_ratio', 0.0) + if cart_to_view > 0.1: + session_factor *= (1 + 0.02) # small boost for conversion intent + + prices = self.base_prices * elasticity_factor * session_factor + prices = np.clip(prices, self.price_floor, self.price_ceil) + + return prices + + +class ProductSpecificSessionPricer(PricingFunction): + """ + Session-aware pricer with product-specific demand signals. + + Uses S_t to extract per-product interaction counts and adjusts pricing + for products the user has already viewed/hovered. + + Strategy: products viewed multiple times = high interest -> price up + """ + + def __init__(self, + alpha: float = 0.1, + view_boost: float = 0.02, + max_view_boost: float = 0.15, + price_floor: float = 0.0, + price_ceil: float = np.inf): + self.alpha = alpha + self.view_boost = view_boost + self.max_view_boost = max_view_boost + self.price_floor = price_floor + self.price_ceil = price_ceil + + self.elasticity = None + self.base_prices = None + self.mean_demand = None + self.product_ids = None + + def fit(self, historical_data: pd.DataFrame, **kwargs): + if 'elasticity' not in historical_data.columns or 'productId' not in historical_data.columns: + raise ValueError("historical_data must contain 'elasticity' and 'productId'") + + self.elasticity = historical_data['elasticity'].values + self.base_prices = (historical_data['base_price'].values + if 'base_price' in historical_data.columns + else np.ones(len(historical_data)) * 100) + self.mean_demand = (historical_data['mean_demand'].values + if 'mean_demand' in historical_data.columns + else np.ones(len(historical_data)) * 10) + self.product_ids = historical_data['productId'].values + return self + + def predict(self, state_space) -> np.ndarray: + if self.elasticity is None: + raise ValueError("Must call fit() before predict()") + + demand = np.asarray(state_space.demand) + n_products = len(demand) + + # base pricing + demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6) + base_prices = self.base_prices * (1 + self.alpha * np.abs(self.elasticity) * demand_dev) + + # product-specific session adjustments + if not state_space.session_features.empty and state_space.product_ids is not None: + # extract product interaction counts from session metadata + # (this would require session features to include per-product signals) + # for now, use uniform boost as placeholder + # TODO: extend session feature extraction to include product-specific counts + pass + + prices = np.clip(base_prices, self.price_floor, self.price_ceil) + return prices