From d42ab56c1e9c13a3f6d6d9fd6751a9e38e8a9503 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Sat, 15 Nov 2025 12:57:46 +0100 Subject: [PATCH] proper pipeline to handle data and build matrices --- experiments/data_export.ipynb | 316 ++++++++++++++---------------- experiments/procesing/extract.py | 96 +++++++++ experiments/procesing/mapping.py | 158 +++++++++++++++ experiments/procesing/pipeline.py | 19 ++ requirements.txt | 1 + 5 files changed, 421 insertions(+), 169 deletions(-) create mode 100644 experiments/procesing/extract.py create mode 100644 experiments/procesing/mapping.py create mode 100644 experiments/procesing/pipeline.py diff --git a/experiments/data_export.ipynb b/experiments/data_export.ipynb index 4f4c2af..87493b1 100644 --- a/experiments/data_export.ipynb +++ b/experiments/data_export.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 51, + "execution_count": 10, "id": "62eafcd9-5462-4063-8873-0e7fb9add907", "metadata": {}, "outputs": [ @@ -12,7 +12,7 @@ "True" ] }, - "execution_count": 51, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -31,7 +31,7 @@ }, { "cell_type": "code", - "execution_count": 52, + "execution_count": 11, "id": "4af65cb4-e8cf-4877-b2db-13ac19f3838f", "metadata": {}, "outputs": [ @@ -84,7 +84,7 @@ }, { "cell_type": "code", - "execution_count": 53, + "execution_count": 12, "id": "f6819a1c-32ab-49c7-845b-5df7bf60f561", "metadata": {}, "outputs": [ @@ -419,7 +419,7 @@ "39 NaN NaN Seaside Resort 1200.0 " ] }, - "execution_count": 53, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" } @@ -430,20 +430,20 @@ }, { "cell_type": "code", - "execution_count": 54, + "execution_count": 13, "id": "380eca5f-8304-4fb2-be32-e8bcfd312085", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "['238dc588-a7ab-4c0e-bccd-6abca5076c66',\n", - " 'f0317a5d-e424-44e9-b784-c8f7291ffe31',\n", + "['013fc334-4045-4d5a-8739-dd0a8766a63b',\n", + " '238dc588-a7ab-4c0e-bccd-6abca5076c66',\n", " 'd176d7c9-4027-4702-9e31-2a71395cdda0',\n", - " '013fc334-4045-4d5a-8739-dd0a8766a63b']" + " 'f0317a5d-e424-44e9-b784-c8f7291ffe31']" ] }, - "execution_count": 54, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } @@ -454,7 +454,7 @@ }, { "cell_type": "code", - "execution_count": 55, + "execution_count": 14, "id": "f4ae6f81-dcb8-44be-aee7-30dbc3a6bae1", "metadata": {}, "outputs": [], @@ -464,7 +464,7 @@ }, { "cell_type": "code", - "execution_count": 56, + "execution_count": 15, "id": "050d90a4-20a9-47f5-b998-c31178a54cb3", "metadata": {}, "outputs": [], @@ -485,7 +485,7 @@ }, { "cell_type": "code", - "execution_count": 57, + "execution_count": 16, "id": "e68f9004-82f5-4826-aece-e3dc6e15a18f", "metadata": {}, "outputs": [], @@ -547,7 +547,7 @@ }, { "cell_type": "code", - "execution_count": 58, + "execution_count": 17, "id": "e255a2c1-6454-4e5e-89f6-ef8ac51ab6cc", "metadata": {}, "outputs": [ @@ -555,6 +555,99 @@ "name": "stdout", "output_type": "stream", "text": [ + "013fc334-4045-4d5a-8739-dd0a8766a63b\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "page_view\n", + "\n", + "page_view\n", + "\n", + "\n", + "\n", + "view_item_page\n", + "\n", + "view_item_page\n", + "\n", + "\n", + "\n", + "page_view->view_item_page\n", + "\n", + "\n", + "1.00\n", + "\n", + "\n", + "\n", + "view_item_page->view_item_page\n", + "\n", + "\n", + "0.68\n", + "\n", + "\n", + "\n", + "hover_over_title\n", + "\n", + "hover_over_title\n", + "\n", + "\n", + "\n", + "view_item_page->hover_over_title\n", + "\n", + "\n", + "0.29\n", + "\n", + "\n", + "\n", + "hover_over_paragraph\n", + "\n", + "hover_over_paragraph\n", + "\n", + "\n", + "\n", + "view_item_page->hover_over_paragraph\n", + "\n", + "\n", + "0.04\n", + "\n", + "\n", + "\n", + "hover_over_title->view_item_page\n", + "\n", + "\n", + "1.00\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[[0.00000000e+000 1.00000000e+000 0.00000000e+000 0.00000000e+000]\n", + " [0.00000000e+000 6.78571429e-001 2.85714286e-001 3.57142857e-002]\n", + " [0.00000000e+000 1.00000000e+000 0.00000000e+000 0.00000000e+000]\n", + " [2.05833592e-312 2.29175545e-312 4.94065646e-324 6.92110218e-310]]\n", "238dc588-a7ab-4c0e-bccd-6abca5076c66\n" ] }, @@ -648,7 +741,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -662,6 +755,43 @@ " [0. 0.1875 0.375 0.4375 ]\n", " [0. 1. 0. 0. ]\n", " [0.14285714 0.85714286 0. 0. ]]\n", + "d176d7c9-4027-4702-9e31-2a71395cdda0\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "page_view\n", + "\n", + "page_view\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[[0.]]\n", "f0317a5d-e424-44e9-b784-c8f7291ffe31\n" ] }, @@ -708,7 +838,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -719,151 +849,7 @@ "output_type": "stream", "text": [ "[[5.0e-001 5.0e-001]\n", - " [9.9e-324 1.5e-323]]\n", - "d176d7c9-4027-4702-9e31-2a71395cdda0\n" - ] - }, - { - "data": { - "image/svg+xml": [ - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "page_view\n", - "\n", - "page_view\n", - "\n", - "\n", - "\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[[0.]]\n", - "013fc334-4045-4d5a-8739-dd0a8766a63b\n" - ] - }, - { - "data": { - "image/svg+xml": [ - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "page_view\n", - "\n", - "page_view\n", - "\n", - "\n", - "\n", - "view_item_page\n", - "\n", - "view_item_page\n", - "\n", - "\n", - "\n", - "page_view->view_item_page\n", - "\n", - "\n", - "1.00\n", - "\n", - "\n", - "\n", - "view_item_page->view_item_page\n", - "\n", - "\n", - "0.68\n", - "\n", - "\n", - "\n", - "hover_over_title\n", - "\n", - "hover_over_title\n", - "\n", - "\n", - "\n", - "view_item_page->hover_over_title\n", - "\n", - "\n", - "0.29\n", - "\n", - "\n", - "\n", - "hover_over_paragraph\n", - "\n", - "hover_over_paragraph\n", - "\n", - "\n", - "\n", - "view_item_page->hover_over_paragraph\n", - "\n", - "\n", - "0.04\n", - "\n", - "\n", - "\n", - "hover_over_title->view_item_page\n", - "\n", - "\n", - "1.00\n", - "\n", - "\n", - "\n", - "hover_over_paragraph->page_view\n", - "\n", - "\n", - "0.14\n", - "\n", - "\n", - "\n", - "hover_over_paragraph->view_item_page\n", - "\n", - "\n", - "0.86\n", - "\n", - "\n", - "\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[[0. 1. 0. 0. ]\n", - " [0. 0.67857143 0.28571429 0.03571429]\n", - " [0. 1. 0. 0. ]\n", - " [0.14285714 0.85714286 0. 0. ]]\n" + " [9.9e-324 1.5e-323]]\n" ] } ], @@ -878,14 +864,6 @@ "for session in sessions:\n", " print(explore_session(session))" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4d278c2d-406e-4dc0-b219-5f7b236e852b", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py new file mode 100644 index 0000000..2b438e7 --- /dev/null +++ b/experiments/procesing/extract.py @@ -0,0 +1,96 @@ +from kafka import KafkaConsumer +import pandas as pd +import json +import numpy as np +import os +from dotenv import load_dotenv +from sklearn.base import BaseEstimator, TransformerMixin +# import matplotlib.pyplot as plt +# from IPython.display import display, SVG, Image +load_dotenv() + + +KAFKA_HOST=os.getenv("KAFKA_HOST", "localhost") +KAFKA_PORT=os.getenv("KAFKA_PORT", 9092) +TOPIC = os.getenv("KAFKA_TOPIC", "user-interactions") +N_PRICE_BUCKETS = 5 + +def get_data_from_kafka() -> pd.DataFrame: + consumer = KafkaConsumer( + TOPIC, + enable_auto_commit=True, + value_deserializer=lambda x: json.loads(x.decode('utf-8')), + auto_offset_reset='earliest', + bootstrap_servers=[f"{KAFKA_HOST}:{KAFKA_PORT}"] + ) + messages=consumer.poll(timeout_ms=1000,max_records=10000) + df = [] + for m in messages.values(): + for i in m: + df.append(i.value) + df = pd.DataFrame(df) + """ + 0 sessionId 73 non-null object + 1 eventName 73 non-null object + 2 page 73 non-null object + 3 productId 67 non-null object + 4 storeMode 73 non-null object + 5 userAgent 73 non-null object + 6 ts 73 non-null object + 7 metadata_referrer 6 non-null object + 8 metadata_roomType 45 non-null object + 9 metadata_price 45 non-null float64 + 10 metadata_nights 45 non-null float64 + 11 metadata_elementText 22 non-null object + 12 metadata_dwellTime 22 non-null float64 + """ + # explode metadata col json + df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) + df = df.dropna(subset=['eventName']) + return df + + +def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame: + # TODO: Get experiments db from supabase and join on session_id + return df + + +def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame: + # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} + # we want metadata schema to create product specific event names + price_buckets = pd.qcut( + df["metadata_price"], + q=N_PRICE_BUCKETS, + labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)] + ) + # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name + # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page + df["metadata_schema"] = np.where( + df["productId"].notnull() & df["metadata_price"].notnull(), + "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str), + "" + ) + df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) + return df + + +def extract() -> pd.DataFrame: + df = get_data_from_kafka() + df = join_with_experiments(df) + df = augment_event_titles(df) + return df + + +class DataExtractor(BaseEstimator, TransformerMixin): + def fit(self, X=None, y=None): + return self + + def transform(self, X=None): + return extract() + + +if __name__ == "__main__": + df = extract() + print(df.head()) + print(df.tail()) + print(df.info()) diff --git a/experiments/procesing/mapping.py b/experiments/procesing/mapping.py new file mode 100644 index 0000000..6c32b91 --- /dev/null +++ b/experiments/procesing/mapping.py @@ -0,0 +1,158 @@ +import numpy as np +import pandas as pd +from sklearn.base import BaseEstimator, TransformerMixin + +def build_transition_prob_matrix(df: pd.DataFrame): + df = df.dropna(subset=['eventName']) + events = df['eventName'].tolist() + labels = pd.Index(events).unique().tolist() + idx = {e:i for i,e in enumerate(labels)} + M = np.zeros((len(labels), len(labels)), dtype=float) + for a, b in zip(events, events[1:]): + M[idx[a], idx[b]] += 1 + row_sums = M.sum(axis=1, keepdims=True) + with np.errstate(divide='ignore', invalid='ignore'): + P = np.divide(M, row_sums, where=row_sums>0) # row-normalized + return P, labels + +# https://medium.com/data-science/time-series-data-markov-transition-matrices-7060771e362b +from graphviz import Digraph +import numpy as np +import pandas as pd + +def _as_prob_df(matrix, labels=None): + """Return a square DataFrame with index=columns=labels.""" + if isinstance(matrix, pd.DataFrame): + # Ensure square and aligned + assert (matrix.index == matrix.columns).all(), "Index/columns must match." + return matrix + matrix = np.asarray(matrix, dtype=float) + assert matrix.shape[0] == matrix.shape[1], "Matrix must be square." + if labels is None: + raise ValueError("labels are required when matrix is not a DataFrame") + assert len(labels) == matrix.shape[0], "labels length must match matrix size." + return pd.DataFrame(matrix, index=list(labels), columns=list(labels)) + +def _df_to_edgelist(P: pd.DataFrame, threshold=0.0, round_digits=2): + """Build weighted edges > threshold.""" + edges = [] + for src in P.index: + for dst in P.columns: + w = float(P.loc[src, dst]) + if w > threshold: + edges.append((str(src), str(dst), f"{w:.{round_digits}f}")) + return edges + +def render_graph(fname, matrix, ls_index=None, threshold=0.0, fmt="svg", view=False): + """ + fname: output file stem (no extension) + matrix: NumPy array or pandas DataFrame of transition PROBABILITIES + ls_index: ordered labels (required if matrix is not a DataFrame) + threshold: hide edges with weight <= threshold + fmt: 'svg'|'png'|'pdf' etc. + view: open after rendering + """ + P = _as_prob_df(matrix, labels=ls_index) + edges = _df_to_edgelist(P, threshold=threshold) + + g = Digraph(format=fmt) + g.attr(rankdir="LR", size="30") + g.attr("node", shape="circle") + + # ensure isolated nodes appear + for node in P.index: + g.node(str(node), width="1", height="1") + + for src, dst, label in edges: + g.edge(src, dst, label=label) + + g.render(fname, view=view, cleanup=True) + return g + + +class TransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): + def __init__(self, threshold=0.0): + self.threshold = threshold + self.P_ = None + self.labels_ = None + + def fit(self, X: pd.DataFrame, y=None): + P, labels = build_transition_prob_matrix(X) + self.P_ = P + self.labels_ = labels + return self + + def transform(self, X: pd.DataFrame = None): + return self.P_, self.labels_ + + def render(self, fname: str, fmt="svg", view=False): + if self.P_ is None or self.labels_ is None: + raise ValueError("Transformer has not been fitted yet.") + return render_graph( + fname, + self.P_, + ls_index=self.labels_, + threshold=self.threshold, + fmt=fmt, + view=view + ) + + +class SessionTransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): + def __init__(self, threshold=0.0, session_col='sessionId'): + self.threshold = threshold + self.session_col = session_col + self.session_matrices_ = None + + def fit(self, X: pd.DataFrame, y=None): + if self.session_col not in X.columns: + raise ValueError(f"Column '{self.session_col}' not found in DataFrame") + + session_matrices = {} + for session_id, grp in X.groupby(self.session_col): + if len(grp) > 1: # need at least 2 events for transitions + P, labels = build_transition_prob_matrix(grp) + session_matrices[session_id] = {'matrix': P, 'labels': labels} + + self.session_matrices_ = session_matrices + return self + + def transform(self, X: pd.DataFrame = None): + if self.session_matrices_ is None: + raise ValueError("Transformer has not been fitted yet.") + return pd.Series(self.session_matrices_) + + def render_session(self, session_id: str, fname: str, fmt="svg", view=False): + if self.session_matrices_ is None: + raise ValueError("Transformer has not been fitted yet.") + if session_id not in self.session_matrices_: + raise ValueError(f"Session '{session_id}' not found in fitted data.") + + sess_data = self.session_matrices_[session_id] + return render_graph( + fname, + sess_data['matrix'], + ls_index=sess_data['labels'], + threshold=self.threshold, + fmt=fmt, + view=view + ) +if __name__ == "__main__": + # Example usage + data = { + 'eventName': [ + 'A', 'B', 'A', 'C', 'B', 'A', 'A', 'C', 'B', 'C', + 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A' + ] + } + df = pd.DataFrame(data) + + transformer = TransitionProbMatrixTransformer(threshold=0.1) + transformer.fit(df) + P, labels = transformer.transform(None) + + print("Transition Probability Matrix:") + print(pd.DataFrame(P, index=labels, columns=labels)) + + # Render the graph + transformer.render("transition_graph", fmt="svg", view=False) diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py new file mode 100644 index 0000000..6b742b2 --- /dev/null +++ b/experiments/procesing/pipeline.py @@ -0,0 +1,19 @@ +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from extract import DataExtractor +from mapping import SessionTransitionProbMatrixTransformer, render_graph + + +if __name__ == "__main__": + steps = [ + ('data_extraction', DataExtractor()), + ('transition_matrix', SessionTransitionProbMatrixTransformer(threshold=0.05)), + ] + pipeline = Pipeline(steps) + result = pipeline.fit_transform(None) + print(f"Number of sessions: {len(result)}\n") + + for session_id, sess_data in result.items(): + fname = f"session_{session_id}" + render_graph(fname, sess_data['matrix'], ls_index=sess_data['labels'], threshold=0.05, fmt="svg", view=False) + print(f"Rendered {fname}.svg") diff --git a/requirements.txt b/requirements.txt index 70db3f0..8bb3ed7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ browser-use pytest pytest-asyncio uv +scikit-learn