refactoring and demand estimation

This commit is contained in:
2025-11-22 22:07:07 +01:00
parent 2661b841fc
commit 2ae027dba2
4 changed files with 151 additions and 99 deletions

View File

@@ -0,0 +1,19 @@
from .extract import (
KafkaDataFetcher,
ExperimentJoiner,
EventTitleAugmenter,
)
from .demand import DemandEstimator
from .mapping import SessionTransitionProbMatrixTransformer, render_graph
from .pipeline import etl_pipeline, pricing_pipeline
__all__ = [
'KafkaDataFetcher',
'ExperimentJoiner',
'EventTitleAugmenter',
'DemandEstimator',
'SessionTransitionProbMatrixTransformer',
'render_graph',
'etl_pipeline',
'pricing_pipeline',
]

View File

@@ -0,0 +1,39 @@
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from supabase import create_client, Client
import pandas as pd
import os
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class DemandEstimator(BaseEstimator, TransformerMixin):
def __init__(self,
store_mode:str='hotel',
session_filter:str="",
experiment_filter:str=""):
self.store=store_mode
self.session_filter=session_filter if len(session_filter)>0 else None
self.experiment_filter=experiment_filter if len(experiment_filter)>0 else None
def fit(self, X):
return self
def transform(self, interactions : pd.DataFrame):
if interactions.empty:
return pd.DataFrame(columns=["productId", "demand_score"])
if self.session_filter:
interactions = interactions[interactions['sessionId'] == self.session_filter]
if self.experiment_filter:
interactions = interactions[interactions['experimentId'] == self.experiment_filter]
products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute()
products = pd.DataFrame(products.data)
unique_products = products['id'].unique()
# TODO: improve demand score calculation rather than just counting interactions (use weights..)
# while maintaining simplicity of a simple cross tab approach
product_demand = pd.crosstab(interactions['productId'], "no_of_interactions")
product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index()
product_demand.columns = ['productId', 'demand_score']
return product_demand

View File

@@ -15,106 +15,98 @@ N_PRICE_BUCKETS = 5
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
def get_data_from_kafka() -> pd.DataFrame:
"""fetch all events from backend dump endpoint"""
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'): class KafkaDataFetcher(BaseEstimator, TransformerMixin):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
return df
def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame:
if df.empty or 'experimentId' not in df.columns:
return df
unique_exp_ids = df['experimentId'].dropna().unique()
if len(unique_exp_ids) == 0:
return df
resp = supabase.table('experiments').select(
'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)'
).in_('id', unique_exp_ids.tolist()).execute()
if not resp.data:
return df
exp_df = pd.DataFrame(resp.data)
# flatten task nested object if present
if 'task' in exp_df.columns and exp_df['task'].notnull().any():
task_normalized = pd.json_normalize(exp_df['task'].dropna())
task_normalized.index = exp_df[exp_df['task'].notnull()].index
exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task')
# rename experiment columns for clarity
exp_df = exp_df.rename(columns={
'id': 'experimentId',
'subject_name': 'exp_subject',
'xp_human_only': 'exp_human_only',
'xp_market_mode': 'exp_market_mode',
'xp_task_id': 'exp_task_id'
})
df = df.merge(exp_df, on='experimentId', how='left')
return df
def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame:
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(
df["productId"].notnull() & df["metadata_price"].notnull(),
"_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
""
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df
def extract() -> pd.DataFrame:
df = get_data_from_kafka()
df = join_with_experiments(df)
df = augment_event_titles(df)
return df
class DataExtractor(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None): def fit(self, X=None, y=None):
return self return self
def transform(self, X=None): def transform(self, X=None):
return extract() resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
# remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
return df
if __name__ == "__main__": class ExperimentJoiner(BaseEstimator, TransformerMixin):
df = extract() def fit(self, X=None, y=None):
print(df.head()) return self
print(df.tail())
print(df.info()) def transform(self, df):
if df.empty or 'experimentId' not in df.columns:
return df
unique_exp_ids = df['experimentId'].dropna().unique()
if len(unique_exp_ids) == 0:
return df
resp = supabase.table('experiments').select(
'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)'
).in_('id', unique_exp_ids.tolist()).execute()
if not resp.data:
return df
exp_df = pd.DataFrame(resp.data)
# flatten task nested object if present
if 'task' in exp_df.columns and exp_df['task'].notnull().any():
task_normalized = pd.json_normalize(exp_df['task'].dropna())
task_normalized.index = exp_df[exp_df['task'].notnull()].index
exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task')
# rename experiment columns for clarity
exp_df = exp_df.rename(columns={
'id': 'experimentId',
'subject_name': 'exp_subject',
'xp_human_only': 'exp_human_only',
'xp_market_mode': 'exp_market_mode',
'xp_task_id': 'exp_task_id'
})
df = df.merge(exp_df, on='experimentId', how='left')
return df
class EventTitleAugmenter(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, df):
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(
df["productId"].notnull() & df["metadata_price"].notnull(),
"_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
""
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df

View File

@@ -1,20 +1,22 @@
from sklearn.pipeline import Pipeline from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import StandardScaler
from extract import DataExtractor from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter
from mapping import SessionTransitionProbMatrixTransformer, render_graph from mapping import SessionTransitionProbMatrixTransformer, render_graph
from demand import DemandEstimator from demand import DemandEstimator
# exposable pipelines # exposable pipelines
etl_pipeline = Pipeline([ etl_pipeline = Pipeline([
('data_extraction', DataExtractor()), ('kafka_fetch', KafkaDataFetcher()),
('experiment_join', ExperimentJoiner()),
('event_augment', EventTitleAugmenter()),
]) ])
pricing_pipeline = Pipeline([ pricing_pipeline = Pipeline([
('demand_estimation', DemandEstimator()), ('demand_estimation', DemandEstimator()),
('scaling', StandardScaler()),
]) ])
if __name__ == "__main__": if __name__ == "__main__":
processed_data = etl_pipeline.fit_transform(None) processed_data = etl_pipeline.fit_transform(None)
pricing = pricing_pipeline.fit_transform(processed_data) pricing = pricing_pipeline.fit_transform(processed_data)
print(pricing)