diff --git a/backend/server/app.py b/backend/server/app.py index 8544689..0443250 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -26,6 +26,11 @@ def get_producer() -> KafkaProducer: bootstrap_servers=[broker], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None, + acks=1, # wait for leader ack only + retries=3, + max_in_flight_requests_per_connection=5, + request_timeout_ms=30000, + api_version_auto_timeout_ms=10000, ) return _producer @@ -72,12 +77,13 @@ async def ingest_logs(event: EventPayload): event.ts = datetime.utcnow().isoformat() + 'Z' producer = get_producer() - producer.send( + future = producer.send( 'user-interactions', key=event.sessionId, value=event.model_dump() ) - producer.flush(timeout=5) + # add callback for error logging but don't block + future.add_errback(lambda e: print(f"[KAFKA_SEND_ERROR] {e}")) return {"success": True} except Exception as e: