From 8b429b7a8e5a7c1e671c62afa34b013358be4208 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Mon, 12 Jan 2026 10:09:55 +0100 Subject: [PATCH] chore: refactor to better map end to end --- backend/provider/app.py | 71 ++++++++++++------------- experiments/procesing/pricers/simple.py | 59 +++++++++++++++++--- experiments/procesing/steps/session.py | 1 + lib/model_registry.py | 46 ++++++++++++++++ web/src/app/api/pricing/route.ts | 37 +++++++------ 5 files changed, 153 insertions(+), 61 deletions(-) diff --git a/backend/provider/app.py b/backend/provider/app.py index fb72a9d..6f9a55d 100644 --- a/backend/provider/app.py +++ b/backend/provider/app.py @@ -47,53 +47,52 @@ def health() -> dict: @app.get("/api/{mode}/price/{productId}", response_model=PriceResponse) def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)): + """ + THIS is the fast lookup service (mechanism). + Priority: session-keyed price > global optimal price > base price + """ product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0] if not product: raise HTTPException(404, f"Product {productId} not found") metadata = product['metadata'] base_price = metadata.get('base_price', 100.0) - # fetch pre-computed prices from registry + # PRIORITY 1: session-aware price (computed by Airflow worker) + if sessionId: + session_price = registry.get_session_price(sessionId, productId) + if session_price is not None: + return PriceResponse( + productId=productId, + price=session_price, + base_price=base_price, + markup=session_price/base_price, + elasticity=None, + model_version='session-aware' + ) + + # PRIORITY 2: global pre-computed prices (surge pricing) prices_df = registry.get_prices('latest') - elasticity_df = registry.get_elasticity('latest') - - if prices_df is None: - # fallback: no pre-computed prices available - return PriceResponse( - productId=productId, - price=base_price, - base_price=base_price, - markup=1.0, - elasticity=None - ) - - # lookup pre-computed price for this product - product_price_row = prices_df[prices_df['productId'] == productId] - if product_price_row.empty: - # product not in pre-computed prices, fallback to base - return PriceResponse( - productId=productId, - price=base_price, - base_price=base_price, - markup=1.0, - elasticity=None - ) - - optimal_price = float(product_price_row['optimal_price'].iloc[0]) # TODO: use optimal_price everywhere as aresult - - # get elasticity if available - product_elasticity = None - if elasticity_df is not None: - product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] - if not product_elasticity_row.empty: - product_elasticity = float(product_elasticity_row['elasticity'].iloc[0]) + if prices_df is not None: + product_price_row = prices_df[prices_df['productId'] == productId] + if not product_price_row.empty: + optimal_price = float(product_price_row['optimal_price'].iloc[0]) + return PriceResponse( + productId=productId, + price=optimal_price, + base_price=base_price, + markup=optimal_price/base_price, + elasticity=None, + model_version='surge' + ) + # PRIORITY 3: fallback to base price return PriceResponse( productId=productId, - price=optimal_price, + price=base_price, base_price=base_price, - markup=optimal_price/base_price, - elasticity=product_elasticity + markup=1.0, + elasticity=None, + model_version='base' ) @app.get("/models") diff --git a/experiments/procesing/pricers/simple.py b/experiments/procesing/pricers/simple.py index 39be37a..6bdd1ca 100644 --- a/experiments/procesing/pricers/simple.py +++ b/experiments/procesing/pricers/simple.py @@ -3,6 +3,46 @@ import pandas as pd from procesing.pricers.base import PricingFunction +def session_features_to_demand(session_features: pd.DataFrame) -> float: + """ + Map session behavioral features to demand proxy. + THIS is the critical θ̂ → D transformation for rule-based pricing. + + Logic: + - High velocity → agent behavior → price up (revenue recovery) + - High cart ratio → purchase intent → price up + - Low activity → discount to convert + + Returns: demand proxy score (0-20 range, higher = more demand) + """ + if session_features.empty: + return 1.0 + + feat = session_features.iloc[0] if len(session_features) > 0 else {} + + velocity = feat.get('interaction_velocity', 0) + cart_ratio = feat.get('cart_to_view_ratio', 0) + item_views = feat.get('item_views', 0) + cart_adds = feat.get('cart_adds', 0) + + # baseline demand + demand = 1.0 + + # agent detection: high velocity → treat as high "demand" to price up + if velocity > 2.0: + demand += 10.0 # strong agent signal + + # conversion intent: cart interaction → price up + if cart_ratio > 0.1 or cart_adds > 0: + demand += 5.0 + + # browsing depth: many views → interest signal + if item_views > 3: + demand += min(item_views, 5.0) + + return min(demand, 20.0) # cap at 20 + + class StaticPricer(PricingFunction): """Static pricing: always return fixed base prices""" @@ -67,21 +107,24 @@ class SimpleSurgePricer(PricingFunction): self.surge_multiplier = surge_multiplier self.discount_multiplier = discount_multiplier - def fit(self, market_data : pd.DataFrame): + def fit(self, market_data: pd.DataFrame): """Extract base prices from product catalog or historical averages""" self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values - self.demand_history = market_data['demand'].to_numpy() if 'demand' in market_data.columns else np.zeros_like(self.base_prices) + return self - def predict(self) -> np.ndarray: + def predict(self, state_space) -> np.ndarray: """ Adjust prices based on current demand using surge rules. - state_space.demand: demand counts per product - state_space.prices: current prices (fallback if base_prices not set) + state_space.demand: demand proxy per product (from session features) + state_space.prices: base prices """ - current_prices = self.base_prices if self.base_prices is not None else np.ones_like(demand_vector) * 99.99 - demand = self.demand_history if self.demand_history is not None else np.zeros_like(current_prices) - new_prices = current_prices.copy() + demand = np.asarray(state_space.demand) if state_space and hasattr(state_space, 'demand') else np.array([0]) + base = np.asarray(state_space.prices) if state_space and hasattr(state_space, 'prices') else self.base_prices + if base is None: + base = np.ones(len(demand)) * 99.99 + + new_prices = base.copy() high_mask = demand >= self.high_threshold new_prices[high_mask] *= self.surge_multiplier diff --git a/experiments/procesing/steps/session.py b/experiments/procesing/steps/session.py index 4b950aa..ec6f27c 100644 --- a/experiments/procesing/steps/session.py +++ b/experiments/procesing/steps/session.py @@ -135,6 +135,7 @@ class ExtractSessionFeaturesStep(BaseContextStep): Vectorized session feature extraction - replaces O(n^2) per-row loop. Input: interactions_df Output: session-level feature matrix + THIS is our main mapping from tau (trajectory) to some features vector theta - we need to do this very well. This is what will go into demand esimation. """ def transform(self, X: pd.DataFrame) -> pd.DataFrame: diff --git a/lib/model_registry.py b/lib/model_registry.py index 92d7934..e833a1a 100755 --- a/lib/model_registry.py +++ b/lib/model_registry.py @@ -178,3 +178,49 @@ class ModelRegistry: return True except: return False + + def set_session_prices(self, session_id: str, prices: Dict[str, float], ttl: int = 1800): + """ + Store prices for a specific session. + THIS is the write path for session-aware pricing. + + Args: + session_id: session identifier + prices: dict of {productId: price} + ttl: time-to-live in seconds (default 30min) + """ + if not prices: + return + + key = f"session:{session_id}:prices" + # use Redis hash for O(1) lookup per product + self.redis_client.hset(key, mapping={k: str(v) for k, v in prices.items()}) + self.redis_client.expire(key, ttl) + + def get_session_price(self, session_id: str, product_id: str) -> Optional[float]: + """ + Lookup price for (sessionId, productId). + THIS is the read path for fast provider lookup. + + Returns: price or None if not found + """ + key = f"session:{session_id}:prices" + price_str = self.redis_client.hget(key, product_id) + + if price_str is None: + return None + + return float(price_str.decode('utf-8') if isinstance(price_str, bytes) else price_str) + + def get_session_all_prices(self, session_id: str) -> Dict[str, float]: + """Get all prices for a session.""" + key = f"session:{session_id}:prices" + prices_raw = self.redis_client.hgetall(key) + + if not prices_raw: + return {} + + return { + (k.decode('utf-8') if isinstance(k, bytes) else k): float(v.decode('utf-8') if isinstance(v, bytes) else v) + for k, v in prices_raw.items() + } diff --git a/web/src/app/api/pricing/route.ts b/web/src/app/api/pricing/route.ts index 1aec75b..6532131 100644 --- a/web/src/app/api/pricing/route.ts +++ b/web/src/app/api/pricing/route.ts @@ -30,6 +30,8 @@ export async function GET(req: NextRequest) { const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001'; try { const queryParams = new URLSearchParams(); + // THIS is our entry point into the dynamic pricing where we reference the context of the sesion and experiment and ask for a price to assign to the trajectory which is expressed + // The whole pipeline gets triggered from here. if (sessionId) queryParams.append('sessionId', sessionId); if (experimentId) queryParams.append('experimentId', experimentId); @@ -55,25 +57,26 @@ export async function GET(req: NextRequest) { price = Math.round(randomBase * 100) / 100; } - // log price to kafka for elasticity computation + // log price to kafka asynchronously (non-blocking) if (sessionId) { const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000'; - try { - await fetch(`${backendUrl}/api/kafka/price-log`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - productId, - price, - sessionId, - experimentId: experimentId || undefined, - storeMode, - ts: timestamp, - }), - }); - } catch (err) { - console.error('[price-log-error]', err); - } + // fire and forget - don't await to avoid blocking response + fetch(`${backendUrl}/api/kafka/price-log`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + productId, + price, + sessionId, + experimentId: experimentId || undefined, + storeMode, + ts: timestamp, + }), + }).catch(err => { + if (process.env.NODE_ENV === 'development') { + console.error('[price-log-error]', err); + } + }); } if (process.env.NODE_ENV === 'development') {