diff --git a/backend/provider/app.py b/backend/provider/app.py index be7d0e7..0e91980 100644 --- a/backend/provider/app.py +++ b/backend/provider/app.py @@ -24,6 +24,7 @@ from procesing.steps import ( ) from procesing import PipelineContext sys.path.append(os.path.dirname(os.path.abspath(__file__))+ "/../../lib/") +print(os.path.dirname(os.path.abspath(__file__))+ "/../../lib/") from lib.model_registry import ModelRegistry # Config @@ -53,20 +54,12 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti metadata = product['metadata'] base_price = metadata.get('base_price', 100.0) - class Provider(SupabaseProvider, BackendAPIProvider): - def __init__(self, backend_url: str): - SupabaseProvider.__init__(self) - BackendAPIProvider.__init__(self, backend_url=backend_url) - - context = PipelineContext( - provider=Provider(backend_url=os.getenv("BACKEND_URL")), - store_mode=mode - ) - - pricing_model = registry.get_pricing_model('latest') + # fetch pre-computed prices from registry + prices_df = registry.get_prices('latest') elasticity_df = registry.get_elasticity('latest') - if pricing_model is None or elasticity_df is None: + if prices_df is None: + # fallback: no pre-computed prices available return PriceResponse( productId=productId, price=base_price, @@ -75,87 +68,26 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti elasticity=None ) - products = context.products - if products.empty: - raise HTTPException(500, "No products available in catalog") - - # merge elasticity with product base prices - products_with_meta = products.copy() - products_with_meta['base_price'] = products_with_meta['metadata'].apply( - lambda m: m.get('base_price', 100.0) if isinstance(m, dict) else 100.0 - ) - - merged = products_with_meta[['id', 'base_price']].rename( - columns={'id': 'productId'} - ).merge( - elasticity_df[['productId', 'elasticity']], - on='productId', - how='left' - ).fillna({'elasticity': 0.0}) - - # compute demand: use pricer's mean_demand if available, else default - demand_values = (pricing_model.mean_demand - if hasattr(pricing_model, 'mean_demand') and pricing_model.mean_demand is not None - else np.ones(len(merged)) * 10.0) - - # build state space with session features if sessionId provided - session_features = pd.DataFrame() - if sessionId: - try: - # fetch recent session interactions from backend - from procesing.steps.session import ExtractSessionFeaturesStep - import requests - from datetime import datetime, timedelta - - t_end = datetime.utcnow() - t_start = t_end - timedelta(hours=1) - backend_url = os.getenv("BACKEND_URL") - print(backend_url) - - resp = requests.get( - f"{os.getenv('BACKEND_URL')}/api/kafka/dump", # TODO: THIS IS SHIT, must fix this - params={'topic': 'user-interactions', 't_start': t_start.isoformat(), 't_end': t_end.isoformat()}, - timeout=2 - ) - - if resp.ok: - msgs = resp.json().get('messages', []) - interactions_df = pd.DataFrame(msgs) - - if not interactions_df.empty and 'sessionId' in interactions_df.columns: - session_interactions = interactions_df[interactions_df['sessionId'] == sessionId] - - if not session_interactions.empty: - extractor = ExtractSessionFeaturesStep(context=context) - session_features_df = extractor.transform(session_interactions) - - if not session_features_df.empty: - session_features = session_features_df.drop(columns=['sessionId']) - except Exception as e: - print(f"[session-features-error] {e}") - # continue without session features - - state = StateSpace( - demand=demand_values, - prices=merged['base_price'].values, - session_features=session_features, - product_ids=merged['productId'].values, - elasticity=merged['elasticity'].values, - metadata={'sessionId': sessionId, 'experimentId': experimentId} - ) - - oracle = PredictPricesStep(context=context) - prices_df = oracle.transform((pricing_model, state)) - + # lookup pre-computed price for this product product_price_row = prices_df[prices_df['productId'] == productId] if product_price_row.empty: - raise HTTPException(404, f"No pricing available for product {productId}") + # product not in pre-computed prices, fallback to base + return PriceResponse( + productId=productId, + price=base_price, + base_price=base_price, + markup=1.0, + elasticity=None + ) optimal_price = float(product_price_row['predicted_price'].iloc[0]) - product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] - product_elasticity = (float(product_elasticity_row['elasticity'].iloc[0]) - if not product_elasticity_row.empty else None) + # get elasticity if available + product_elasticity = None + if elasticity_df is not None: + product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] + if not product_elasticity_row.empty: + product_elasticity = float(product_elasticity_row['elasticity'].iloc[0]) return PriceResponse( productId=productId, diff --git a/docker-compose.yml b/docker-compose.yml index a806b83..e64eaea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -208,6 +208,7 @@ services: - KAFKA_PORT=29092 - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + - BACKEND_URL=http://localhost:5000 ports: - "${PROVIDER_PORT:-5001}:5001" volumes: diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py index 6dd3f2d..4d6830d 100644 --- a/experiments/airflow/dags/elasticity_pricing_dag.py +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -202,7 +202,7 @@ def predict_prices(**kwargs): return len(prices_df) def publish_results(**kwargs): - """Task: Publish elasticity and pricing results to model registry""" + """Task: Publish elasticity, pricing model, and predicted prices to registry""" ti = kwargs['ti'] elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices')) @@ -222,7 +222,6 @@ def publish_results(**kwargs): registry.publish_elasticity(elasticity_df, model_name='latest', metadata=metadata) - # get fitted pricer from XCom pricer = pickle.loads(ti.xcom_pull(key='pricer')) registry.publish_pricing_model( pricer, @@ -230,10 +229,13 @@ def publish_results(**kwargs): metadata={**metadata, 'model_type': type(pricer).__name__} ) - logging.info(f"Published elasticity + pricing for {len(elasticity_df)} products") + registry.publish_prices(prices_df, model_name='latest', metadata=metadata) + + logging.info(f"Published elasticity + pricing + prices for {len(elasticity_df)} products") return { 'n_products': len(elasticity_df), + 'n_prices': len(prices_df), 'registry_status': 'success', 'elasticity_mean': float(elasticity_df['elasticity'].mean()) } diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index ac95314..cae639f 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -121,6 +121,8 @@ if __name__ == '__main__': context = PipelineContext( provider=Provider(backend_url="http://localhost:5000"), store_mode='hotel', + # 15 min not month + window_size='15min', ) elasticity_df, prices_df = full_pipeline(context) diff --git a/experiments/procesing/steps/pricing.py b/experiments/procesing/steps/pricing.py index c99f83e..6439474 100755 --- a/experiments/procesing/steps/pricing.py +++ b/experiments/procesing/steps/pricing.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd from typing import Optional, List, Dict, Any from dataclasses import dataclass, field +from experiments.procesing.pricers.simple import StaticPricer from procesing.steps.base import BaseContextStep from procesing.pricers import ElasticityBasedPricer @@ -113,17 +114,17 @@ class BuildStateSpaceStep(BaseContextStep): class FitPricingFunctionStep(BaseContextStep): """ - Fit pricing function using elasticity data. - Input: elasticity_df + Fit pricing function using data. + Input: pricing_data Output: fitted pricing function instance """ - def transform(self, elasticity_df: pd.DataFrame): - pricing_class = self.context.config.get('pricing_function_class', ElasticityBasedPricer) + def transform(self, pricing_data: pd.DataFrame): + pricing_class = self.context.config.get('pricing_function_class', StaticPricer) pricing_params = self.context.config.get('pricing_function_params', {}) pricer = pricing_class(**pricing_params) - pricer.fit(elasticity_df) + pricer.fit(pricing_data) return pricer diff --git a/lib/model_registry.py b/lib/model_registry.py index 08233a0..92d7934 100755 --- a/lib/model_registry.py +++ b/lib/model_registry.py @@ -26,6 +26,7 @@ class ModelRegistry: self.metadata_prefix = "model:meta:" self.data_prefix = "model:data:" self.elasticity_prefix = "elasticity:" + self.prices_prefix = "prices:" def publish_elasticity(self, elasticity_df: pd.DataFrame, @@ -130,6 +131,46 @@ class ModelRegistry: return models + def publish_prices(self, + prices_df: pd.DataFrame, + model_name: str = 'latest', + metadata: Optional[Dict[str, Any]] = None): + """Store predicted prices in registry. + + Args: + prices_df: df with [productId, predicted_price, ...] + model_name: identifier for this price snapshot + metadata: additional info + """ + key = f"{self.prices_prefix}{model_name}" + data_json = prices_df.to_json(orient='records') + + self.redis_client.set(key, data_json) + + meta = metadata or {} + meta.update({ + 'n_products': len(prices_df), + 'model_type': 'predicted_prices' + }) + + meta_key = f"{self.metadata_prefix}prices_{model_name}" + self.redis_client.set(meta_key, json.dumps(meta)) + + log.info(f"Published prices '{model_name}' for {len(prices_df)} products") + + def get_prices(self, model_name: str = 'latest') -> Optional[pd.DataFrame]: + """Retrieve predicted prices from registry.""" + key = f"{self.prices_prefix}{model_name}" + data_json = self.redis_client.get(key) + + if data_json is None: + return None + + if isinstance(data_json, bytes): + data_json = data_json.decode('utf-8') + + return pd.read_json(data_json, orient='records') + def health_check(self) -> bool: """Check if Redis connection is alive.""" try: