fixing prod

This commit is contained in:
2025-11-13 18:21:07 +01:00
parent 707ce032cf
commit a80ee0e80f

View File

@@ -26,6 +26,11 @@ def get_producer() -> KafkaProducer:
bootstrap_servers=[broker], bootstrap_servers=[broker],
value_serializer=lambda v: json.dumps(v).encode('utf-8'), value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None, key_serializer=lambda k: k.encode('utf-8') if k else None,
acks=1, # wait for leader ack only
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
api_version_auto_timeout_ms=10000,
) )
return _producer return _producer
@@ -72,12 +77,13 @@ async def ingest_logs(event: EventPayload):
event.ts = datetime.utcnow().isoformat() + 'Z' event.ts = datetime.utcnow().isoformat() + 'Z'
producer = get_producer() producer = get_producer()
producer.send( future = producer.send(
'user-interactions', 'user-interactions',
key=event.sessionId, key=event.sessionId,
value=event.model_dump() value=event.model_dump()
) )
producer.flush(timeout=5) # add callback for error logging but don't block
future.add_errback(lambda e: print(f"[KAFKA_SEND_ERROR] {e}"))
return {"success": True} return {"success": True}
except Exception as e: except Exception as e: