cleaning old pipeline and vectorization

This commit is contained in:
2025-11-28 14:20:05 +01:00
parent 1054fe7720
commit f3bc81e0ed
6 changed files with 17 additions and 592 deletions

View File

@@ -1,119 +0,0 @@
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from supabase import create_client, Client
from typing import Optional, Literal
import os
import logging
log = logging.getLogger(__name__)
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin):
"""
Split interaction data into time windows for temporal analysis.
Returns a list of dataframes, one per time window.
"""
def __init__(self,
window_size:str='1h',
ts_col:str='ts',
return_metadata:bool=True):
"""
Args:
window_size: pandas freq string ('1h', '30T', '1D', etc)
ts_col: timestamp column name
return_metadata: if True, return dict with metadata per chunk
"""
self.window_size = window_size
self.ts_col = ts_col
self.return_metadata = return_metadata
def fit(self, X):
return self
def transform(self, interactions: pd.DataFrame):
"""
Returns:
if return_metadata=False: list of dataframes, one per window
if return_metadata=True: list of dicts with keys:
- 'data': dataframe for this window
- 'window_start': start timestamp
- 'window_end': end timestamp
- 'window_idx': integer index
"""
if interactions.empty:
return []
df = interactions.copy()
# ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]):
df[self.ts_col] = pd.to_datetime(df[self.ts_col])
# sort by time
df = df.sort_values(self.ts_col)
# assign window
df['_window'] = df[self.ts_col].dt.floor(self.window_size)
# group by window
chunks = []
for idx, (window_start, group) in enumerate(df.groupby('_window')):
chunk_data = group.drop(columns=['_window'])
if self.return_metadata:
chunks.append({
'data': chunk_data,
'window_start': window_start,
'window_end': window_start + pd.Timedelta(self.window_size),
'window_idx': idx
})
else:
chunks.append(chunk_data)
return chunks
class DemandEstimator(BaseEstimator, TransformerMixin):
def __init__(self,
store_mode:str='hotel',
session_filter:str="",
experiment_filter:str=""):
self.store=store_mode
self.session_filter=session_filter if len(session_filter)>0 else None
self.experiment_filter=experiment_filter if len(experiment_filter)>0 else None
def fit(self, X):
return self
def transform(self, interactions : pd.DataFrame):
if interactions.empty:
return pd.DataFrame(columns=["productId", "demand_score"])
if self.session_filter:
interactions = interactions[interactions['sessionId'] == self.session_filter]
if self.experiment_filter:
interactions = interactions[interactions['experimentId'] == self.experiment_filter]
products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute()
products = pd.DataFrame(products.data)
unique_products = products['id'].unique()
log.info(f"Demand estimator found {len(unique_products)} in data")
# filter out rows without productId
interactions_with_products = interactions.dropna(subset=['productId'])
if interactions_with_products.empty:
# no interactions with products, return all zeros
return pd.DataFrame({
'productId': unique_products,
'demand_score': 0
})
# TODO: improve demand score calculation rather than just counting interactions (use weights..)
# while maintaining simplicity of a simple cross tab approach
product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions")
product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index()
product_demand.columns = ['productId', 'demand_score']
return product_demand

View File

@@ -130,25 +130,24 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin):
def _build_product_timeseries(self, aligned_chunks):
"""Build time series [price, quantity] per product."""
series_by_product = {}
# vectorize chunk merging instead of iterating rows
all_merged = []
for chunk in aligned_chunks:
demand_df = chunk['demand']
price_df = chunk['prices']
merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner')
merged['timestamp'] = chunk['window_start']
all_merged.append(merged[['productId', 'timestamp', 'price', 'demand_score']])
# merge on productId
merged = demand_df.merge(price_df, on='productId', how='inner')
if not all_merged:
return {}
for _, row in merged.iterrows():
pid = row['productId']
if pid not in series_by_product:
series_by_product[pid] = []
series_by_product[pid].append({
'timestamp': chunk['window_start'],
'price': row['price'],
'quantity': row['demand_score']
})
# concat all chunks and group by productId in one pass
combined = pd.concat(all_merged, ignore_index=True)
series_by_product = {
pid: group[['timestamp', 'price', 'demand_score']].rename(
columns={'demand_score': 'quantity'}
).to_dict('records')
for pid, group in combined.groupby('productId')
}
return series_by_product

View File

@@ -1,207 +0,0 @@
import pandas as pd
import json
import numpy as np
import os
import requests
from dotenv import load_dotenv
from sklearn.base import BaseEstimator, TransformerMixin
from supabase import create_client, Client
from typing import Tuple, List, Dict
load_dotenv()
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000")
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
N_PRICE_BUCKETS = 5
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class KafkaDataFetcher(BaseEstimator, TransformerMixin):
def __init__(self, topic: str = "user-interactions"):
self.topic = topic # also can be price-logs
def fit(self, X=None, y=None):
return self
def transform(self, X=None):
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
if self.topic == 'user-interactions':
if 'metadata' in df.columns: # explode metadata col json
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
# remape dateIndex
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
return df
class ExperimentJoiner(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, df):
if df.empty or 'experimentId' not in df.columns:
return df
unique_exp_ids = df['experimentId'].dropna().unique()
if len(unique_exp_ids) == 0:
return df
resp = supabase.table('experiments').select(
'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)'
).in_('id', unique_exp_ids.tolist()).execute()
if not resp.data:
return df
exp_df = pd.DataFrame(resp.data)
# flatten task nested object if present
if 'task' in exp_df.columns and exp_df['task'].notnull().any():
task_normalized = pd.json_normalize(exp_df['task'].dropna())
task_normalized.index = exp_df[exp_df['task'].notnull()].index
exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task')
# rename experiment columns for clarity
exp_df = exp_df.rename(columns={
'id': 'experimentId',
'subject_name': 'exp_subject',
'xp_human_only': 'exp_human_only',
'xp_market_mode': 'exp_market_mode',
'xp_task_id': 'exp_task_id'
})
df = df.merge(exp_df, on='experimentId', how='left')
return df
class EventTitleAugmenter(BaseEstimator, TransformerMixin):
def fit(self, X=None, y=None):
return self
def transform(self, df):
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(
df["productId"].notnull() & df["metadata_price"].notnull(),
"_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
""
)
df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
return df
def chunk_shared_data(interactions_df: pd.DataFrame,
price_logs_df: pd.DataFrame,
window_size: str = '30s',
ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]:
"""
Chunk interaction and price data into aligned time windows.
Args:
interactions_df: interaction data with timestamp column
price_logs_df: price log data with timestamp column
window_size: pandas freq string ('30s', '1min', '1h', etc)
ts_col: name of timestamp column
Returns:
tuple of (interaction_chunks, price_chunks) where each is list of dicts:
{
'window_start': timestamp,
'window_end': timestamp,
'data': dataframe for this window
}
"""
if interactions_df.empty and price_logs_df.empty:
return [], []
# convert timestamps to datetime
interactions_df = interactions_df.copy()
price_logs_df = price_logs_df.copy()
if not interactions_df.empty:
if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]):
interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col])
if not price_logs_df.empty:
if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]):
price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col])
# find global time bounds
times = []
if not interactions_df.empty:
times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()])
if not price_logs_df.empty:
times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()])
if not times:
return [], []
earliest = min(times)
latest = max(times)
# create shared time windows
windows = pd.date_range(start=earliest, end=latest, freq=window_size)
if len(windows) < 2:
return [], []
# chunk both datasets
interaction_chunks = []
price_chunks = []
for i in range(len(windows) - 1):
window_start = windows[i]
window_end = windows[i + 1]
# filter interactions in this window
if not interactions_df.empty:
mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end)
interaction_chunk = interactions_df[mask]
else:
interaction_chunk = pd.DataFrame()
interaction_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': interaction_chunk
})
# filter price logs in this window
if not price_logs_df.empty:
mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end)
price_chunk = price_logs_df[mask]
else:
price_chunk = pd.DataFrame()
price_chunks.append({
'window_start': window_start,
'window_end': window_end,
'data': price_chunk
})
return interaction_chunks, price_chunks

View File

@@ -1,158 +0,0 @@
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
def build_transition_prob_matrix(df: pd.DataFrame):
df = df.dropna(subset=['eventName'])
events = df['eventName'].tolist()
labels = pd.Index(events).unique().tolist()
idx = {e:i for i,e in enumerate(labels)}
M = np.zeros((len(labels), len(labels)), dtype=float)
for a, b in zip(events, events[1:]):
M[idx[a], idx[b]] += 1
row_sums = M.sum(axis=1, keepdims=True)
with np.errstate(divide='ignore', invalid='ignore'):
P = np.divide(M, row_sums, where=row_sums>0) # row-normalized
return P, labels
# https://medium.com/data-science/time-series-data-markov-transition-matrices-7060771e362b
from graphviz import Digraph
import numpy as np
import pandas as pd
def _as_prob_df(matrix, labels=None):
"""Return a square DataFrame with index=columns=labels."""
if isinstance(matrix, pd.DataFrame):
# Ensure square and aligned
assert (matrix.index == matrix.columns).all(), "Index/columns must match."
return matrix
matrix = np.asarray(matrix, dtype=float)
assert matrix.shape[0] == matrix.shape[1], "Matrix must be square."
if labels is None:
raise ValueError("labels are required when matrix is not a DataFrame")
assert len(labels) == matrix.shape[0], "labels length must match matrix size."
return pd.DataFrame(matrix, index=list(labels), columns=list(labels))
def _df_to_edgelist(P: pd.DataFrame, threshold=0.0, round_digits=2):
"""Build weighted edges > threshold."""
edges = []
for src in P.index:
for dst in P.columns:
w = float(P.loc[src, dst])
if w > threshold:
edges.append((str(src), str(dst), f"{w:.{round_digits}f}"))
return edges
def render_graph(fname, matrix, ls_index=None, threshold=0.0, fmt="svg", view=False):
"""
fname: output file stem (no extension)
matrix: NumPy array or pandas DataFrame of transition PROBABILITIES
ls_index: ordered labels (required if matrix is not a DataFrame)
threshold: hide edges with weight <= threshold
fmt: 'svg'|'png'|'pdf' etc.
view: open after rendering
"""
P = _as_prob_df(matrix, labels=ls_index)
edges = _df_to_edgelist(P, threshold=threshold)
g = Digraph(format=fmt)
g.attr(rankdir="LR", size="30")
g.attr("node", shape="circle")
# ensure isolated nodes appear
for node in P.index:
g.node(str(node), width="1", height="1")
for src, dst, label in edges:
g.edge(src, dst, label=label)
g.render(fname, view=view, cleanup=True)
return g
class TransitionProbMatrixTransformer(BaseEstimator, TransformerMixin):
def __init__(self, threshold=0.0):
self.threshold = threshold
self.P_ = None
self.labels_ = None
def fit(self, X: pd.DataFrame, y=None):
P, labels = build_transition_prob_matrix(X)
self.P_ = P
self.labels_ = labels
return self
def transform(self, X: pd.DataFrame = None):
return self.P_, self.labels_
def render(self, fname: str, fmt="svg", view=False):
if self.P_ is None or self.labels_ is None:
raise ValueError("Transformer has not been fitted yet.")
return render_graph(
fname,
self.P_,
ls_index=self.labels_,
threshold=self.threshold,
fmt=fmt,
view=view
)
class SessionTransitionProbMatrixTransformer(BaseEstimator, TransformerMixin):
def __init__(self, threshold=0.0, session_col='sessionId'):
self.threshold = threshold
self.session_col = session_col
self.session_matrices_ = None
def fit(self, X: pd.DataFrame, y=None):
if self.session_col not in X.columns:
raise ValueError(f"Column '{self.session_col}' not found in DataFrame")
session_matrices = {}
for session_id, grp in X.groupby(self.session_col):
if len(grp) > 1: # need at least 2 events for transitions
P, labels = build_transition_prob_matrix(grp)
session_matrices[session_id] = {'matrix': P, 'labels': labels}
self.session_matrices_ = session_matrices
return self
def transform(self, X: pd.DataFrame = None):
if self.session_matrices_ is None:
raise ValueError("Transformer has not been fitted yet.")
return pd.Series(self.session_matrices_)
def render_session(self, session_id: str, fname: str, fmt="svg", view=False):
if self.session_matrices_ is None:
raise ValueError("Transformer has not been fitted yet.")
if session_id not in self.session_matrices_:
raise ValueError(f"Session '{session_id}' not found in fitted data.")
sess_data = self.session_matrices_[session_id]
return render_graph(
fname,
sess_data['matrix'],
ls_index=sess_data['labels'],
threshold=self.threshold,
fmt=fmt,
view=view
)
if __name__ == "__main__":
# Example usage
data = {
'eventName': [
'A', 'B', 'A', 'C', 'B', 'A', 'A', 'C', 'B', 'C',
'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A'
]
}
df = pd.DataFrame(data)
transformer = TransitionProbMatrixTransformer(threshold=0.1)
transformer.fit(df)
P, labels = transformer.transform(None)
print("Transition Probability Matrix:")
print(pd.DataFrame(P, index=labels, columns=labels))
# Render the graph
transformer.render("transition_graph", fmt="svg", view=False)

View File

@@ -1,90 +0,0 @@
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import pandas as pd
import logging
log = logging.getLogger(__name__)
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data
from mapping import SessionTransitionProbMatrixTransformer, render_graph
from demand import DemandEstimator, ChunkInteractionsIntoSteps
from elasticity import TemporalElasticityEstimator, aggregate_price_logs
# elasticity pipeline components (not sklearn compatible, manual orchestration)
def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'):
"""
Compute price elasticity from interaction and price data.
Args:
interactions_df: raw interaction data from demand_data_pipeline
price_logs_df: price log data from price_data_pipeline
window_size: time window for chunking
store_mode: 'hotel' or 'airline'
Returns:
df with [productId, elasticity, std_error, n_obs]
"""
# step 1: chunk interactions into time windows
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
interaction_chunks = chunker.transform(interactions_df)
log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}")
if not interaction_chunks:
return None
# step 2: compute demand per window
demand_estimator = DemandEstimator(store_mode=store_mode)
demand_chunks = []
for chunk in interaction_chunks:
demand_vector = demand_estimator.transform(chunk['data'])
demand_chunks.append({
'window_start': chunk['window_start'],
'window_end': chunk['window_end'],
'demand_vector': demand_vector # each has a full list of all products, even if demand is 0
})
# [q_chunk1, q_chunk2, ...]
# step 3: aggregate price logs into windows
price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size)
# step 4: compute elasticity
elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2)
elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode)
return elasticity_df
# exposable pipelines
interaction_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='user-interactions')),
('experiment_join', ExperimentJoiner()),
('event_augment', EventTitleAugmenter()),
])
price_data_pipeline = Pipeline([
('kafka_fetch', KafkaDataFetcher(topic='price-logs')),
])
# interaction_data + price_data -> elasticity (demand)
# elasticity -> pricing
pricing_pipeline = Pipeline([
('demand_estimation', DemandEstimator()),
])
if __name__ == "__main__":
# fetch both datasets
interaction_data = interaction_pipeline.fit_transform(None)
pricing_data = price_data_pipeline.fit_transform(None)
if interaction_data.empty or pricing_data.empty:
print("Insufficient data for elasticity computation"); exit(0)
# compute elasticity via unified pipeline
window_size = "30s"
elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size)
elasticity_value_array = elasticity_results['elasticity'].values if elasticity_results is not None else np.array([])
print(elasticity_value_array)
if elasticity_results is not None and not elasticity_results.empty:
print(elasticity_results.to_string(index=False))
else:
print("\nInsufficient data for elasticity computation")