fixing backend dumping

This commit is contained in:
2025-11-15 15:33:37 +01:00
parent d42ab56c1e
commit 55a760713f
2 changed files with 88 additions and 48 deletions

View File

@@ -7,7 +7,7 @@ import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaAdminClient
from kafka import KafkaProducer, KafkaAdminClient, KafkaConsumer
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from dotenv import load_dotenv
@@ -22,7 +22,7 @@ def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '29092') # use internal broker port
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' if port else host
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
_producer = KafkaProducer(
@@ -61,7 +61,7 @@ app.add_middleware(
async def startup_event():
"""create kafka topics on startup"""
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '29092')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}'
try:
@@ -125,10 +125,62 @@ async def ingest_logs(event: EventPayload):
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs():
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
pass
def dump_logs(
last_n: Optional[int] = None,
t_start: Optional[str] = None,
t_end: Optional[str] = None
):
"""dump all messages from user-interactions topic
params:
last_n: return only last n messages (default: all)
t_start: filter by start timestamp iso format (future use)
t_end: filter by end timestamp iso format (future use)
"""
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}'
try:
consumer = KafkaConsumer(
'user-interactions',
bootstrap_servers=[broker],
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=5000
)
events = []
for msg in consumer:
events.append(msg.value)
consumer.close()
# apply filters
if t_start or t_end:
# filter by timestamp range if provided
filtered = []
for e in events:
ts = e.get('ts')
if ts:
if t_start and ts < t_start:
continue
if t_end and ts > t_end:
continue
filtered.append(e)
events = filtered
if last_n and last_n > 0:
events = events[-last_n:]
return {"success": True, "count": len(events), "data": events}
except Exception as e:
import traceback
print(f"[DUMP_ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,51 +1,28 @@
from kafka import KafkaConsumer
import pandas as pd
import json
import numpy as np
import os
import requests
from dotenv import load_dotenv
from sklearn.base import BaseEstimator, TransformerMixin
# import matplotlib.pyplot as plt
# from IPython.display import display, SVG, Image
load_dotenv()
KAFKA_HOST=os.getenv("KAFKA_HOST", "localhost")
KAFKA_PORT=os.getenv("KAFKA_PORT", 9092)
TOPIC = os.getenv("KAFKA_TOPIC", "user-interactions")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000")
N_PRICE_BUCKETS = 5
def get_data_from_kafka() -> pd.DataFrame:
consumer = KafkaConsumer(
TOPIC,
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
bootstrap_servers=[f"{KAFKA_HOST}:{KAFKA_PORT}"]
)
messages=consumer.poll(timeout_ms=1000,max_records=10000)
df = []
for m in messages.values():
for i in m:
df.append(i.value)
df = pd.DataFrame(df)
"""
0 sessionId 73 non-null object
1 eventName 73 non-null object
2 page 73 non-null object
3 productId 67 non-null object
4 storeMode 73 non-null object
5 userAgent 73 non-null object
6 ts 73 non-null object
7 metadata_referrer 6 non-null object
8 metadata_roomType 45 non-null object
9 metadata_price 45 non-null float64
10 metadata_nights 45 non-null float64
11 metadata_elementText 22 non-null object
12 metadata_dwellTime 22 non-null float64
"""
"""fetch all events from backend dump endpoint"""
resp = requests.get(f"{BACKEND_URL}/api/kafka/dump")
resp.raise_for_status()
data = resp.json()
if not data.get('success') or not data.get('data'):
return pd.DataFrame()
df = pd.DataFrame(data['data'])
# explode metadata col json
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
if 'metadata' in df.columns:
df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_"))
df = df.dropna(subset=['eventName'])
return df
@@ -58,11 +35,22 @@ def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame:
def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame:
# from taking standard view_item_page in eventName to view_item_page_{metadata_schema}
# we want metadata schema to create product specific event names
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)]
)
# only create price buckets if we have enough unique prices
if df["metadata_price"].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df["metadata_price"],
q=N_PRICE_BUCKETS,
labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)],
duplicates='drop' # handle duplicate bin edges
)
except ValueError:
# fallback: if still not enough unique values, use cut with fixed ranges or just use raw price
price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "")
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
# metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name
# TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page
df["metadata_schema"] = np.where(