mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
2 nextjs scaffold with store mode shop and admin session experiment wiring event emission v1 (#17)
* chore: cleaning gitignore * formating and env documentation * feat: context switching of hotel/airline depndent on env var via middleware * fixed alignment and building * wrong file * prods * fixed applying style * better session cookie management * tentative session storage with maybe using airtable * migrated api of ingestion * events and products apge * fixing build * 13 create outline for research paper draft (#18) * updated outline for paper from issue * extra paper sections and some formalization of series data * algorithms and acknowledgements * updated outline for paper from issue * upadted text formating * event unification * refactor tracking to ues callbacks instead of refs * implement a pricing display api with session passing * moved middleware to proxy according to new changes in Nextjs * refactoed kafka ingestion to go via backend not web-db * Refactor docker-compose services to use individual Dockerfiles (#20) * Initial plan * Refactor services into individual Dockerfiles Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com> * Add EXPOSE directives to all Dockerfiles with port documentation Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com> * fixing small bugs and adding exepriments to tracking * added some doc
This commit is contained in:
committed by
GitHub
parent
7ece6e82cb
commit
37b2099ee0
99
backend/server/app.py
Normal file
99
backend/server/app.py
Normal file
@@ -0,0 +1,99 @@
|
||||
# boilerplate code
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional, Any
|
||||
import uvicorn
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from kafka import KafkaProducer
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# kafka producer - lazy init
|
||||
_producer: Optional[KafkaProducer] = None
|
||||
|
||||
def get_producer() -> KafkaProducer:
|
||||
global _producer
|
||||
if _producer is None:
|
||||
host = os.getenv('KAFKA_HOST', 'localhost')
|
||||
port = os.getenv('KAFKA_PORT', '9092')
|
||||
broker = f'{host}:{port}' if port else host
|
||||
_producer = KafkaProducer(
|
||||
bootstrap_servers=[broker],
|
||||
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||||
key_serializer=lambda k: k.encode('utf-8') if k else None,
|
||||
)
|
||||
return _producer
|
||||
|
||||
class EventPayload(BaseModel):
|
||||
sessionId: str
|
||||
eventName: str
|
||||
page: str
|
||||
productId: Optional[str] = None
|
||||
metadata: Optional[dict[str, Any]] = None
|
||||
storeMode: str
|
||||
userAgent: Optional[str] = None
|
||||
ts: Optional[str] = None
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
kafka_status = "unknown"
|
||||
try:
|
||||
producer = get_producer()
|
||||
# attempt to get cluster metadata to verify connection
|
||||
producer.bootstrap_connected()
|
||||
kafka_status = "connected"
|
||||
except Exception as e:
|
||||
kafka_status = f"error: {str(e)}"
|
||||
|
||||
return {
|
||||
"status": "healthy",
|
||||
"kafka": kafka_status,
|
||||
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
|
||||
}
|
||||
|
||||
|
||||
@app.post("/api/kafka/ingest")
|
||||
async def ingest_logs(event: EventPayload):
|
||||
try:
|
||||
if not event.ts:
|
||||
event.ts = datetime.utcnow().isoformat() + 'Z'
|
||||
|
||||
producer = get_producer()
|
||||
producer.send(
|
||||
'user-interactions',
|
||||
key=event.sessionId,
|
||||
value=event.model_dump()
|
||||
)
|
||||
producer.flush(timeout=5)
|
||||
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"[ERROR] {e}")
|
||||
print(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.get("/api/kafka/dump")
|
||||
def dump_logs():
|
||||
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
|
||||
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
|
||||
pass
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
PORT=int(os.getenv("BACKEND_PORT", 5000))
|
||||
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)
|
||||
5
backend/server/requirements.txt
Normal file
5
backend/server/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
fastapi==0.104.1
|
||||
uvicorn[standard]==0.24.0
|
||||
kafka-python==2.0.2
|
||||
pydantic==2.5.0
|
||||
python-dotenv==1.0.0
|
||||
Reference in New Issue
Block a user