From 56308ecb1056a9179c34e2f50f0216704b4a47d5 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Wed, 21 Jan 2026 19:12:11 +0100 Subject: [PATCH] chore: export repeated methods into lib --- lib/__init__.py | 41 +++++++++++++++ lib/config.py | 65 +++++++++++++++++++++++ lib/features.py | 125 ++++++++++++++++++++++++++++++++++++++++++++ lib/kafka_client.py | 54 +++++++++++++++++++ lib/state.py | 72 +++++++++++++++++++++++++ 5 files changed, 357 insertions(+) create mode 100644 lib/__init__.py create mode 100644 lib/config.py create mode 100644 lib/features.py create mode 100755 lib/kafka_client.py create mode 100644 lib/state.py diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..7f8ec2d --- /dev/null +++ b/lib/__init__.py @@ -0,0 +1,41 @@ +"""PHANTOM shared library +Exports unified utilities for features, state, config, kafka, and model registry +""" +from .config import ( + PROJECT_ROOT, DATA_DIR, EXPERIMENTS_DIR, + AGENT_DATA_DIR, HUMAN_DATA_DIR, SIM_RUNS_DIR, MODEL_REGISTRY_DIR, + COLLECTED_DATA_DIR, NOTEBOOK_OUTPUT_DIR, + ensure_dir, get_data_path, get_experiments_path, get_sim_path, + KAFKA_HOST, KAFKA_PORT, KAFKA_BROKER, + REDIS_HOST, REDIS_PORT, + SUPABASE_URL, SUPABASE_ANON_KEY, + BACKEND_PORT, PROVIDER_PORT +) +from .state import ( + make_state_repr, event_to_state, parse_state, + get_event_name, get_timestamp, + create_state_fn, create_event_name_fn, create_timestamp_fn +) +from .features import ( + transition_histogram, temporal_signature, state_coverage, transition_entropy, + event_type_distribution, featurize_trajectory, parse_timestamp +) + +__all__ = [ + # config + 'PROJECT_ROOT', 'DATA_DIR', 'EXPERIMENTS_DIR', + 'AGENT_DATA_DIR', 'HUMAN_DATA_DIR', 'SIM_RUNS_DIR', 'MODEL_REGISTRY_DIR', + 'COLLECTED_DATA_DIR', 'NOTEBOOK_OUTPUT_DIR', + 'ensure_dir', 'get_data_path', 'get_experiments_path', 'get_sim_path', + 'KAFKA_HOST', 'KAFKA_PORT', 'KAFKA_BROKER', + 'REDIS_HOST', 'REDIS_PORT', + 'SUPABASE_URL', 'SUPABASE_ANON_KEY', + 'BACKEND_PORT', 'PROVIDER_PORT', + # state + 'make_state_repr', 'event_to_state', 'parse_state', + 'get_event_name', 'get_timestamp', + 'create_state_fn', 'create_event_name_fn', 'create_timestamp_fn', + # features + 'transition_histogram', 'temporal_signature', 'state_coverage', 'transition_entropy', + 'event_type_distribution', 'featurize_trajectory', 'parse_timestamp', +] diff --git a/lib/config.py b/lib/config.py new file mode 100644 index 0000000..a27ffd9 --- /dev/null +++ b/lib/config.py @@ -0,0 +1,65 @@ +"""Unified path configuration for PHANTOM project +All hardcoded paths should reference this module +Paths can be overridden via environment variables +""" +import os +from pathlib import Path + +# project root (directory containing lib/, experiments/, sim/, web/, backend/) +PROJECT_ROOT = Path(__file__).parent.parent.resolve() + +# data directories +DATA_DIR = Path(os.getenv('PHANTOM_DATA_DIR', PROJECT_ROOT / 'data')) +EXPERIMENTS_DIR = Path(os.getenv('PHANTOM_EXPERIMENTS_DIR', PROJECT_ROOT / 'experiments')) + +# agent/human interaction data +AGENT_DATA_DIR = Path(os.getenv('PHANTOM_AGENT_DATA_DIR', DATA_DIR / 'agents')) +HUMAN_DATA_DIR = Path(os.getenv('PHANTOM_HUMAN_DATA_DIR', DATA_DIR / 'humans')) + +# RL simulation runs +SIM_RUNS_DIR = Path(os.getenv('PHANTOM_SIM_RUNS_DIR', PROJECT_ROOT / 'sim' / 'rl' / 'runs')) + +# model artifacts +MODEL_REGISTRY_DIR = Path(os.getenv('PHANTOM_MODEL_REGISTRY_DIR', DATA_DIR / 'models')) + +# collected experiment data +COLLECTED_DATA_DIR = Path(os.getenv('PHANTOM_COLLECTED_DATA_DIR', EXPERIMENTS_DIR / 'agents' / 'collected_data')) + +# notebook outputs +NOTEBOOK_OUTPUT_DIR = Path(os.getenv('PHANTOM_NOTEBOOK_OUTPUT_DIR', EXPERIMENTS_DIR / 'notebooks' / 'outputs')) + + +def ensure_dir(path: Path) -> Path: + """ensure directory exists, create if needed""" + path.mkdir(parents=True, exist_ok=True) + return path + + +def get_data_path(*parts: str) -> Path: + """construct path relative to DATA_DIR""" + return DATA_DIR.joinpath(*parts) + + +def get_experiments_path(*parts: str) -> Path: + """construct path relative to EXPERIMENTS_DIR""" + return EXPERIMENTS_DIR.joinpath(*parts) + + +def get_sim_path(*parts: str) -> Path: + """construct path relative to SIM_RUNS_DIR""" + return SIM_RUNS_DIR.joinpath(*parts) + + +# service configuration (from .env) +KAFKA_HOST = os.getenv('KAFKA_HOST', 'localhost') +KAFKA_PORT = os.getenv('KAFKA_PORT', '9092') +KAFKA_BROKER = f"{KAFKA_HOST}:{KAFKA_PORT}" + +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PORT = int(os.getenv('REDIS_PORT', '6379')) + +SUPABASE_URL = os.getenv('NEXT_PUBLIC_SUPABASE_URL', '') +SUPABASE_ANON_KEY = os.getenv('NEXT_PUBLIC_SUPABASE_ANON_KEY', '') + +BACKEND_PORT = int(os.getenv('BACKEND_PORT', '5000')) +PROVIDER_PORT = int(os.getenv('PROVIDER_PORT', '5001')) diff --git a/lib/features.py b/lib/features.py new file mode 100644 index 0000000..f2d88f5 --- /dev/null +++ b/lib/features.py @@ -0,0 +1,125 @@ +"""Unified featurization utilities for trajectory -> feature vector conversion +Used by both experiments/ml/ and sim/rl/ components +""" +import numpy as np +from collections import defaultdict +from typing import List, Dict, Callable, Optional, Any, Set +from datetime import datetime + + +def transition_histogram(events: List, state_fn: Callable, max_states: int = 50) -> np.ndarray: + """compute normalized histogram of state transitions in trajectory + events: list of event objects/dicts + state_fn: function mapping event -> state string + max_states: maximum dimensions for histogram + """ + if len(events) < 2: + return np.zeros(max_states, dtype=np.float32) + states = [state_fn(e) for e in events] + trans_counts = defaultdict(int) + for s, s_next in zip(states, states[1:]): + trans_counts[(s, s_next)] += 1 + total = sum(trans_counts.values()) + hist = np.array(list(trans_counts.values())[:max_states], dtype=np.float32) + hist = np.pad(hist, (0, max(0, max_states - len(hist)))) + return hist / (total + 1e-10) + + +def temporal_signature(events: List, ts_fn: Callable) -> np.ndarray: + """extract temporal features: mean/std/skew of inter-event times plus count + events: list of event objects/dicts + ts_fn: function mapping event -> timestamp (float seconds) + returns: [mean_dt, std_dt, skew, n_intervals] array + """ + if len(events) < 2: + return np.zeros(4, dtype=np.float32) + times = sorted([ts_fn(e) for e in events]) + diffs = np.diff(times).astype(np.float32) + if len(diffs) == 0: + return np.zeros(4, dtype=np.float32) + mean_dt, std_dt = np.mean(diffs), np.std(diffs) + 1e-10 + skew = np.mean(((diffs - mean_dt) / std_dt) ** 3) if std_dt > 1e-8 else 0.0 + return np.array([mean_dt, std_dt, skew, len(diffs)], dtype=np.float32) + + +def state_coverage(events: List, state_fn: Callable, mdp_states: Set[str]) -> float: + """fraction of MDP states visited by trajectory + events: list of event objects/dicts + state_fn: function mapping event -> state string + mdp_states: set of all possible MDP states + """ + if not mdp_states: + return 0.0 + visited = set(state_fn(e) for e in events) + return len(visited & mdp_states) / len(mdp_states) + + +def transition_entropy(events: List, state_fn: Callable) -> float: + """compute entropy of transition distribution (randomness of navigation) + higher entropy = more random browsing pattern + """ + if len(events) < 2: + return 0.0 + states = [state_fn(e) for e in events] + trans_counts = defaultdict(int) + for s, s_next in zip(states, states[1:]): + trans_counts[(s, s_next)] += 1 + total = sum(trans_counts.values()) + probs = [c / total for c in trans_counts.values()] + return -sum(p * np.log(p + 1e-10) for p in probs) + + +def event_type_distribution(events: List, event_name_fn: Callable) -> np.ndarray: + """compute proportions of different event type categories + returns: [page_view_ratio, hover_ratio, cart_ratio, purchase_ratio] + """ + if not events: + return np.zeros(4, dtype=np.float32) + n = len(events) + names = [event_name_fn(e).lower() for e in events] + return np.array([ + sum(1 for nm in names if 'page' in nm or 'view' in nm) / n, + sum(1 for nm in names if 'hover' in nm) / n, + sum(1 for nm in names if 'cart' in nm) / n, + sum(1 for nm in names if 'purchase' in nm or 'checkout' in nm) / n + ], dtype=np.float32) + + +def featurize_trajectory(events: List, state_fn: Callable, ts_fn: Callable, + event_name_fn: Callable, mdp_states: Optional[Set[str]] = None, + output_dim: int = 64) -> np.ndarray: + """convert trajectory to fixed-dimension feature vector + events: list of event objects/dicts + state_fn: function mapping event -> state string + ts_fn: function mapping event -> timestamp (float) + event_name_fn: function mapping event -> event name string + mdp_states: optional set of all MDP states for coverage calculation + output_dim: desired output dimension (will pad/truncate) + """ + feats = [] + feats.extend(transition_histogram(events, state_fn, max_states=40)) # 40 dims + feats.extend(temporal_signature(events, ts_fn)) # 4 dims + feats.append(state_coverage(events, state_fn, mdp_states or set())) # 1 dim + feats.append(transition_entropy(events, state_fn)) # 1 dim + feats.append(float(len(events))) # trajectory length + feats.append(float(len(set(state_fn(e) for e in events)))) # unique states + feats.extend(event_type_distribution(events, event_name_fn)) # 4 dims + + feats = np.array(feats[:output_dim], dtype=np.float32) + if len(feats) < output_dim: + feats = np.pad(feats, (0, output_dim - len(feats))) + return feats + + +def parse_timestamp(ts: Any) -> float: + """parse various timestamp formats to float seconds""" + if ts is None: + return 0.0 + if isinstance(ts, (int, float)): + return float(ts) + if isinstance(ts, str): + try: + return datetime.fromisoformat(ts.replace('Z', '+00:00')).timestamp() + except ValueError: + return 0.0 + return 0.0 diff --git a/lib/kafka_client.py b/lib/kafka_client.py new file mode 100755 index 0000000..d61cd9e --- /dev/null +++ b/lib/kafka_client.py @@ -0,0 +1,54 @@ +from kafka import KafkaConsumer +import json +import os +from dotenv import load_dotenv +load_dotenv() + +def get_interactions( + topic='user-interactions', + bootstrap_servers=None, + from_beginning=True, + max_records=None, + timeout_ms=5000 +): + """Consume interaction events from Kafka. + + Args: + topic: Kafka topic name + bootstrap_servers: Kafka broker address (default from env) + from_beginning: Start from earliest offset if True + max_records: Max number of records to fetch (None = all available) + timeout_ms: Consumer poll timeout + + Returns: + List of parsed interaction event dicts + """ + if not bootstrap_servers: + host = os.getenv('KAFKA_HOST', 'localhost') + port = os.getenv('KAFKA_PORT', '9092') + bootstrap_servers = f'{host}:{port}' + + consumer = KafkaConsumer( + topic, + bootstrap_servers=bootstrap_servers, + auto_offset_reset='earliest' if from_beginning else 'latest', + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + consumer_timeout_ms=timeout_ms + ) + + events = [] + try: + for msg in consumer: + events.append(msg.value) + if max_records and len(events) >= max_records: + break + finally: + consumer.close() + + return events + +if __name__ == '__main__': + interactions = get_interactions(max_records=10) + for event in interactions: + print(event) diff --git a/lib/state.py b/lib/state.py new file mode 100644 index 0000000..cfb4251 --- /dev/null +++ b/lib/state.py @@ -0,0 +1,72 @@ +"""Unified state representation utilities for MDP state encoding +Used by both experiments/ and sim/ components for consistent state handling +""" +from typing import Any, Callable + + +def make_state_repr(page: str = None, product_id: str = None, event_name: str = None) -> str: + """create canonical state representation string from components + format: page|productId|eventName + """ + p = page or 'unk' + pid = product_id or 'none' + en = event_name or 'unknown' + return f"{p}|{pid}|{en}" + + +def event_to_state(evt: Any) -> str: + """convert event object/dict to state string + supports both object attributes and dict keys + """ + if isinstance(evt, dict): + return make_state_repr( + page=evt.get('page'), + product_id=evt.get('productId'), + event_name=evt.get('eventName') or evt.get('event_type') + ) + return make_state_repr( + page=getattr(evt, 'page', None), + product_id=getattr(evt, 'productId', None), + event_name=getattr(evt, 'eventName', None) or getattr(evt, 'event_type', None) + ) + + +def parse_state(state_str: str) -> dict: + """parse state string back to components + returns: {'page': str, 'productId': str, 'eventName': str} + """ + parts = state_str.split('|') + return { + 'page': parts[0] if len(parts) > 0 and parts[0] != 'unk' else None, + 'productId': parts[1] if len(parts) > 1 and parts[1] != 'none' else None, + 'eventName': parts[2] if len(parts) > 2 and parts[2] != 'unknown' else None + } + + +def get_event_name(evt: Any) -> str: + """extract event name from event object/dict""" + if isinstance(evt, dict): + return evt.get('eventName') or evt.get('event_type') or '' + return getattr(evt, 'eventName', None) or getattr(evt, 'event_type', None) or '' + + +def get_timestamp(evt: Any) -> Any: + """extract timestamp from event object/dict""" + if isinstance(evt, dict): + return evt.get('ts') or evt.get('timestamp') + return getattr(evt, 'ts', None) or getattr(evt, 'timestamp', None) + + +def create_state_fn() -> Callable: + """factory for state representation function""" + return event_to_state + + +def create_event_name_fn() -> Callable: + """factory for event name extraction function""" + return get_event_name + + +def create_timestamp_fn() -> Callable: + """factory for timestamp extraction function (returns raw value, use features.parse_timestamp to convert)""" + return get_timestamp