First pricing implementation (#27)

* first implementation of elasticity demand computation

* chor: fixing test :(

* feature: rudemantary defintition of pricing pipeline

* chor: fixing cross product missing data

* add warning

* feature: e2e pricing pipeline with inference
This commit is contained in:
Daniel Alves Rösel
2025-11-27 18:25:27 +01:00
committed by GitHub
parent 8b76d24ade
commit c432c45343
8 changed files with 829 additions and 39 deletions

View File

@@ -64,6 +64,14 @@ class EventPayload(BaseModel):
userAgent: Optional[str] = None userAgent: Optional[str] = None
ts: Optional[str] = None ts: Optional[str] = None
class PriceLogPayload(BaseModel):
productId: str
price: float
sessionId: str
experimentId: Optional[str] = None
storeMode: str
ts: Optional[str] = None
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["*"],
@@ -87,7 +95,8 @@ async def startup_event():
) )
topics = [ topics = [
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1) NewTopic(name='user-interactions', num_partitions=3, replication_factor=1),
NewTopic(name='price-logs', num_partitions=3, replication_factor=1)
] ]
admin.create_topics(new_topics=topics, validate_only=False) admin.create_topics(new_topics=topics, validate_only=False)
@@ -139,26 +148,52 @@ async def ingest_logs(event: EventPayload):
print(traceback.format_exc()) print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/kafka/price-log")
async def ingest_price_log(price_log: PriceLogPayload):
try:
if not price_log.ts:
price_log.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
future = producer.send(
'price-logs',
key=price_log.productId,
value=price_log.model_dump()
)
future.add_errback(lambda e: print(f"[KAFKA_PRICE_LOG_ERROR] {e}"))
return {"success": True}
except Exception as e:
import traceback
print(f"[PRICE_LOG_ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump") @app.get("/api/kafka/dump")
def dump_logs( def dump_logs(
topic: str = 'user-interactions',
last_n: Optional[int] = None, last_n: Optional[int] = None,
t_start: Optional[str] = None, t_start: Optional[str] = None,
t_end: Optional[str] = None t_end: Optional[str] = None
): ):
"""dump all messages from user-interactions topic """dump all messages from specified kafka topic
params: params:
topic: kafka topic to dump (default: user-interactions)
last_n: return only last n messages (default: all) last_n: return only last n messages (default: all)
t_start: filter by start timestamp iso format (future use) t_start: filter by start timestamp iso format
t_end: filter by end timestamp iso format (future use) t_end: filter by end timestamp iso format
""" """
if topic not in ['user-interactions', 'price-logs']:
raise HTTPException(status_code=400, detail="Invalid topic")
host = os.getenv('KAFKA_HOST', 'localhost') host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092') port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' broker = f'{host}:{port}'
try: try:
consumer = KafkaConsumer( consumer = KafkaConsumer(
'user-interactions', topic,
bootstrap_servers=[broker], bootstrap_servers=[broker],
auto_offset_reset='earliest', auto_offset_reset='earliest',
enable_auto_commit=False, enable_auto_commit=False,
@@ -174,7 +209,6 @@ def dump_logs(
# apply filters # apply filters
if t_start or t_end: if t_start or t_end:
# filter by timestamp range if provided
filtered = [] filtered = []
for e in events: for e in events:
ts = e.get('ts') ts = e.get('ts')

View File

@@ -38,7 +38,10 @@ def get_agent(agent_type: AgentTypes, **kwargs) -> Agent:
if __name__ == "__main__": if __name__ == "__main__":
import asyncio import asyncio
JTBD= "Name all the products on this site and try to find out more about each product by clicking into them (they might not open)" JTBD= "Find me the cheapest room in Madrid for 2 people in the next two days, review each hotel room in detail and then add it to cart."
agent = get_agent(AgentTypes.GENERIC_BROWSER_USE_AGENT, goal=JTBD, url="http://localhost:3000/products", timeout=300) agent = get_agent(AgentTypes.GENERIC_BROWSER_USE_AGENT,
goal=JTBD,
url="http://localhost:3000/start-task?uuid=d10f5ab3-a7b7-4e97-8d94-ab06f1537c0a",
timeout=300)
R=asyncio.run(agent.act()) R=asyncio.run(agent.act())
print(R) print(R)

View File

@@ -2,14 +2,81 @@ from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from supabase import create_client, Client from supabase import create_client, Client
import pandas as pd from typing import Optional, Literal
import os import os
import logging
log = logging.getLogger(__name__)
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin):
"""
Split interaction data into time windows for temporal analysis.
Returns a list of dataframes, one per time window.
"""
def __init__(self,
window_size:str='1h',
ts_col:str='ts',
return_metadata:bool=True):
"""
Args:
window_size: pandas freq string ('1h', '30T', '1D', etc)
ts_col: timestamp column name
return_metadata: if True, return dict with metadata per chunk
"""
self.window_size = window_size
self.ts_col = ts_col
self.return_metadata = return_metadata
def fit(self, X):
return self
def transform(self, interactions: pd.DataFrame):
"""
Returns:
if return_metadata=False: list of dataframes, one per window
if return_metadata=True: list of dicts with keys:
- 'data': dataframe for this window
- 'window_start': start timestamp
- 'window_end': end timestamp
- 'window_idx': integer index
"""
if interactions.empty:
return []
df = interactions.copy()
# ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]):
df[self.ts_col] = pd.to_datetime(df[self.ts_col])
# sort by time
df = df.sort_values(self.ts_col)
# assign window
df['_window'] = df[self.ts_col].dt.floor(self.window_size)
# group by window
chunks = []
for idx, (window_start, group) in enumerate(df.groupby('_window')):
chunk_data = group.drop(columns=['_window'])
if self.return_metadata:
chunks.append({
'data': chunk_data,
'window_start': window_start,
'window_end': window_start + pd.Timedelta(self.window_size),
'window_idx': idx
})
else:
chunks.append(chunk_data)
return chunks
class DemandEstimator(BaseEstimator, TransformerMixin): class DemandEstimator(BaseEstimator, TransformerMixin):
def __init__(self, def __init__(self,
store_mode:str='hotel', store_mode:str='hotel',
@@ -28,12 +95,25 @@ class DemandEstimator(BaseEstimator, TransformerMixin):
interactions = interactions[interactions['sessionId'] == self.session_filter] interactions = interactions[interactions['sessionId'] == self.session_filter]
if self.experiment_filter: if self.experiment_filter:
interactions = interactions[interactions['experimentId'] == self.experiment_filter] interactions = interactions[interactions['experimentId'] == self.experiment_filter]
products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute()
products = pd.DataFrame(products.data) products = pd.DataFrame(products.data)
unique_products = products['id'].unique() unique_products = products['id'].unique()
log.info(f"Demand estimator found {len(unique_products)} in data")
# filter out rows without productId
interactions_with_products = interactions.dropna(subset=['productId'])
if interactions_with_products.empty:
# no interactions with products, return all zeros
return pd.DataFrame({
'productId': unique_products,
'demand_score': 0
})
# TODO: improve demand score calculation rather than just counting interactions (use weights..) # TODO: improve demand score calculation rather than just counting interactions (use weights..)
# while maintaining simplicity of a simple cross tab approach # while maintaining simplicity of a simple cross tab approach
product_demand = pd.crosstab(interactions['productId'], "no_of_interactions") product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions")
product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index() product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index()
product_demand.columns = ['productId', 'demand_score'] product_demand.columns = ['productId', 'demand_score']
return product_demand return product_demand

View File

@@ -0,0 +1,333 @@
import numpy as np
import pandas as pd
from typing import List, Dict, Optional
from sklearn.base import BaseEstimator, TransformerMixin
from supabase import create_client, Client
import os
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
"""
Compute price elasticity from time-series demand and price data.
Elasticity = (% change in quantity) / (% change in price)
Works with chunked time-window data from ChunkInteractionsIntoSteps.
"""
def __init__(self,
method:str='point',
min_observations:int=2,
smooth_window:Optional[int]=None):
"""
Args:
method: 'point' (point elasticity) or 'arc' (arc elasticity)
min_observations: min data points needed per product
smooth_window: if set, apply rolling avg smoothing to time series
"""
self.method = method
self.min_observations = min_observations
self.smooth_window = smooth_window
def fit(self, X):
return self
def transform(self,
demand_chunks: List[Dict],
price_chunks: List[Dict],
store_mode: str = 'hotel') -> pd.DataFrame:
"""
Args:
demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator
each item: {'window_start', 'window_end', 'demand_vector'}
price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'}
store_mode: 'hotel' or 'airline' to fetch all products
Returns:
df with [productId, elasticity, std_error, n_observations]
"""
# fetch all products from database
all_products = supabase.table(f'{store_mode}_products').select("id").execute()
all_product_ids = [p['id'] for p in all_products.data]
aligned = self._align_chunks(demand_chunks, price_chunks)
if not aligned:
# return all products with zero elasticity
return pd.DataFrame({
'productId': all_product_ids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
# build time series per product
product_series = self._build_product_timeseries(aligned)
# compute elasticity per product
elasticities = []
for pid, series in product_series.items():
if len(series) < self.min_observations:
# assign 0 elasticity for products with insufficient data
elasticities.append({
'productId': pid,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': len(series)
})
continue
# apply smoothing if requested
if self.smooth_window and len(series) >= self.smooth_window:
series = self._smooth_series(series, self.smooth_window)
elast = self._compute_elasticity(series)
elasticities.append({
'productId': pid,
'elasticity': elast['value'],
'std_error': elast.get('std_error', 0.0),
'n_obs': len(series)
})
result_df = pd.DataFrame(elasticities)
# fill in missing products with zero elasticity
observed_pids = set(result_df['productId'].unique())
missing_pids = [pid for pid in all_product_ids if pid not in observed_pids]
if missing_pids:
missing_df = pd.DataFrame({
'productId': missing_pids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
result_df = pd.concat([result_df, missing_df], ignore_index=True)
return result_df
def _align_chunks(self, demand_chunks, price_chunks):
"""Align demand and price data by matching time windows."""
aligned = []
# create lookup for price chunks by window_start
price_lookup = {chunk['window_start']: chunk for chunk in price_chunks}
for demand_chunk in demand_chunks:
window_start = demand_chunk['window_start']
if window_start in price_lookup:
aligned.append({
'window_start': window_start,
'window_end': demand_chunk['window_end'],
'demand': demand_chunk['demand_vector'],
'prices': price_lookup[window_start]['price_vector']
})
return aligned
def _build_product_timeseries(self, aligned_chunks):
"""Build time series [price, quantity] per product."""
series_by_product = {}
for chunk in aligned_chunks:
demand_df = chunk['demand']
price_df = chunk['prices']
# merge on productId
merged = demand_df.merge(price_df, on='productId', how='inner')
for _, row in merged.iterrows():
pid = row['productId']
if pid not in series_by_product:
series_by_product[pid] = []
series_by_product[pid].append({
'timestamp': chunk['window_start'],
'price': row['price'],
'quantity': row['demand_score']
})
return series_by_product
def _smooth_series(self, series, window):
"""Apply rolling average smoothing."""
df = pd.DataFrame(series)
df['price_smooth'] = df['price'].rolling(window=window, center=True).mean()
df['quantity_smooth'] = df['quantity'].rolling(window=window, center=True).mean()
df = df.dropna()
return [{'timestamp': row['timestamp'],
'price': row['price_smooth'],
'quantity': row['quantity_smooth']}
for _, row in df.iterrows()]
def _compute_elasticity(self, series):
"""Compute elasticity from time series."""
if len(series) < 2:
return {'value': 0.0, 'std_error': 0.0}
prices = np.array([s['price'] for s in series])
quantities = np.array([s['quantity'] for s in series])
# filter out zero/negative values
valid = (prices > 0) & (quantities > 0)
if valid.sum() < 2:
return {'value': 0.0, 'std_error': 0.0}
prices = prices[valid]
quantities = quantities[valid]
if self.method == 'point':
return self._point_elasticity(prices, quantities)
elif self.method == 'arc':
return self._arc_elasticity(prices, quantities)
else:
raise ValueError(f"Unknown method: {self.method}")
def _point_elasticity(self, prices, quantities):
"""
Point elasticity using log-log regression.
log(Q) = a + b*log(P), elasticity = b
"""
if len(prices) < 2:
return {'value': 0.0, 'std_error': 0.0}
log_p = np.log(prices)
log_q = np.log(quantities)
# simple linear regression
if log_p.std() == 0:
return {'value': 0.0, 'std_error': 0.0}
cov = np.cov(log_p, log_q)[0, 1]
var = np.var(log_p)
b = cov / var
# std error estimate (avoid div by zero)
if len(prices) <= 2:
se_b = 0.0
else:
residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean()))
mse = (residuals ** 2).sum() / (len(prices) - 2)
se_b = np.sqrt(mse / (len(prices) * var))
return {'value': b, 'std_error': se_b}
def _arc_elasticity(self, prices, quantities):
"""
Arc elasticity: average of period-over-period elasticities.
E_t = (ΔQ/Q_avg) / (ΔP/P_avg)
"""
elasticities = []
for i in range(1, len(prices)):
p1, p2 = prices[i-1], prices[i]
q1, q2 = quantities[i-1], quantities[i]
p_avg = (p1 + p2) / 2
q_avg = (q1 + q2) / 2
if p_avg == 0 or q_avg == 0:
continue
delta_p = p2 - p1
delta_q = q2 - q1
if delta_p == 0:
continue
e = (delta_q / q_avg) / (delta_p / p_avg)
elasticities.append(e)
if not elasticities:
return None
return {
'value': np.mean(elasticities),
'std_error': np.std(elasticities) / np.sqrt(len(elasticities))
}
def aggregate_price_logs(price_logs: pd.DataFrame,
window_size: str = '1H',
ts_col: str = 'ts',
store_mode : str = 'hotel') -> List[Dict]:
"""
Recover price vectors treating prices as persistent state changes.
Prices are set-operations that persist until next change. For each window:
- If price logs exist: average all changes within window
- If no logs: carry forward last price before window end
Args:
price_logs: df with [productId, price, ts, ...]
window_size: time window size matching ChunkInteractionsIntoSteps
ts_col: timestamp column name
Returns:
list of dicts with {'window_start', 'window_end', 'price_vector'}
where price_vector is df with [productId, price]
"""
if price_logs.empty:
return []
df = price_logs.copy()
if not pd.api.types.is_datetime64_any_dtype(df[ts_col]):
df[ts_col] = pd.to_datetime(df[ts_col])
df = df.sort_values([ts_col, 'productId'])
all_products=supabase.table(f'{store_mode}_products').select("id, room_type, date_index, metadata, availability").execute()
all_products = pd.DataFrame(all_products.data)
unique_products = all_products['id'].unique()
# generate windows across data range
min_time, max_time = df[ts_col].min(), df[ts_col].max()
windows = pd.date_range(
start=min_time.floor(window_size),
end=max_time,
freq=window_size
)
chunks = []
for window_start in windows:
window_end = window_start + pd.Timedelta(window_size)
price_vector = []
# all products with price history by window_end
#historical_products = df[df[ts_col] < window_end]['productId'].unique()
historical_products = unique_products.tolist()
for pid in historical_products:
product_data = df[df['productId'] == pid]
# logs within window
in_window = product_data[
(product_data[ts_col] >= window_start) &
(product_data[ts_col] < window_end)
]
if not in_window.empty:
# average changes within window
price = in_window['price'].mean()
else:
# carry forward: last price before window end
before_window = product_data[product_data[ts_col] < window_end]
if before_window.empty:
continue
price = before_window['price'].iloc[-1]
price_vector.append({'productId': pid, 'price': price})
if price_vector:
chunks.append({
'window_start': window_start,
'window_end': window_end,
'price_vector': pd.DataFrame(price_vector)
})
return chunks

View File

@@ -6,6 +6,7 @@ import requests
from dotenv import load_dotenv from dotenv import load_dotenv
from sklearn.base import BaseEstimator, TransformerMixin from sklearn.base import BaseEstimator, TransformerMixin
from supabase import create_client, Client from supabase import create_client, Client
from typing import Tuple, List, Dict
load_dotenv() load_dotenv()
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000")
@@ -17,11 +18,13 @@ supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class KafkaDataFetcher(BaseEstimator, TransformerMixin): class KafkaDataFetcher(BaseEstimator, TransformerMixin):
def __init__(self, topic: str = "user-interactions"):
self.topic = topic # also can be price-logs
def fit(self, X=None, y=None): def fit(self, X=None, y=None):
return self return self
def transform(self, X=None): def transform(self, X=None):
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump") resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}")
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
@@ -29,12 +32,12 @@ class KafkaDataFetcher(BaseEstimator, TransformerMixin):
return pd.DataFrame() return pd.DataFrame()
df = pd.DataFrame(data['data']) df = pd.DataFrame(data['data'])
# explode metadata col json if self.topic == 'user-interactions':
if 'metadata' in df.columns: if 'metadata' in df.columns: # explode metadata col json
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName']) df = df.dropna(subset=['eventName'])
# remape dateIndex # remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
return df return df
@@ -110,3 +113,95 @@ class EventTitleAugmenter(BaseEstimator, TransformerMixin):
) )
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df return df
def chunk_shared_data(interactions_df: pd.DataFrame,
price_logs_df: pd.DataFrame,
window_size: str = '30s',
ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]:
"""
Chunk interaction and price data into aligned time windows.
Args:
interactions_df: interaction data with timestamp column
price_logs_df: price log data with timestamp column
window_size: pandas freq string ('30s', '1min', '1h', etc)
ts_col: name of timestamp column
Returns:
tuple of (interaction_chunks, price_chunks) where each is list of dicts:
{
'window_start': timestamp,
'window_end': timestamp,
'data': dataframe for this window
}
"""
if interactions_df.empty and price_logs_df.empty:
return [], []
# convert timestamps to datetime
interactions_df = interactions_df.copy()
price_logs_df = price_logs_df.copy()
if not interactions_df.empty:
if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]):
interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col])
if not price_logs_df.empty:
if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]):
price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col])
# find global time bounds
times = []
if not interactions_df.empty:
times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()])
if not price_logs_df.empty:
times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()])
if not times:
return [], []
earliest = min(times)
latest = max(times)
# create shared time windows
windows = pd.date_range(start=earliest, end=latest, freq=window_size)
if len(windows) < 2:
return [], []
# chunk both datasets
interaction_chunks = []
price_chunks = []
for i in range(len(windows) - 1):
window_start = windows[i]
window_end = windows[i + 1]
# filter interactions in this window
if not interactions_df.empty:
mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end)
interaction_chunk = interactions_df[mask]
else:
interaction_chunk = pd.DataFrame()
interaction_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': interaction_chunk
})
# filter price logs in this window
if not price_logs_df.empty:
mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end)
price_chunk = price_logs_df[mask]
else:
price_chunk = pd.DataFrame()
price_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': price_chunk
})
return interaction_chunks, price_chunks

View File

@@ -1,22 +1,90 @@
from sklearn.pipeline import Pipeline from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import StandardScaler
import pandas as pd
import logging
log = logging.getLogger(__name__)
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data
from mapping import SessionTransitionProbMatrixTransformer, render_graph from mapping import SessionTransitionProbMatrixTransformer, render_graph
from demand import DemandEstimator from demand import DemandEstimator, ChunkInteractionsIntoSteps
from elasticity import TemporalElasticityEstimator, aggregate_price_logs
# elasticity pipeline components (not sklearn compatible, manual orchestration)
def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'):
"""
Compute price elasticity from interaction and price data.
Args:
interactions_df: raw interaction data from demand_data_pipeline
price_logs_df: price log data from price_data_pipeline
window_size: time window for chunking
store_mode: 'hotel' or 'airline'
Returns:
df with [productId, elasticity, std_error, n_obs]
"""
# step 1: chunk interactions into time windows
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
interaction_chunks = chunker.transform(interactions_df)
log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}")
if not interaction_chunks:
return None
# step 2: compute demand per window
demand_estimator = DemandEstimator(store_mode=store_mode)
demand_chunks = []
for chunk in interaction_chunks:
demand_vector = demand_estimator.transform(chunk['data'])
demand_chunks.append({
'window_start': chunk['window_start'],
'window_end': chunk['window_end'],
'demand_vector': demand_vector # each has a full list of all products, even if demand is 0
})
# [q_chunk1, q_chunk2, ...]
# step 3: aggregate price logs into windows
price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size)
# step 4: compute elasticity
elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2)
elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode)
return elasticity_df
# exposable pipelines # exposable pipelines
etl_pipeline = Pipeline([ interaction_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher()), ('kafka_fetch', KafkaDataFetcher(topic='user-interactions')),
('experiment_join', ExperimentJoiner()), ('experiment_join', ExperimentJoiner()),
('event_augment', EventTitleAugmenter()), ('event_augment', EventTitleAugmenter()),
]) ])
price_data_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='price-logs')),
])
# interaction_data + price_data -> elasticity (demand)
# elasticity -> pricing
pricing_pipeline = Pipeline([ pricing_pipeline = Pipeline([
('demand_estimation', DemandEstimator()), ('demand_estimation', DemandEstimator()),
]) ])
if __name__ == "__main__": if __name__ == "__main__":
processed_data = etl_pipeline.fit_transform(None) # fetch both datasets
pricing = pricing_pipeline.fit_transform(processed_data) interaction_data = interaction_pipeline.fit_transform(None)
print(pricing) pricing_data = price_data_pipeline.fit_transform(None)
if interaction_data.empty or pricing_data.empty:
print("Insufficient data for elasticity computation"); exit(0)
# compute elasticity via unified pipeline
window_size = "30s"
elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size)
elasticity_value_array = elasticity_results['elasticity'].values if elasticity_results is not None else np.array([])
print(elasticity_value_array)
if elasticity_results is not None and not elasticity_results.empty:
print(elasticity_results.to_string(index=False))
else:
print("\nInsufficient data for elasticity computation")

View File

@@ -0,0 +1,153 @@
r"""
Our state space comes as:
$Q_t in R^n$ - our demand at a time t
$P_t in R^n$ - prices at time t
$S_t$ some form of interaction session features
This is a single sate which we map under
$f: (Q, S, H) \to P_{t+1}$
With:
$H_t = \{Q_{t-k}, P_{t-k}, S_{t-k}\}$
We can have f be literally anything, analytical or learned or rule based or an RL policy.
Our goal is to mazimize the expected revenue:
$E[R_T] = E[\sum_{t=1}^T P_t^T \dot Q_t]$
subject to Q_t = g(P_t, S_t) : demand response to price (estimated via elasticity) and P_t ≥ C : prices above cost floor and additionally minimizing the following:
$L_{agent} = R_{oracle} - R_{observed}
where: R_oracle = revenue if we knew agent intentions (from recon session) and R_observed = revenue under current pricing policy f
I would start be defning a pricing function interface and standardizing how to train that based on historical data and define how to make it behave for online training (if we do that)
We also need to develop a solid benchmark with mapping revenue and full KPIs from session interactions to measure differences between different price learning methods
"""
from abc import ABC, abstractmethod
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
import os
from supabase import create_client, Client
from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
def expected_revenue(prices: np.ndarray, demand: np.ndarray) -> float:
"""Returns: expected revenue R_t = P_t^T * Q_t"""
return float(np.dot(prices, demand))
class StateSpace:
def __init__(self,
demand : np.ndarray, # at time t, only values (assuming aligned by productId order)
prices : np.ndarray, # at time t, only values (assuming aligned by productId order)
session_features : pd.DataFrame):
self.demand = demand # Q_t
self.prices = prices # P_t
self.session_features = session_features # S_t
self.history = [] # H_t
class PricingFunction(BaseEstimator, TransformerMixin, ABC):
def __init__(self):
pass
def fit(self, historical_data):
"""
Train the pricing function based on historical data.
historical_data: list of StateSpace instances with known outcomes
"""
raise NotImplementedError("Train method must be implemented by subclass.")
def transform(self, state_space) -> np.ndarray:
"""
Predict the next prices given the current state space.
state_space: StateSpace instance
Returns: predicted prices P_{t+1}
"""
raise NotImplementedError("Predict method must be implemented by subclass.")
class SimpleLinearPricingFunction(PricingFunction):
def __init__(self, price_sensitivity: float = -0.1):
super().__init__()
self.price_sensitivity = price_sensitivity # simple coefficient
def fit(self, historical_data):
return self
def transform(self, state_space: StateSpace) -> np.ndarray:
# Simple linear adjustment: P_{t+1} = P_t + sensitivity * Q_t
new_prices = state_space.prices + self.price_sensitivity * state_space.demand # this is not great
return np.maximum(new_prices, 0)
# Example usage:
if __name__ == "__main__":
store_mode = 'hotel'
interaction_data = interaction_pipeline.fit_transform(None)
price_data = price_data_pipeline.fit_transform(None)
elasticity_df = elasticity_pipeline(interaction_data, price_data, window_size="30s", store_mode=store_mode)
# fetch all products with base prices from database
products_resp = supabase.table(f'{store_mode}_products').select("id, metadata").execute()
products_df = pd.DataFrame(products_resp.data)
# extract base_price from metadata
products_df['base_price'] = products_df['metadata'].apply(lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0)
products_df = products_df.rename(columns={'id': 'productId'})[['productId', 'base_price']]
# override with logged prices where available
if not price_data.empty:
if 'ts' in price_data.columns and not pd.api.types.is_datetime64_any_dtype(price_data['ts']):
price_data['ts'] = pd.to_datetime(price_data['ts'])
# get latest logged price per product
price_logs_agg = price_data.sort_values('ts').groupby('productId', as_index=False).last()
# merge: start with all products (base prices), override with logged prices
products_df = products_df.merge(
price_logs_agg[['productId', 'price']],
on='productId',
how='left'
)
products_df['final_price'] = products_df['price'].fillna(products_df['base_price'])
else:
products_df['final_price'] = products_df['base_price']
# merge with elasticity
if elasticity_df is not None and not elasticity_df.empty:
price_data_merged = products_df[['productId', 'final_price']].merge(
elasticity_df[['productId', 'elasticity']],
on='productId',
how='left'
).fillna({'elasticity': 0.0})
prices = price_data_merged['final_price'].values
elasticities = price_data_merged['elasticity'].values
else:
prices = np.array([])
elasticities = np.array([])
print(elasticities)
print(prices)
state_space = StateSpace(
demand=elasticities,
prices=prices,
session_features=interaction_data
)
pricing_function = SimpleLinearPricingFunction(price_sensitivity=-0.05)
pricing_function.fit([]) # No training data for simple model
predicted_prices = pricing_function.transform(state_space)
print("Predicted Prices:", predicted_prices)

View File

@@ -13,17 +13,6 @@ export async function GET(req: NextRequest) {
const experimentId = searchParams.get('experimentId'); const experimentId = searchParams.get('experimentId');
const storeMode = process.env.NEXT_PUBLIC_STORE_MODE || 'shop'; const storeMode = process.env.NEXT_PUBLIC_STORE_MODE || 'shop';
// log in dev
if (process.env.NODE_ENV === 'development') {
console.log('[pricing-api]', {
productId,
sessionId,
experimentId,
storeMode,
timestamp: new Date().toISOString(),
});
}
if (!productId) { if (!productId) {
return NextResponse.json( return NextResponse.json(
{ error: 'productId is required' }, { error: 'productId is required' },
@@ -34,11 +23,46 @@ export async function GET(req: NextRequest) {
// stub: call external pricing provider (random for now) // stub: call external pricing provider (random for now)
const basePrice = 100 + Math.random() * 900; // 100-1000 range const basePrice = 100 + Math.random() * 900; // 100-1000 range
const price = Math.round(basePrice * 100) / 100; const price = Math.round(basePrice * 100) / 100;
const timestamp = new Date().toISOString();
// log price to kafka for elasticity computation
if (sessionId) {
const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000';
try {
await fetch(`${backendUrl}/api/kafka/price-log`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
productId,
price,
sessionId,
experimentId: experimentId || undefined,
storeMode,
ts: timestamp,
}),
});
} catch (err) {
console.error('[price-log-error]', err);
// don't fail the pricing request if logging fails
}
}
// log in dev
if (process.env.NODE_ENV === 'development') {
console.log('[pricing-api]', {
productId,
sessionId,
experimentId,
storeMode,
price,
timestamp,
});
}
const response: PricingResponse = { const response: PricingResponse = {
price, price,
currency: 'EUR', currency: 'EUR',
cachedAt: new Date().toISOString(), cachedAt: timestamp,
}; };
return NextResponse.json(response); return NextResponse.json(response);