mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
topic auto create
This commit is contained in:
@@ -7,7 +7,9 @@ import uvicorn
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from kafka import KafkaProducer
|
from kafka import KafkaProducer, KafkaAdminClient
|
||||||
|
from kafka.admin import NewTopic
|
||||||
|
from kafka.errors import TopicAlreadyExistsError
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
@@ -20,7 +22,7 @@ def get_producer() -> KafkaProducer:
|
|||||||
global _producer
|
global _producer
|
||||||
if _producer is None:
|
if _producer is None:
|
||||||
host = os.getenv('KAFKA_HOST', 'localhost')
|
host = os.getenv('KAFKA_HOST', 'localhost')
|
||||||
port = os.getenv('KAFKA_PORT', '9092')
|
port = os.getenv('KAFKA_PORT', '29092') # use internal broker port
|
||||||
broker = f'{host}:{port}' if port else host
|
broker = f'{host}:{port}' if port else host
|
||||||
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
|
print(f"[KAFKA_INIT] Connecting to broker: {broker}")
|
||||||
_producer = KafkaProducer(
|
_producer = KafkaProducer(
|
||||||
@@ -32,6 +34,7 @@ def get_producer() -> KafkaProducer:
|
|||||||
max_in_flight_requests_per_connection=5,
|
max_in_flight_requests_per_connection=5,
|
||||||
request_timeout_ms=30000,
|
request_timeout_ms=30000,
|
||||||
api_version_auto_timeout_ms=10000,
|
api_version_auto_timeout_ms=10000,
|
||||||
|
max_block_ms=5000, # don't block send() for more than 5s
|
||||||
)
|
)
|
||||||
print(f"[KAFKA_INIT] Producer created successfully")
|
print(f"[KAFKA_INIT] Producer created successfully")
|
||||||
return _producer
|
return _producer
|
||||||
@@ -54,6 +57,33 @@ app.add_middleware(
|
|||||||
allow_headers=["*"],
|
allow_headers=["*"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup_event():
|
||||||
|
"""create kafka topics on startup"""
|
||||||
|
host = os.getenv('KAFKA_HOST', 'localhost')
|
||||||
|
port = os.getenv('KAFKA_PORT', '29092')
|
||||||
|
broker = f'{host}:{port}'
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"[STARTUP] Creating Kafka topics on {broker}")
|
||||||
|
admin = KafkaAdminClient(
|
||||||
|
bootstrap_servers=[broker],
|
||||||
|
request_timeout_ms=10000,
|
||||||
|
)
|
||||||
|
|
||||||
|
topics = [
|
||||||
|
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1)
|
||||||
|
]
|
||||||
|
|
||||||
|
admin.create_topics(new_topics=topics, validate_only=False)
|
||||||
|
print(f"[STARTUP] Topics created successfully")
|
||||||
|
admin.close()
|
||||||
|
except TopicAlreadyExistsError:
|
||||||
|
print(f"[STARTUP] Topics already exist, skipping creation")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[STARTUP] Failed to create topics: {e}")
|
||||||
|
print(f"[STARTUP] Will rely on auto-creation on first message")
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health():
|
async def health():
|
||||||
kafka_status = "unknown"
|
kafka_status = "unknown"
|
||||||
|
|||||||
Reference in New Issue
Block a user