From f3bc81e0ed0975945eaddc3b0d283ed461439f71 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 28 Nov 2025 14:20:05 +0100 Subject: [PATCH] cleaning old pipeline and vectorization --- .../airflow/dags/elasticity_pricing_dag.py | 4 +- experiments/procesing/demand.py | 119 ---------- experiments/procesing/elasticity.py | 31 ++- experiments/procesing/extract.py | 207 ------------------ experiments/procesing/mapping.py | 158 ------------- experiments/procesing/pipeline.py | 90 -------- 6 files changed, 17 insertions(+), 592 deletions(-) delete mode 100644 experiments/procesing/demand.py delete mode 100644 experiments/procesing/extract.py delete mode 100644 experiments/procesing/mapping.py delete mode 100644 experiments/procesing/pipeline.py diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py index a44ecf5..6dd3f2d 100644 --- a/experiments/airflow/dags/elasticity_pricing_dag.py +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -130,7 +130,7 @@ def compute_demand(**kwargs): return len(demand_chunks) def aggregate_price_logs(**kwargs): - """Task: Aggregate price logs into time windows (VECTORIZED)""" + """Task: Aggregate price logs into time windows """ ti = kwargs['ti'] df = pickle.loads(ti.xcom_pull(key='price_logs_raw')) @@ -139,7 +139,7 @@ def aggregate_price_logs(**kwargs): price_chunks = step.transform(df) ti.xcom_push(key='price_chunks', value=pickle.dumps(price_chunks)) - logging.info(f"Aggregated {len(price_chunks)} price chunks (vectorized)") + logging.info(f"Aggregated {len(price_chunks)} price chunks") return len(price_chunks) def compute_elasticity(**kwargs): diff --git a/experiments/procesing/demand.py b/experiments/procesing/demand.py deleted file mode 100644 index ba3c6c6..0000000 --- a/experiments/procesing/demand.py +++ /dev/null @@ -1,119 +0,0 @@ -from sklearn.base import BaseEstimator, TransformerMixin -import numpy as np -import pandas as pd -from supabase import create_client, Client -from typing import Optional, Literal -import os -import logging -log = logging.getLogger(__name__) - -SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") -SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") - -supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) - -class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin): - """ - Split interaction data into time windows for temporal analysis. - Returns a list of dataframes, one per time window. - """ - def __init__(self, - window_size:str='1h', - ts_col:str='ts', - return_metadata:bool=True): - """ - Args: - window_size: pandas freq string ('1h', '30T', '1D', etc) - ts_col: timestamp column name - return_metadata: if True, return dict with metadata per chunk - """ - self.window_size = window_size - self.ts_col = ts_col - self.return_metadata = return_metadata - - def fit(self, X): - return self - - def transform(self, interactions: pd.DataFrame): - """ - Returns: - if return_metadata=False: list of dataframes, one per window - if return_metadata=True: list of dicts with keys: - - 'data': dataframe for this window - - 'window_start': start timestamp - - 'window_end': end timestamp - - 'window_idx': integer index - """ - if interactions.empty: - return [] - - df = interactions.copy() - - # ensure timestamp is datetime - if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]): - df[self.ts_col] = pd.to_datetime(df[self.ts_col]) - - # sort by time - df = df.sort_values(self.ts_col) - - # assign window - df['_window'] = df[self.ts_col].dt.floor(self.window_size) - - # group by window - chunks = [] - for idx, (window_start, group) in enumerate(df.groupby('_window')): - chunk_data = group.drop(columns=['_window']) - - if self.return_metadata: - chunks.append({ - 'data': chunk_data, - 'window_start': window_start, - 'window_end': window_start + pd.Timedelta(self.window_size), - 'window_idx': idx - }) - else: - chunks.append(chunk_data) - - return chunks - - -class DemandEstimator(BaseEstimator, TransformerMixin): - def __init__(self, - store_mode:str='hotel', - session_filter:str="", - experiment_filter:str=""): - self.store=store_mode - self.session_filter=session_filter if len(session_filter)>0 else None - self.experiment_filter=experiment_filter if len(experiment_filter)>0 else None - def fit(self, X): - return self - - def transform(self, interactions : pd.DataFrame): - if interactions.empty: - return pd.DataFrame(columns=["productId", "demand_score"]) - if self.session_filter: - interactions = interactions[interactions['sessionId'] == self.session_filter] - if self.experiment_filter: - interactions = interactions[interactions['experimentId'] == self.experiment_filter] - - products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() - products = pd.DataFrame(products.data) - unique_products = products['id'].unique() - log.info(f"Demand estimator found {len(unique_products)} in data") - - # filter out rows without productId - interactions_with_products = interactions.dropna(subset=['productId']) - - if interactions_with_products.empty: - # no interactions with products, return all zeros - return pd.DataFrame({ - 'productId': unique_products, - 'demand_score': 0 - }) - - # TODO: improve demand score calculation rather than just counting interactions (use weights..) - # while maintaining simplicity of a simple cross tab approach - product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions") - product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index() - product_demand.columns = ['productId', 'demand_score'] - return product_demand diff --git a/experiments/procesing/elasticity.py b/experiments/procesing/elasticity.py index eea3022..736364c 100644 --- a/experiments/procesing/elasticity.py +++ b/experiments/procesing/elasticity.py @@ -130,25 +130,24 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): def _build_product_timeseries(self, aligned_chunks): """Build time series [price, quantity] per product.""" - series_by_product = {} - + # vectorize chunk merging instead of iterating rows + all_merged = [] for chunk in aligned_chunks: - demand_df = chunk['demand'] - price_df = chunk['prices'] + merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner') + merged['timestamp'] = chunk['window_start'] + all_merged.append(merged[['productId', 'timestamp', 'price', 'demand_score']]) - # merge on productId - merged = demand_df.merge(price_df, on='productId', how='inner') + if not all_merged: + return {} - for _, row in merged.iterrows(): - pid = row['productId'] - if pid not in series_by_product: - series_by_product[pid] = [] - - series_by_product[pid].append({ - 'timestamp': chunk['window_start'], - 'price': row['price'], - 'quantity': row['demand_score'] - }) + # concat all chunks and group by productId in one pass + combined = pd.concat(all_merged, ignore_index=True) + series_by_product = { + pid: group[['timestamp', 'price', 'demand_score']].rename( + columns={'demand_score': 'quantity'} + ).to_dict('records') + for pid, group in combined.groupby('productId') + } return series_by_product diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py deleted file mode 100644 index 81eb1d3..0000000 --- a/experiments/procesing/extract.py +++ /dev/null @@ -1,207 +0,0 @@ -import pandas as pd -import json -import numpy as np -import os -import requests -from dotenv import load_dotenv -from sklearn.base import BaseEstimator, TransformerMixin -from supabase import create_client, Client -from typing import Tuple, List, Dict -load_dotenv() - -BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000") -SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") -SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") -N_PRICE_BUCKETS = 5 - -supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) - - -class KafkaDataFetcher(BaseEstimator, TransformerMixin): - def __init__(self, topic: str = "user-interactions"): - self.topic = topic # also can be price-logs - def fit(self, X=None, y=None): - return self - - def transform(self, X=None): - resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}") - resp.raise_for_status() - data = resp.json() - - if not data.get('success') or not data.get('data'): - return pd.DataFrame() - - df = pd.DataFrame(data['data']) - if self.topic == 'user-interactions': - if 'metadata' in df.columns: # explode metadata col json - df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) - df = df.dropna(subset=['eventName']) - # remape dateIndex - df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') - return df - - -class ExperimentJoiner(BaseEstimator, TransformerMixin): - def fit(self, X=None, y=None): - return self - - def transform(self, df): - if df.empty or 'experimentId' not in df.columns: - return df - - unique_exp_ids = df['experimentId'].dropna().unique() - if len(unique_exp_ids) == 0: - return df - - resp = supabase.table('experiments').select( - 'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)' - ).in_('id', unique_exp_ids.tolist()).execute() - - if not resp.data: - return df - - exp_df = pd.DataFrame(resp.data) - - # flatten task nested object if present - if 'task' in exp_df.columns and exp_df['task'].notnull().any(): - task_normalized = pd.json_normalize(exp_df['task'].dropna()) - task_normalized.index = exp_df[exp_df['task'].notnull()].index - exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task') - - # rename experiment columns for clarity - exp_df = exp_df.rename(columns={ - 'id': 'experimentId', - 'subject_name': 'exp_subject', - 'xp_human_only': 'exp_human_only', - 'xp_market_mode': 'exp_market_mode', - 'xp_task_id': 'exp_task_id' - }) - - df = df.merge(exp_df, on='experimentId', how='left') - return df - - -class EventTitleAugmenter(BaseEstimator, TransformerMixin): - def fit(self, X=None, y=None): - return self - - def transform(self, df): - # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} - # we want metadata schema to create product specific event names - - # only create price buckets if we have enough unique prices - if df["metadata_price"].notnull().sum() > 0: - try: - price_buckets = pd.qcut( - df["metadata_price"], - q=N_PRICE_BUCKETS, - labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)], - duplicates='drop' # handle duplicate bin edges - ) - except ValueError: - # fallback: if still not enough unique values, use cut with fixed ranges or just use raw price - price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "") - else: - price_buckets = pd.Series([""] * len(df), index=df.index) - - # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name - # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page - df["metadata_schema"] = np.where( - df["productId"].notnull() & df["metadata_price"].notnull(), - "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str), - "" - ) - df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) - return df - - -def chunk_shared_data(interactions_df: pd.DataFrame, - price_logs_df: pd.DataFrame, - window_size: str = '30s', - ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]: - """ - Chunk interaction and price data into aligned time windows. - - Args: - interactions_df: interaction data with timestamp column - price_logs_df: price log data with timestamp column - window_size: pandas freq string ('30s', '1min', '1h', etc) - ts_col: name of timestamp column - - Returns: - tuple of (interaction_chunks, price_chunks) where each is list of dicts: - { - 'window_start': timestamp, - 'window_end': timestamp, - 'data': dataframe for this window - } - """ - if interactions_df.empty and price_logs_df.empty: - return [], [] - - # convert timestamps to datetime - interactions_df = interactions_df.copy() - price_logs_df = price_logs_df.copy() - - if not interactions_df.empty: - if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]): - interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col]) - - if not price_logs_df.empty: - if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]): - price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col]) - - # find global time bounds - times = [] - if not interactions_df.empty: - times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()]) - if not price_logs_df.empty: - times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()]) - - if not times: - return [], [] - - earliest = min(times) - latest = max(times) - - # create shared time windows - windows = pd.date_range(start=earliest, end=latest, freq=window_size) - - if len(windows) < 2: - return [], [] - - # chunk both datasets - interaction_chunks = [] - price_chunks = [] - - for i in range(len(windows) - 1): - window_start = windows[i] - window_end = windows[i + 1] - - # filter interactions in this window - if not interactions_df.empty: - mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end) - interaction_chunk = interactions_df[mask] - else: - interaction_chunk = pd.DataFrame() - - interaction_chunks.append({ - 'window_start': window_start, - 'window_end': window_end, - 'data': interaction_chunk - }) - - # filter price logs in this window - if not price_logs_df.empty: - mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end) - price_chunk = price_logs_df[mask] - else: - price_chunk = pd.DataFrame() - - price_chunks.append({ - 'window_start': window_start, - 'window_end': window_end, - 'data': price_chunk - }) - - return interaction_chunks, price_chunks diff --git a/experiments/procesing/mapping.py b/experiments/procesing/mapping.py deleted file mode 100644 index 6c32b91..0000000 --- a/experiments/procesing/mapping.py +++ /dev/null @@ -1,158 +0,0 @@ -import numpy as np -import pandas as pd -from sklearn.base import BaseEstimator, TransformerMixin - -def build_transition_prob_matrix(df: pd.DataFrame): - df = df.dropna(subset=['eventName']) - events = df['eventName'].tolist() - labels = pd.Index(events).unique().tolist() - idx = {e:i for i,e in enumerate(labels)} - M = np.zeros((len(labels), len(labels)), dtype=float) - for a, b in zip(events, events[1:]): - M[idx[a], idx[b]] += 1 - row_sums = M.sum(axis=1, keepdims=True) - with np.errstate(divide='ignore', invalid='ignore'): - P = np.divide(M, row_sums, where=row_sums>0) # row-normalized - return P, labels - -# https://medium.com/data-science/time-series-data-markov-transition-matrices-7060771e362b -from graphviz import Digraph -import numpy as np -import pandas as pd - -def _as_prob_df(matrix, labels=None): - """Return a square DataFrame with index=columns=labels.""" - if isinstance(matrix, pd.DataFrame): - # Ensure square and aligned - assert (matrix.index == matrix.columns).all(), "Index/columns must match." - return matrix - matrix = np.asarray(matrix, dtype=float) - assert matrix.shape[0] == matrix.shape[1], "Matrix must be square." - if labels is None: - raise ValueError("labels are required when matrix is not a DataFrame") - assert len(labels) == matrix.shape[0], "labels length must match matrix size." - return pd.DataFrame(matrix, index=list(labels), columns=list(labels)) - -def _df_to_edgelist(P: pd.DataFrame, threshold=0.0, round_digits=2): - """Build weighted edges > threshold.""" - edges = [] - for src in P.index: - for dst in P.columns: - w = float(P.loc[src, dst]) - if w > threshold: - edges.append((str(src), str(dst), f"{w:.{round_digits}f}")) - return edges - -def render_graph(fname, matrix, ls_index=None, threshold=0.0, fmt="svg", view=False): - """ - fname: output file stem (no extension) - matrix: NumPy array or pandas DataFrame of transition PROBABILITIES - ls_index: ordered labels (required if matrix is not a DataFrame) - threshold: hide edges with weight <= threshold - fmt: 'svg'|'png'|'pdf' etc. - view: open after rendering - """ - P = _as_prob_df(matrix, labels=ls_index) - edges = _df_to_edgelist(P, threshold=threshold) - - g = Digraph(format=fmt) - g.attr(rankdir="LR", size="30") - g.attr("node", shape="circle") - - # ensure isolated nodes appear - for node in P.index: - g.node(str(node), width="1", height="1") - - for src, dst, label in edges: - g.edge(src, dst, label=label) - - g.render(fname, view=view, cleanup=True) - return g - - -class TransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): - def __init__(self, threshold=0.0): - self.threshold = threshold - self.P_ = None - self.labels_ = None - - def fit(self, X: pd.DataFrame, y=None): - P, labels = build_transition_prob_matrix(X) - self.P_ = P - self.labels_ = labels - return self - - def transform(self, X: pd.DataFrame = None): - return self.P_, self.labels_ - - def render(self, fname: str, fmt="svg", view=False): - if self.P_ is None or self.labels_ is None: - raise ValueError("Transformer has not been fitted yet.") - return render_graph( - fname, - self.P_, - ls_index=self.labels_, - threshold=self.threshold, - fmt=fmt, - view=view - ) - - -class SessionTransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): - def __init__(self, threshold=0.0, session_col='sessionId'): - self.threshold = threshold - self.session_col = session_col - self.session_matrices_ = None - - def fit(self, X: pd.DataFrame, y=None): - if self.session_col not in X.columns: - raise ValueError(f"Column '{self.session_col}' not found in DataFrame") - - session_matrices = {} - for session_id, grp in X.groupby(self.session_col): - if len(grp) > 1: # need at least 2 events for transitions - P, labels = build_transition_prob_matrix(grp) - session_matrices[session_id] = {'matrix': P, 'labels': labels} - - self.session_matrices_ = session_matrices - return self - - def transform(self, X: pd.DataFrame = None): - if self.session_matrices_ is None: - raise ValueError("Transformer has not been fitted yet.") - return pd.Series(self.session_matrices_) - - def render_session(self, session_id: str, fname: str, fmt="svg", view=False): - if self.session_matrices_ is None: - raise ValueError("Transformer has not been fitted yet.") - if session_id not in self.session_matrices_: - raise ValueError(f"Session '{session_id}' not found in fitted data.") - - sess_data = self.session_matrices_[session_id] - return render_graph( - fname, - sess_data['matrix'], - ls_index=sess_data['labels'], - threshold=self.threshold, - fmt=fmt, - view=view - ) -if __name__ == "__main__": - # Example usage - data = { - 'eventName': [ - 'A', 'B', 'A', 'C', 'B', 'A', 'A', 'C', 'B', 'C', - 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A' - ] - } - df = pd.DataFrame(data) - - transformer = TransitionProbMatrixTransformer(threshold=0.1) - transformer.fit(df) - P, labels = transformer.transform(None) - - print("Transition Probability Matrix:") - print(pd.DataFrame(P, index=labels, columns=labels)) - - # Render the graph - transformer.render("transition_graph", fmt="svg", view=False) diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py deleted file mode 100644 index 2efa4ae..0000000 --- a/experiments/procesing/pipeline.py +++ /dev/null @@ -1,90 +0,0 @@ -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import StandardScaler -import pandas as pd -import logging -log = logging.getLogger(__name__) - -from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data -from mapping import SessionTransitionProbMatrixTransformer, render_graph -from demand import DemandEstimator, ChunkInteractionsIntoSteps -from elasticity import TemporalElasticityEstimator, aggregate_price_logs - - - -# elasticity pipeline components (not sklearn compatible, manual orchestration) -def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'): - """ - Compute price elasticity from interaction and price data. - - Args: - interactions_df: raw interaction data from demand_data_pipeline - price_logs_df: price log data from price_data_pipeline - window_size: time window for chunking - store_mode: 'hotel' or 'airline' - - Returns: - df with [productId, elasticity, std_error, n_obs] - """ - # step 1: chunk interactions into time windows - chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) - interaction_chunks = chunker.transform(interactions_df) - log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}") - - if not interaction_chunks: - return None - - # step 2: compute demand per window - demand_estimator = DemandEstimator(store_mode=store_mode) - demand_chunks = [] - for chunk in interaction_chunks: - demand_vector = demand_estimator.transform(chunk['data']) - demand_chunks.append({ - 'window_start': chunk['window_start'], - 'window_end': chunk['window_end'], - 'demand_vector': demand_vector # each has a full list of all products, even if demand is 0 - }) - # [q_chunk1, q_chunk2, ...] - - # step 3: aggregate price logs into windows - price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size) - - # step 4: compute elasticity - elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2) - elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode) - - return elasticity_df - - -# exposable pipelines -interaction_pipeline = Pipeline([ - ('kafka_fetch', KafkaDataFetcher(topic='user-interactions')), - ('experiment_join', ExperimentJoiner()), - ('event_augment', EventTitleAugmenter()), -]) - -price_data_pipeline = Pipeline([ - ('kafka_fetch', KafkaDataFetcher(topic='price-logs')), -]) - -# interaction_data + price_data -> elasticity (demand) -# elasticity -> pricing - -pricing_pipeline = Pipeline([ - ('demand_estimation', DemandEstimator()), -]) -if __name__ == "__main__": - # fetch both datasets - interaction_data = interaction_pipeline.fit_transform(None) - pricing_data = price_data_pipeline.fit_transform(None) - if interaction_data.empty or pricing_data.empty: - print("Insufficient data for elasticity computation"); exit(0) - # compute elasticity via unified pipeline - window_size = "30s" - elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size) - elasticity_value_array = elasticity_results['elasticity'].values if elasticity_results is not None else np.array([]) - print(elasticity_value_array) - - if elasticity_results is not None and not elasticity_results.empty: - print(elasticity_results.to_string(index=False)) - else: - print("\nInsufficient data for elasticity computation")