diff --git a/backend/server/app.py b/backend/server/app.py index d57d1de..3830058 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -7,7 +7,7 @@ import uvicorn import os import json from datetime import datetime -from kafka import KafkaProducer, KafkaAdminClient +from kafka import KafkaProducer, KafkaAdminClient, KafkaConsumer from kafka.admin import NewTopic from kafka.errors import TopicAlreadyExistsError from dotenv import load_dotenv @@ -22,7 +22,7 @@ def get_producer() -> KafkaProducer: global _producer if _producer is None: host = os.getenv('KAFKA_HOST', 'localhost') - port = os.getenv('KAFKA_PORT', '29092') # use internal broker port + port = os.getenv('KAFKA_PORT', '9092') broker = f'{host}:{port}' if port else host print(f"[KAFKA_INIT] Connecting to broker: {broker}") _producer = KafkaProducer( @@ -61,7 +61,7 @@ app.add_middleware( async def startup_event(): """create kafka topics on startup""" host = os.getenv('KAFKA_HOST', 'localhost') - port = os.getenv('KAFKA_PORT', '29092') + port = os.getenv('KAFKA_PORT', '9092') broker = f'{host}:{port}' try: @@ -125,10 +125,62 @@ async def ingest_logs(event: EventPayload): raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/kafka/dump") -def dump_logs(): - # TODO: implement a dump of logs of time period t_start to t_end (params of get) - # OR: allow for params of last_n logs as a param - creating two modes of the dumping - pass +def dump_logs( + last_n: Optional[int] = None, + t_start: Optional[str] = None, + t_end: Optional[str] = None +): + """dump all messages from user-interactions topic + + params: + last_n: return only last n messages (default: all) + t_start: filter by start timestamp iso format (future use) + t_end: filter by end timestamp iso format (future use) + """ + host = os.getenv('KAFKA_HOST', 'localhost') + port = os.getenv('KAFKA_PORT', '9092') + broker = f'{host}:{port}' + + try: + consumer = KafkaConsumer( + 'user-interactions', + bootstrap_servers=[broker], + auto_offset_reset='earliest', + enable_auto_commit=False, + value_deserializer=lambda x: json.loads(x.decode('utf-8')), + consumer_timeout_ms=5000 + ) + + events = [] + for msg in consumer: + events.append(msg.value) + + consumer.close() + + # apply filters + if t_start or t_end: + # filter by timestamp range if provided + filtered = [] + for e in events: + ts = e.get('ts') + if ts: + if t_start and ts < t_start: + continue + if t_end and ts > t_end: + continue + filtered.append(e) + events = filtered + + if last_n and last_n > 0: + events = events[-last_n:] + + return {"success": True, "count": len(events), "data": events} + + except Exception as e: + import traceback + print(f"[DUMP_ERROR] {e}") + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py index 2b438e7..cfe73e2 100644 --- a/experiments/procesing/extract.py +++ b/experiments/procesing/extract.py @@ -1,51 +1,28 @@ -from kafka import KafkaConsumer import pandas as pd import json import numpy as np import os +import requests from dotenv import load_dotenv from sklearn.base import BaseEstimator, TransformerMixin -# import matplotlib.pyplot as plt -# from IPython.display import display, SVG, Image load_dotenv() - -KAFKA_HOST=os.getenv("KAFKA_HOST", "localhost") -KAFKA_PORT=os.getenv("KAFKA_PORT", 9092) -TOPIC = os.getenv("KAFKA_TOPIC", "user-interactions") +BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000") N_PRICE_BUCKETS = 5 def get_data_from_kafka() -> pd.DataFrame: - consumer = KafkaConsumer( - TOPIC, - enable_auto_commit=True, - value_deserializer=lambda x: json.loads(x.decode('utf-8')), - auto_offset_reset='earliest', - bootstrap_servers=[f"{KAFKA_HOST}:{KAFKA_PORT}"] - ) - messages=consumer.poll(timeout_ms=1000,max_records=10000) - df = [] - for m in messages.values(): - for i in m: - df.append(i.value) - df = pd.DataFrame(df) - """ - 0 sessionId 73 non-null object - 1 eventName 73 non-null object - 2 page 73 non-null object - 3 productId 67 non-null object - 4 storeMode 73 non-null object - 5 userAgent 73 non-null object - 6 ts 73 non-null object - 7 metadata_referrer 6 non-null object - 8 metadata_roomType 45 non-null object - 9 metadata_price 45 non-null float64 - 10 metadata_nights 45 non-null float64 - 11 metadata_elementText 22 non-null object - 12 metadata_dwellTime 22 non-null float64 - """ + """fetch all events from backend dump endpoint""" + resp = requests.get(f"{BACKEND_URL}/api/kafka/dump") + resp.raise_for_status() + data = resp.json() + + if not data.get('success') or not data.get('data'): + return pd.DataFrame() + + df = pd.DataFrame(data['data']) # explode metadata col json - df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) + if 'metadata' in df.columns: + df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) df = df.dropna(subset=['eventName']) return df @@ -58,11 +35,22 @@ def join_with_experiments(df: pd.DataFrame) -> pd.DataFrame: def augment_event_titles(df: pd.DataFrame) -> pd.DataFrame: # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} # we want metadata schema to create product specific event names - price_buckets = pd.qcut( - df["metadata_price"], - q=N_PRICE_BUCKETS, - labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)] - ) + + # only create price buckets if we have enough unique prices + if df["metadata_price"].notnull().sum() > 0: + try: + price_buckets = pd.qcut( + df["metadata_price"], + q=N_PRICE_BUCKETS, + labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)], + duplicates='drop' # handle duplicate bin edges + ) + except ValueError: + # fallback: if still not enough unique values, use cut with fixed ranges or just use raw price + price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "") + else: + price_buckets = pd.Series([""] * len(df), index=df.index) + # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page df["metadata_schema"] = np.where(