diff --git a/tests/e2e/helpers/kafka.ts b/tests/e2e/helpers/kafka.ts index 9b4f081..c0a95dd 100644 --- a/tests/e2e/helpers/kafka.ts +++ b/tests/e2e/helpers/kafka.ts @@ -6,58 +6,34 @@ interface InteractionEvent { metadata?: Record; } -export async function dumpKafkaTopic( - backendUrl: string, - topic: string -): Promise { +const dumpKafkaTopic = async (backendUrl: string, topic: string) => { const resp = await fetch(`${backendUrl}/api/kafka/dump?topic=${topic}`); - if (!resp.ok) { - throw new Error(`Kafka dump failed: ${resp.status}`); - } - const data = await resp.json(); - return data.messages || []; -} + if (!resp.ok) throw new Error(`Kafka dump failed: ${resp.status}`); + const { messages = [] } = await resp.json(); + return messages as any[]; +}; -export async function waitForInteractionEvent( +export const waitForInteractionEvent = async ( backendUrl: string, sessionId: string, eventType: string, - maxRetries: number = 10, - pollInterval: number = 500 -): Promise { + maxRetries = 10, + pollInterval = 500 +): Promise => { for (let i = 0; i < maxRetries; i++) { - const msgs = await dumpKafkaTopic(backendUrl, 'user-interactions'); - - for (const msg of msgs) { - if (msg.sessionId === sessionId && msg.event === eventType) { - return msg as InteractionEvent; - } - } - - await new Promise(r => setTimeout(r, pollInterval)); + const msgs = await dumpKafkaTopic(backendUrl, "user-interactions"); + const hit = msgs.find(m => m.sessionId === sessionId && m.event === eventType); + if (hit) return hit as InteractionEvent; + await new Promise(r => setTimeout(r, pollInterval)); } - return null; -} +}; -export async function countProductViews( - backendUrl: string, - productId: string -): Promise { - const msgs = await dumpKafkaTopic(backendUrl, 'user-interactions'); +export const countProductViews = async (backendUrl: string, productId: string) => + (await dumpKafkaTopic(backendUrl, "user-interactions")).reduce( + (n, m) => n + (m.productId === productId && m.event === "view_item_page" ? 1 : 0), + 0 + ); - return msgs.reduce((cnt, msg) => { - if (msg.productId === productId && msg.event === 'view_item_page') { - return cnt + 1; - } - return cnt; - }, 0); -} - -export async function getSessionEvents( - backendUrl: string, - sessionId: string -): Promise { - const msgs = await dumpKafkaTopic(backendUrl, 'user-interactions'); - return msgs.filter(m => m.sessionId === sessionId); -} +export const getSessionEvents = async (backendUrl: string, sessionId: string) => + (await dumpKafkaTopic(backendUrl, "user-interactions")).filter(m => m.sessionId === sessionId);