diff --git a/experiments/data_export.ipynb b/experiments/data_export.ipynb
index 4f4c2af..87493b1 100644
--- a/experiments/data_export.ipynb
+++ b/experiments/data_export.ipynb
@@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 51,
+ "execution_count": 10,
"id": "62eafcd9-5462-4063-8873-0e7fb9add907",
"metadata": {},
"outputs": [
@@ -12,7 +12,7 @@
"True"
]
},
- "execution_count": 51,
+ "execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
@@ -31,7 +31,7 @@
},
{
"cell_type": "code",
- "execution_count": 52,
+ "execution_count": 11,
"id": "4af65cb4-e8cf-4877-b2db-13ac19f3838f",
"metadata": {},
"outputs": [
@@ -84,7 +84,7 @@
},
{
"cell_type": "code",
- "execution_count": 53,
+ "execution_count": 12,
"id": "f6819a1c-32ab-49c7-845b-5df7bf60f561",
"metadata": {},
"outputs": [
@@ -419,7 +419,7 @@
"39 NaN NaN Seaside Resort 1200.0 "
]
},
- "execution_count": 53,
+ "execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
@@ -430,20 +430,20 @@
},
{
"cell_type": "code",
- "execution_count": 54,
+ "execution_count": 13,
"id": "380eca5f-8304-4fb2-be32-e8bcfd312085",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
- "['238dc588-a7ab-4c0e-bccd-6abca5076c66',\n",
- " 'f0317a5d-e424-44e9-b784-c8f7291ffe31',\n",
+ "['013fc334-4045-4d5a-8739-dd0a8766a63b',\n",
+ " '238dc588-a7ab-4c0e-bccd-6abca5076c66',\n",
" 'd176d7c9-4027-4702-9e31-2a71395cdda0',\n",
- " '013fc334-4045-4d5a-8739-dd0a8766a63b']"
+ " 'f0317a5d-e424-44e9-b784-c8f7291ffe31']"
]
},
- "execution_count": 54,
+ "execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
@@ -454,7 +454,7 @@
},
{
"cell_type": "code",
- "execution_count": 55,
+ "execution_count": 14,
"id": "f4ae6f81-dcb8-44be-aee7-30dbc3a6bae1",
"metadata": {},
"outputs": [],
@@ -464,7 +464,7 @@
},
{
"cell_type": "code",
- "execution_count": 56,
+ "execution_count": 15,
"id": "050d90a4-20a9-47f5-b998-c31178a54cb3",
"metadata": {},
"outputs": [],
@@ -485,7 +485,7 @@
},
{
"cell_type": "code",
- "execution_count": 57,
+ "execution_count": 16,
"id": "e68f9004-82f5-4826-aece-e3dc6e15a18f",
"metadata": {},
"outputs": [],
@@ -547,7 +547,7 @@
},
{
"cell_type": "code",
- "execution_count": 58,
+ "execution_count": 17,
"id": "e255a2c1-6454-4e5e-89f6-ef8ac51ab6cc",
"metadata": {},
"outputs": [
@@ -555,6 +555,99 @@
"name": "stdout",
"output_type": "stream",
"text": [
+ "013fc334-4045-4d5a-8739-dd0a8766a63b\n"
+ ]
+ },
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "[[0.00000000e+000 1.00000000e+000 0.00000000e+000 0.00000000e+000]\n",
+ " [0.00000000e+000 6.78571429e-001 2.85714286e-001 3.57142857e-002]\n",
+ " [0.00000000e+000 1.00000000e+000 0.00000000e+000 0.00000000e+000]\n",
+ " [2.05833592e-312 2.29175545e-312 4.94065646e-324 6.92110218e-310]]\n",
"238dc588-a7ab-4c0e-bccd-6abca5076c66\n"
]
},
@@ -648,7 +741,7 @@
"\n"
],
"text/plain": [
- ""
+ ""
]
},
"metadata": {},
@@ -662,6 +755,43 @@
" [0. 0.1875 0.375 0.4375 ]\n",
" [0. 1. 0. 0. ]\n",
" [0.14285714 0.85714286 0. 0. ]]\n",
+ "d176d7c9-4027-4702-9e31-2a71395cdda0\n"
+ ]
+ },
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "[[0.]]\n",
"f0317a5d-e424-44e9-b784-c8f7291ffe31\n"
]
},
@@ -708,7 +838,7 @@
"\n"
],
"text/plain": [
- ""
+ ""
]
},
"metadata": {},
@@ -719,151 +849,7 @@
"output_type": "stream",
"text": [
"[[5.0e-001 5.0e-001]\n",
- " [9.9e-324 1.5e-323]]\n",
- "d176d7c9-4027-4702-9e31-2a71395cdda0\n"
- ]
- },
- {
- "data": {
- "image/svg+xml": [
- "\n",
- "\n",
- "\n",
- "\n",
- "\n"
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[[0.]]\n",
- "013fc334-4045-4d5a-8739-dd0a8766a63b\n"
- ]
- },
- {
- "data": {
- "image/svg+xml": [
- "\n",
- "\n",
- "\n",
- "\n",
- "\n"
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[[0. 1. 0. 0. ]\n",
- " [0. 0.67857143 0.28571429 0.03571429]\n",
- " [0. 1. 0. 0. ]\n",
- " [0.14285714 0.85714286 0. 0. ]]\n"
+ " [9.9e-324 1.5e-323]]\n"
]
}
],
@@ -878,14 +864,6 @@
"for session in sessions:\n",
" print(explore_session(session))"
]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "4d278c2d-406e-4dc0-b219-5f7b236e852b",
- "metadata": {},
- "outputs": [],
- "source": []
}
],
"metadata": {
diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py
new file mode 100644
index 0000000..2b438e7
--- /dev/null
+++ b/experiments/procesing/extract.py
@@ -0,0 +1,96 @@
+from kafka import KafkaConsumer
+import pandas as pd
+import json
+import numpy as np
+import os
+from dotenv import load_dotenv
+from sklearn.base import BaseEstimator, TransformerMixin
+# import matplotlib.pyplot as plt
+# from IPython.display import display, SVG, Image
+load_dotenv()
+
+
+KAFKA_HOST=os.getenv("KAFKA_HOST", "localhost")
+KAFKA_PORT=os.getenv("KAFKA_PORT", 9092)
+TOPIC = os.getenv("KAFKA_TOPIC", "user-interactions")
+N_PRICE_BUCKETS = 5
+
+def get_data_from_kafka() -> pd.DataFrame:
+ consumer = KafkaConsumer(
+ TOPIC,
+ enable_auto_commit=True,
+ value_deserializer=lambda x: json.loads(x.decode('utf-8')),
+ auto_offset_reset='earliest',
+ bootstrap_servers=[f"{KAFKA_HOST}:{KAFKA_PORT}"]
+ )
+ messages=consumer.poll(timeout_ms=1000,max_records=10000)
+ df = []
+ for m in messages.values():
+ for i in m:
+ df.append(i.value)
+ df = pd.DataFrame(df)
+ """
+ 0 sessionId 73 non-null object
+ 1 eventName 73 non-null object
+ 2 page 73 non-null object
+ 3 productId 67 non-null object
+ 4 storeMode 73 non-null object
+ 5 userAgent 73 non-null object
+ 6 ts 73 non-null object
+ 7 metadata_referrer 6 non-null object
+ 8 metadata_roomType 45 non-null object
+ 9 metadata_price 45 non-null float64
+ 10 metadata_nights 45 non-null float64
+ 11 metadata_elementText 22 non-null object
+ 12 metadata_dwellTime 22 non-null float64
+ """
+ # explode metadata col json
+ df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
+ df = df.dropna(subset=['eventName'])
+ return df
+
+
+def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame:
+ # TODO: Get experiments db from supabase and join on session_id
+ return df
+
+
+def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame:
+ # from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
+ # we want metadata schema to create product specific event names
+ price_buckets = pd.qcut(
+ df["metadata_price"],
+ q=N_PRICE_BUCKETS,
+ labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)]
+ )
+ # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
+ # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
+ df["metadata_schema"] = np.where(
+ df["productId"].notnull() & df["metadata_price"].notnull(),
+ "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str),
+ ""
+ )
+ df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str)
+ return df
+
+
+def extract() -> pd.DataFrame:
+ df = get_data_from_kafka()
+ df = join_with_experiments(df)
+ df = augment_event_titles(df)
+ return df
+
+
+class DataExtractor(BaseEstimator, TransformerMixin):
+ def fit(self, X=None, y=None):
+ return self
+
+ def transform(self, X=None):
+ return extract()
+
+
+if __name__ == "__main__":
+ df = extract()
+ print(df.head())
+ print(df.tail())
+ print(df.info())
diff --git a/experiments/procesing/mapping.py b/experiments/procesing/mapping.py
new file mode 100644
index 0000000..6c32b91
--- /dev/null
+++ b/experiments/procesing/mapping.py
@@ -0,0 +1,158 @@
+import numpy as np
+import pandas as pd
+from sklearn.base import BaseEstimator, TransformerMixin
+
+def build_transition_prob_matrix(df: pd.DataFrame):
+ df = df.dropna(subset=['eventName'])
+ events = df['eventName'].tolist()
+ labels = pd.Index(events).unique().tolist()
+ idx = {e:i for i,e in enumerate(labels)}
+ M = np.zeros((len(labels), len(labels)), dtype=float)
+ for a, b in zip(events, events[1:]):
+ M[idx[a], idx[b]] += 1
+ row_sums = M.sum(axis=1, keepdims=True)
+ with np.errstate(divide='ignore', invalid='ignore'):
+ P = np.divide(M, row_sums, where=row_sums>0) # row-normalized
+ return P, labels
+
+# https://medium.com/data-science/time-series-data-markov-transition-matrices-7060771e362b
+from graphviz import Digraph
+import numpy as np
+import pandas as pd
+
+def _as_prob_df(matrix, labels=None):
+ """Return a square DataFrame with index=columns=labels."""
+ if isinstance(matrix, pd.DataFrame):
+ # Ensure square and aligned
+ assert (matrix.index == matrix.columns).all(), "Index/columns must match."
+ return matrix
+ matrix = np.asarray(matrix, dtype=float)
+ assert matrix.shape[0] == matrix.shape[1], "Matrix must be square."
+ if labels is None:
+ raise ValueError("labels are required when matrix is not a DataFrame")
+ assert len(labels) == matrix.shape[0], "labels length must match matrix size."
+ return pd.DataFrame(matrix, index=list(labels), columns=list(labels))
+
+def _df_to_edgelist(P: pd.DataFrame, threshold=0.0, round_digits=2):
+ """Build weighted edges > threshold."""
+ edges = []
+ for src in P.index:
+ for dst in P.columns:
+ w = float(P.loc[src, dst])
+ if w > threshold:
+ edges.append((str(src), str(dst), f"{w:.{round_digits}f}"))
+ return edges
+
+def render_graph(fname, matrix, ls_index=None, threshold=0.0, fmt="svg", view=False):
+ """
+ fname: output file stem (no extension)
+ matrix: NumPy array or pandas DataFrame of transition PROBABILITIES
+ ls_index: ordered labels (required if matrix is not a DataFrame)
+ threshold: hide edges with weight <= threshold
+ fmt: 'svg'|'png'|'pdf' etc.
+ view: open after rendering
+ """
+ P = _as_prob_df(matrix, labels=ls_index)
+ edges = _df_to_edgelist(P, threshold=threshold)
+
+ g = Digraph(format=fmt)
+ g.attr(rankdir="LR", size="30")
+ g.attr("node", shape="circle")
+
+ # ensure isolated nodes appear
+ for node in P.index:
+ g.node(str(node), width="1", height="1")
+
+ for src, dst, label in edges:
+ g.edge(src, dst, label=label)
+
+ g.render(fname, view=view, cleanup=True)
+ return g
+
+
+class TransitionProbMatrixTransformer(BaseEstimator, TransformerMixin):
+ def __init__(self, threshold=0.0):
+ self.threshold = threshold
+ self.P_ = None
+ self.labels_ = None
+
+ def fit(self, X: pd.DataFrame, y=None):
+ P, labels = build_transition_prob_matrix(X)
+ self.P_ = P
+ self.labels_ = labels
+ return self
+
+ def transform(self, X: pd.DataFrame = None):
+ return self.P_, self.labels_
+
+ def render(self, fname: str, fmt="svg", view=False):
+ if self.P_ is None or self.labels_ is None:
+ raise ValueError("Transformer has not been fitted yet.")
+ return render_graph(
+ fname,
+ self.P_,
+ ls_index=self.labels_,
+ threshold=self.threshold,
+ fmt=fmt,
+ view=view
+ )
+
+
+class SessionTransitionProbMatrixTransformer(BaseEstimator, TransformerMixin):
+ def __init__(self, threshold=0.0, session_col='sessionId'):
+ self.threshold = threshold
+ self.session_col = session_col
+ self.session_matrices_ = None
+
+ def fit(self, X: pd.DataFrame, y=None):
+ if self.session_col not in X.columns:
+ raise ValueError(f"Column '{self.session_col}' not found in DataFrame")
+
+ session_matrices = {}
+ for session_id, grp in X.groupby(self.session_col):
+ if len(grp) > 1: # need at least 2 events for transitions
+ P, labels = build_transition_prob_matrix(grp)
+ session_matrices[session_id] = {'matrix': P, 'labels': labels}
+
+ self.session_matrices_ = session_matrices
+ return self
+
+ def transform(self, X: pd.DataFrame = None):
+ if self.session_matrices_ is None:
+ raise ValueError("Transformer has not been fitted yet.")
+ return pd.Series(self.session_matrices_)
+
+ def render_session(self, session_id: str, fname: str, fmt="svg", view=False):
+ if self.session_matrices_ is None:
+ raise ValueError("Transformer has not been fitted yet.")
+ if session_id not in self.session_matrices_:
+ raise ValueError(f"Session '{session_id}' not found in fitted data.")
+
+ sess_data = self.session_matrices_[session_id]
+ return render_graph(
+ fname,
+ sess_data['matrix'],
+ ls_index=sess_data['labels'],
+ threshold=self.threshold,
+ fmt=fmt,
+ view=view
+ )
+if __name__ == "__main__":
+ # Example usage
+ data = {
+ 'eventName': [
+ 'A', 'B', 'A', 'C', 'B', 'A', 'A', 'C', 'B', 'C',
+ 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A'
+ ]
+ }
+ df = pd.DataFrame(data)
+
+ transformer = TransitionProbMatrixTransformer(threshold=0.1)
+ transformer.fit(df)
+ P, labels = transformer.transform(None)
+
+ print("Transition Probability Matrix:")
+ print(pd.DataFrame(P, index=labels, columns=labels))
+
+ # Render the graph
+ transformer.render("transition_graph", fmt="svg", view=False)
diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py
new file mode 100644
index 0000000..6b742b2
--- /dev/null
+++ b/experiments/procesing/pipeline.py
@@ -0,0 +1,19 @@
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import StandardScaler
+from extract import DataExtractor
+from mapping import SessionTransitionProbMatrixTransformer, render_graph
+
+
+if __name__ == "__main__":
+ steps = [
+ ('data_extraction', DataExtractor()),
+ ('transition_matrix', SessionTransitionProbMatrixTransformer(threshold=0.05)),
+ ]
+ pipeline = Pipeline(steps)
+ result = pipeline.fit_transform(None)
+ print(f"Number of sessions: {len(result)}\n")
+
+ for session_id, sess_data in result.items():
+ fname = f"session_{session_id}"
+ render_graph(fname, sess_data['matrix'], ls_index=sess_data['labels'], threshold=0.05, fmt="svg", view=False)
+ print(f"Rendered {fname}.svg")
diff --git a/requirements.txt b/requirements.txt
index 70db3f0..8bb3ed7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,3 +9,4 @@ browser-use
pytest
pytest-asyncio
uv
+scikit-learn