mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
chore: export repeated methods into lib
This commit is contained in:
125
lib/features.py
Normal file
125
lib/features.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Unified featurization utilities for trajectory -> feature vector conversion
|
||||
Used by both experiments/ml/ and sim/rl/ components
|
||||
"""
|
||||
import numpy as np
|
||||
from collections import defaultdict
|
||||
from typing import List, Dict, Callable, Optional, Any, Set
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def transition_histogram(events: List, state_fn: Callable, max_states: int = 50) -> np.ndarray:
|
||||
"""compute normalized histogram of state transitions in trajectory
|
||||
events: list of event objects/dicts
|
||||
state_fn: function mapping event -> state string
|
||||
max_states: maximum dimensions for histogram
|
||||
"""
|
||||
if len(events) < 2:
|
||||
return np.zeros(max_states, dtype=np.float32)
|
||||
states = [state_fn(e) for e in events]
|
||||
trans_counts = defaultdict(int)
|
||||
for s, s_next in zip(states, states[1:]):
|
||||
trans_counts[(s, s_next)] += 1
|
||||
total = sum(trans_counts.values())
|
||||
hist = np.array(list(trans_counts.values())[:max_states], dtype=np.float32)
|
||||
hist = np.pad(hist, (0, max(0, max_states - len(hist))))
|
||||
return hist / (total + 1e-10)
|
||||
|
||||
|
||||
def temporal_signature(events: List, ts_fn: Callable) -> np.ndarray:
|
||||
"""extract temporal features: mean/std/skew of inter-event times plus count
|
||||
events: list of event objects/dicts
|
||||
ts_fn: function mapping event -> timestamp (float seconds)
|
||||
returns: [mean_dt, std_dt, skew, n_intervals] array
|
||||
"""
|
||||
if len(events) < 2:
|
||||
return np.zeros(4, dtype=np.float32)
|
||||
times = sorted([ts_fn(e) for e in events])
|
||||
diffs = np.diff(times).astype(np.float32)
|
||||
if len(diffs) == 0:
|
||||
return np.zeros(4, dtype=np.float32)
|
||||
mean_dt, std_dt = np.mean(diffs), np.std(diffs) + 1e-10
|
||||
skew = np.mean(((diffs - mean_dt) / std_dt) ** 3) if std_dt > 1e-8 else 0.0
|
||||
return np.array([mean_dt, std_dt, skew, len(diffs)], dtype=np.float32)
|
||||
|
||||
|
||||
def state_coverage(events: List, state_fn: Callable, mdp_states: Set[str]) -> float:
|
||||
"""fraction of MDP states visited by trajectory
|
||||
events: list of event objects/dicts
|
||||
state_fn: function mapping event -> state string
|
||||
mdp_states: set of all possible MDP states
|
||||
"""
|
||||
if not mdp_states:
|
||||
return 0.0
|
||||
visited = set(state_fn(e) for e in events)
|
||||
return len(visited & mdp_states) / len(mdp_states)
|
||||
|
||||
|
||||
def transition_entropy(events: List, state_fn: Callable) -> float:
|
||||
"""compute entropy of transition distribution (randomness of navigation)
|
||||
higher entropy = more random browsing pattern
|
||||
"""
|
||||
if len(events) < 2:
|
||||
return 0.0
|
||||
states = [state_fn(e) for e in events]
|
||||
trans_counts = defaultdict(int)
|
||||
for s, s_next in zip(states, states[1:]):
|
||||
trans_counts[(s, s_next)] += 1
|
||||
total = sum(trans_counts.values())
|
||||
probs = [c / total for c in trans_counts.values()]
|
||||
return -sum(p * np.log(p + 1e-10) for p in probs)
|
||||
|
||||
|
||||
def event_type_distribution(events: List, event_name_fn: Callable) -> np.ndarray:
|
||||
"""compute proportions of different event type categories
|
||||
returns: [page_view_ratio, hover_ratio, cart_ratio, purchase_ratio]
|
||||
"""
|
||||
if not events:
|
||||
return np.zeros(4, dtype=np.float32)
|
||||
n = len(events)
|
||||
names = [event_name_fn(e).lower() for e in events]
|
||||
return np.array([
|
||||
sum(1 for nm in names if 'page' in nm or 'view' in nm) / n,
|
||||
sum(1 for nm in names if 'hover' in nm) / n,
|
||||
sum(1 for nm in names if 'cart' in nm) / n,
|
||||
sum(1 for nm in names if 'purchase' in nm or 'checkout' in nm) / n
|
||||
], dtype=np.float32)
|
||||
|
||||
|
||||
def featurize_trajectory(events: List, state_fn: Callable, ts_fn: Callable,
|
||||
event_name_fn: Callable, mdp_states: Optional[Set[str]] = None,
|
||||
output_dim: int = 64) -> np.ndarray:
|
||||
"""convert trajectory to fixed-dimension feature vector
|
||||
events: list of event objects/dicts
|
||||
state_fn: function mapping event -> state string
|
||||
ts_fn: function mapping event -> timestamp (float)
|
||||
event_name_fn: function mapping event -> event name string
|
||||
mdp_states: optional set of all MDP states for coverage calculation
|
||||
output_dim: desired output dimension (will pad/truncate)
|
||||
"""
|
||||
feats = []
|
||||
feats.extend(transition_histogram(events, state_fn, max_states=40)) # 40 dims
|
||||
feats.extend(temporal_signature(events, ts_fn)) # 4 dims
|
||||
feats.append(state_coverage(events, state_fn, mdp_states or set())) # 1 dim
|
||||
feats.append(transition_entropy(events, state_fn)) # 1 dim
|
||||
feats.append(float(len(events))) # trajectory length
|
||||
feats.append(float(len(set(state_fn(e) for e in events)))) # unique states
|
||||
feats.extend(event_type_distribution(events, event_name_fn)) # 4 dims
|
||||
|
||||
feats = np.array(feats[:output_dim], dtype=np.float32)
|
||||
if len(feats) < output_dim:
|
||||
feats = np.pad(feats, (0, output_dim - len(feats)))
|
||||
return feats
|
||||
|
||||
|
||||
def parse_timestamp(ts: Any) -> float:
|
||||
"""parse various timestamp formats to float seconds"""
|
||||
if ts is None:
|
||||
return 0.0
|
||||
if isinstance(ts, (int, float)):
|
||||
return float(ts)
|
||||
if isinstance(ts, str):
|
||||
try:
|
||||
return datetime.fromisoformat(ts.replace('Z', '+00:00')).timestamp()
|
||||
except ValueError:
|
||||
return 0.0
|
||||
return 0.0
|
||||
Reference in New Issue
Block a user