mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
chore: refactor to better map end to end
This commit is contained in:
@@ -47,53 +47,52 @@ def health() -> dict:
|
|||||||
|
|
||||||
@app.get("/api/{mode}/price/{productId}", response_model=PriceResponse)
|
@app.get("/api/{mode}/price/{productId}", response_model=PriceResponse)
|
||||||
def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)):
|
def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)):
|
||||||
|
"""
|
||||||
|
THIS is the fast lookup service (mechanism).
|
||||||
|
Priority: session-keyed price > global optimal price > base price
|
||||||
|
"""
|
||||||
product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0]
|
product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0]
|
||||||
if not product: raise HTTPException(404, f"Product {productId} not found")
|
if not product: raise HTTPException(404, f"Product {productId} not found")
|
||||||
|
|
||||||
metadata = product['metadata']
|
metadata = product['metadata']
|
||||||
base_price = metadata.get('base_price', 100.0)
|
base_price = metadata.get('base_price', 100.0)
|
||||||
|
|
||||||
# fetch pre-computed prices from registry
|
# PRIORITY 1: session-aware price (computed by Airflow worker)
|
||||||
|
if sessionId:
|
||||||
|
session_price = registry.get_session_price(sessionId, productId)
|
||||||
|
if session_price is not None:
|
||||||
|
return PriceResponse(
|
||||||
|
productId=productId,
|
||||||
|
price=session_price,
|
||||||
|
base_price=base_price,
|
||||||
|
markup=session_price/base_price,
|
||||||
|
elasticity=None,
|
||||||
|
model_version='session-aware'
|
||||||
|
)
|
||||||
|
|
||||||
|
# PRIORITY 2: global pre-computed prices (surge pricing)
|
||||||
prices_df = registry.get_prices('latest')
|
prices_df = registry.get_prices('latest')
|
||||||
elasticity_df = registry.get_elasticity('latest')
|
if prices_df is not None:
|
||||||
|
|
||||||
if prices_df is None:
|
|
||||||
# fallback: no pre-computed prices available
|
|
||||||
return PriceResponse(
|
|
||||||
productId=productId,
|
|
||||||
price=base_price,
|
|
||||||
base_price=base_price,
|
|
||||||
markup=1.0,
|
|
||||||
elasticity=None
|
|
||||||
)
|
|
||||||
|
|
||||||
# lookup pre-computed price for this product
|
|
||||||
product_price_row = prices_df[prices_df['productId'] == productId]
|
product_price_row = prices_df[prices_df['productId'] == productId]
|
||||||
if product_price_row.empty:
|
if not product_price_row.empty:
|
||||||
# product not in pre-computed prices, fallback to base
|
optimal_price = float(product_price_row['optimal_price'].iloc[0])
|
||||||
return PriceResponse(
|
|
||||||
productId=productId,
|
|
||||||
price=base_price,
|
|
||||||
base_price=base_price,
|
|
||||||
markup=1.0,
|
|
||||||
elasticity=None
|
|
||||||
)
|
|
||||||
|
|
||||||
optimal_price = float(product_price_row['optimal_price'].iloc[0]) # TODO: use optimal_price everywhere as aresult
|
|
||||||
|
|
||||||
# get elasticity if available
|
|
||||||
product_elasticity = None
|
|
||||||
if elasticity_df is not None:
|
|
||||||
product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId]
|
|
||||||
if not product_elasticity_row.empty:
|
|
||||||
product_elasticity = float(product_elasticity_row['elasticity'].iloc[0])
|
|
||||||
|
|
||||||
return PriceResponse(
|
return PriceResponse(
|
||||||
productId=productId,
|
productId=productId,
|
||||||
price=optimal_price,
|
price=optimal_price,
|
||||||
base_price=base_price,
|
base_price=base_price,
|
||||||
markup=optimal_price/base_price,
|
markup=optimal_price/base_price,
|
||||||
elasticity=product_elasticity
|
elasticity=None,
|
||||||
|
model_version='surge'
|
||||||
|
)
|
||||||
|
|
||||||
|
# PRIORITY 3: fallback to base price
|
||||||
|
return PriceResponse(
|
||||||
|
productId=productId,
|
||||||
|
price=base_price,
|
||||||
|
base_price=base_price,
|
||||||
|
markup=1.0,
|
||||||
|
elasticity=None,
|
||||||
|
model_version='base'
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.get("/models")
|
@app.get("/models")
|
||||||
|
|||||||
@@ -3,6 +3,46 @@ import pandas as pd
|
|||||||
from procesing.pricers.base import PricingFunction
|
from procesing.pricers.base import PricingFunction
|
||||||
|
|
||||||
|
|
||||||
|
def session_features_to_demand(session_features: pd.DataFrame) -> float:
|
||||||
|
"""
|
||||||
|
Map session behavioral features to demand proxy.
|
||||||
|
THIS is the critical θ̂ → D transformation for rule-based pricing.
|
||||||
|
|
||||||
|
Logic:
|
||||||
|
- High velocity → agent behavior → price up (revenue recovery)
|
||||||
|
- High cart ratio → purchase intent → price up
|
||||||
|
- Low activity → discount to convert
|
||||||
|
|
||||||
|
Returns: demand proxy score (0-20 range, higher = more demand)
|
||||||
|
"""
|
||||||
|
if session_features.empty:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
feat = session_features.iloc[0] if len(session_features) > 0 else {}
|
||||||
|
|
||||||
|
velocity = feat.get('interaction_velocity', 0)
|
||||||
|
cart_ratio = feat.get('cart_to_view_ratio', 0)
|
||||||
|
item_views = feat.get('item_views', 0)
|
||||||
|
cart_adds = feat.get('cart_adds', 0)
|
||||||
|
|
||||||
|
# baseline demand
|
||||||
|
demand = 1.0
|
||||||
|
|
||||||
|
# agent detection: high velocity → treat as high "demand" to price up
|
||||||
|
if velocity > 2.0:
|
||||||
|
demand += 10.0 # strong agent signal
|
||||||
|
|
||||||
|
# conversion intent: cart interaction → price up
|
||||||
|
if cart_ratio > 0.1 or cart_adds > 0:
|
||||||
|
demand += 5.0
|
||||||
|
|
||||||
|
# browsing depth: many views → interest signal
|
||||||
|
if item_views > 3:
|
||||||
|
demand += min(item_views, 5.0)
|
||||||
|
|
||||||
|
return min(demand, 20.0) # cap at 20
|
||||||
|
|
||||||
|
|
||||||
class StaticPricer(PricingFunction):
|
class StaticPricer(PricingFunction):
|
||||||
"""Static pricing: always return fixed base prices"""
|
"""Static pricing: always return fixed base prices"""
|
||||||
|
|
||||||
@@ -70,18 +110,21 @@ class SimpleSurgePricer(PricingFunction):
|
|||||||
def fit(self, market_data: pd.DataFrame):
|
def fit(self, market_data: pd.DataFrame):
|
||||||
"""Extract base prices from product catalog or historical averages"""
|
"""Extract base prices from product catalog or historical averages"""
|
||||||
self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values
|
self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values
|
||||||
self.demand_history = market_data['demand'].to_numpy() if 'demand' in market_data.columns else np.zeros_like(self.base_prices)
|
return self
|
||||||
|
|
||||||
def predict(self) -> np.ndarray:
|
def predict(self, state_space) -> np.ndarray:
|
||||||
"""
|
"""
|
||||||
Adjust prices based on current demand using surge rules.
|
Adjust prices based on current demand using surge rules.
|
||||||
state_space.demand: demand counts per product
|
state_space.demand: demand proxy per product (from session features)
|
||||||
state_space.prices: current prices (fallback if base_prices not set)
|
state_space.prices: base prices
|
||||||
"""
|
"""
|
||||||
current_prices = self.base_prices if self.base_prices is not None else np.ones_like(demand_vector) * 99.99
|
demand = np.asarray(state_space.demand) if state_space and hasattr(state_space, 'demand') else np.array([0])
|
||||||
demand = self.demand_history if self.demand_history is not None else np.zeros_like(current_prices)
|
base = np.asarray(state_space.prices) if state_space and hasattr(state_space, 'prices') else self.base_prices
|
||||||
new_prices = current_prices.copy()
|
|
||||||
|
|
||||||
|
if base is None:
|
||||||
|
base = np.ones(len(demand)) * 99.99
|
||||||
|
|
||||||
|
new_prices = base.copy()
|
||||||
high_mask = demand >= self.high_threshold
|
high_mask = demand >= self.high_threshold
|
||||||
new_prices[high_mask] *= self.surge_multiplier
|
new_prices[high_mask] *= self.surge_multiplier
|
||||||
|
|
||||||
|
|||||||
@@ -135,6 +135,7 @@ class ExtractSessionFeaturesStep(BaseContextStep):
|
|||||||
Vectorized session feature extraction - replaces O(n^2) per-row loop.
|
Vectorized session feature extraction - replaces O(n^2) per-row loop.
|
||||||
Input: interactions_df
|
Input: interactions_df
|
||||||
Output: session-level feature matrix
|
Output: session-level feature matrix
|
||||||
|
THIS is our main mapping from tau (trajectory) to some features vector theta - we need to do this very well. This is what will go into demand esimation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
|||||||
@@ -178,3 +178,49 @@ class ModelRegistry:
|
|||||||
return True
|
return True
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def set_session_prices(self, session_id: str, prices: Dict[str, float], ttl: int = 1800):
|
||||||
|
"""
|
||||||
|
Store prices for a specific session.
|
||||||
|
THIS is the write path for session-aware pricing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id: session identifier
|
||||||
|
prices: dict of {productId: price}
|
||||||
|
ttl: time-to-live in seconds (default 30min)
|
||||||
|
"""
|
||||||
|
if not prices:
|
||||||
|
return
|
||||||
|
|
||||||
|
key = f"session:{session_id}:prices"
|
||||||
|
# use Redis hash for O(1) lookup per product
|
||||||
|
self.redis_client.hset(key, mapping={k: str(v) for k, v in prices.items()})
|
||||||
|
self.redis_client.expire(key, ttl)
|
||||||
|
|
||||||
|
def get_session_price(self, session_id: str, product_id: str) -> Optional[float]:
|
||||||
|
"""
|
||||||
|
Lookup price for (sessionId, productId).
|
||||||
|
THIS is the read path for fast provider lookup.
|
||||||
|
|
||||||
|
Returns: price or None if not found
|
||||||
|
"""
|
||||||
|
key = f"session:{session_id}:prices"
|
||||||
|
price_str = self.redis_client.hget(key, product_id)
|
||||||
|
|
||||||
|
if price_str is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return float(price_str.decode('utf-8') if isinstance(price_str, bytes) else price_str)
|
||||||
|
|
||||||
|
def get_session_all_prices(self, session_id: str) -> Dict[str, float]:
|
||||||
|
"""Get all prices for a session."""
|
||||||
|
key = f"session:{session_id}:prices"
|
||||||
|
prices_raw = self.redis_client.hgetall(key)
|
||||||
|
|
||||||
|
if not prices_raw:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
return {
|
||||||
|
(k.decode('utf-8') if isinstance(k, bytes) else k): float(v.decode('utf-8') if isinstance(v, bytes) else v)
|
||||||
|
for k, v in prices_raw.items()
|
||||||
|
}
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ export async function GET(req: NextRequest) {
|
|||||||
const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001';
|
const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001';
|
||||||
try {
|
try {
|
||||||
const queryParams = new URLSearchParams();
|
const queryParams = new URLSearchParams();
|
||||||
|
// THIS is our entry point into the dynamic pricing where we reference the context of the sesion and experiment and ask for a price to assign to the trajectory which is expressed
|
||||||
|
// The whole pipeline gets triggered from here.
|
||||||
if (sessionId) queryParams.append('sessionId', sessionId);
|
if (sessionId) queryParams.append('sessionId', sessionId);
|
||||||
if (experimentId) queryParams.append('experimentId', experimentId);
|
if (experimentId) queryParams.append('experimentId', experimentId);
|
||||||
|
|
||||||
@@ -55,11 +57,11 @@ export async function GET(req: NextRequest) {
|
|||||||
price = Math.round(randomBase * 100) / 100;
|
price = Math.round(randomBase * 100) / 100;
|
||||||
}
|
}
|
||||||
|
|
||||||
// log price to kafka for elasticity computation
|
// log price to kafka asynchronously (non-blocking)
|
||||||
if (sessionId) {
|
if (sessionId) {
|
||||||
const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000';
|
const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000';
|
||||||
try {
|
// fire and forget - don't await to avoid blocking response
|
||||||
await fetch(`${backendUrl}/api/kafka/price-log`, {
|
fetch(`${backendUrl}/api/kafka/price-log`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
body: JSON.stringify({
|
body: JSON.stringify({
|
||||||
@@ -70,10 +72,11 @@ export async function GET(req: NextRequest) {
|
|||||||
storeMode,
|
storeMode,
|
||||||
ts: timestamp,
|
ts: timestamp,
|
||||||
}),
|
}),
|
||||||
});
|
}).catch(err => {
|
||||||
} catch (err) {
|
if (process.env.NODE_ENV === 'development') {
|
||||||
console.error('[price-log-error]', err);
|
console.error('[price-log-error]', err);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (process.env.NODE_ENV === 'development') {
|
if (process.env.NODE_ENV === 'development') {
|
||||||
|
|||||||
Reference in New Issue
Block a user