feat: initial feature engineering of trajectories

This commit is contained in:
2026-01-21 14:05:39 +01:00
parent b05b510f70
commit 440371dba4

View File

@@ -1,4 +1,7 @@
from loader import Loader, AgentLoader, JointLoader
try:
from loader import Loader, AgentLoader, JointLoader
except ImportError:
from sim.rl.behavior_loader.loader import Loader, AgentLoader, JointLoader
from collections import defaultdict
from typing import Dict, List, Tuple, Set
import numpy as np
@@ -80,6 +83,50 @@ class BehaviorModel:
path.append(curr)
return path
def extract_trajectory_features(self, events: List, max_trans_dim: int = 50) -> np.ndarray:
"""Convert trajectory to feature vector using MDP structure for contrastive learning"""
if not self.mdp:
self.build_MDP()
states = [self._state_repr(e) for e in sorted(events, key=self._sort_key)]
features = []
# transition histogram over MDP state space
trans_counts = defaultdict(int)
for s, s_next in zip(states, states[1:]):
trans_counts[(s, s_next)] += 1
all_trans = [(s, t) for s in self.mdp['states'] for t in self.mdp['transitions'].get(s, {}).keys()]
trans_vec = [trans_counts.get(tr, 0) for tr in all_trans[:max_trans_dim]]
trans_vec = trans_vec + [0] * (max_trans_dim - len(trans_vec)) # pad
total_trans = sum(trans_counts.values()) or 1
features.extend([v / total_trans for v in trans_vec])
# state coverage ratio
visited = set(states)
features.append(len(visited) / max(self.mdp['num_states'], 1))
# temporal entropy of transitions
if len(states) > 1:
trans_probs = [self.transition_prob(s, s_n) for s, s_n in zip(states, states[1:])]
entropy = -sum(p * np.log(p + 1e-10) for p in trans_probs if p > 0)
features.append(entropy / max(len(states), 1))
else:
features.append(0.0)
# trajectory length and unique state count
features.append(len(states))
features.append(len(visited))
# state value statistics along trajectory
vals = [self.state_value(s) for s in states]
if vals:
features.extend([np.mean(vals), np.std(vals), np.min(vals), np.max(vals)])
else:
features.extend([0.0, 0.0, 0.0, 0.0])
return np.array(features, dtype=np.float32)
class AgentBehaviorModel(BehaviorModel):
def __init__(self, src_dir: str):
super().__init__(src_dir, AgentLoader)