Files
PHANTOM/backend/server/app.py
Daniel Alves Rösel ab8b8787a8 13 agentic behavior runner v1 (#14)
* baseline setup of agent abstract

* feat: new implementation of simple AI agent that can follow a goal and return

* refactored import structure and created full tests

* pytest setup a github workflow to run tests + more ignores

* singularity for pushing

* fixing builds of PDFs

* inital structure of docs

* init styles and docs

* basic style implementation

* 13 create outline for research paper draft (#18)

* updated outline for paper from issue

* extra paper sections and some formalization of series data

* algorithms and acknowledgements

* updated outline for paper from issue

* Refactor docker-compose services to use individual Dockerfiles (#20)

* Initial plan

* Refactor services into individual Dockerfiles

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* Add EXPOSE directives to all Dockerfiles with port documentation

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* 2 nextjs scaffold with store mode shop and admin session experiment wiring event emission v1 (#17)

* chore: cleaning gitignore

* formating and env documentation

* feat: context switching of hotel/airline depndent on env var via middleware

* fixed alignment and building

* wrong file

* prods

* fixed applying style

* better session cookie management

* tentative session storage with maybe using airtable

* migrated api of ingestion

* events and products apge

* fixing build

* 13 create outline for research paper draft (#18)

* updated outline for paper from issue

* extra paper sections and some formalization of series data

* algorithms and acknowledgements

* updated outline for paper from issue

* upadted text formating

* event unification

* refactor tracking to ues callbacks instead of refs

* implement a pricing display api with session passing

* moved middleware to proxy according to new changes in Nextjs

* refactoed kafka ingestion to go via backend not web-db

* Refactor docker-compose services to use individual Dockerfiles (#20)

* Initial plan

* Refactor services into individual Dockerfiles

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* Add EXPOSE directives to all Dockerfiles with port documentation

Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: velocitatem <60182044+velocitatem@users.noreply.github.com>

* fixing small bugs and adding exepriments to tracking

* added some doc

* fixing prod

* prod kafka server logging

* topic auto create

* pytest setup a github workflow to run tests + more ignores

* getting data from agents properly

* proper pipeline to handle data and build matrices

* fixing backend dumping

* fixing agents and ignore

* fixing import for tests

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2025-11-15 16:16:01 +01:00

190 lines
5.6 KiB
Python

# boilerplate code
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Any
import uvicorn
import os
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaAdminClient, KafkaConsumer
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# kafka producer - lazy init
_producer: Optional[KafkaProducer] = None
def get_producer() -> KafkaProducer:
global _producer
if _producer is None:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}' if port else host
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
_producer = KafkaProducer(
bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks=1,
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
api_version_auto_timeout_ms=10000,
max_block_ms=5000, # don't block send() for more than 5s
)
print(f"[KAFKA_INIT] Producer created successfully")
return _producer
class EventPayload(BaseModel):
sessionId: str
eventName: str
page: str
productId: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
storeMode: str
userAgent: Optional[str] = None
ts: Optional[str] = None
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
"""create kafka topics on startup"""
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}'
try:
print(f"[STARTUP] Creating Kafka topics on {broker}")
admin = KafkaAdminClient(
bootstrap_servers=[broker],
request_timeout_ms=10000,
)
topics = [
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1)
]
admin.create_topics(new_topics=topics, validate_only=False)
print(f"[STARTUP] Topics created successfully")
admin.close()
except TopicAlreadyExistsError:
print(f"[STARTUP] Topics already exist, skipping creation")
except Exception as e:
print(f"[STARTUP] Failed to create topics: {e}")
print(f"[STARTUP] Will rely on auto-creation on first message")
@app.get("/health")
async def health():
kafka_status = "unknown"
try:
producer = get_producer()
# attempt to get cluster metadata to verify connection
producer.bootstrap_connected()
kafka_status = "connected"
except Exception as e:
kafka_status = f"error: {str(e)}"
return {
"status": "healthy",
"kafka": kafka_status,
"kafka_broker": f"{os.getenv('KAFKA_HOST', 'localhost')}:{os.getenv('KAFKA_PORT', '9092')}"
}
@app.post("/api/kafka/ingest")
async def ingest_logs(event: EventPayload):
try:
if not event.ts:
event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer()
future = producer.send(
'user-interactions',
key=event.sessionId,
value=event.model_dump()
)
# add callback for error logging but don't block
future.add_errback(lambda e: print(f"[KAFKA_SEND_ERROR] {e}"))
return {"success": True}
except Exception as e:
import traceback
print(f"[ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/kafka/dump")
def dump_logs(
last_n: Optional[int] = None,
t_start: Optional[str] = None,
t_end: Optional[str] = None
):
"""dump all messages from user-interactions topic
params:
last_n: return only last n messages (default: all)
t_start: filter by start timestamp iso format (future use)
t_end: filter by end timestamp iso format (future use)
"""
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
broker = f'{host}:{port}'
try:
consumer = KafkaConsumer(
'user-interactions',
bootstrap_servers=[broker],
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=5000
)
events = []
for msg in consumer:
events.append(msg.value)
consumer.close()
# apply filters
if t_start or t_end:
# filter by timestamp range if provided
filtered = []
for e in events:
ts = e.get('ts')
if ts:
if t_start and ts < t_start:
continue
if t_end and ts > t_end:
continue
filtered.append(e)
events = filtered
if last_n and last_n > 0:
events = events[-last_n:]
return {"success": True, "count": len(events), "data": events}
except Exception as e:
import traceback
print(f"[DUMP_ERROR] {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
PORT=int(os.getenv("BACKEND_PORT", 5000))
uvicorn.run("server:app", host="0.0.0.0", port=PORT, reload=True)