mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
topic auto create
This commit is contained in:
@@ -7,7 +7,9 @@ import uvicorn
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from kafka import KafkaProducer
|
||||
from kafka import KafkaProducer, KafkaAdminClient
|
||||
from kafka.admin import NewTopic
|
||||
from kafka.errors import TopicAlreadyExistsError
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
@@ -20,7 +22,7 @@ def get_producer() -> KafkaProducer:
|
||||
global _producer
|
||||
if _producer is None:
|
||||
host = os.getenv('KAFKA_HOST', 'localhost')
|
||||
port = os.getenv('KAFKA_PORT', '9092')
|
||||
port = os.getenv('KAFKA_PORT', '29092') # use internal broker port
|
||||
broker = f'{host}:{port}' if port else host
|
||||
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
|
||||
_producer = KafkaProducer(
|
||||
@@ -32,6 +34,7 @@ def get_producer() -> KafkaProducer:
|
||||
max_in_flight_requests_per_connection=5,
|
||||
request_timeout_ms=30000,
|
||||
api_version_auto_timeout_ms=10000,
|
||||
max_block_ms=5000, # don't block send() for more than 5s
|
||||
)
|
||||
print(f"[KAFKA_INIT] Producer created successfully")
|
||||
return _producer
|
||||
@@ -54,6 +57,33 @@ app.add_middleware(
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""create kafka topics on startup"""
|
||||
host = os.getenv('KAFKA_HOST', 'localhost')
|
||||
port = os.getenv('KAFKA_PORT', '29092')
|
||||
broker = f'{host}:{port}'
|
||||
|
||||
try:
|
||||
print(f"[STARTUP] Creating Kafka topics on {broker}")
|
||||
admin = KafkaAdminClient(
|
||||
bootstrap_servers=[broker],
|
||||
request_timeout_ms=10000,
|
||||
)
|
||||
|
||||
topics = [
|
||||
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1)
|
||||
]
|
||||
|
||||
admin.create_topics(new_topics=topics, validate_only=False)
|
||||
print(f"[STARTUP] Topics created successfully")
|
||||
admin.close()
|
||||
except TopicAlreadyExistsError:
|
||||
print(f"[STARTUP] Topics already exist, skipping creation")
|
||||
except Exception as e:
|
||||
print(f"[STARTUP] Failed to create topics: {e}")
|
||||
print(f"[STARTUP] Will rely on auto-creation on first message")
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
kafka_status = "unknown"
|
||||
|
||||
Reference in New Issue
Block a user