diff --git a/Makefile b/Makefile index d9eaac5..d4fd961 100644 --- a/Makefile +++ b/Makefile @@ -49,4 +49,8 @@ install: $(VENV) test: $(VENV) $(PYTEST) -v +count-lines: + @find . \( -path '*/node_modules' -o -path '*/.venv' -o -path '*/venv' \) -prune -o \ + \( -name "*.ts" -o -name "*.py" \) -type f -print0 | xargs -0 cat | wc -l + .PHONY: all pdf clean watch run.webapp install test diff --git a/backend/provider/app.py b/backend/provider/app.py index d2752ac..be7d0e7 100644 --- a/backend/provider/app.py +++ b/backend/provider/app.py @@ -57,8 +57,9 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti def __init__(self, backend_url: str): SupabaseProvider.__init__(self) BackendAPIProvider.__init__(self, backend_url=backend_url) + context = PipelineContext( - provider=Provider(backend_url=os.getenv("BACKEND_API_URL")), + provider=Provider(backend_url=os.getenv("BACKEND_URL")), store_mode=mode ) @@ -66,7 +67,6 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti elasticity_df = registry.get_elasticity('latest') if pricing_model is None or elasticity_df is None: - # fallback to base price if no model available return PriceResponse( productId=productId, price=base_price, @@ -75,7 +75,6 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti elasticity=None ) - # build full state space for all products in catalog products = context.products if products.empty: raise HTTPException(500, "No products available in catalog") @@ -94,28 +93,66 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti how='left' ).fillna({'elasticity': 0.0}) - # use fitted pricer's mean_demand if available, else default to 10.0 + # compute demand: use pricer's mean_demand if available, else default demand_values = (pricing_model.mean_demand if hasattr(pricing_model, 'mean_demand') and pricing_model.mean_demand is not None else np.ones(len(merged)) * 10.0) + # build state space with session features if sessionId provided + session_features = pd.DataFrame() + if sessionId: + try: + # fetch recent session interactions from backend + from procesing.steps.session import ExtractSessionFeaturesStep + import requests + from datetime import datetime, timedelta + + t_end = datetime.utcnow() + t_start = t_end - timedelta(hours=1) + backend_url = os.getenv("BACKEND_URL") + print(backend_url) + + resp = requests.get( + f"{os.getenv('BACKEND_URL')}/api/kafka/dump", # TODO: THIS IS SHIT, must fix this + params={'topic': 'user-interactions', 't_start': t_start.isoformat(), 't_end': t_end.isoformat()}, + timeout=2 + ) + + if resp.ok: + msgs = resp.json().get('messages', []) + interactions_df = pd.DataFrame(msgs) + + if not interactions_df.empty and 'sessionId' in interactions_df.columns: + session_interactions = interactions_df[interactions_df['sessionId'] == sessionId] + + if not session_interactions.empty: + extractor = ExtractSessionFeaturesStep(context=context) + session_features_df = extractor.transform(session_interactions) + + if not session_features_df.empty: + session_features = session_features_df.drop(columns=['sessionId']) + except Exception as e: + print(f"[session-features-error] {e}") + # continue without session features + state = StateSpace( demand=demand_values, prices=merged['base_price'].values, - session_features=pd.DataFrame() + session_features=session_features, + product_ids=merged['productId'].values, + elasticity=merged['elasticity'].values, + metadata={'sessionId': sessionId, 'experimentId': experimentId} ) oracle = PredictPricesStep(context=context) prices_df = oracle.transform((pricing_model, state)) - # extract price for requested product product_price_row = prices_df[prices_df['productId'] == productId] if product_price_row.empty: raise HTTPException(404, f"No pricing available for product {productId}") optimal_price = float(product_price_row['predicted_price'].iloc[0]) - # extract elasticity if available product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] product_elasticity = (float(product_elasticity_row['elasticity'].iloc[0]) if not product_elasticity_row.empty else None) diff --git a/experiments/procesing/pricers/__init__.py b/experiments/procesing/pricers/__init__.py index 1a82b17..a812d8f 100644 --- a/experiments/procesing/pricers/__init__.py +++ b/experiments/procesing/pricers/__init__.py @@ -1,10 +1,13 @@ from procesing.pricers.base import PricingFunction from procesing.pricers.elasticity import ElasticityBasedPricer from procesing.pricers.simple import StaticPricer, RandomPricer +from procesing.pricers.session_aware import SessionAwarePricer, ProductSpecificSessionPricer __all__ = [ 'PricingFunction', 'ElasticityBasedPricer', 'StaticPricer', - 'RandomPricer' + 'RandomPricer', + 'SessionAwarePricer', + 'ProductSpecificSessionPricer' ] diff --git a/experiments/procesing/pricers/base.py b/experiments/procesing/pricers/base.py index 081b7a5..b2ada0c 100644 --- a/experiments/procesing/pricers/base.py +++ b/experiments/procesing/pricers/base.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List import numpy as np import pandas as pd @@ -6,23 +7,64 @@ import pandas as pd class PricingFunction(ABC): """ Abstract base for pricing functions. - Defines the mapping f: StateSpace -> prices + + Defines mapping: f(Q_t, P_t, S_t, H_t) -> P_{t+1} + + Where: + Q_t ∈ R^n: demand vector at time t + P_t ∈ R^n: price vector at time t + S_t: session features (behavioral signals, interactions) + H_t = {Q_{t-k}, P_{t-k}, S_{t-k}}: historical state trajectory + + Objective: + maximize E[R_T] = E[Σ P_t^T · Q_t] + subject to: + Q_t = g(P_t, S_t) (demand response via elasticity) + P_t ≥ C (cost floor) + minimize L_agent = R_oracle - R_observed """ @abstractmethod - def fit(self, historical_data: pd.DataFrame): - """Train/calibrate the pricing function on historical data""" + def fit(self, historical_data: pd.DataFrame, **kwargs): + """ + Offline training on historical data. + + Args: + historical_data: DataFrame with elasticity, prices, demand signals + **kwargs: additional training parameters + """ pass @abstractmethod def predict(self, state_space) -> np.ndarray: """ - Generate prices given current state space. + Generate optimal prices given current state. Args: - state_space: StateSpace object containing demand, prices, session features + state_space: StateSpace object containing Q_t, P_t, S_t, H_t Returns: - prices: price vector P_{t+1} in R^n + P_{t+1}: price vector in R^n """ pass + + def update(self, observation: Dict[str, Any]): + """ + Online learning update (optional). + + Args: + observation: dict with {state, action, reward, next_state} + - state: StateSpace before pricing decision + - action: prices shown (P_t) + - reward: revenue/conversion signal + - next_state: StateSpace after user interaction + """ + pass # default: no online learning + + def get_params(self) -> Dict[str, Any]: + """Return pricing function parameters for serialization.""" + return {} + + def set_params(self, params: Dict[str, Any]): + """Load pricing function parameters from dict.""" + pass diff --git a/experiments/procesing/steps/pricing.py b/experiments/procesing/steps/pricing.py index f8a8eb3..c99f83e 100755 --- a/experiments/procesing/steps/pricing.py +++ b/experiments/procesing/steps/pricing.py @@ -1,30 +1,77 @@ import numpy as np import pandas as pd +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field from procesing.steps.base import BaseContextStep from procesing.pricers import ElasticityBasedPricer +@dataclass class StateSpace: - """State representation for pricing functions""" - def __init__(self, - demand: np.ndarray, - prices: np.ndarray, - session_features: pd.DataFrame = None): - self.demand = demand - self.prices = prices - self.session_features = session_features if session_features is not None else pd.DataFrame() + """ + State representation for pricing functions. + + Components: + Q_t: demand ∈ R^n (current demand signal per product) + P_t: prices ∈ R^n (current/base prices) + S_t: session_features (behavioral signals, interaction data) + H_t: history = {Q_{t-k}, P_{t-k}, S_{t-k}} for k in [1, history_length] + + Additionally stores: + - product_ids: product identifiers (n,) + - elasticity: price elasticity per product (n,) + - metadata: arbitrary context (experiment_id, timestamp, etc.) + """ + demand: np.ndarray # Q_t ∈ R^n + prices: np.ndarray # P_t ∈ R^n + session_features: pd.DataFrame = field(default_factory=pd.DataFrame) # S_t + + # augmented state components + product_ids: Optional[np.ndarray] = None + elasticity: Optional[np.ndarray] = None + + # historical trajectory H_t = {(Q_{t-k}, P_{t-k}, S_{t-k})} + history: List[Dict[str, Any]] = field(default_factory=list) + + # metadata for context + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """Validate dimensions.""" + n = len(self.demand) + assert len(self.prices) == n, "demand and prices must have same dimension" + if self.elasticity is not None: + assert len(self.elasticity) == n, "elasticity must match dimension" + if self.product_ids is not None: + assert len(self.product_ids) == n, "product_ids must match dimension" + + @property + def n_products(self) -> int: + """Number of products in state space.""" + return len(self.demand) + + def add_history(self, q: np.ndarray, p: np.ndarray, s: pd.DataFrame, max_length: int = 10): + """Append historical state to trajectory H_t.""" + self.history.append({'demand': q, 'prices': p, 'session_features': s}) + if len(self.history) > max_length: + self.history.pop(0) + + def get_history_window(self, k: int = 5) -> List[Dict[str, Any]]: + """Retrieve last k historical states.""" + return self.history[-k:] if len(self.history) >= k else self.history class BuildStateSpaceStep(BaseContextStep): """ - Build state space from elasticity and price data. - Input: elasticity_df - Output: StateSpace instance + Build state space from elasticity, demand, and price data. + + Input: elasticity_df [productId, elasticity, ...], optional demand_df + Output: StateSpace instance with Q_t, P_t, elasticity, product_ids """ - def transform(self, elasticity_df: pd.DataFrame): + def transform(self, elasticity_df: pd.DataFrame, demand_df: Optional[pd.DataFrame] = None): products = self.context.products - # fetch current/base prices from product metadata + # extract base prices from product metadata products_with_prices = products.copy() if 'metadata' in products_with_prices.columns: products_with_prices['base_price'] = products_with_prices['metadata'].apply( @@ -42,10 +89,25 @@ class BuildStateSpaceStep(BaseContextStep): how='left' ).fillna({'elasticity': 0.0, 'base_price': 0.0}) + # merge with demand if provided, else use default + if demand_df is not None and 'demand' in demand_df.columns: + merged = merged.merge( + demand_df[['productId', 'demand']], + on='productId', + how='left' + ).fillna({'demand': 0.0}) + demand_vector = merged['demand'].values + else: + # default: uniform demand or use elasticity as proxy + demand_vector = np.ones(len(merged)) * 10.0 + return StateSpace( - demand=merged['elasticity'].values, + demand=demand_vector, prices=merged['base_price'].values, - session_features=pd.DataFrame() + session_features=pd.DataFrame(), + product_ids=merged['productId'].values, + elasticity=merged['elasticity'].values, + metadata={'timestamp': pd.Timestamp.now().isoformat()} ) diff --git a/web/src/app/api/pricing/route.ts b/web/src/app/api/pricing/route.ts index a809f99..959b46c 100644 --- a/web/src/app/api/pricing/route.ts +++ b/web/src/app/api/pricing/route.ts @@ -20,10 +20,40 @@ export async function GET(req: NextRequest) { ); } - // stub: call external pricing provider (random for now) - const basePrice = 100 + Math.random() * 900; // 100-1000 range - const price = Math.round(basePrice * 100) / 100; const timestamp = new Date().toISOString(); + let price: number; + let basePrice: number | undefined; + let markup: number | undefined; + let elasticity: number | undefined; + + // call real pricing provider + const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001'; + try { + const queryParams = new URLSearchParams(); + if (sessionId) queryParams.append('sessionId', sessionId); + if (experimentId) queryParams.append('experimentId', experimentId); + + const providerResponse = await fetch( + `${providerUrl}/api/${storeMode}/price/${productId}?${queryParams.toString()}`, + { headers: { 'Accept': 'application/json' }, cache: 'no-store' } + ); + + if (!providerResponse.ok) { + throw new Error(`Provider returned ${providerResponse.status}`); + } + + const providerData = await providerResponse.json(); + price = providerData.price; + basePrice = providerData.base_price; + markup = providerData.markup; + elasticity = providerData.elasticity; + + } catch (err) { + console.error('[pricing-provider-error]', err); + // fallback to random pricing if provider unavailable + const randomBase = 100 + Math.random() * 900; + price = Math.round(randomBase * 100) / 100; + } // log price to kafka for elasticity computation if (sessionId) { @@ -43,19 +73,13 @@ export async function GET(req: NextRequest) { }); } catch (err) { console.error('[price-log-error]', err); - // don't fail the pricing request if logging fails } } - // log in dev if (process.env.NODE_ENV === 'development') { console.log('[pricing-api]', { - productId, - sessionId, - experimentId, - storeMode, - price, - timestamp, + productId, sessionId, experimentId, storeMode, + price, basePrice, markup, elasticity, timestamp, }); }