From 9bb6f842f4e5d6272b192a72c8b8b19e8b641fe7 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Thu, 13 Nov 2025 18:41:37 +0100 Subject: [PATCH] topic auto create --- backend/server/app.py | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/backend/server/app.py b/backend/server/app.py index c91a88c..d57d1de 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -7,7 +7,9 @@ import uvicorn import os import json from datetime import datetime -from kafka import KafkaProducer +from kafka import KafkaProducer, KafkaAdminClient +from kafka.admin import NewTopic +from kafka.errors import TopicAlreadyExistsError from dotenv import load_dotenv load_dotenv() @@ -20,7 +22,7 @@ def get_producer() -> KafkaProducer: global _producer if _producer is None: host = os.getenv('KAFKA_HOST', 'localhost') - port = os.getenv('KAFKA_PORT', '9092') + port = os.getenv('KAFKA_PORT', '29092') # use internal broker port broker = f'{host}:{port}' if port else host print(f"[KAFKA_INIT] Connecting to broker: {broker}") _producer = KafkaProducer( @@ -32,6 +34,7 @@ def get_producer() -> KafkaProducer: max_in_flight_requests_per_connection=5, request_timeout_ms=30000, api_version_auto_timeout_ms=10000, + max_block_ms=5000, # don't block send() for more than 5s ) print(f"[KAFKA_INIT] Producer created successfully") return _producer @@ -54,6 +57,33 @@ app.add_middleware( allow_headers=["*"], ) +@app.on_event("startup") +async def startup_event(): + """create kafka topics on startup""" + host = os.getenv('KAFKA_HOST', 'localhost') + port = os.getenv('KAFKA_PORT', '29092') + broker = f'{host}:{port}' + + try: + print(f"[STARTUP] Creating Kafka topics on {broker}") + admin = KafkaAdminClient( + bootstrap_servers=[broker], + request_timeout_ms=10000, + ) + + topics = [ + NewTopic(name='user-interactions', num_partitions=3, replication_factor=1) + ] + + admin.create_topics(new_topics=topics, validate_only=False) + print(f"[STARTUP] Topics created successfully") + admin.close() + except TopicAlreadyExistsError: + print(f"[STARTUP] Topics already exist, skipping creation") + except Exception as e: + print(f"[STARTUP] Failed to create topics: {e}") + print(f"[STARTUP] Will rely on auto-creation on first message") + @app.get("/health") async def health(): kafka_status = "unknown"