Files
PHANTOM/backend/server/app.py
2025-11-13 18:41:37 +01:00

138 lines
4.2 KiB
Python

# boilerplate code
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any
import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# kafka producer - lazy init
_producer: Optional[KafkaProducer] = None
def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '29092') # use internal broker port
broker = f'{host}:{port}' if port else host
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
_producer = KafkaProducer(
bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks=1,
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
api_version_auto_timeout_ms=10000,
max_block_ms=5000, # don't block send() for more than 5s
)
print(f"[KAFKA_INIT] Producer created successfully")
return _producer
class EventPayload(BaseModel):
sessionId: str
eventName: str
page: str
productId: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
storeMode: str
userAgent: Optional[str] = None
ts: Optional[str] = None
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
"""create kafka topics on startup"""
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '29092')
broker = f'{host}:{port}'
try:
print(f"[STARTUP] Creating Kafka topics on {broker}")
admin = KafkaAdminClient(
bootstrap_servers=[broker],
request_timeout_ms=10000,
)
topics = [
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1)
]
admin.create_topics(new_topics=topics, validate_only=False)
print(f"[STARTUP] Topics created successfully")
admin.close()
except TopicAlreadyExistsError:
print(f"[STARTUP] Topics already exist, skipping creation")
except Exception as e:
print(f"[STARTUP] Failed to create topics: {e}")
print(f"[STARTUP] Will rely on auto-creation on first message")
@app.get("/health")
async def health():
kafka_status = "unknown"
try:
producer = get_producer()
# attempt to get cluster metadata to verify connection
producer.bootstrap_connected()
kafka_status = "connected"
except Exception as e:
kafka_status = f"error: {str(e)}"
return {
"status": "healthy",
"kafka": kafka_status,
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
}
@app.post("/api/kafka/ingest")
async def ingest_logs(event: EventPayload):
try:
if not event.ts:
event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
future = producer.send(
'user-interactions',
key=event.sessionId,
value=event.model_dump()
)
# add callback for error logging but don't block
future.add_errback(lambda e: print(f"[KAFKA_SEND_ERROR] {e}"))
return {"success": True}
except Exception as e:
import traceback
print(f"[ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs():
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
pass
if __name__ == "__main__":
PORT=int(os.getenv("BACKEND_PORT", 5000))
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)