refactor models computations

This commit is contained in:
2026-01-13 16:51:00 +01:00
parent a1e3166322
commit 3072e5f46e

View File

@@ -1,16 +1,12 @@
from experiments.agents.base import Agent
from loader import Loader, AgentLoader, JointLoader from loader import Loader, AgentLoader, JointLoader
from collections import defaultdict from collections import defaultdict
from typing import Dict, List, Tuple, Set from typing import Dict, List, Tuple, Set
import numpy as np import numpy as np
import graphviz import graphviz
DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/"
AGENT_DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/"
class BehaviorModel: class BehaviorModel:
def __init__(self, src_dir: str = DIR): def __init__(self, src_dir: str, loader_cls=Loader):
self.loader = Loader(src_dir) self.loader = loader_cls(src_dir)
self.data = self.loader.get_data() self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries() self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None self.mdp = None
@@ -19,50 +15,48 @@ class BehaviorModel:
p = evt.value.payload p = evt.value.payload
return f"{p.page or 'unk'}|{p.productId or 'none'}|{p.eventName}" return f"{p.page or 'unk'}|{p.productId or 'none'}|{p.eventName}"
def _extract_sessions(self): def _sort_key(self, evt):
# transform raw events into sequential state trajectories per session return evt.timestamp
trajectories = []
for sid, evts in self.data.items():
if len(evts) < 2: continue
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.timestamp)]
trajectories.append(states)
return trajectories
def _calc_transitions(self, trajectories: List[List[str]]) -> Tuple[Dict, Set]: def _extract_sessions(self) -> List[List[str]]:
trans = defaultdict(lambda: defaultdict(int)) trajs = []
states = set() for evts in self.data.values():
for traj in trajectories: if len(evts) < 2: continue
for i in range(len(traj) - 1): states = [self._state_repr(e) for e in sorted(evts, key=self._sort_key)]
s, s_next = traj[i], traj[i+1] trajs.append(states)
return trajs
def _calc_transitions(self, trajs: List[List[str]]) -> Tuple[Dict, Set]:
trans, states = defaultdict(lambda: defaultdict(int)), set()
for traj in trajs:
for s, s_next in zip(traj, traj[1:]):
trans[s][s_next] += 1 trans[s][s_next] += 1
states.update([s, s_next]) states.update([s, s_next])
return trans, states return trans, states
def _calc_rewards(self, trajectories: List[List[str]]) -> Dict: def _calc_rewards(self, trajs: List[List[str]]) -> Dict:
# reward based on session progression depth
rwd = defaultdict(list) rwd = defaultdict(list)
for traj in trajectories: for traj in trajs:
n = len(traj) n = len(traj)
for i, s in enumerate(traj): for i, s in enumerate(traj):
rwd[s].append(i / n) rwd[s].append(i / n)
return rwd return rwd
def _normalize_trans(self, counts: Dict) -> Dict: def _normalize_trans(self, cnts: Dict) -> Dict:
return {s: {s_n: cnt/sum(nxt.values()) for s_n, cnt in nxt.items()} return {s: {s_n: cnt/sum(nxt.values()) for s_n, cnt in nxt.items()}
for s, nxt in counts.items()} for s, nxt in cnts.items()}
def build_MDP(self) -> Dict: def build_MDP(self) -> Dict:
trajs = self._extract_sessions() trajs = self._extract_sessions()
trans_cnt, states = self._calc_transitions(trajs) trans_cnt, states = self._calc_transitions(trajs)
trans_prob = self._normalize_trans(trans_cnt) trans_prob = self._normalize_trans(trans_cnt)
state_rwd = self._calc_rewards(trajs) state_rwd = self._calc_rewards(trajs)
state_val = {s: np.mean(r) for s, r in state_rwd.items()}
self.mdp = { self.mdp = {
'states': sorted(list(states)), 'states': sorted(states),
'num_states': len(states), 'num_states': len(states),
'transitions': trans_prob, 'transitions': trans_prob,
'state_values': state_val, 'state_values': {s: np.mean(r) for s, r in state_rwd.items()},
'state_rewards': state_rwd, 'state_rewards': state_rwd,
'trans_counts': trans_cnt, 'trans_counts': trans_cnt,
} }
@@ -78,8 +72,7 @@ class BehaviorModel:
def sample_traj(self, start: str, max_len: int = 50) -> List[str]: def sample_traj(self, start: str, max_len: int = 50) -> List[str]:
if not self.mdp: raise ValueError("build MDP first") if not self.mdp: raise ValueError("build MDP first")
path = [start] path, curr = [start], start
curr = start
for _ in range(max_len): for _ in range(max_len):
nxt = self.mdp['transitions'].get(curr, {}) nxt = self.mdp['transitions'].get(curr, {})
if not nxt: break if not nxt: break
@@ -88,154 +81,113 @@ class BehaviorModel:
return path return path
class AgentBehaviorModel(BehaviorModel): class AgentBehaviorModel(BehaviorModel):
"""behavior model for agent interaction data (simplified PayloadModel schema)""" def __init__(self, src_dir: str):
super().__init__(src_dir, AgentLoader)
def __init__(self, src_dir: str = AGENT_DIR):
self.loader = AgentLoader(src_dir)
self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None
def _state_repr(self, evt) -> str: def _state_repr(self, evt) -> str:
# direct access to PayloadModel fields (no .value.payload nesting)
return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}" return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}"
def _extract_sessions(self): def _sort_key(self, evt):
trajectories = [] return evt.ts
for sid, evts in self.data.items():
if len(evts) < 2: continue
# sort by timestamp string (ISO format sorts lexicographically)
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.ts)]
trajectories.append(states)
return trajectories
class JointBehaviorModel(BehaviorModel): class JointBehaviorModel(BehaviorModel):
"""behavior model for combined human+agent data (flat PayloadModel distribution)""" def __init__(self, human_dir: str, agent_dir: str):
def __init__(self, human_dir: str = DIR, agent_dir: str = AGENT_DIR):
self.loader = JointLoader(human_dir, agent_dir) self.loader = JointLoader(human_dir, agent_dir)
self.data = self.loader.get_data() self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries() self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None self.mdp = None
def _state_repr(self, evt) -> str: def _state_repr(self, evt) -> str:
# direct access to PayloadModel fields (JointLoader unwraps to PayloadModel)
return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}" return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}"
def _extract_sessions(self): def _sort_key(self, evt):
trajectories = [] return evt.ts
for sid, evts in self.data.items():
if len(evts) < 2: continue
# sort by timestamp string (ISO format sorts lexicographically)
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.ts)]
trajectories.append(states)
return trajectories
def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]: def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]:
"""aggregate state transitions by event type and normalize"""
evt_trans = defaultdict(lambda: defaultdict(float)) evt_trans = defaultdict(lambda: defaultdict(float))
for s, trans in mdp['transitions'].items(): for s, trans in mdp['transitions'].items():
evt_src = s.split('|')[2] src = s.split('|')[2]
for s_next, prob in trans.items(): for s_next, prob in trans.items():
evt_dst = s_next.split('|')[2] dst = s_next.split('|')[2]
evt_trans[evt_src][evt_dst] += prob evt_trans[src][dst] += prob
# normalize aggregated transitions for src in evt_trans:
for evt_src in evt_trans: total = sum(evt_trans[src].values())
total = sum(evt_trans[evt_src].values())
if total > 0: if total > 0:
for evt_dst in evt_trans[evt_src]: evt_trans[src] = {dst: p/total for dst, p in evt_trans[src].items()}
evt_trans[evt_src][evt_dst] /= total
return dict(evt_trans) return dict(evt_trans)
def visualize_mdp(model: BehaviorModel, threshold: float = 0.05, output: str = "mdp_graph", fmt: str = "svg", view: bool = False, export_dot: bool = False): def visualize_mdp(model: BehaviorModel, threshold: float = 0.05, output: str = "mdp_graph",
"""visualize MDP as directed graph using graphviz, aggregated by event type""" fmt: str = "svg", view: bool = False, export_dot: bool = False):
if not model.mdp: raise ValueError("build MDP first") if not model.mdp: raise ValueError("build MDP first")
evt_trans = aggregate_event_transitions(model.mdp) evt_trans = aggregate_event_transitions(model.mdp)
g = graphviz.Digraph(format=fmt) g = graphviz.Digraph(format=fmt)
g.attr(rankdir='LR', size='30') g.attr(rankdir='LR', size='30')
g.attr('node', shape='circle', width='1', height='1') g.attr('node', shape='circle', width='1', height='1')
# collect all event types events = set(evt_trans.keys()) | {e for trans in evt_trans.values() for e in trans.keys()}
events = set(evt_trans.keys())
for trans in evt_trans.values():
events.update(trans.keys())
# add nodes for each event type
for evt in events: for evt in events:
g.node(evt) g.node(evt)
# add edges above threshold for src, dsts in evt_trans.items():
for evt_src in evt_trans: for dst, prob in dsts.items():
for evt_dst, prob in evt_trans[evt_src].items():
if prob > threshold: if prob > threshold:
g.edge(evt_src, evt_dst, label=f'{prob:.2f}') g.edge(src, dst, label=f'{prob:.2f}')
g.render(output, view=view, cleanup=True) g.render(output, view=view, cleanup=True)
print(f"Saved MDP graph to {output}.{fmt}") print(f"Saved MDP graph to {output}.{fmt}")
if export_dot: if export_dot:
dot_file = f"{output}.dot" with open(f"{output}.dot", 'w') as f:
with open(dot_file, 'w') as f:
f.write(g.source) f.write(g.source)
print(f"Exported DOT source to {dot_file}") print(f"Exported DOT source to {output}.dot")
return g return g
def kl_divergence(p: Dict[str, float], q: Dict[str, float]) -> float: def kl_divergence(p: Dict[str, float], q: Dict[str, float]) -> float:
"""Compute KL divergence D_KL(P || Q) for discrete distributions P and Q.""" eps = 1e-10
epsilon = 1e-10 # small constant to avoid log(0) return sum((p[k] + eps) * np.log((p[k] + eps) / (q.get(k, 0.0) + eps)) for k in p)
kl_div = 0.0
for key in p:
p_val = p[key] + epsilon
q_val = q.get(key, 0.0) + epsilon
kl_div += p_val * np.log(p_val / q_val)
return kl_div
if __name__ == "__main__": if __name__ == "__main__":
human_model = BehaviorModel(DIR) base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments"
human_dir, agent_dir = f"{base_dir}/collected_data/", f"{base_dir}/agents/collected_data/"
human_model = BehaviorModel(human_dir)
human_mdp = human_model.build_MDP() human_mdp = human_model.build_MDP()
print(f"Built MDP: {human_mdp['num_states']} states, {sum(len(t) for t in human_mdp['transitions'].values())} transitions") print(f"Built MDP: {human_mdp['num_states']} states, "
f"{sum(len(t) for t in human_mdp['transitions'].values())} transitions")
if not human_mdp['states']: if not human_mdp['states']:
print("No states found") exit("No states found")
exit(1)
visualize_mdp(human_model, threshold=0.05, output="human_mdp_viz", fmt="pdf", export_dot=True) visualize_mdp(human_model, threshold=0.05, output="human_mdp_viz", fmt="pdf", export_dot=True)
agent_model = AgentBehaviorModel() agent_model = AgentBehaviorModel(agent_dir)
agent_mdp = agent_model.build_MDP() agent_mdp = agent_model.build_MDP()
print(f"AGENT... Built MDP: {agent_mdp['num_states']} states, {sum(len(t) for t in agent_mdp['transitions'].values())} transitions") print(f"AGENT... Built MDP: {agent_mdp['num_states']} states, "
f"{sum(len(t) for t in agent_mdp['transitions'].values())} transitions")
if not agent_mdp['states']: if not agent_mdp['states']:
print("No states found") exit("No states found")
exit(1)
visualize_mdp(agent_model, threshold=0.05, output="agent_mdp_viz", fmt="pdf", export_dot=True) visualize_mdp(agent_model, threshold=0.05, output="agent_mdp_viz", fmt="pdf", export_dot=True)
# aggregate transitions by event type for both models human_evt = aggregate_event_transitions(human_mdp)
human_evt_trans = aggregate_event_transitions(human_mdp) agent_evt = aggregate_event_transitions(agent_mdp)
agent_evt_trans = aggregate_event_transitions(agent_mdp) common = set(human_evt.keys()) & set(agent_evt.keys())
common_evts = set(human_evt_trans.keys()) & set(agent_evt_trans.keys()) if not common:
if not common_evts: import sys; sys.exit("No common event types for KL divergence analysis") exit("No common event types for KL divergence analysis")
kl_divs = [] kl_divs = sorted([(e, kl_divergence(human_evt[e], agent_evt[e])) for e in common],
for evt in common_evts: key=lambda x: x[1], reverse=True)
kl = kl_divergence(human_evt_trans[evt], agent_evt_trans[evt])
kl_divs.append((evt, kl))
kl_divs.sort(key=lambda x: x[1], reverse=True) print(f"Average KL divergence: {np.mean([kl for _, kl in kl_divs]):.4f}")
avg_kl = np.mean([kl for _, kl in kl_divs]) print("\nMost divergent event types:")
print(f"Average KL divergence: {avg_kl:.4f}")
print(f"\nMost divergent event types:")
for evt, kl in kl_divs: for evt, kl in kl_divs:
print(f" {evt}: {kl:.4f}") print(f" {evt}: {kl:.4f}")
# build joint model (combined distribution)
print("\n=== Joint Model (Human + Agent Combined) ===") print("\n=== Joint Model (Human + Agent Combined) ===")
joint_model = JointBehaviorModel() joint_model = JointBehaviorModel(human_dir, agent_dir)
joint_mdp = joint_model.build_MDP() joint_mdp = joint_model.build_MDP()
print(f"Built joint MDP: {joint_mdp['num_states']} states, {sum(len(t) for t in joint_mdp['transitions'].values())} transitions") print(f"Built joint MDP: {joint_mdp['num_states']} states, "
f"{sum(len(t) for t in joint_mdp['transitions'].values())} transitions")
if joint_mdp['states']: if joint_mdp['states']:
visualize_mdp(joint_model, threshold=0.05, output="joint_mdp_viz", fmt="pdf", export_dot=True) visualize_mdp(joint_model, threshold=0.05, output="joint_mdp_viz", fmt="pdf", export_dot=True)