chore: refactor to better map end to end

This commit is contained in:
2026-01-12 10:09:55 +01:00
parent f9bf3de71e
commit 8b429b7a8e
5 changed files with 153 additions and 61 deletions

View File

@@ -47,53 +47,52 @@ def health() -> dict:
@app.get("/api/{mode}/price/{productId}", response_model=PriceResponse) @app.get("/api/{mode}/price/{productId}", response_model=PriceResponse)
def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)): def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)):
"""
THIS is the fast lookup service (mechanism).
Priority: session-keyed price > global optimal price > base price
"""
product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0] product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0]
if not product: raise HTTPException(404, f"Product {productId} not found") if not product: raise HTTPException(404, f"Product {productId} not found")
metadata = product['metadata'] metadata = product['metadata']
base_price = metadata.get('base_price', 100.0) base_price = metadata.get('base_price', 100.0)
# fetch pre-computed prices from registry # PRIORITY 1: session-aware price (computed by Airflow worker)
if sessionId:
session_price = registry.get_session_price(sessionId, productId)
if session_price is not None:
return PriceResponse(
productId=productId,
price=session_price,
base_price=base_price,
markup=session_price/base_price,
elasticity=None,
model_version='session-aware'
)
# PRIORITY 2: global pre-computed prices (surge pricing)
prices_df = registry.get_prices('latest') prices_df = registry.get_prices('latest')
elasticity_df = registry.get_elasticity('latest') if prices_df is not None:
if prices_df is None:
# fallback: no pre-computed prices available
return PriceResponse(
productId=productId,
price=base_price,
base_price=base_price,
markup=1.0,
elasticity=None
)
# lookup pre-computed price for this product
product_price_row = prices_df[prices_df['productId'] == productId] product_price_row = prices_df[prices_df['productId'] == productId]
if product_price_row.empty: if not product_price_row.empty:
# product not in pre-computed prices, fallback to base optimal_price = float(product_price_row['optimal_price'].iloc[0])
return PriceResponse(
productId=productId,
price=base_price,
base_price=base_price,
markup=1.0,
elasticity=None
)
optimal_price = float(product_price_row['optimal_price'].iloc[0]) # TODO: use optimal_price everywhere as aresult
# get elasticity if available
product_elasticity = None
if elasticity_df is not None:
product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId]
if not product_elasticity_row.empty:
product_elasticity = float(product_elasticity_row['elasticity'].iloc[0])
return PriceResponse( return PriceResponse(
productId=productId, productId=productId,
price=optimal_price, price=optimal_price,
base_price=base_price, base_price=base_price,
markup=optimal_price/base_price, markup=optimal_price/base_price,
elasticity=product_elasticity elasticity=None,
model_version='surge'
)
# PRIORITY 3: fallback to base price
return PriceResponse(
productId=productId,
price=base_price,
base_price=base_price,
markup=1.0,
elasticity=None,
model_version='base'
) )
@app.get("/models") @app.get("/models")

View File

@@ -3,6 +3,46 @@ import pandas as pd
from procesing.pricers.base import PricingFunction from procesing.pricers.base import PricingFunction
def session_features_to_demand(session_features: pd.DataFrame) -> float:
"""
Map session behavioral features to demand proxy.
THIS is the critical θ̂ → D transformation for rule-based pricing.
Logic:
- High velocity → agent behavior → price up (revenue recovery)
- High cart ratio → purchase intent → price up
- Low activity → discount to convert
Returns: demand proxy score (0-20 range, higher = more demand)
"""
if session_features.empty:
return 1.0
feat = session_features.iloc[0] if len(session_features) > 0 else {}
velocity = feat.get('interaction_velocity', 0)
cart_ratio = feat.get('cart_to_view_ratio', 0)
item_views = feat.get('item_views', 0)
cart_adds = feat.get('cart_adds', 0)
# baseline demand
demand = 1.0
# agent detection: high velocity → treat as high "demand" to price up
if velocity > 2.0:
demand += 10.0 # strong agent signal
# conversion intent: cart interaction → price up
if cart_ratio > 0.1 or cart_adds > 0:
demand += 5.0
# browsing depth: many views → interest signal
if item_views > 3:
demand += min(item_views, 5.0)
return min(demand, 20.0) # cap at 20
class StaticPricer(PricingFunction): class StaticPricer(PricingFunction):
"""Static pricing: always return fixed base prices""" """Static pricing: always return fixed base prices"""
@@ -67,21 +107,24 @@ class SimpleSurgePricer(PricingFunction):
self.surge_multiplier = surge_multiplier self.surge_multiplier = surge_multiplier
self.discount_multiplier = discount_multiplier self.discount_multiplier = discount_multiplier
def fit(self, market_data : pd.DataFrame): def fit(self, market_data: pd.DataFrame):
"""Extract base prices from product catalog or historical averages""" """Extract base prices from product catalog or historical averages"""
self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values
self.demand_history = market_data['demand'].to_numpy() if 'demand' in market_data.columns else np.zeros_like(self.base_prices) return self
def predict(self) -> np.ndarray: def predict(self, state_space) -> np.ndarray:
""" """
Adjust prices based on current demand using surge rules. Adjust prices based on current demand using surge rules.
state_space.demand: demand counts per product state_space.demand: demand proxy per product (from session features)
state_space.prices: current prices (fallback if base_prices not set) state_space.prices: base prices
""" """
current_prices = self.base_prices if self.base_prices is not None else np.ones_like(demand_vector) * 99.99 demand = np.asarray(state_space.demand) if state_space and hasattr(state_space, 'demand') else np.array([0])
demand = self.demand_history if self.demand_history is not None else np.zeros_like(current_prices) base = np.asarray(state_space.prices) if state_space and hasattr(state_space, 'prices') else self.base_prices
new_prices = current_prices.copy()
if base is None:
base = np.ones(len(demand)) * 99.99
new_prices = base.copy()
high_mask = demand >= self.high_threshold high_mask = demand >= self.high_threshold
new_prices[high_mask] *= self.surge_multiplier new_prices[high_mask] *= self.surge_multiplier

View File

@@ -135,6 +135,7 @@ class ExtractSessionFeaturesStep(BaseContextStep):
Vectorized session feature extraction - replaces O(n^2) per-row loop. Vectorized session feature extraction - replaces O(n^2) per-row loop.
Input: interactions_df Input: interactions_df
Output: session-level feature matrix Output: session-level feature matrix
THIS is our main mapping from tau (trajectory) to some features vector theta - we need to do this very well. This is what will go into demand esimation.
""" """
def transform(self, X: pd.DataFrame) -> pd.DataFrame: def transform(self, X: pd.DataFrame) -> pd.DataFrame:

View File

@@ -178,3 +178,49 @@ class ModelRegistry:
return True return True
except: except:
return False return False
def set_session_prices(self, session_id: str, prices: Dict[str, float], ttl: int = 1800):
"""
Store prices for a specific session.
THIS is the write path for session-aware pricing.
Args:
session_id: session identifier
prices: dict of {productId: price}
ttl: time-to-live in seconds (default 30min)
"""
if not prices:
return
key = f"session:{session_id}:prices"
# use Redis hash for O(1) lookup per product
self.redis_client.hset(key, mapping={k: str(v) for k, v in prices.items()})
self.redis_client.expire(key, ttl)
def get_session_price(self, session_id: str, product_id: str) -> Optional[float]:
"""
Lookup price for (sessionId, productId).
THIS is the read path for fast provider lookup.
Returns: price or None if not found
"""
key = f"session:{session_id}:prices"
price_str = self.redis_client.hget(key, product_id)
if price_str is None:
return None
return float(price_str.decode('utf-8') if isinstance(price_str, bytes) else price_str)
def get_session_all_prices(self, session_id: str) -> Dict[str, float]:
"""Get all prices for a session."""
key = f"session:{session_id}:prices"
prices_raw = self.redis_client.hgetall(key)
if not prices_raw:
return {}
return {
(k.decode('utf-8') if isinstance(k, bytes) else k): float(v.decode('utf-8') if isinstance(v, bytes) else v)
for k, v in prices_raw.items()
}

View File

@@ -30,6 +30,8 @@ export async function GET(req: NextRequest) {
const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001'; const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001';
try { try {
const queryParams = new URLSearchParams(); const queryParams = new URLSearchParams();
// THIS is our entry point into the dynamic pricing where we reference the context of the sesion and experiment and ask for a price to assign to the trajectory which is expressed
// The whole pipeline gets triggered from here.
if (sessionId) queryParams.append('sessionId', sessionId); if (sessionId) queryParams.append('sessionId', sessionId);
if (experimentId) queryParams.append('experimentId', experimentId); if (experimentId) queryParams.append('experimentId', experimentId);
@@ -55,11 +57,11 @@ export async function GET(req: NextRequest) {
price = Math.round(randomBase * 100) / 100; price = Math.round(randomBase * 100) / 100;
} }
// log price to kafka for elasticity computation // log price to kafka asynchronously (non-blocking)
if (sessionId) { if (sessionId) {
const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000'; const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000';
try { // fire and forget - don't await to avoid blocking response
await fetch(`${backendUrl}/api/kafka/price-log`, { fetch(`${backendUrl}/api/kafka/price-log`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: JSON.stringify({
@@ -70,10 +72,11 @@ export async function GET(req: NextRequest) {
storeMode, storeMode,
ts: timestamp, ts: timestamp,
}), }),
}); }).catch(err => {
} catch (err) { if (process.env.NODE_ENV === 'development') {
console.error('[price-log-error]', err); console.error('[price-log-error]', err);
} }
});
} }
if (process.env.NODE_ENV === 'development') { if (process.env.NODE_ENV === 'development') {