mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
feat: initial feature engineering of trajectories
This commit is contained in:
@@ -1,4 +1,7 @@
|
|||||||
from loader import Loader, AgentLoader, JointLoader
|
try:
|
||||||
|
from loader import Loader, AgentLoader, JointLoader
|
||||||
|
except ImportError:
|
||||||
|
from sim.rl.behavior_loader.loader import Loader, AgentLoader, JointLoader
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Dict, List, Tuple, Set
|
from typing import Dict, List, Tuple, Set
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@@ -80,6 +83,50 @@ class BehaviorModel:
|
|||||||
path.append(curr)
|
path.append(curr)
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
def extract_trajectory_features(self, events: List, max_trans_dim: int = 50) -> np.ndarray:
|
||||||
|
"""Convert trajectory to feature vector using MDP structure for contrastive learning"""
|
||||||
|
if not self.mdp:
|
||||||
|
self.build_MDP()
|
||||||
|
|
||||||
|
states = [self._state_repr(e) for e in sorted(events, key=self._sort_key)]
|
||||||
|
features = []
|
||||||
|
|
||||||
|
# transition histogram over MDP state space
|
||||||
|
trans_counts = defaultdict(int)
|
||||||
|
for s, s_next in zip(states, states[1:]):
|
||||||
|
trans_counts[(s, s_next)] += 1
|
||||||
|
all_trans = [(s, t) for s in self.mdp['states'] for t in self.mdp['transitions'].get(s, {}).keys()]
|
||||||
|
trans_vec = [trans_counts.get(tr, 0) for tr in all_trans[:max_trans_dim]]
|
||||||
|
trans_vec = trans_vec + [0] * (max_trans_dim - len(trans_vec)) # pad
|
||||||
|
total_trans = sum(trans_counts.values()) or 1
|
||||||
|
features.extend([v / total_trans for v in trans_vec])
|
||||||
|
|
||||||
|
# state coverage ratio
|
||||||
|
visited = set(states)
|
||||||
|
features.append(len(visited) / max(self.mdp['num_states'], 1))
|
||||||
|
|
||||||
|
# temporal entropy of transitions
|
||||||
|
if len(states) > 1:
|
||||||
|
trans_probs = [self.transition_prob(s, s_n) for s, s_n in zip(states, states[1:])]
|
||||||
|
entropy = -sum(p * np.log(p + 1e-10) for p in trans_probs if p > 0)
|
||||||
|
features.append(entropy / max(len(states), 1))
|
||||||
|
else:
|
||||||
|
features.append(0.0)
|
||||||
|
|
||||||
|
# trajectory length and unique state count
|
||||||
|
features.append(len(states))
|
||||||
|
features.append(len(visited))
|
||||||
|
|
||||||
|
# state value statistics along trajectory
|
||||||
|
vals = [self.state_value(s) for s in states]
|
||||||
|
if vals:
|
||||||
|
features.extend([np.mean(vals), np.std(vals), np.min(vals), np.max(vals)])
|
||||||
|
else:
|
||||||
|
features.extend([0.0, 0.0, 0.0, 0.0])
|
||||||
|
|
||||||
|
return np.array(features, dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
class AgentBehaviorModel(BehaviorModel):
|
class AgentBehaviorModel(BehaviorModel):
|
||||||
def __init__(self, src_dir: str):
|
def __init__(self, src_dir: str):
|
||||||
super().__init__(src_dir, AgentLoader)
|
super().__init__(src_dir, AgentLoader)
|
||||||
|
|||||||
Reference in New Issue
Block a user