diff --git a/.env.example b/.env.example index 95639c9..c011fd8 100644 --- a/.env.example +++ b/.env.example @@ -7,7 +7,12 @@ NEXT_PUBLIC_API_BASE=http://localhost:3000 # base URL for API endpoints, must b NEXT_PUBLIC_APP_ENV=dev # application environment: 'dev' or 'prod' - controls logging, error handling NEXT_PUBLIC_HOVER_THRESHOLD=1200 # hover threshold in milliseconds for UI interactions +# Backend service +BACKEND_URL=http://localhost:5000 # backend API URL for kafka ingestion (set to railway service URL in prod) + # Service ports - used by docker-compose and service communication +BACKEND_PORT=5000 # backend server port for kafka ingestion API +KAFKA_HOST=localhost # kafka broker hostname - set to remote host in prod (e.g., kafka.example.com) KAFKA_PORT=9092 # kafka broker port for event streaming REDIS_PORT=6377 # redis port for worker queue and caching REDPANDA_CONSOLE_PORT=8084 # redpanda console UI port for kafka monitoring diff --git a/backend/server/app.py b/backend/server/app.py new file mode 100644 index 0000000..8544689 --- /dev/null +++ b/backend/server/app.py @@ -0,0 +1,99 @@ +# boilerplate code +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Optional, Any +import uvicorn +import os +import json +from datetime import datetime +from kafka import KafkaProducer +from dotenv import load_dotenv +load_dotenv() + +app = FastAPI() + +# kafka producer - lazy init +_producer: Optional[KafkaProducer] = None + +def get_producer() -> KafkaProducer: + global _producer + if _producer is None: + host = os.getenv('KAFKA_HOST', 'localhost') + port = os.getenv('KAFKA_PORT', '9092') + broker = f'{host}:{port}' if port else host + _producer = KafkaProducer( + bootstrap_servers=[broker], + value_serializer=lambda v: json.dumps(v).encode('utf-8'), + key_serializer=lambda k: k.encode('utf-8') if k else None, + ) + return _producer + +class EventPayload(BaseModel): + sessionId: str + eventName: str + page: str + productId: Optional[str] = None + metadata: Optional[dict[str, Any]] = None + storeMode: str + userAgent: Optional[str] = None + ts: Optional[str] = None + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/health") +async def health(): + kafka_status = "unknown" + try: + producer = get_producer() + # attempt to get cluster metadata to verify connection + producer.bootstrap_connected() + kafka_status = "connected" + except Exception as e: + kafka_status = f"error: {str(e)}" + + return { + "status": "healthy", + "kafka": kafka_status, + "kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}" + } + + +@app.post("/api/kafka/ingest") +async def ingest_logs(event: EventPayload): + try: + if not event.ts: + event.ts = datetime.utcnow().isoformat() + 'Z' + + producer = get_producer() + producer.send( + 'user-interactions', + key=event.sessionId, + value=event.model_dump() + ) + producer.flush(timeout=5) + + return {"success": True} + except Exception as e: + import traceback + print(f"[ERROR] {e}") + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/kafka/dump") +def dump_logs(): + # TODO: implement a dump of logs of time period t_start to t_end (params of get) + # OR: allow for params of last_n logs as a param - creating two modes of the dumping + pass + + + +if __name__ == "__main__": + PORT=int(os.getenv("BACKEND_PORT", 5000)) + uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True) diff --git a/backend/server/requirements.txt b/backend/server/requirements.txt new file mode 100644 index 0000000..d9113ed --- /dev/null +++ b/backend/server/requirements.txt @@ -0,0 +1,5 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +kafka-python==2.0.2 +pydantic==2.5.0 +python-dotenv==1.0.0 diff --git a/docker-compose.yml b/docker-compose.yml index abfb77d..affa437 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,18 @@ services: + backend: + container_name: "PHANTOM-backend" + build: + context: . + dockerfile: docker/backend.Dockerfile + ports: + - "${BACKEND_PORT:-5000}:5000" + environment: + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + depends_on: + - kafka + restart: unless-stopped + redis: container_name: "PHANTOM-redis" image: redis:7-alpine @@ -7,6 +21,7 @@ services: volumes: - phantom_redis_data:/data restart: unless-stopped + zookeeper: container_name: "PHANTOM-zookeeper" image: confluentinc/cp-zookeeper:latest diff --git a/docker/backend.Dockerfile b/docker/backend.Dockerfile new file mode 100644 index 0000000..32eb28b --- /dev/null +++ b/docker/backend.Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY backend/server/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY backend/server/app.py . + +EXPOSE 5000 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "5000"] diff --git a/web/package.json b/web/package.json index c46656c..0a32603 100644 --- a/web/package.json +++ b/web/package.json @@ -8,7 +8,6 @@ "start": "next start" }, "dependencies": { - "kafkajs": "^2.2.4", "next": "16.0.0", "react": "19.2.0", "react-dom": "19.2.0", diff --git a/web/src/app/api/ingest/route.ts b/web/src/app/api/ingest/route.ts index 83a68e1..70497ac 100644 --- a/web/src/app/api/ingest/route.ts +++ b/web/src/app/api/ingest/route.ts @@ -1,7 +1,8 @@ import { NextRequest, NextResponse } from 'next/server'; -import { sendEvent } from '@/lib/kafka'; import type { EventBase } from '@/lib/events'; +const BACKEND_URL = process.env.BACKEND_URL || 'http://localhost:5000'; + export async function POST(req: NextRequest) { try { const body = await req.json(); @@ -16,7 +17,15 @@ export async function POST(req: NextRequest) { ts: body.ts || new Date().toISOString(), }; - await sendEvent(event); + const res = await fetch(`${BACKEND_URL}/api/kafka/ingest`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(event), + }); + + if (!res.ok) { + throw new Error(`Backend returned ${res.status}`); + } if (process.env.NEXT_PUBLIC_APP_ENV === 'dev') { console.log('[ingest]', event); diff --git a/web/src/lib/kafka.ts b/web/src/lib/kafka.ts deleted file mode 100644 index 4228a15..0000000 --- a/web/src/lib/kafka.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { Kafka, Producer } from 'kafkajs'; -import type { EventBase } from './events'; - -let producer: Producer | null = null; - -const kafka = new Kafka({ - clientId: 'phantom-web', - brokers: [`${process.env.KAFKA_HOST || 'localhost'}:${process.env.KAFKA_PORT || '9092'}`], -}); - -export const getProducer = async (): Promise => { - if (!producer) { - producer = kafka.producer(); - await producer.connect(); - } - return producer; -}; - -export const sendEvent = async (ev: EventBase) => { - const p = await getProducer(); - await p.send({ - topic: 'user-interactions', - messages: [{ - key: ev.sessionId, - value: JSON.stringify(ev), - }], - }); -}; - -export const disconnect = async () => { - if (producer) { - await producer.disconnect(); - producer = null; - } -};