2 nextjs scaffold with store mode shop and admin session experiment wiring event emission v1 (#17)

* chore: cleaning gitignore

* formating and env documentation

* feat: context switching of hotel/airline depndent on env var via middleware

* fixed alignment and building

* wrong file

* prods

* fixed applying style

* better session cookie management

* tentative session storage with maybe using airtable

* migrated api of ingestion

* events and products apge

* fixing build

* 13 create outline for research paper draft (#18)

* updated outline for paper from issue

* extra paper sections and some formalization of series data

* algorithms and acknowledgements

* updated outline for paper from issue

* upadted text formating

* event unification

* refactor tracking to ues callbacks instead of refs

* implement a pricing display api with session passing

* moved middleware to proxy according to new changes in Nextjs

* refactoed kafka ingestion to go via backend not web-db

* Refactor docker-compose services to use individual Dockerfiles (#20)

* Initial plan

* Refactor services into individual Dockerfiles

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* Add EXPOSE directives to all Dockerfiles with port documentation

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* fixing small bugs and adding exepriments to tracking

* added some doc
This commit is contained in:
Daniel Alves Rösel
2025-11-13 18:07:27 +01:00
committed by Daniel Rosel
parent ea11539f7d
commit 707ce032cf
50 changed files with 2862 additions and 447 deletions

99
backend/server/app.py Normal file
View File

@@ -0,0 +1,99 @@
# boilerplate code
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any
import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# kafka producer - lazy init
_producer: Optional[KafkaProducer] = None
def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' if port else host
_producer = KafkaProducer(
bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
)
return _producer
class EventPayload(BaseModel):
sessionId: str
eventName: str
page: str
productId: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
storeMode: str
userAgent: Optional[str] = None
ts: Optional[str] = None
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health():
kafka_status = "unknown"
try:
producer = get_producer()
# attempt to get cluster metadata to verify connection
producer.bootstrap_connected()
kafka_status = "connected"
except Exception as e:
kafka_status = f"error: {str(e)}"
return {
"status": "healthy",
"kafka": kafka_status,
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
}
@app.post("/api/kafka/ingest")
async def ingest_logs(event: EventPayload):
try:
if not event.ts:
event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
producer.send(
'user-interactions',
key=event.sessionId,
value=event.model_dump()
)
producer.flush(timeout=5)
return {"success": True}
except Exception as e:
import traceback
print(f"[ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs():
# TODO: implement a dump of logs of time period t_start to t_end (params of get)
# OR: allow for params of last_n logs as a param - creating two modes of the dumping
pass
if __name__ == "__main__":
PORT=int(os.getenv("BACKEND_PORT", 5000))
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)

View File

@@ -0,0 +1,5 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
kafka-python==2.0.2
pydantic==2.5.0
python-dotenv==1.0.0