From 87a35fad2c9c0954de5332edf4a55b53ca6b7049 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Tue, 13 Jan 2026 16:42:50 +0100 Subject: [PATCH] feat: joint loader --- sim/rl/behavior_loader/loader.py | 47 ++++++++++++++++++++++++++------ sim/rl/behavior_loader/models.py | 32 +++++++++++++++++++++- 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/sim/rl/behavior_loader/loader.py b/sim/rl/behavior_loader/loader.py index bd18442..620576c 100644 --- a/sim/rl/behavior_loader/loader.py +++ b/sim/rl/behavior_loader/loader.py @@ -71,13 +71,44 @@ class AgentLoader(Loader): sessions[entry] = [i for i in ints if not self._is_admin_page_simple(i)] return sessions -if __name__ == "__main__": - DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/" - loader = AgentLoader(DIR) - _, n = loader.get_entries() - print(f"Loaded {n} sessions from {DIR}") +class JointLoader: + """Loader for combined human (Kafka) and agent (direct) data without discrimination""" - DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" - loader = Loader(DIR) + def __init__(self, human_dir: str, agent_dir: str): + self.human_dir = human_dir + self.agent_dir = agent_dir + self.human_loader = Loader(human_dir) + self.agent_loader = AgentLoader(agent_dir) + self.data = self._load_joint_sessions() + self.entries = list(self.data.keys()) + + def _load_joint_sessions(self) -> dict: + sessions = {} + # load human sessions (unwrap from Kafka format to PayloadModel) + for sid, evts in self.human_loader.get_data().items(): + sessions[f"human_{sid}"] = [evt.value.payload for evt in evts] + # load agent sessions (already PayloadModel) + for sid, evts in self.agent_loader.get_data().items(): + sessions[f"agent_{sid}"] = evts + return sessions + + def get_data(self) -> dict: + return self.data + + def get_entries(self) -> tuple[list[str], int]: + return self.entries, len(self.entries) + +if __name__ == "__main__": + AGENT_DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/" + loader = AgentLoader(AGENT_DIR) _, n = loader.get_entries() - print(f"Loaded {n} sessions from {DIR}") + print(f"Loaded {n} agent sessions from {AGENT_DIR}") + + HUMAN_DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" + loader = Loader(HUMAN_DIR) + _, n = loader.get_entries() + print(f"Loaded {n} human sessions from {HUMAN_DIR}") + + joint_loader = JointLoader(HUMAN_DIR, AGENT_DIR) + _, n = joint_loader.get_entries() + print(f"Loaded {n} total sessions (combined) from joint loader") diff --git a/sim/rl/behavior_loader/models.py b/sim/rl/behavior_loader/models.py index 7254606..46ac99d 100644 --- a/sim/rl/behavior_loader/models.py +++ b/sim/rl/behavior_loader/models.py @@ -1,5 +1,5 @@ from experiments.agents.base import Agent -from loader import Loader, AgentLoader +from loader import Loader, AgentLoader, JointLoader from collections import defaultdict from typing import Dict, List, Tuple, Set import numpy as np @@ -109,6 +109,28 @@ class AgentBehaviorModel(BehaviorModel): trajectories.append(states) return trajectories +class JointBehaviorModel(BehaviorModel): + """behavior model for combined human+agent data (flat PayloadModel distribution)""" + + def __init__(self, human_dir: str = DIR, agent_dir: str = AGENT_DIR): + self.loader = JointLoader(human_dir, agent_dir) + self.data = self.loader.get_data() + self.entries, self.num_entries = self.loader.get_entries() + self.mdp = None + + def _state_repr(self, evt) -> str: + # direct access to PayloadModel fields (JointLoader unwraps to PayloadModel) + return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}" + + def _extract_sessions(self): + trajectories = [] + for sid, evts in self.data.items(): + if len(evts) < 2: continue + # sort by timestamp string (ISO format sorts lexicographically) + states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.ts)] + trajectories.append(states) + return trajectories + def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]: """aggregate state transitions by event type and normalize""" evt_trans = defaultdict(lambda: defaultdict(float)) @@ -209,3 +231,11 @@ if __name__ == "__main__": print(f"\nMost divergent event types:") for evt, kl in kl_divs: print(f" {evt}: {kl:.4f}") + + # build joint model (combined distribution) + print("\n=== Joint Model (Human + Agent Combined) ===") + joint_model = JointBehaviorModel() + joint_mdp = joint_model.build_MDP() + print(f"Built joint MDP: {joint_mdp['num_states']} states, {sum(len(t) for t in joint_mdp['transitions'].values())} transitions") + if joint_mdp['states']: + visualize_mdp(joint_model, threshold=0.05, output="joint_mdp_viz", fmt="pdf", export_dot=True)