migrated api of ingestion

This commit is contained in:
2025-11-06 19:03:12 +01:00
parent 0cec8487ba
commit d3d5f39ec5
7 changed files with 817 additions and 391 deletions

View File

@@ -1,42 +1,35 @@
import { Kafka, Producer } from 'kafkajs';
import type { EventBase } from './events';
let producer: Producer | null = null;
const kafka = new Kafka({
clientId: 'phantom-web',
brokers: [`${process.env.KAFKA_HOST || 'localhost'}:${process.env.KAFKA_PORT || '9092'}`],
clientId: 'phantom-web',
brokers: [`${process.env.KAFKA_HOST || 'localhost'}:${process.env.KAFKA_PORT || '9092'}`],
});
export const getProducer = async (): Promise<Producer> => {
if (!producer) {
producer = kafka.producer();
await producer.connect();
}
return producer;
if (!producer) {
producer = kafka.producer();
await producer.connect();
}
return producer;
};
export const sendInteractionEvent = async (ev: {
sessionId: string;
eventType: string;
targetEl?: string;
targetUrl?: string;
metadata?: Record<string, any>;
ts: number;
}) => {
const p = await getProducer();
// add to the metadata
await p.send({
topic: 'user-interactions',
messages: [{
key: ev.sessionId,
value: JSON.stringify(ev),
}],
});
export const sendEvent = async (ev: EventBase) => {
const p = await getProducer();
await p.send({
topic: 'user-interactions',
messages: [{
key: ev.sessionId,
value: JSON.stringify(ev),
}],
});
};
export const disconnect = async () => {
if (producer) {
await producer.disconnect();
producer = null;
}
if (producer) {
await producer.disconnect();
producer = null;
}
};