6 catalog data and mode mappers (#25)

* supabase product proxy and rendering

* minor pipeline refactor

* refactoring and demand estimation

* trackion of date index searching

* fixing changes of imports

* data seeding

* chore: airline basic refactor

* feat: huge push of product changes and item review with cart

* refactored design

* chore: moving route elsewhere and align

* fix: build of web/

* chore: fixing paper build

* fixing chars
This commit is contained in:
Daniel Alves Rösel
2025-11-25 11:00:31 +01:00
committed by GitHub
parent 894ce87a5d
commit 8b76d24ade
29 changed files with 1390 additions and 1237 deletions

View File

@@ -15,106 +15,98 @@ N_PRICE_BUCKETS = 5
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
def get_data_from_kafka() -> pd.DataFrame:
"""fetch all events from backend dump endpoint"""
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
return df
def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame:
if df.empty or 'experimentId' not in df.columns:
return df
unique_exp_ids = df['experimentId'].dropna().unique()
if len(unique_exp_ids) == 0:
return df
resp = supabase.table('experiments').select(
'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)'
).in_('id', unique_exp_ids.tolist()).execute()
if not resp.data:
return df
exp_df = pd.DataFrame(resp.data)
# flatten task nested object if present
if 'task' in exp_df.columns and exp_df['task'].notnull().any():
task_normalized = pd.json_normalize(exp_df['task'].dropna())
task_normalized.index = exp_df[exp_df['task'].notnull()].index
exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task')
# rename experiment columns for clarity
exp_df = exp_df.rename(columns={
'id': 'experimentId',
'subject_name': 'exp_subject',
'xp_human_only': 'exp_human_only',
'xp_market_mode': 'exp_market_mode',
'xp_task_id': 'exp_task_id'
})
df = df.merge(exp_df, on='experimentId', how='left')
return df
def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame:
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(
df["productId"].notnull() & df["metadata_price"].notnull(),
"_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
""
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df
def extract() -> pd.DataFrame:
df = get_data_from_kafka()
df = join_with_experiments(df)
df = augment_event_titles(df)
return df
class DataExtractor(BaseEstimator, TransformerMixin):
class KafkaDataFetcher(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, X=None):
return extract()
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
# remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
return df
if __name__ == "__main__":
df = extract()
print(df.head())
print(df.tail())
print(df.info())
class ExperimentJoiner(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, df):
if df.empty or 'experimentId' not in df.columns:
return df
unique_exp_ids = df['experimentId'].dropna().unique()
if len(unique_exp_ids) == 0:
return df
resp = supabase.table('experiments').select(
'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)'
).in_('id', unique_exp_ids.tolist()).execute()
if not resp.data:
return df
exp_df = pd.DataFrame(resp.data)
# flatten task nested object if present
if 'task' in exp_df.columns and exp_df['task'].notnull().any():
task_normalized = pd.json_normalize(exp_df['task'].dropna())
task_normalized.index = exp_df[exp_df['task'].notnull()].index
exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task')
# rename experiment columns for clarity
exp_df = exp_df.rename(columns={
'id': 'experimentId',
'subject_name': 'exp_subject',
'xp_human_only': 'exp_human_only',
'xp_market_mode': 'exp_market_mode',
'xp_task_id': 'exp_task_id'
})
df = df.merge(exp_df, on='experimentId', how='left')
return df
class EventTitleAugmenter(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, df):
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(
df["productId"].notnull() & df["metadata_price"].notnull(),
"_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
""
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df