first implementation of elasticity demand computation

This commit is contained in:
2025-11-25 22:27:38 +01:00
parent 8b76d24ade
commit c639d99be2
8 changed files with 616 additions and 38 deletions

View File

@@ -2,7 +2,7 @@ from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from supabase import create_client, Client
import pandas as pd
from typing import Optional, Literal
import os
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL")
@@ -10,6 +10,71 @@ SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin):
"""
Split interaction data into time windows for temporal analysis.
Returns a list of dataframes, one per time window.
"""
def __init__(self,
window_size:str='1h',
ts_col:str='ts',
return_metadata:bool=True):
"""
Args:
window_size: pandas freq string ('1h', '30T', '1D', etc)
ts_col: timestamp column name
return_metadata: if True, return dict with metadata per chunk
"""
self.window_size = window_size
self.ts_col = ts_col
self.return_metadata = return_metadata
def fit(self, X):
return self
def transform(self, interactions: pd.DataFrame):
"""
Returns:
if return_metadata=False: list of dataframes, one per window
if return_metadata=True: list of dicts with keys:
- 'data': dataframe for this window
- 'window_start': start timestamp
- 'window_end': end timestamp
- 'window_idx': integer index
"""
if interactions.empty:
return []
df = interactions.copy()
# ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]):
df[self.ts_col] = pd.to_datetime(df[self.ts_col])
# sort by time
df = df.sort_values(self.ts_col)
# assign window
df['_window'] = df[self.ts_col].dt.floor(self.window_size)
# group by window
chunks = []
for idx, (window_start, group) in enumerate(df.groupby('_window')):
chunk_data = group.drop(columns=['_window'])
if self.return_metadata:
chunks.append({
'data': chunk_data,
'window_start': window_start,
'window_end': window_start + pd.Timedelta(self.window_size),
'window_idx': idx
})
else:
chunks.append(chunk_data)
return chunks
class DemandEstimator(BaseEstimator, TransformerMixin):
def __init__(self,
store_mode:str='hotel',
@@ -28,12 +93,24 @@ class DemandEstimator(BaseEstimator, TransformerMixin):
interactions = interactions[interactions['sessionId'] == self.session_filter]
if self.experiment_filter:
interactions = interactions[interactions['experimentId'] == self.experiment_filter]
products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute()
products = pd.DataFrame(products.data)
unique_products = products['id'].unique()
# filter out rows without productId
interactions_with_products = interactions.dropna(subset=['productId'])
if interactions_with_products.empty:
# no interactions with products, return all zeros
return pd.DataFrame({
'productId': unique_products,
'demand_score': 0
})
# TODO: improve demand score calculation rather than just counting interactions (use weights..)
# while maintaining simplicity of a simple cross tab approach
product_demand = pd.crosstab(interactions['productId'], "no_of_interactions")
product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions")
product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index()
product_demand.columns = ['productId', 'demand_score']
return product_demand

View File

@@ -0,0 +1,285 @@
import numpy as np
import pandas as pd
from typing import List, Dict, Optional
from sklearn.base import BaseEstimator, TransformerMixin
class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
"""
Compute price elasticity from time-series demand and price data.
Elasticity = (% change in quantity) / (% change in price)
Works with chunked time-window data from ChunkInteractionsIntoSteps.
"""
def __init__(self,
method:str='point',
min_observations:int=2,
smooth_window:Optional[int]=None):
"""
Args:
method: 'point' (point elasticity) or 'arc' (arc elasticity)
min_observations: min data points needed per product
smooth_window: if set, apply rolling avg smoothing to time series
"""
self.method = method
self.min_observations = min_observations
self.smooth_window = smooth_window
def fit(self, X):
return self
def transform(self,
demand_chunks: List[Dict],
price_chunks: List[Dict]) -> pd.DataFrame:
"""
Args:
demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator
each item: {'window_start', 'window_end', 'demand_vector'}
price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'}
Returns:
df with [productId, elasticity, std_error, n_observations]
"""
aligned = self._align_chunks(demand_chunks, price_chunks)
if not aligned:
return pd.DataFrame(columns=['productId', 'elasticity', 'std_error', 'n_obs'])
# build time series per product
product_series = self._build_product_timeseries(aligned)
# compute elasticity per product
elasticities = []
for pid, series in product_series.items():
if len(series) < self.min_observations:
continue
# apply smoothing if requested
if self.smooth_window and len(series) >= self.smooth_window:
series = self._smooth_series(series, self.smooth_window)
elast = self._compute_elasticity(series)
if elast is not None:
elasticities.append({
'productId': pid,
'elasticity': elast['value'],
'std_error': elast.get('std_error', np.nan),
'n_obs': len(series)
})
return pd.DataFrame(elasticities)
def _align_chunks(self, demand_chunks, price_chunks):
"""Align demand and price data by matching time windows."""
aligned = []
# create lookup for price chunks by window_start
price_lookup = {chunk['window_start']: chunk for chunk in price_chunks}
for demand_chunk in demand_chunks:
window_start = demand_chunk['window_start']
if window_start in price_lookup:
aligned.append({
'window_start': window_start,
'window_end': demand_chunk['window_end'],
'demand': demand_chunk['demand_vector'],
'prices': price_lookup[window_start]['price_vector']
})
return aligned
def _build_product_timeseries(self, aligned_chunks):
"""Build time series [price, quantity] per product."""
series_by_product = {}
for chunk in aligned_chunks:
demand_df = chunk['demand']
price_df = chunk['prices']
# merge on productId
merged = demand_df.merge(price_df, on='productId', how='inner')
for _, row in merged.iterrows():
pid = row['productId']
if pid not in series_by_product:
series_by_product[pid] = []
series_by_product[pid].append({
'timestamp': chunk['window_start'],
'price': row['price'],
'quantity': row['demand_score']
})
return series_by_product
def _smooth_series(self, series, window):
"""Apply rolling average smoothing."""
df = pd.DataFrame(series)
df['price_smooth'] = df['price'].rolling(window=window, center=True).mean()
df['quantity_smooth'] = df['quantity'].rolling(window=window, center=True).mean()
df = df.dropna()
return [{'timestamp': row['timestamp'],
'price': row['price_smooth'],
'quantity': row['quantity_smooth']}
for _, row in df.iterrows()]
def _compute_elasticity(self, series):
"""Compute elasticity from time series."""
if len(series) < 2:
return None
prices = np.array([s['price'] for s in series])
quantities = np.array([s['quantity'] for s in series])
# filter out zero/negative values
valid = (prices > 0) & (quantities > 0)
if valid.sum() < 2:
return None
prices = prices[valid]
quantities = quantities[valid]
if self.method == 'point':
return self._point_elasticity(prices, quantities)
elif self.method == 'arc':
return self._arc_elasticity(prices, quantities)
else:
raise ValueError(f"Unknown method: {self.method}")
def _point_elasticity(self, prices, quantities):
"""
Point elasticity using log-log regression.
log(Q) = a + b*log(P), elasticity = b
"""
if len(prices) < 2:
return None
log_p = np.log(prices)
log_q = np.log(quantities)
# simple linear regression
if log_p.std() == 0:
return None
cov = np.cov(log_p, log_q)[0, 1]
var = np.var(log_p)
b = cov / var
# std error estimate
residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean()))
mse = (residuals ** 2).sum() / (len(prices) - 2)
se_b = np.sqrt(mse / (len(prices) * var))
return {'value': b, 'std_error': se_b}
def _arc_elasticity(self, prices, quantities):
"""
Arc elasticity: average of period-over-period elasticities.
E_t = (ΔQ/Q_avg) / (ΔP/P_avg)
"""
elasticities = []
for i in range(1, len(prices)):
p1, p2 = prices[i-1], prices[i]
q1, q2 = quantities[i-1], quantities[i]
p_avg = (p1 + p2) / 2
q_avg = (q1 + q2) / 2
if p_avg == 0 or q_avg == 0:
continue
delta_p = p2 - p1
delta_q = q2 - q1
if delta_p == 0:
continue
e = (delta_q / q_avg) / (delta_p / p_avg)
elasticities.append(e)
if not elasticities:
return None
return {
'value': np.mean(elasticities),
'std_error': np.std(elasticities) / np.sqrt(len(elasticities))
}
def aggregate_price_logs(price_logs: pd.DataFrame,
window_size: str = '1H',
ts_col: str = 'ts') -> List[Dict]:
"""
Recover price vectors treating prices as persistent state changes.
Prices are set-operations that persist until next change. For each window:
- If price logs exist: average all changes within window
- If no logs: carry forward last price before window end
Args:
price_logs: df with [productId, price, ts, ...]
window_size: time window size matching ChunkInteractionsIntoSteps
ts_col: timestamp column name
Returns:
list of dicts with {'window_start', 'window_end', 'price_vector'}
where price_vector is df with [productId, price]
"""
if price_logs.empty:
return []
df = price_logs.copy()
if not pd.api.types.is_datetime64_any_dtype(df[ts_col]):
df[ts_col] = pd.to_datetime(df[ts_col])
df = df.sort_values([ts_col, 'productId'])
# generate windows across data range
min_time, max_time = df[ts_col].min(), df[ts_col].max()
windows = pd.date_range(
start=min_time.floor(window_size),
end=max_time,
freq=window_size
)
chunks = []
for window_start in windows:
window_end = window_start + pd.Timedelta(window_size)
price_vector = []
# all products with price history by window_end
historical_products = df[df[ts_col] < window_end]['productId'].unique()
for pid in historical_products:
product_data = df[df['productId'] == pid]
# logs within window
in_window = product_data[
(product_data[ts_col] >= window_start) &
(product_data[ts_col] < window_end)
]
if not in_window.empty:
# average changes within window
price = in_window['price'].mean()
else:
# carry forward: last price before window end
before_window = product_data[product_data[ts_col] < window_end]
if before_window.empty:
continue
price = before_window['price'].iloc[-1]
price_vector.append({'productId': pid, 'price': price})
if price_vector:
chunks.append({
'window_start': window_start,
'window_end': window_end,
'price_vector': pd.DataFrame(price_vector)
})
return chunks

View File

@@ -6,6 +6,7 @@ import requests
from dotenv import load_dotenv
from sklearn.base import BaseEstimator, TransformerMixin
from supabase import create_client, Client
from typing import Tuple, List, Dict
load_dotenv()
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000")
@@ -17,11 +18,13 @@ supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class KafkaDataFetcher(BaseEstimator, TransformerMixin):
def __init__(self, topic: str = "user-interactions"):
self.topic = topic # also can be price-logs
def fit(self, X=None, y=None):
return self
def transform(self, X=None):
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}")
resp.raise_for_status()
data = resp.json()
@@ -29,12 +32,12 @@ class KafkaDataFetcher(BaseEstimator, TransformerMixin):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
# remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
if self.topic == 'user-interactions':
if 'metadata' in df.columns: # explode metadata col json
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
# remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
return df
@@ -110,3 +113,95 @@ class EventTitleAugmenter(BaseEstimator, TransformerMixin):
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df
def chunk_shared_data(interactions_df: pd.DataFrame,
price_logs_df: pd.DataFrame,
window_size: str = '30s',
ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]:
"""
Chunk interaction and price data into aligned time windows.
Args:
interactions_df: interaction data with timestamp column
price_logs_df: price log data with timestamp column
window_size: pandas freq string ('30s', '1min', '1h', etc)
ts_col: name of timestamp column
Returns:
tuple of (interaction_chunks, price_chunks) where each is list of dicts:
{
'window_start': timestamp,
'window_end': timestamp,
'data': dataframe for this window
}
"""
if interactions_df.empty and price_logs_df.empty:
return [], []
# convert timestamps to datetime
interactions_df = interactions_df.copy()
price_logs_df = price_logs_df.copy()
if not interactions_df.empty:
if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]):
interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col])
if not price_logs_df.empty:
if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]):
price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col])
# find global time bounds
times = []
if not interactions_df.empty:
times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()])
if not price_logs_df.empty:
times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()])
if not times:
return [], []
earliest = min(times)
latest = max(times)
# create shared time windows
windows = pd.date_range(start=earliest, end=latest, freq=window_size)
if len(windows) < 2:
return [], []
# chunk both datasets
interaction_chunks = []
price_chunks = []
for i in range(len(windows) - 1):
window_start = windows[i]
window_end = windows[i + 1]
# filter interactions in this window
if not interactions_df.empty:
mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end)
interaction_chunk = interactions_df[mask]
else:
interaction_chunk = pd.DataFrame()
interaction_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': interaction_chunk
})
# filter price logs in this window
if not price_logs_df.empty:
mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end)
price_chunk = price_logs_df[mask]
else:
price_chunk = pd.DataFrame()
price_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': price_chunk
})
return interaction_chunks, price_chunks

View File

@@ -1,22 +1,82 @@
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import pandas as pd
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data
from mapping import SessionTransitionProbMatrixTransformer, render_graph
from demand import DemandEstimator
from demand import DemandEstimator, ChunkInteractionsIntoSteps
from elasticity import TemporalElasticityEstimator, aggregate_price_logs
# elasticity pipeline components (not sklearn compatible, manual orchestration)
def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'):
"""
Compute price elasticity from interaction and price data.
Args:
interactions_df: raw interaction data from demand_data_pipeline
price_logs_df: price log data from price_data_pipeline
window_size: time window for chunking
store_mode: 'hotel' or 'airline'
Returns:
df with [productId, elasticity, std_error, n_obs]
"""
# step 1: chunk interactions into time windows
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
interaction_chunks = chunker.transform(interactions_df)
print(len(interaction_chunks))
if not interaction_chunks:
return None
# step 2: compute demand per window
demand_estimator = DemandEstimator(store_mode=store_mode)
demand_chunks = []
for chunk in interaction_chunks:
demand_vector = demand_estimator.transform(chunk['data'])
demand_chunks.append({
'window_start': chunk['window_start'],
'window_end': chunk['window_end'],
'demand_vector': demand_vector
})
# step 3: aggregate price logs into windows
price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size)
# step 4: compute elasticity
elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2)
elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks)
return elasticity_df
# exposable pipelines
etl_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher()),
interaction_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='user-interactions')),
('experiment_join', ExperimentJoiner()),
('event_augment', EventTitleAugmenter()),
])
price_data_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='price-logs')),
])
pricing_pipeline = Pipeline([
('demand_estimation', DemandEstimator()),
])
if __name__ == "__main__":
processed_data = etl_pipeline.fit_transform(None)
pricing = pricing_pipeline.fit_transform(processed_data)
print(pricing)
# fetch both datasets
interaction_data = interaction_pipeline.fit_transform(None)
pricing_data = price_data_pipeline.fit_transform(None)
if interaction_data.empty or pricing_data.empty:
print("Insufficient data for elasticity computation"); exit(0)
# compute elasticity via unified pipeline
window_size = "30s"
elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size)
if elasticity_results is not None and not elasticity_results.empty:
print(elasticity_results.to_string(index=False))
else:
print("\nInsufficient data for elasticity computation")

View File