Files
PHANTOM/backend/server/app.py
2025-11-13 18:21:07 +01:00

106 lines
3.0 KiB
Python

# boilerplate code
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any
import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# kafka producer - lazy init
_producer: Optional[KafkaProducer] = None
def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' if port else host
_producer = KafkaProducer(
bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks=1, # wait for leader ack only
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
api_version_auto_timeout_ms=10000,
)
return _producer
class EventPayload(BaseModel):
sessionId: str
eventName: str
page: str
productId: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
storeMode: str
userAgent: Optional[str] = None
ts: Optional[str] = None
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health():
kafka_status = "unknown"
try:
producer = get_producer()
# attempt to get cluster metadata to verify connection
producer.bootstrap_connected()
kafka_status = "connected"
except Exception as e:
kafka_status = f"error: {str(e)}"
return {
"status": "healthy",
"kafka": kafka_status,
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
}
@app.post("/api/kafka/ingest")
async def ingest_logs(event: EventPayload):
try:
if not event.ts:
event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
future = producer.send(
'user-interactions',
key=event.sessionId,
value=event.model_dump()
)
# add callback for error logging but don't block
future.add_errback(lambda e: print(f"[KAFKA_SEND_ERROR] {e}"))
return {"success": True}
except Exception as e:
import traceback
print(f"[ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs():
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
pass
if __name__ == "__main__":
PORT=int(os.getenv("BACKEND_PORT", 5000))
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)