diff --git a/experiments/procesing/__init__.py b/experiments/procesing/__init__.py new file mode 100644 index 0000000..48b91bf --- /dev/null +++ b/experiments/procesing/__init__.py @@ -0,0 +1,19 @@ +from .extract import ( + KafkaDataFetcher, + ExperimentJoiner, + EventTitleAugmenter, +) +from .demand import DemandEstimator +from .mapping import SessionTransitionProbMatrixTransformer, render_graph +from .pipeline import etl_pipeline, pricing_pipeline + +__all__ = [ + 'KafkaDataFetcher', + 'ExperimentJoiner', + 'EventTitleAugmenter', + 'DemandEstimator', + 'SessionTransitionProbMatrixTransformer', + 'render_graph', + 'etl_pipeline', + 'pricing_pipeline', +] diff --git a/experiments/procesing/demand.py b/experiments/procesing/demand.py new file mode 100644 index 0000000..d1924ec --- /dev/null +++ b/experiments/procesing/demand.py @@ -0,0 +1,39 @@ +from sklearn.base import BaseEstimator, TransformerMixin +import numpy as np +import pandas as pd +from supabase import create_client, Client +import pandas as pd +import os + +SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") +SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") + +supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) + +class DemandEstimator(BaseEstimator, TransformerMixin): + def __init__(self, + store_mode:str='hotel', + session_filter:str="", + experiment_filter:str=""): + self.store=store_mode + self.session_filter=session_filter if len(session_filter)>0 else None + self.experiment_filter=experiment_filter if len(experiment_filter)>0 else None + def fit(self, X): + return self + + def transform(self, interactions : pd.DataFrame): + if interactions.empty: + return pd.DataFrame(columns=["productId", "demand_score"]) + if self.session_filter: + interactions = interactions[interactions['sessionId'] == self.session_filter] + if self.experiment_filter: + interactions = interactions[interactions['experimentId'] == self.experiment_filter] + products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() + products = pd.DataFrame(products.data) + unique_products = products['id'].unique() + # TODO: improve demand score calculation rather than just counting interactions (use weights..) + # while maintaining simplicity of a simple cross tab approach + product_demand = pd.crosstab(interactions['productId'], "no_of_interactions") + product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index() + product_demand.columns = ['productId', 'demand_score'] + return product_demand diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py index 7fbb88c..bd77204 100644 --- a/experiments/procesing/extract.py +++ b/experiments/procesing/extract.py @@ -15,106 +15,98 @@ N_PRICE_BUCKETS = 5 supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) -def get_data_from_kafka() -> pd.DataFrame: - """fetch all events from backend dump endpoint""" - resp = requests.get(f"{BACKEND_URL}/api/kafka/dump") - resp.raise_for_status() - data = resp.json() - if not data.get('success') or not data.get('data'): - return pd.DataFrame() - - df = pd.DataFrame(data['data']) - # explode metadata col json - if 'metadata' in df.columns: - df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) - df = df.dropna(subset=['eventName']) - return df - - -def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame: - if df.empty or 'experimentId' not in df.columns: - return df - - unique_exp_ids = df['experimentId'].dropna().unique() - if len(unique_exp_ids) == 0: - return df - - resp = supabase.table('experiments').select( - 'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)' - ).in_('id', unique_exp_ids.tolist()).execute() - - if not resp.data: - return df - - exp_df = pd.DataFrame(resp.data) - - # flatten task nested object if present - if 'task' in exp_df.columns and exp_df['task'].notnull().any(): - task_normalized = pd.json_normalize(exp_df['task'].dropna()) - task_normalized.index = exp_df[exp_df['task'].notnull()].index - exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task') - - # rename experiment columns for clarity - exp_df = exp_df.rename(columns={ - 'id': 'experimentId', - 'subject_name': 'exp_subject', - 'xp_human_only': 'exp_human_only', - 'xp_market_mode': 'exp_market_mode', - 'xp_task_id': 'exp_task_id' - }) - - df = df.merge(exp_df, on='experimentId', how='left') - return df - - -def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame: - # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} - # we want metadata schema to create product specific event names - - # only create price buckets if we have enough unique prices - if df["metadata_price"].notnull().sum() > 0: - try: - price_buckets = pd.qcut( - df["metadata_price"], - q=N_PRICE_BUCKETS, - labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)], - duplicates='drop' # handle duplicate bin edges - ) - except ValueError: - # fallback: if still not enough unique values, use cut with fixed ranges or just use raw price - price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "") - else: - price_buckets = pd.Series([""] * len(df), index=df.index) - - # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name - # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page - df["metadata_schema"] = np.where( - df["productId"].notnull() & df["metadata_price"].notnull(), - "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str), - "" - ) - df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) - return df - - -def extract() -> pd.DataFrame: - df = get_data_from_kafka() - df = join_with_experiments(df) - df = augment_event_titles(df) - return df - - -class DataExtractor(BaseEstimator, TransformerMixin): +class KafkaDataFetcher(BaseEstimator, TransformerMixin): def fit(self, X=None, y=None): return self def transform(self, X=None): - return extract() + resp = requests.get(f"{BACKEND_URL}/api/kafka/dump") + resp.raise_for_status() + data = resp.json() + + if not data.get('success') or not data.get('data'): + return pd.DataFrame() + + df = pd.DataFrame(data['data']) + # explode metadata col json + if 'metadata' in df.columns: + df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) + df = df.dropna(subset=['eventName']) + # remape dateIndex + df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') + return df -if __name__ == "__main__": - df = extract() - print(df.head()) - print(df.tail()) - print(df.info()) +class ExperimentJoiner(BaseEstimator, TransformerMixin): + def fit(self, X=None, y=None): + return self + + def transform(self, df): + if df.empty or 'experimentId' not in df.columns: + return df + + unique_exp_ids = df['experimentId'].dropna().unique() + if len(unique_exp_ids) == 0: + return df + + resp = supabase.table('experiments').select( + 'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)' + ).in_('id', unique_exp_ids.tolist()).execute() + + if not resp.data: + return df + + exp_df = pd.DataFrame(resp.data) + + # flatten task nested object if present + if 'task' in exp_df.columns and exp_df['task'].notnull().any(): + task_normalized = pd.json_normalize(exp_df['task'].dropna()) + task_normalized.index = exp_df[exp_df['task'].notnull()].index + exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task') + + # rename experiment columns for clarity + exp_df = exp_df.rename(columns={ + 'id': 'experimentId', + 'subject_name': 'exp_subject', + 'xp_human_only': 'exp_human_only', + 'xp_market_mode': 'exp_market_mode', + 'xp_task_id': 'exp_task_id' + }) + + df = df.merge(exp_df, on='experimentId', how='left') + return df + + +class EventTitleAugmenter(BaseEstimator, TransformerMixin): + def fit(self, X=None, y=None): + return self + + def transform(self, df): + # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} + # we want metadata schema to create product specific event names + + # only create price buckets if we have enough unique prices + if df["metadata_price"].notnull().sum() > 0: + try: + price_buckets = pd.qcut( + df["metadata_price"], + q=N_PRICE_BUCKETS, + labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)], + duplicates='drop' # handle duplicate bin edges + ) + except ValueError: + # fallback: if still not enough unique values, use cut with fixed ranges or just use raw price + price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "") + else: + price_buckets = pd.Series([""] * len(df), index=df.index) + + # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name + # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page + df["metadata_schema"] = np.where( + df["productId"].notnull() & df["metadata_price"].notnull(), + "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str), + "" + ) + df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) + return df diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py index 0465317..2d93c05 100644 --- a/experiments/procesing/pipeline.py +++ b/experiments/procesing/pipeline.py @@ -1,20 +1,22 @@ from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler -from extract import DataExtractor +from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter from mapping import SessionTransitionProbMatrixTransformer, render_graph from demand import DemandEstimator # exposable pipelines etl_pipeline = Pipeline([ - ('data_extraction', DataExtractor()), + ('kafka_fetch', KafkaDataFetcher()), + ('experiment_join', ExperimentJoiner()), + ('event_augment', EventTitleAugmenter()), ]) pricing_pipeline = Pipeline([ ('demand_estimation', DemandEstimator()), - ('scaling', StandardScaler()), ]) if __name__ == "__main__": processed_data = etl_pipeline.fit_transform(None) pricing = pricing_pipeline.fit_transform(processed_data) + print(pricing)