refactoed kafka ingestion to go via backend not web-db

This commit is contained in:
2025-11-13 17:42:04 +01:00
parent ba8c42e30e
commit 6a2c41bbdb
8 changed files with 147 additions and 38 deletions

View File

@@ -7,7 +7,12 @@ NEXT_PUBLIC_API_BASE=http://localhost:3000 # base URL for API endpoints, must b
NEXT_PUBLIC_APP_ENV=dev # application environment: 'dev' or 'prod' - controls logging, error handling NEXT_PUBLIC_APP_ENV=dev # application environment: 'dev' or 'prod' - controls logging, error handling
NEXT_PUBLIC_HOVER_THRESHOLD=1200 # hover threshold in milliseconds for UI interactions NEXT_PUBLIC_HOVER_THRESHOLD=1200 # hover threshold in milliseconds for UI interactions
# Backend service
BACKEND_URL=http://localhost:5000 # backend API URL for kafka ingestion (set to railway service URL in prod)
# Service ports - used by docker-compose and service communication # Service ports - used by docker-compose and service communication
BACKEND_PORT=5000 # backend server port for kafka ingestion API
KAFKA_HOST=localhost # kafka broker hostname - set to remote host in prod (e.g., kafka.example.com)
KAFKA_PORT=9092 # kafka broker port for event streaming KAFKA_PORT=9092 # kafka broker port for event streaming
REDIS_PORT=6377 # redis port for worker queue and caching REDIS_PORT=6377 # redis port for worker queue and caching
REDPANDA_CONSOLE_PORT=8084 # redpanda console UI port for kafka monitoring REDPANDA_CONSOLE_PORT=8084 # redpanda console UI port for kafka monitoring

99
backend/server/app.py Normal file
View File

@@ -0,0 +1,99 @@
# boilerplate code
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any
import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# kafka producer - lazy init
_producer: Optional[KafkaProducer] = None
def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' if port else host
_producer = KafkaProducer(
bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
)
return _producer
class EventPayload(BaseModel):
sessionId: str
eventName: str
page: str
productId: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
storeMode: str
userAgent: Optional[str] = None
ts: Optional[str] = None
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health():
kafka_status = "unknown"
try:
producer = get_producer()
# attempt to get cluster metadata to verify connection
producer.bootstrap_connected()
kafka_status = "connected"
except Exception as e:
kafka_status = f"error: {str(e)}"
return {
"status": "healthy",
"kafka": kafka_status,
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
}
@app.post("/api/kafka/ingest")
async def ingest_logs(event: EventPayload):
try:
if not event.ts:
event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
producer.send(
'user-interactions',
key=event.sessionId,
value=event.model_dump()
)
producer.flush(timeout=5)
return {"success": True}
except Exception as e:
import traceback
print(f"[ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs():
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
pass
if __name__ == "__main__":
PORT=int(os.getenv("BACKEND_PORT", 5000))
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)

View File

@@ -0,0 +1,5 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
kafka-python==2.0.2
pydantic==2.5.0
python-dotenv==1.0.0

View File

@@ -1,4 +1,18 @@
services: services:
backend:
container_name: "PHANTOM-backend"
build:
context: .
dockerfile: docker/backend.Dockerfile
ports:
- "${BACKEND_PORT:-5000}:5000"
environment:
- KAFKA_HOST=kafka
- KAFKA_PORT=29092
depends_on:
- kafka
restart: unless-stopped
redis: redis:
container_name: "PHANTOM-redis" container_name: "PHANTOM-redis"
image: redis:7-alpine image: redis:7-alpine
@@ -7,6 +21,7 @@ services:
volumes: volumes:
- phantom_redis_data:/data - phantom_redis_data:/data
restart: unless-stopped restart: unless-stopped
zookeeper: zookeeper:
container_name: "PHANTOM-zookeeper" container_name: "PHANTOM-zookeeper"
image: confluentinc/cp-zookeeper:latest image: confluentinc/cp-zookeeper:latest

12
docker/backend.Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM python:3.11-slim
WORKDIR /app
COPY backend/server/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY backend/server/app.py .
EXPOSE 5000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "5000"]

View File

@@ -8,7 +8,6 @@
"start": "next start" "start": "next start"
}, },
"dependencies": { "dependencies": {
"kafkajs": "^2.2.4",
"next": "16.0.0", "next": "16.0.0",
"react": "19.2.0", "react": "19.2.0",
"react-dom": "19.2.0", "react-dom": "19.2.0",

View File

@@ -1,7 +1,8 @@
import { NextRequest, NextResponse } from 'next/server'; import { NextRequest, NextResponse } from 'next/server';
import { sendEvent } from '@/lib/kafka';
import type { EventBase } from '@/lib/events'; import type { EventBase } from '@/lib/events';
const BACKEND_URL = process.env.BACKEND_URL || 'http://localhost:5000';
export async function POST(req: NextRequest) { export async function POST(req: NextRequest) {
try { try {
const body = await req.json(); const body = await req.json();
@@ -16,7 +17,15 @@ export async function POST(req: NextRequest) {
ts: body.ts || new Date().toISOString(), ts: body.ts || new Date().toISOString(),
}; };
await sendEvent(event); const res = await fetch(`${BACKEND_URL}/api/kafka/ingest`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(event),
});
if (!res.ok) {
throw new Error(`Backend returned ${res.status}`);
}
if (process.env.NEXT_PUBLIC_APP_ENV === 'dev') { if (process.env.NEXT_PUBLIC_APP_ENV === 'dev') {
console.log('[ingest]', event); console.log('[ingest]', event);

View File

@@ -1,35 +0,0 @@
import { Kafka, Producer } from 'kafkajs';
import type { EventBase } from './events';
let producer: Producer | null = null;
const kafka = new Kafka({
clientId: 'phantom-web',
brokers: [`${process.env.KAFKA_HOST || 'localhost'}:${process.env.KAFKA_PORT || '9092'}`],
});
export const getProducer = async (): Promise<Producer> => {
if (!producer) {
producer = kafka.producer();
await producer.connect();
}
return producer;
};
export const sendEvent = async (ev: EventBase) => {
const p = await getProducer();
await p.send({
topic: 'user-interactions',
messages: [{
key: ev.sessionId,
value: JSON.stringify(ev),
}],
});
};
export const disconnect = async () => {
if (producer) {
await producer.disconnect();
producer = null;
}
};