chore: redefined and connected pricers

This commit is contained in:
2025-11-29 17:40:05 +01:00
parent dd33f83e10
commit d0a5748ca1
6 changed files with 212 additions and 40 deletions

View File

@@ -49,4 +49,8 @@ install: $(VENV)
test: $(VENV) test: $(VENV)
$(PYTEST) -v $(PYTEST) -v
count-lines:
@find . \( -path '*/node_modules' -o -path '*/.venv' -o -path '*/venv' \) -prune -o \
\( -name "*.ts" -o -name "*.py" \) -type f -print0 | xargs -0 cat | wc -l
.PHONY: all pdf clean watch run.webapp install test .PHONY: all pdf clean watch run.webapp install test

View File

@@ -57,8 +57,9 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti
def __init__(self, backend_url: str): def __init__(self, backend_url: str):
SupabaseProvider.__init__(self) SupabaseProvider.__init__(self)
BackendAPIProvider.__init__(self, backend_url=backend_url) BackendAPIProvider.__init__(self, backend_url=backend_url)
context = PipelineContext( context = PipelineContext(
provider=Provider(backend_url=os.getenv("BACKEND_API_URL")), provider=Provider(backend_url=os.getenv("BACKEND_URL")),
store_mode=mode store_mode=mode
) )
@@ -66,7 +67,6 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti
elasticity_df = registry.get_elasticity('latest') elasticity_df = registry.get_elasticity('latest')
if pricing_model is None or elasticity_df is None: if pricing_model is None or elasticity_df is None:
# fallback to base price if no model available
return PriceResponse( return PriceResponse(
productId=productId, productId=productId,
price=base_price, price=base_price,
@@ -75,7 +75,6 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti
elasticity=None elasticity=None
) )
# build full state space for all products in catalog
products = context.products products = context.products
if products.empty: if products.empty:
raise HTTPException(500, "No products available in catalog") raise HTTPException(500, "No products available in catalog")
@@ -94,28 +93,66 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti
how='left' how='left'
).fillna({'elasticity': 0.0}) ).fillna({'elasticity': 0.0})
# use fitted pricer's mean_demand if available, else default to 10.0 # compute demand: use pricer's mean_demand if available, else default
demand_values = (pricing_model.mean_demand demand_values = (pricing_model.mean_demand
if hasattr(pricing_model, 'mean_demand') and pricing_model.mean_demand is not None if hasattr(pricing_model, 'mean_demand') and pricing_model.mean_demand is not None
else np.ones(len(merged)) * 10.0) else np.ones(len(merged)) * 10.0)
# build state space with session features if sessionId provided
session_features = pd.DataFrame()
if sessionId:
try:
# fetch recent session interactions from backend
from procesing.steps.session import ExtractSessionFeaturesStep
import requests
from datetime import datetime, timedelta
t_end = datetime.utcnow()
t_start = t_end - timedelta(hours=1)
backend_url = os.getenv("BACKEND_URL")
print(backend_url)
resp = requests.get(
f"{os.getenv('BACKEND_URL')}/api/kafka/dump", # TODO: THIS IS SHIT, must fix this
params={'topic': 'user-interactions', 't_start': t_start.isoformat(), 't_end': t_end.isoformat()},
timeout=2
)
if resp.ok:
msgs = resp.json().get('messages', [])
interactions_df = pd.DataFrame(msgs)
if not interactions_df.empty and 'sessionId' in interactions_df.columns:
session_interactions = interactions_df[interactions_df['sessionId'] == sessionId]
if not session_interactions.empty:
extractor = ExtractSessionFeaturesStep(context=context)
session_features_df = extractor.transform(session_interactions)
if not session_features_df.empty:
session_features = session_features_df.drop(columns=['sessionId'])
except Exception as e:
print(f"[session-features-error] {e}")
# continue without session features
state = StateSpace( state = StateSpace(
demand=demand_values, demand=demand_values,
prices=merged['base_price'].values, prices=merged['base_price'].values,
session_features=pd.DataFrame() session_features=session_features,
product_ids=merged['productId'].values,
elasticity=merged['elasticity'].values,
metadata={'sessionId': sessionId, 'experimentId': experimentId}
) )
oracle = PredictPricesStep(context=context) oracle = PredictPricesStep(context=context)
prices_df = oracle.transform((pricing_model, state)) prices_df = oracle.transform((pricing_model, state))
# extract price for requested product
product_price_row = prices_df[prices_df['productId'] == productId] product_price_row = prices_df[prices_df['productId'] == productId]
if product_price_row.empty: if product_price_row.empty:
raise HTTPException(404, f"No pricing available for product {productId}") raise HTTPException(404, f"No pricing available for product {productId}")
optimal_price = float(product_price_row['predicted_price'].iloc[0]) optimal_price = float(product_price_row['predicted_price'].iloc[0])
# extract elasticity if available
product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId]
product_elasticity = (float(product_elasticity_row['elasticity'].iloc[0]) product_elasticity = (float(product_elasticity_row['elasticity'].iloc[0])
if not product_elasticity_row.empty else None) if not product_elasticity_row.empty else None)

View File

@@ -1,10 +1,13 @@
from procesing.pricers.base import PricingFunction from procesing.pricers.base import PricingFunction
from procesing.pricers.elasticity import ElasticityBasedPricer from procesing.pricers.elasticity import ElasticityBasedPricer
from procesing.pricers.simple import StaticPricer, RandomPricer from procesing.pricers.simple import StaticPricer, RandomPricer
from procesing.pricers.session_aware import SessionAwarePricer, ProductSpecificSessionPricer
__all__ = [ __all__ = [
'PricingFunction', 'PricingFunction',
'ElasticityBasedPricer', 'ElasticityBasedPricer',
'StaticPricer', 'StaticPricer',
'RandomPricer' 'RandomPricer',
'SessionAwarePricer',
'ProductSpecificSessionPricer'
] ]

View File

@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@@ -6,23 +7,64 @@ import pandas as pd
class PricingFunction(ABC): class PricingFunction(ABC):
""" """
Abstract base for pricing functions. Abstract base for pricing functions.
Defines the mapping f: StateSpace -> prices
Defines mapping: f(Q_t, P_t, S_t, H_t) -> P_{t+1}
Where:
Q_t ∈ R^n: demand vector at time t
P_t ∈ R^n: price vector at time t
S_t: session features (behavioral signals, interactions)
H_t = {Q_{t-k}, P_{t-k}, S_{t-k}}: historical state trajectory
Objective:
maximize E[R_T] = E[Σ P_t^T · Q_t]
subject to:
Q_t = g(P_t, S_t) (demand response via elasticity)
P_t ≥ C (cost floor)
minimize L_agent = R_oracle - R_observed
""" """
@abstractmethod @abstractmethod
def fit(self, historical_data: pd.DataFrame): def fit(self, historical_data: pd.DataFrame, **kwargs):
"""Train/calibrate the pricing function on historical data""" """
Offline training on historical data.
Args:
historical_data: DataFrame with elasticity, prices, demand signals
**kwargs: additional training parameters
"""
pass pass
@abstractmethod @abstractmethod
def predict(self, state_space) -> np.ndarray: def predict(self, state_space) -> np.ndarray:
""" """
Generate prices given current state space. Generate optimal prices given current state.
Args: Args:
state_space: StateSpace object containing demand, prices, session features state_space: StateSpace object containing Q_t, P_t, S_t, H_t
Returns: Returns:
prices: price vector P_{t+1} in R^n P_{t+1}: price vector in R^n
""" """
pass pass
def update(self, observation: Dict[str, Any]):
"""
Online learning update (optional).
Args:
observation: dict with {state, action, reward, next_state}
- state: StateSpace before pricing decision
- action: prices shown (P_t)
- reward: revenue/conversion signal
- next_state: StateSpace after user interaction
"""
pass # default: no online learning
def get_params(self) -> Dict[str, Any]:
"""Return pricing function parameters for serialization."""
return {}
def set_params(self, params: Dict[str, Any]):
"""Load pricing function parameters from dict."""
pass

View File

@@ -1,30 +1,77 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from procesing.steps.base import BaseContextStep from procesing.steps.base import BaseContextStep
from procesing.pricers import ElasticityBasedPricer from procesing.pricers import ElasticityBasedPricer
@dataclass
class StateSpace: class StateSpace:
"""State representation for pricing functions""" """
def __init__(self, State representation for pricing functions.
demand: np.ndarray,
prices: np.ndarray, Components:
session_features: pd.DataFrame = None): Q_t: demand ∈ R^n (current demand signal per product)
self.demand = demand P_t: prices ∈ R^n (current/base prices)
self.prices = prices S_t: session_features (behavioral signals, interaction data)
self.session_features = session_features if session_features is not None else pd.DataFrame() H_t: history = {Q_{t-k}, P_{t-k}, S_{t-k}} for k in [1, history_length]
Additionally stores:
- product_ids: product identifiers (n,)
- elasticity: price elasticity per product (n,)
- metadata: arbitrary context (experiment_id, timestamp, etc.)
"""
demand: np.ndarray # Q_t ∈ R^n
prices: np.ndarray # P_t ∈ R^n
session_features: pd.DataFrame = field(default_factory=pd.DataFrame) # S_t
# augmented state components
product_ids: Optional[np.ndarray] = None
elasticity: Optional[np.ndarray] = None
# historical trajectory H_t = {(Q_{t-k}, P_{t-k}, S_{t-k})}
history: List[Dict[str, Any]] = field(default_factory=list)
# metadata for context
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Validate dimensions."""
n = len(self.demand)
assert len(self.prices) == n, "demand and prices must have same dimension"
if self.elasticity is not None:
assert len(self.elasticity) == n, "elasticity must match dimension"
if self.product_ids is not None:
assert len(self.product_ids) == n, "product_ids must match dimension"
@property
def n_products(self) -> int:
"""Number of products in state space."""
return len(self.demand)
def add_history(self, q: np.ndarray, p: np.ndarray, s: pd.DataFrame, max_length: int = 10):
"""Append historical state to trajectory H_t."""
self.history.append({'demand': q, 'prices': p, 'session_features': s})
if len(self.history) > max_length:
self.history.pop(0)
def get_history_window(self, k: int = 5) -> List[Dict[str, Any]]:
"""Retrieve last k historical states."""
return self.history[-k:] if len(self.history) >= k else self.history
class BuildStateSpaceStep(BaseContextStep): class BuildStateSpaceStep(BaseContextStep):
""" """
Build state space from elasticity and price data. Build state space from elasticity, demand, and price data.
Input: elasticity_df
Output: StateSpace instance Input: elasticity_df [productId, elasticity, ...], optional demand_df
Output: StateSpace instance with Q_t, P_t, elasticity, product_ids
""" """
def transform(self, elasticity_df: pd.DataFrame): def transform(self, elasticity_df: pd.DataFrame, demand_df: Optional[pd.DataFrame] = None):
products = self.context.products products = self.context.products
# fetch current/base prices from product metadata # extract base prices from product metadata
products_with_prices = products.copy() products_with_prices = products.copy()
if 'metadata' in products_with_prices.columns: if 'metadata' in products_with_prices.columns:
products_with_prices['base_price'] = products_with_prices['metadata'].apply( products_with_prices['base_price'] = products_with_prices['metadata'].apply(
@@ -42,10 +89,25 @@ class BuildStateSpaceStep(BaseContextStep):
how='left' how='left'
).fillna({'elasticity': 0.0, 'base_price': 0.0}) ).fillna({'elasticity': 0.0, 'base_price': 0.0})
# merge with demand if provided, else use default
if demand_df is not None and 'demand' in demand_df.columns:
merged = merged.merge(
demand_df[['productId', 'demand']],
on='productId',
how='left'
).fillna({'demand': 0.0})
demand_vector = merged['demand'].values
else:
# default: uniform demand or use elasticity as proxy
demand_vector = np.ones(len(merged)) * 10.0
return StateSpace( return StateSpace(
demand=merged['elasticity'].values, demand=demand_vector,
prices=merged['base_price'].values, prices=merged['base_price'].values,
session_features=pd.DataFrame() session_features=pd.DataFrame(),
product_ids=merged['productId'].values,
elasticity=merged['elasticity'].values,
metadata={'timestamp': pd.Timestamp.now().isoformat()}
) )

View File

@@ -20,10 +20,40 @@ export async function GET(req: NextRequest) {
); );
} }
// stub: call external pricing provider (random for now)
const basePrice = 100 + Math.random() * 900; // 100-1000 range
const price = Math.round(basePrice * 100) / 100;
const timestamp = new Date().toISOString(); const timestamp = new Date().toISOString();
let price: number;
let basePrice: number | undefined;
let markup: number | undefined;
let elasticity: number | undefined;
// call real pricing provider
const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001';
try {
const queryParams = new URLSearchParams();
if (sessionId) queryParams.append('sessionId', sessionId);
if (experimentId) queryParams.append('experimentId', experimentId);
const providerResponse = await fetch(
`${providerUrl}/api/${storeMode}/price/${productId}?${queryParams.toString()}`,
{ headers: { 'Accept': 'application/json' }, cache: 'no-store' }
);
if (!providerResponse.ok) {
throw new Error(`Provider returned ${providerResponse.status}`);
}
const providerData = await providerResponse.json();
price = providerData.price;
basePrice = providerData.base_price;
markup = providerData.markup;
elasticity = providerData.elasticity;
} catch (err) {
console.error('[pricing-provider-error]', err);
// fallback to random pricing if provider unavailable
const randomBase = 100 + Math.random() * 900;
price = Math.round(randomBase * 100) / 100;
}
// log price to kafka for elasticity computation // log price to kafka for elasticity computation
if (sessionId) { if (sessionId) {
@@ -43,19 +73,13 @@ export async function GET(req: NextRequest) {
}); });
} catch (err) { } catch (err) {
console.error('[price-log-error]', err); console.error('[price-log-error]', err);
// don't fail the pricing request if logging fails
} }
} }
// log in dev
if (process.env.NODE_ENV === 'development') { if (process.env.NODE_ENV === 'development') {
console.log('[pricing-api]', { console.log('[pricing-api]', {
productId, productId, sessionId, experimentId, storeMode,
sessionId, price, basePrice, markup, elasticity, timestamp,
experimentId,
storeMode,
price,
timestamp,
}); });
} }