feature: e2e pricing pipeline with inference

This commit is contained in:
2025-11-27 12:57:16 +01:00
parent 5b87fde8ed
commit 40a57bc10b
4 changed files with 97 additions and 17 deletions

View File

@@ -4,9 +4,11 @@ import pandas as pd
from supabase import create_client, Client from supabase import create_client, Client
from typing import Optional, Literal from typing import Optional, Literal
import os import os
import logging
log = logging.getLogger(__name__)
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
@@ -97,6 +99,7 @@ class DemandEstimator(BaseEstimator, TransformerMixin):
products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute()
products = pd.DataFrame(products.data) products = pd.DataFrame(products.data)
unique_products = products['id'].unique() unique_products = products['id'].unique()
log.info(f"Demand estimator found {len(unique_products)} in data")
# filter out rows without productId # filter out rows without productId
interactions_with_products = interactions.dropna(subset=['productId']) interactions_with_products = interactions.dropna(subset=['productId'])

View File

@@ -2,6 +2,13 @@ import numpy as np
import pandas as pd import pandas as pd
from typing import List, Dict, Optional from typing import List, Dict, Optional
from sklearn.base import BaseEstimator, TransformerMixin from sklearn.base import BaseEstimator, TransformerMixin
from supabase import create_client, Client
import os
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
""" """
@@ -31,19 +38,31 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
def transform(self, def transform(self,
demand_chunks: List[Dict], demand_chunks: List[Dict],
price_chunks: List[Dict]) -> pd.DataFrame: price_chunks: List[Dict],
store_mode: str = 'hotel') -> pd.DataFrame:
""" """
Args: Args:
demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator
each item: {'window_start', 'window_end', 'demand_vector'} each item: {'window_start', 'window_end', 'demand_vector'}
price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'} price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'}
store_mode: 'hotel' or 'airline' to fetch all products
Returns: Returns:
df with [productId, elasticity, std_error, n_observations] df with [productId, elasticity, std_error, n_observations]
""" """
# fetch all products from database
all_products = supabase.table(f'{store_mode}_products').select("id").execute()
all_product_ids = [p['id'] for p in all_products.data]
aligned = self._align_chunks(demand_chunks, price_chunks) aligned = self._align_chunks(demand_chunks, price_chunks)
if not aligned: if not aligned:
return pd.DataFrame(columns=['productId', 'elasticity', 'std_error', 'n_obs']) # return all products with zero elasticity
return pd.DataFrame({
'productId': all_product_ids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
# build time series per product # build time series per product
product_series = self._build_product_timeseries(aligned) product_series = self._build_product_timeseries(aligned)
@@ -73,7 +92,22 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
'n_obs': len(series) 'n_obs': len(series)
}) })
return pd.DataFrame(elasticities) result_df = pd.DataFrame(elasticities)
# fill in missing products with zero elasticity
observed_pids = set(result_df['productId'].unique())
missing_pids = [pid for pid in all_product_ids if pid not in observed_pids]
if missing_pids:
missing_df = pd.DataFrame({
'productId': missing_pids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
result_df = pd.concat([result_df, missing_df], ignore_index=True)
return result_df
def _align_chunks(self, demand_chunks, price_chunks): def _align_chunks(self, demand_chunks, price_chunks):
"""Align demand and price data by matching time windows.""" """Align demand and price data by matching time windows."""
@@ -219,7 +253,8 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
def aggregate_price_logs(price_logs: pd.DataFrame, def aggregate_price_logs(price_logs: pd.DataFrame,
window_size: str = '1H', window_size: str = '1H',
ts_col: str = 'ts') -> List[Dict]: ts_col: str = 'ts',
store_mode : str = 'hotel') -> List[Dict]:
""" """
Recover price vectors treating prices as persistent state changes. Recover price vectors treating prices as persistent state changes.
@@ -245,6 +280,9 @@ def aggregate_price_logs(price_logs: pd.DataFrame,
df[ts_col] = pd.to_datetime(df[ts_col]) df[ts_col] = pd.to_datetime(df[ts_col])
df = df.sort_values([ts_col, 'productId']) df = df.sort_values([ts_col, 'productId'])
all_products=supabase.table(f'{store_mode}_products').select("id, room_type, date_index, metadata, availability").execute()
all_products = pd.DataFrame(all_products.data)
unique_products = all_products['id'].unique()
# generate windows across data range # generate windows across data range
min_time, max_time = df[ts_col].min(), df[ts_col].max() min_time, max_time = df[ts_col].min(), df[ts_col].max()
@@ -261,7 +299,8 @@ def aggregate_price_logs(price_logs: pd.DataFrame,
price_vector = [] price_vector = []
# all products with price history by window_end # all products with price history by window_end
historical_products = df[df[ts_col] < window_end]['productId'].unique() #historical_products = df[df[ts_col] < window_end]['productId'].unique()
historical_products = unique_products.tolist()
for pid in historical_products: for pid in historical_products:
product_data = df[df['productId'] == pid] product_data = df[df['productId'] == pid]

View File

@@ -1,6 +1,8 @@
from sklearn.pipeline import Pipeline from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import StandardScaler
import pandas as pd import pandas as pd
import logging
log = logging.getLogger(__name__)
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data
from mapping import SessionTransitionProbMatrixTransformer, render_graph from mapping import SessionTransitionProbMatrixTransformer, render_graph
@@ -26,7 +28,7 @@ def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store
# step 1: chunk interactions into time windows # step 1: chunk interactions into time windows
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
interaction_chunks = chunker.transform(interactions_df) interaction_chunks = chunker.transform(interactions_df)
print(len(interaction_chunks)) log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}")
if not interaction_chunks: if not interaction_chunks:
return None return None
@@ -39,15 +41,16 @@ def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store
demand_chunks.append({ demand_chunks.append({
'window_start': chunk['window_start'], 'window_start': chunk['window_start'],
'window_end': chunk['window_end'], 'window_end': chunk['window_end'],
'demand_vector': demand_vector 'demand_vector': demand_vector # each has a full list of all products, even if demand is 0
}) })
# [q_chunk1, q_chunk2, ...]
# step 3: aggregate price logs into windows # step 3: aggregate price logs into windows
price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size) price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size)
# step 4: compute elasticity # step 4: compute elasticity
elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2) elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2)
elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks) elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode)
return elasticity_df return elasticity_df
@@ -63,6 +66,9 @@ price_data_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='price-logs')), ('kafka_fetch', KafkaDataFetcher(topic='price-logs')),
]) ])
# interaction_data + price_data -> elasticity (demand)
# elasticity -> pricing
pricing_pipeline = Pipeline([ pricing_pipeline = Pipeline([
('demand_estimation', DemandEstimator()), ('demand_estimation', DemandEstimator()),
]) ])

View File

@@ -34,8 +34,14 @@ from abc import ABC, abstractmethod
from sklearn.base import BaseEstimator, TransformerMixin from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import os
from supabase import create_client, Client
from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
def expected_revenue(prices: np.ndarray, demand: np.ndarray) -> float: def expected_revenue(prices: np.ndarray, demand: np.ndarray) -> float:
"""Returns: expected revenue R_t = P_t^T * Q_t""" """Returns: expected revenue R_t = P_t^T * Q_t"""
return float(np.dot(prices, demand)) return float(np.dot(prices, demand))
@@ -85,21 +91,47 @@ class SimpleLinearPricingFunction(PricingFunction):
# Example usage: # Example usage:
if __name__ == "__main__": if __name__ == "__main__":
store_mode = 'hotel'
interaction_data = interaction_pipeline.fit_transform(None) interaction_data = interaction_pipeline.fit_transform(None)
price_data = price_data_pipeline.fit_transform(None) price_data = price_data_pipeline.fit_transform(None)
elasticity_df = elasticity_pipeline(interaction_data, price_data, window_size="30s") elasticity_df = elasticity_pipeline(interaction_data, price_data, window_size="30s", store_mode=store_mode)
# align elasticity with price data by productId, fill missing with 0 # fetch all products with base prices from database
if not price_data.empty and elasticity_df is not None and not elasticity_df.empty: products_resp = supabase.table(f'{store_mode}_products').select("id, metadata").execute()
# TODO FIX: we might have duplicate productIds products_df = pd.DataFrame(products_resp.data)
price_data_merged = price_data.merge(
# extract base_price from metadata
products_df['base_price'] = products_df['metadata'].apply(lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0)
products_df = products_df.rename(columns={'id': 'productId'})[['productId', 'base_price']]
# override with logged prices where available
if not price_data.empty:
if 'ts' in price_data.columns and not pd.api.types.is_datetime64_any_dtype(price_data['ts']):
price_data['ts'] = pd.to_datetime(price_data['ts'])
# get latest logged price per product
price_logs_agg = price_data.sort_values('ts').groupby('productId', as_index=False).last()
# merge: start with all products (base prices), override with logged prices
products_df = products_df.merge(
price_logs_agg[['productId', 'price']],
on='productId',
how='left'
)
products_df['final_price'] = products_df['price'].fillna(products_df['base_price'])
else:
products_df['final_price'] = products_df['base_price']
# merge with elasticity
if elasticity_df is not None and not elasticity_df.empty:
price_data_merged = products_df[['productId', 'final_price']].merge(
elasticity_df[['productId', 'elasticity']], elasticity_df[['productId', 'elasticity']],
on='productId', on='productId',
how='left' how='left'
).fillna({'elasticity': 0.0}) # is it possible we are spilling some elasticities into other products ).fillna({'elasticity': 0.0})
prices = price_data_merged['price'].values prices = price_data_merged['final_price'].values
elasticities = price_data_merged['elasticity'].values elasticities = price_data_merged['elasticity'].values
else: else:
prices = np.array([]) prices = np.array([])