Files
PHANTOM/lib/kafka_client.py

55 lines
1.5 KiB
Python
Executable File

from kafka import KafkaConsumer
import json
import os
from dotenv import load_dotenv
load_dotenv()
def get_interactions(
topic='user-interactions',
bootstrap_servers=None,
from_beginning=True,
max_records=None,
timeout_ms=5000
):
"""Consume interaction events from Kafka.
Args:
topic: Kafka topic name
bootstrap_servers: Kafka broker address (default from env)
from_beginning: Start from earliest offset if True
max_records: Max number of records to fetch (None = all available)
timeout_ms: Consumer poll timeout
Returns:
List of parsed interaction event dicts
"""
if not bootstrap_servers:
host = os.getenv('KAFKA_HOST', 'localhost')
port = os.getenv('KAFKA_PORT', '9092')
bootstrap_servers = f'{host}:{port}'
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest' if from_beginning else 'latest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=timeout_ms
)
events = []
try:
for msg in consumer:
events.append(msg.value)
if max_records and len(events) >= max_records:
break
finally:
consumer.close()
return events
if __name__ == '__main__':
interactions = get_interactions(max_records=10)
for event in interactions:
print(event)