From 40a57bc10bb1be8e06d62e3e2447a75e6943e13b Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Thu, 27 Nov 2025 12:57:16 +0100 Subject: [PATCH] feature: e2e pricing pipeline with inference --- experiments/procesing/demand.py | 7 +++-- experiments/procesing/elasticity.py | 49 ++++++++++++++++++++++++++--- experiments/procesing/pipeline.py | 12 +++++-- experiments/procesing/pricing.py | 46 ++++++++++++++++++++++----- 4 files changed, 97 insertions(+), 17 deletions(-) diff --git a/experiments/procesing/demand.py b/experiments/procesing/demand.py index 4652146..ba3c6c6 100644 --- a/experiments/procesing/demand.py +++ b/experiments/procesing/demand.py @@ -4,9 +4,11 @@ import pandas as pd from supabase import create_client, Client from typing import Optional, Literal import os +import logging +log = logging.getLogger(__name__) -SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") -SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") +SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") +SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) @@ -97,6 +99,7 @@ class DemandEstimator(BaseEstimator, TransformerMixin): products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() products = pd.DataFrame(products.data) unique_products = products['id'].unique() + log.info(f"Demand estimator found {len(unique_products)} in data") # filter out rows without productId interactions_with_products = interactions.dropna(subset=['productId']) diff --git a/experiments/procesing/elasticity.py b/experiments/procesing/elasticity.py index 5e8a7fe..eea3022 100644 --- a/experiments/procesing/elasticity.py +++ b/experiments/procesing/elasticity.py @@ -2,6 +2,13 @@ import numpy as np import pandas as pd from typing import List, Dict, Optional from sklearn.base import BaseEstimator, TransformerMixin +from supabase import create_client, Client +import os + +SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") +SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") + +supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): """ @@ -31,19 +38,31 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): def transform(self, demand_chunks: List[Dict], - price_chunks: List[Dict]) -> pd.DataFrame: + price_chunks: List[Dict], + store_mode: str = 'hotel') -> pd.DataFrame: """ Args: demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator each item: {'window_start', 'window_end', 'demand_vector'} price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'} + store_mode: 'hotel' or 'airline' to fetch all products Returns: df with [productId, elasticity, std_error, n_observations] """ + # fetch all products from database + all_products = supabase.table(f'{store_mode}_products').select("id").execute() + all_product_ids = [p['id'] for p in all_products.data] + aligned = self._align_chunks(demand_chunks, price_chunks) if not aligned: - return pd.DataFrame(columns=['productId', 'elasticity', 'std_error', 'n_obs']) + # return all products with zero elasticity + return pd.DataFrame({ + 'productId': all_product_ids, + 'elasticity': 0.0, + 'std_error': 0.0, + 'n_obs': 0 + }) # build time series per product product_series = self._build_product_timeseries(aligned) @@ -73,7 +92,22 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): 'n_obs': len(series) }) - return pd.DataFrame(elasticities) + result_df = pd.DataFrame(elasticities) + + # fill in missing products with zero elasticity + observed_pids = set(result_df['productId'].unique()) + missing_pids = [pid for pid in all_product_ids if pid not in observed_pids] + + if missing_pids: + missing_df = pd.DataFrame({ + 'productId': missing_pids, + 'elasticity': 0.0, + 'std_error': 0.0, + 'n_obs': 0 + }) + result_df = pd.concat([result_df, missing_df], ignore_index=True) + + return result_df def _align_chunks(self, demand_chunks, price_chunks): """Align demand and price data by matching time windows.""" @@ -219,7 +253,8 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): def aggregate_price_logs(price_logs: pd.DataFrame, window_size: str = '1H', - ts_col: str = 'ts') -> List[Dict]: + ts_col: str = 'ts', + store_mode : str = 'hotel') -> List[Dict]: """ Recover price vectors treating prices as persistent state changes. @@ -245,6 +280,9 @@ def aggregate_price_logs(price_logs: pd.DataFrame, df[ts_col] = pd.to_datetime(df[ts_col]) df = df.sort_values([ts_col, 'productId']) + all_products=supabase.table(f'{store_mode}_products').select("id, room_type, date_index, metadata, availability").execute() + all_products = pd.DataFrame(all_products.data) + unique_products = all_products['id'].unique() # generate windows across data range min_time, max_time = df[ts_col].min(), df[ts_col].max() @@ -261,7 +299,8 @@ def aggregate_price_logs(price_logs: pd.DataFrame, price_vector = [] # all products with price history by window_end - historical_products = df[df[ts_col] < window_end]['productId'].unique() + #historical_products = df[df[ts_col] < window_end]['productId'].unique() + historical_products = unique_products.tolist() for pid in historical_products: product_data = df[df['productId'] == pid] diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py index 47d00d6..2efa4ae 100644 --- a/experiments/procesing/pipeline.py +++ b/experiments/procesing/pipeline.py @@ -1,6 +1,8 @@ from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler import pandas as pd +import logging +log = logging.getLogger(__name__) from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data from mapping import SessionTransitionProbMatrixTransformer, render_graph @@ -26,7 +28,7 @@ def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store # step 1: chunk interactions into time windows chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) interaction_chunks = chunker.transform(interactions_df) - print(len(interaction_chunks)) + log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}") if not interaction_chunks: return None @@ -39,15 +41,16 @@ def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store demand_chunks.append({ 'window_start': chunk['window_start'], 'window_end': chunk['window_end'], - 'demand_vector': demand_vector + 'demand_vector': demand_vector # each has a full list of all products, even if demand is 0 }) + # [q_chunk1, q_chunk2, ...] # step 3: aggregate price logs into windows price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size) # step 4: compute elasticity elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2) - elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks) + elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode) return elasticity_df @@ -63,6 +66,9 @@ price_data_pipeline = Pipeline([ ('kafka_fetch', KafkaDataFetcher(topic='price-logs')), ]) +# interaction_data + price_data -> elasticity (demand) +# elasticity -> pricing + pricing_pipeline = Pipeline([ ('demand_estimation', DemandEstimator()), ]) diff --git a/experiments/procesing/pricing.py b/experiments/procesing/pricing.py index 25c3725..afdcfdd 100644 --- a/experiments/procesing/pricing.py +++ b/experiments/procesing/pricing.py @@ -34,8 +34,14 @@ from abc import ABC, abstractmethod from sklearn.base import BaseEstimator, TransformerMixin import numpy as np import pandas as pd +import os +from supabase import create_client, Client from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline +SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") +SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") +supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) + def expected_revenue(prices: np.ndarray, demand: np.ndarray) -> float: """Returns: expected revenue R_t = P_t^T * Q_t""" return float(np.dot(prices, demand)) @@ -85,21 +91,47 @@ class SimpleLinearPricingFunction(PricingFunction): # Example usage: if __name__ == "__main__": + store_mode = 'hotel' interaction_data = interaction_pipeline.fit_transform(None) price_data = price_data_pipeline.fit_transform(None) - elasticity_df = elasticity_pipeline(interaction_data, price_data, window_size="30s") + elasticity_df = elasticity_pipeline(interaction_data, price_data, window_size="30s", store_mode=store_mode) - # align elasticity with price data by productId, fill missing with 0 - if not price_data.empty and elasticity_df is not None and not elasticity_df.empty: - # TODO FIX: we might have duplicate productIds - price_data_merged = price_data.merge( + # fetch all products with base prices from database + products_resp = supabase.table(f'{store_mode}_products').select("id, metadata").execute() + products_df = pd.DataFrame(products_resp.data) + + # extract base_price from metadata + products_df['base_price'] = products_df['metadata'].apply(lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0) + products_df = products_df.rename(columns={'id': 'productId'})[['productId', 'base_price']] + + # override with logged prices where available + if not price_data.empty: + if 'ts' in price_data.columns and not pd.api.types.is_datetime64_any_dtype(price_data['ts']): + price_data['ts'] = pd.to_datetime(price_data['ts']) + + # get latest logged price per product + price_logs_agg = price_data.sort_values('ts').groupby('productId', as_index=False).last() + + # merge: start with all products (base prices), override with logged prices + products_df = products_df.merge( + price_logs_agg[['productId', 'price']], + on='productId', + how='left' + ) + products_df['final_price'] = products_df['price'].fillna(products_df['base_price']) + else: + products_df['final_price'] = products_df['base_price'] + + # merge with elasticity + if elasticity_df is not None and not elasticity_df.empty: + price_data_merged = products_df[['productId', 'final_price']].merge( elasticity_df[['productId', 'elasticity']], on='productId', how='left' - ).fillna({'elasticity': 0.0}) # is it possible we are spilling some elasticities into other products + ).fillna({'elasticity': 0.0}) - prices = price_data_merged['price'].values + prices = price_data_merged['final_price'].values elasticities = price_data_merged['elasticity'].values else: prices = np.array([])