chore: cleaning up provider of prices

This commit is contained in:
2025-11-28 16:23:44 +01:00
parent b5c71e713b
commit e9d9c0e319
2 changed files with 37 additions and 119 deletions

View File

@@ -1,43 +1,18 @@
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any, Literal
import uvicorn
import os
import sys
import json
import numpy as np
import pandas as pd
from typing import Literal, Optional
import uvicorn, os, sys
from supabase import create_client, Client
try:
# in docker container, paths are set via PYTHONPATH
from model_registry import ModelRegistry
from pricing import StateSpace, ElasticityBasedPricingFunction
except ImportError:
# local dev: add paths manually
lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../lib'))
procesing_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../experiments/procesing'))
sys.path.insert(0, lib_path)
sys.path.insert(0, procesing_path)
# Local imports of registry and pricing function
from model_registry import ModelRegistry
from pricing import StateSpace, ElasticityBasedPricingFunction
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Config
app = FastAPI(title="PHANTOM Pricing Provider")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
supabase: Client = create_client(os.getenv("NEXT_PUBLIC_SUPABASE_URL"), os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY"))
registry = ModelRegistry()
class PriceResponse(BaseModel):
@@ -49,93 +24,47 @@ class PriceResponse(BaseModel):
model_version: str = 'latest'
@app.get("/health")
def health():
redis_ok = registry.health_check()
return {"status": "healthy", "redis": redis_ok}
def health() -> dict:
return {"status": "healthy", "redis": registry.health_check()}
@app.get("/api/{mode}/price/{productId}")
def get_price(
mode: Literal['hotel', 'airline'],
productId: str,
sessionId: Optional[str] = Query(None),
experimentId: Optional[str] = Query(None)
) -> PriceResponse:
"""
Dynamic pricing endpoint.
@app.get("/api/{mode}/price/{productId}", response_model=PriceResponse)
def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)):
product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0]
if not product: raise HTTPException(404, f"Product {productId} not found")
1. Fetch product base price from Supabase
2. Load latest elasticity + pricing model from registry
3. Compute optimal price
5. Return price to client
"""
# fetch product
product_resp = supabase.table(f'{mode}_products').select("id, metadata").eq('id', productId).execute()
if not product_resp.data or len(product_resp.data) == 0:
raise HTTPException(status_code=404, detail=f"Product {productId} not found")
product = product_resp.data[0]
metadata = product.get('metadata', {})
metadata = product['metadata']
base_price = metadata.get('base_price', 100.0)
# load elasticity data
elasticity_df = registry.get_elasticity('latest')
if elasticity_df is None:
# no model available, return base price
final_price = base_price
elasticity_value = None
else:
# load or create pricing model
pricing_model = registry.get_pricing_model('latest')
if not elasticity_df:
return PriceResponse(productId=productId, price=base_price, base_price=base_price, markup=1.0)
if pricing_model is None:
# create default model
pricing_model = ElasticityBasedPricingFunction()
pricing_model.fit(elasticity_df)
pricing_model = registry.get_pricing_model('latest') or ElasticityBasedPricingFunction().fit(elasticity_df)
product_elasticity = elasticity_df[elasticity_df['productId'] == productId]['elasticity'].iloc[0] if (elasticity_row := elasticity_df[elasticity_df['productId'] == productId]).any().any() else None
# get elasticity for this product
product_elasticity = elasticity_df[elasticity_df['productId'] == productId]
elasticity_value = float(product_elasticity['elasticity'].iloc[0]) if not product_elasticity.empty else None
# construct state space (single product)
state = StateSpace(
demand=np.array([0.0]), # demand not needed for elasticity-based pricing
prices=np.array([base_price]),
session_features=pd.DataFrame()
)
# compute optimal price
optimal_prices = pricing_model.transform(state, product_ids=np.array([productId]))
final_price = float(optimal_prices[0])
state = StateSpace(np.array([0.0]), np.array([base_price]), pd.DataFrame())
optimal_price = pricing_model.transform(state, np.array([productId]))[0]
return PriceResponse(
productId=productId,
price=final_price,
price=float(optimal_price),
base_price=base_price,
markup=final_price / base_price if base_price > 0 else 1.0,
elasticity=elasticity_value,
model_version='latest'
markup=optimal_price/base_price,
elasticity=float(product_elasticity) if product_elasticity is not None else None
)
@app.get("/models")
def list_models():
"""List all registered models in the registry."""
return registry.list_models()
def list_models(): return registry.list_models()
@app.post("/models/reload")
def reload_models():
"""Force reload of models from registry (useful after pipeline updates)."""
elasticity = registry.get_elasticity('latest')
pricing_model = registry.get_pricing_model('latest')
elasticity, pricing_model = registry.get_elasticity('latest'), registry.get_pricing_model('latest')
return {
"elasticity_loaded": elasticity is not None,
"elasticity_loaded": bool(elasticity),
"n_products": len(elasticity) if elasticity is not None else 0,
"pricing_model_loaded": pricing_model is not None,
"pricing_model_loaded": bool(pricing_model),
"model_class": pricing_model.__class__.__name__ if pricing_model else None
}
if __name__ == "__main__":
port = int(os.getenv("PROVIDER_PORT", "5001"))
uvicorn.run(app, host="0.0.0.0", port=port)
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PROVIDER_PORT", "5001")))