feature: experiemntal sessin pricer and metrics(vibe)

This commit is contained in:
2025-11-29 17:42:42 +01:00
parent 5d5795b212
commit dd33f83e10
2 changed files with 417 additions and 0 deletions

View File

@@ -0,0 +1,245 @@
"""
Revenue and KPI benchmark framework for pricing strategies.
Computes session-level and aggregate metrics to compare pricing functions:
- Revenue: R_T = Σ P_t^T · Q_t
- Conversion rate
- Average order value (AOV)
- Agent exploitation loss: L_agent = R_oracle - R_observed
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field, asdict
import pandas as pd
import numpy as np
@dataclass
class SessionMetrics:
"""KPIs for single session."""
session_id: str
experiment_id: Optional[str] = None
# interaction metrics
total_interactions: int = 0
page_views: int = 0
item_views: int = 0
searches: int = 0
cart_adds: int = 0
# revenue metrics
items_purchased: int = 0
total_revenue: float = 0.0
avg_item_price: float = 0.0
conversion_rate: float = 0.0
# pricing signals
total_price_shown: float = 0.0 # sum of all prices displayed
avg_markup: float = 0.0 # avg (price / base_price)
# behavioral features (for agent detection)
interaction_velocity: float = 0.0 # interactions per minute
session_duration_sec: float = 0.0
unique_products_viewed: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class AggregateMetrics:
"""Aggregate KPIs across sessions/experiments."""
experiment_id: Optional[str] = None
n_sessions: int = 0
# revenue aggregates
total_revenue: float = 0.0
avg_revenue_per_session: float = 0.0
median_revenue_per_session: float = 0.0
# conversion aggregates
total_conversions: int = 0
conversion_rate: float = 0.0 # purchases / sessions
# pricing aggregates
avg_markup: float = 0.0
median_markup: float = 0.0
# agent exploitation metrics
estimated_agent_sessions: int = 0 # sessions flagged as agent-driven
agent_revenue: float = 0.0
human_revenue: float = 0.0
agent_loss: float = 0.0 # L_agent = R_oracle - R_observed (if available)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class MetricsComputer:
"""Compute session and aggregate metrics from interaction/price logs."""
@staticmethod
def compute_session_metrics(
session_id: str,
interactions: pd.DataFrame,
price_logs: pd.DataFrame,
purchases: Optional[pd.DataFrame] = None,
experiment_id: Optional[str] = None
) -> SessionMetrics:
"""
Compute metrics for single session.
Args:
session_id: session identifier
interactions: user-interactions events for this session
price_logs: price-logs for this session
purchases: purchase events (if available)
experiment_id: experiment identifier
"""
metrics = SessionMetrics(session_id=session_id, experiment_id=experiment_id)
if interactions.empty:
return metrics
# interaction counts
event_counts = interactions['eventName'].value_counts().to_dict()
metrics.total_interactions = len(interactions)
metrics.page_views = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0)
metrics.item_views = event_counts.get('view_item_page', 0)
metrics.searches = event_counts.get('search', 0)
metrics.cart_adds = event_counts.get('add_item_to_cart', 0)
# unique products viewed
metrics.unique_products_viewed = interactions['productId'].dropna().nunique()
# session duration
if 'ts' in interactions.columns:
timestamps = pd.to_datetime(interactions['ts'])
metrics.session_duration_sec = (timestamps.max() - timestamps.min()).total_seconds()
if metrics.session_duration_sec > 0:
metrics.interaction_velocity = (metrics.total_interactions / metrics.session_duration_sec) * 60
# revenue from purchases
if purchases is not None and not purchases.empty:
metrics.items_purchased = len(purchases)
metrics.total_revenue = purchases['price'].sum() if 'price' in purchases.columns else 0.0
metrics.avg_item_price = metrics.total_revenue / metrics.items_purchased if metrics.items_purchased > 0 else 0.0
metrics.conversion_rate = 1.0 if metrics.items_purchased > 0 else 0.0
# pricing metrics
if not price_logs.empty:
metrics.total_price_shown = price_logs['price'].sum()
# compute markup if base_price available in price logs or join with product catalog
if 'base_price' in price_logs.columns:
valid_markup = price_logs[price_logs['base_price'] > 0]
if not valid_markup.empty:
metrics.avg_markup = (valid_markup['price'] / valid_markup['base_price']).mean()
return metrics
@staticmethod
def compute_aggregate_metrics(
session_metrics_list: List[SessionMetrics],
experiment_id: Optional[str] = None,
agent_detector_fn: Optional[callable] = None
) -> AggregateMetrics:
"""
Aggregate metrics across sessions.
Args:
session_metrics_list: list of SessionMetrics
experiment_id: experiment identifier
agent_detector_fn: optional function to classify session as agent (returns bool)
"""
agg = AggregateMetrics(experiment_id=experiment_id)
agg.n_sessions = len(session_metrics_list)
if agg.n_sessions == 0:
return agg
df = pd.DataFrame([m.to_dict() for m in session_metrics_list])
# revenue aggregates
agg.total_revenue = df['total_revenue'].sum()
agg.avg_revenue_per_session = df['total_revenue'].mean()
agg.median_revenue_per_session = df['total_revenue'].median()
# conversion aggregates
agg.total_conversions = (df['items_purchased'] > 0).sum()
agg.conversion_rate = agg.total_conversions / agg.n_sessions
# pricing aggregates
valid_markups = df[df['avg_markup'] > 0]
if not valid_markups.empty:
agg.avg_markup = valid_markups['avg_markup'].mean()
agg.median_markup = valid_markups['avg_markup'].median()
# agent detection (if detector provided)
if agent_detector_fn is not None:
agent_flags = [agent_detector_fn(m) for m in session_metrics_list]
agg.estimated_agent_sessions = sum(agent_flags)
agent_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if is_agent)
human_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if not is_agent)
agg.agent_revenue = agent_revenue
agg.human_revenue = human_revenue
return agg
@staticmethod
def compare_pricing_strategies(
experiments: Dict[str, List[SessionMetrics]],
baseline_experiment_id: Optional[str] = None
) -> pd.DataFrame:
"""
Compare multiple pricing strategies/experiments.
Args:
experiments: dict mapping experiment_id -> list of SessionMetrics
baseline_experiment_id: experiment to use as baseline for comparison
Returns:
DataFrame with comparative metrics
"""
results = []
baseline_agg = None
for exp_id, session_metrics in experiments.items():
agg = MetricsComputer.compute_aggregate_metrics(session_metrics, experiment_id=exp_id)
result = agg.to_dict()
if exp_id == baseline_experiment_id:
baseline_agg = agg
results.append(result)
df = pd.DataFrame(results)
# add relative metrics if baseline exists
if baseline_agg is not None:
df['revenue_lift_pct'] = ((df['total_revenue'] - baseline_agg.total_revenue) / baseline_agg.total_revenue * 100)
df['conversion_lift_pct'] = ((df['conversion_rate'] - baseline_agg.conversion_rate) / baseline_agg.conversion_rate * 100)
return df
def simple_agent_detector(session_metrics: SessionMetrics, velocity_threshold: float = 5.0) -> bool:
"""
Simple heuristic agent detector based on interaction velocity.
Args:
session_metrics: SessionMetrics instance
velocity_threshold: interactions per minute threshold (default: 5.0)
Returns:
True if session likely agent-driven
"""
# agents tend to have higher interaction velocity and lower session duration
if session_metrics.interaction_velocity > velocity_threshold:
return True
# agents often view many products quickly without converting
if session_metrics.unique_products_viewed > 10 and session_metrics.conversion_rate == 0:
return True
return False

View File

@@ -0,0 +1,172 @@
"""
Session-aware pricing functions that leverage behavioral features S_t.
These pricers aim to minimize L_agent = R_oracle - R_observed.
"""
import numpy as np
import pandas as pd
from procesing.pricers.base import PricingFunction
from procesing.pricers.elasticity import ElasticityBasedPricer
class SessionAwarePricer(PricingFunction):
"""
Extends elasticity-based pricing with session behavioral signals.
f(Q, P, S) = base_price * elasticity_factor * session_factor
Where session_factor adjusts for:
- interaction_velocity (agent detection proxy)
- product_view_depth (interest signal)
- cart_to_view_ratio (conversion intent)
Strategy: charge higher prices to suspected agents (high velocity)
to recover oracle revenue from reconnaissance sessions.
"""
def __init__(self,
alpha: float = 0.1,
beta_velocity: float = 0.05,
beta_attention: float = 0.03,
agent_velocity_threshold: float = 5.0,
agent_markup: float = 1.2,
price_floor: float = 0.0,
price_ceil: float = np.inf):
"""
Args:
alpha: elasticity sensitivity
beta_velocity: interaction velocity weight
beta_attention: product attention weight
agent_velocity_threshold: velocity above which to apply agent markup
agent_markup: price multiplier for suspected agent sessions
price_floor, price_ceil: price bounds
"""
self.alpha = alpha
self.beta_velocity = beta_velocity
self.beta_attention = beta_attention
self.agent_velocity_threshold = agent_velocity_threshold
self.agent_markup = agent_markup
self.price_floor = price_floor
self.price_ceil = price_ceil
# fitted parameters
self.elasticity = None
self.base_prices = None
self.mean_demand = None
def fit(self, historical_data: pd.DataFrame, **kwargs):
"""Calibrate from historical elasticity data."""
if 'elasticity' not in historical_data.columns:
raise ValueError("historical_data must contain 'elasticity'")
self.elasticity = historical_data['elasticity'].values
self.base_prices = (historical_data['base_price'].values
if 'base_price' in historical_data.columns
else np.ones(len(historical_data)) * 100)
self.mean_demand = (historical_data['mean_demand'].values
if 'mean_demand' in historical_data.columns
else np.ones(len(historical_data)) * 10)
return self
def predict(self, state_space) -> np.ndarray:
"""Generate prices with session awareness."""
if self.elasticity is None:
raise ValueError("Must call fit() before predict()")
demand = np.asarray(state_space.demand)
n_products = len(demand)
# base elasticity-driven pricing
demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6)
elasticity_factor = 1 + self.alpha * np.abs(self.elasticity) * demand_dev
# session-aware adjustments
session_factor = np.ones(n_products)
if not state_space.session_features.empty:
sf = state_space.session_features.iloc[0] # single session features
# agent detection via velocity
velocity = sf.get('interaction_velocity', 0.0)
if velocity > self.agent_velocity_threshold:
# suspected agent: apply markup to recover oracle revenue
session_factor *= self.agent_markup
# attention signal: higher view depth -> user interested -> can charge more
view_depth = sf.get('product_view_depth', 0)
if view_depth > 0:
attention_boost = 1 + self.beta_attention * np.log1p(view_depth)
session_factor *= attention_boost
# cart presence: if user has items in cart, slightly increase prices
cart_to_view = sf.get('cart_to_view_ratio', 0.0)
if cart_to_view > 0.1:
session_factor *= (1 + 0.02) # small boost for conversion intent
prices = self.base_prices * elasticity_factor * session_factor
prices = np.clip(prices, self.price_floor, self.price_ceil)
return prices
class ProductSpecificSessionPricer(PricingFunction):
"""
Session-aware pricer with product-specific demand signals.
Uses S_t to extract per-product interaction counts and adjusts pricing
for products the user has already viewed/hovered.
Strategy: products viewed multiple times = high interest -> price up
"""
def __init__(self,
alpha: float = 0.1,
view_boost: float = 0.02,
max_view_boost: float = 0.15,
price_floor: float = 0.0,
price_ceil: float = np.inf):
self.alpha = alpha
self.view_boost = view_boost
self.max_view_boost = max_view_boost
self.price_floor = price_floor
self.price_ceil = price_ceil
self.elasticity = None
self.base_prices = None
self.mean_demand = None
self.product_ids = None
def fit(self, historical_data: pd.DataFrame, **kwargs):
if 'elasticity' not in historical_data.columns or 'productId' not in historical_data.columns:
raise ValueError("historical_data must contain 'elasticity' and 'productId'")
self.elasticity = historical_data['elasticity'].values
self.base_prices = (historical_data['base_price'].values
if 'base_price' in historical_data.columns
else np.ones(len(historical_data)) * 100)
self.mean_demand = (historical_data['mean_demand'].values
if 'mean_demand' in historical_data.columns
else np.ones(len(historical_data)) * 10)
self.product_ids = historical_data['productId'].values
return self
def predict(self, state_space) -> np.ndarray:
if self.elasticity is None:
raise ValueError("Must call fit() before predict()")
demand = np.asarray(state_space.demand)
n_products = len(demand)
# base pricing
demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6)
base_prices = self.base_prices * (1 + self.alpha * np.abs(self.elasticity) * demand_dev)
# product-specific session adjustments
if not state_space.session_features.empty and state_space.product_ids is not None:
# extract product interaction counts from session metadata
# (this would require session features to include per-product signals)
# for now, use uniform boost as placeholder
# TODO: extend session feature extraction to include product-specific counts
pass
prices = np.clip(base_prices, self.price_floor, self.price_ceil)
return prices