refactor models computations

This commit is contained in:
2026-01-13 16:51:00 +01:00
parent 82b54428b7
commit e9cf5f0736

View File

@@ -1,16 +1,12 @@
from experiments.agents.base import Agent
from loader import Loader, AgentLoader, JointLoader
from collections import defaultdict
from typing import Dict, List, Tuple, Set
import numpy as np
import graphviz
DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/"
AGENT_DIR = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/"
class BehaviorModel:
def __init__(self, src_dir: str = DIR):
self.loader = Loader(src_dir)
def __init__(self, src_dir: str, loader_cls=Loader):
self.loader = loader_cls(src_dir)
self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None
@@ -19,50 +15,48 @@ class BehaviorModel:
p = evt.value.payload
return f"{p.page or 'unk'}|{p.productId or 'none'}|{p.eventName}"
def _extract_sessions(self):
# transform raw events into sequential state trajectories per session
trajectories = []
for sid, evts in self.data.items():
if len(evts) < 2: continue
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.timestamp)]
trajectories.append(states)
return trajectories
def _sort_key(self, evt):
return evt.timestamp
def _calc_transitions(self, trajectories: List[List[str]]) -> Tuple[Dict, Set]:
trans = defaultdict(lambda: defaultdict(int))
states = set()
for traj in trajectories:
for i in range(len(traj) - 1):
s, s_next = traj[i], traj[i+1]
def _extract_sessions(self) -> List[List[str]]:
trajs = []
for evts in self.data.values():
if len(evts) < 2: continue
states = [self._state_repr(e) for e in sorted(evts, key=self._sort_key)]
trajs.append(states)
return trajs
def _calc_transitions(self, trajs: List[List[str]]) -> Tuple[Dict, Set]:
trans, states = defaultdict(lambda: defaultdict(int)), set()
for traj in trajs:
for s, s_next in zip(traj, traj[1:]):
trans[s][s_next] += 1
states.update([s, s_next])
return trans, states
def _calc_rewards(self, trajectories: List[List[str]]) -> Dict:
# reward based on session progression depth
def _calc_rewards(self, trajs: List[List[str]]) -> Dict:
rwd = defaultdict(list)
for traj in trajectories:
for traj in trajs:
n = len(traj)
for i, s in enumerate(traj):
rwd[s].append(i / n)
return rwd
def _normalize_trans(self, counts: Dict) -> Dict:
def _normalize_trans(self, cnts: Dict) -> Dict:
return {s: {s_n: cnt/sum(nxt.values()) for s_n, cnt in nxt.items()}
for s, nxt in counts.items()}
for s, nxt in cnts.items()}
def build_MDP(self) -> Dict:
trajs = self._extract_sessions()
trans_cnt, states = self._calc_transitions(trajs)
trans_prob = self._normalize_trans(trans_cnt)
state_rwd = self._calc_rewards(trajs)
state_val = {s: np.mean(r) for s, r in state_rwd.items()}
self.mdp = {
'states': sorted(list(states)),
'states': sorted(states),
'num_states': len(states),
'transitions': trans_prob,
'state_values': state_val,
'state_values': {s: np.mean(r) for s, r in state_rwd.items()},
'state_rewards': state_rwd,
'trans_counts': trans_cnt,
}
@@ -78,8 +72,7 @@ class BehaviorModel:
def sample_traj(self, start: str, max_len: int = 50) -> List[str]:
if not self.mdp: raise ValueError("build MDP first")
path = [start]
curr = start
path, curr = [start], start
for _ in range(max_len):
nxt = self.mdp['transitions'].get(curr, {})
if not nxt: break
@@ -88,154 +81,113 @@ class BehaviorModel:
return path
class AgentBehaviorModel(BehaviorModel):
"""behavior model for agent interaction data (simplified PayloadModel schema)"""
def __init__(self, src_dir: str = AGENT_DIR):
self.loader = AgentLoader(src_dir)
self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None
def __init__(self, src_dir: str):
super().__init__(src_dir, AgentLoader)
def _state_repr(self, evt) -> str:
# direct access to PayloadModel fields (no .value.payload nesting)
return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}"
def _extract_sessions(self):
trajectories = []
for sid, evts in self.data.items():
if len(evts) < 2: continue
# sort by timestamp string (ISO format sorts lexicographically)
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.ts)]
trajectories.append(states)
return trajectories
def _sort_key(self, evt):
return evt.ts
class JointBehaviorModel(BehaviorModel):
"""behavior model for combined human+agent data (flat PayloadModel distribution)"""
def __init__(self, human_dir: str = DIR, agent_dir: str = AGENT_DIR):
def __init__(self, human_dir: str, agent_dir: str):
self.loader = JointLoader(human_dir, agent_dir)
self.data = self.loader.get_data()
self.entries, self.num_entries = self.loader.get_entries()
self.mdp = None
def _state_repr(self, evt) -> str:
# direct access to PayloadModel fields (JointLoader unwraps to PayloadModel)
return f"{evt.page or 'unk'}|{evt.productId or 'none'}|{evt.eventName}"
def _extract_sessions(self):
trajectories = []
for sid, evts in self.data.items():
if len(evts) < 2: continue
# sort by timestamp string (ISO format sorts lexicographically)
states = [self._state_repr(e) for e in sorted(evts, key=lambda x: x.ts)]
trajectories.append(states)
return trajectories
def _sort_key(self, evt):
return evt.ts
def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]:
"""aggregate state transitions by event type and normalize"""
evt_trans = defaultdict(lambda: defaultdict(float))
for s, trans in mdp['transitions'].items():
evt_src = s.split('|')[2]
src = s.split('|')[2]
for s_next, prob in trans.items():
evt_dst = s_next.split('|')[2]
evt_trans[evt_src][evt_dst] += prob
dst = s_next.split('|')[2]
evt_trans[src][dst] += prob
# normalize aggregated transitions
for evt_src in evt_trans:
total = sum(evt_trans[evt_src].values())
for src in evt_trans:
total = sum(evt_trans[src].values())
if total > 0:
for evt_dst in evt_trans[evt_src]:
evt_trans[evt_src][evt_dst] /= total
evt_trans[src] = {dst: p/total for dst, p in evt_trans[src].items()}
return dict(evt_trans)
def visualize_mdp(model: BehaviorModel, threshold: float = 0.05, output: str = "mdp_graph", fmt: str = "svg", view: bool = False, export_dot: bool = False):
"""visualize MDP as directed graph using graphviz, aggregated by event type"""
def visualize_mdp(model: BehaviorModel, threshold: float = 0.05, output: str = "mdp_graph",
fmt: str = "svg", view: bool = False, export_dot: bool = False):
if not model.mdp: raise ValueError("build MDP first")
evt_trans = aggregate_event_transitions(model.mdp)
g = graphviz.Digraph(format=fmt)
g.attr(rankdir='LR', size='30')
g.attr('node', shape='circle', width='1', height='1')
# collect all event types
events = set(evt_trans.keys())
for trans in evt_trans.values():
events.update(trans.keys())
# add nodes for each event type
events = set(evt_trans.keys()) | {e for trans in evt_trans.values() for e in trans.keys()}
for evt in events:
g.node(evt)
# add edges above threshold
for evt_src in evt_trans:
for evt_dst, prob in evt_trans[evt_src].items():
for src, dsts in evt_trans.items():
for dst, prob in dsts.items():
if prob > threshold:
g.edge(evt_src, evt_dst, label=f'{prob:.2f}')
g.edge(src, dst, label=f'{prob:.2f}')
g.render(output, view=view, cleanup=True)
print(f"Saved MDP graph to {output}.{fmt}")
if export_dot:
dot_file = f"{output}.dot"
with open(dot_file, 'w') as f:
with open(f"{output}.dot", 'w') as f:
f.write(g.source)
print(f"Exported DOT source to {dot_file}")
print(f"Exported DOT source to {output}.dot")
return g
def kl_divergence(p: Dict[str, float], q: Dict[str, float]) -> float:
"""Compute KL divergence D_KL(P || Q) for discrete distributions P and Q."""
epsilon = 1e-10 # small constant to avoid log(0)
kl_div = 0.0
for key in p:
p_val = p[key] + epsilon
q_val = q.get(key, 0.0) + epsilon
kl_div += p_val * np.log(p_val / q_val)
return kl_div
eps = 1e-10
return sum((p[k] + eps) * np.log((p[k] + eps) / (q.get(k, 0.0) + eps)) for k in p)
if __name__ == "__main__":
human_model = BehaviorModel(DIR)
base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments"
human_dir, agent_dir = f"{base_dir}/collected_data/", f"{base_dir}/agents/collected_data/"
human_model = BehaviorModel(human_dir)
human_mdp = human_model.build_MDP()
print(f"Built MDP: {human_mdp['num_states']} states, {sum(len(t) for t in human_mdp['transitions'].values())} transitions")
print(f"Built MDP: {human_mdp['num_states']} states, "
f"{sum(len(t) for t in human_mdp['transitions'].values())} transitions")
if not human_mdp['states']:
print("No states found")
exit(1)
exit("No states found")
visualize_mdp(human_model, threshold=0.05, output="human_mdp_viz", fmt="pdf", export_dot=True)
agent_model = AgentBehaviorModel()
agent_model = AgentBehaviorModel(agent_dir)
agent_mdp = agent_model.build_MDP()
print(f"AGENT... Built MDP: {agent_mdp['num_states']} states, {sum(len(t) for t in agent_mdp['transitions'].values())} transitions")
print(f"AGENT... Built MDP: {agent_mdp['num_states']} states, "
f"{sum(len(t) for t in agent_mdp['transitions'].values())} transitions")
if not agent_mdp['states']:
print("No states found")
exit(1)
exit("No states found")
visualize_mdp(agent_model, threshold=0.05, output="agent_mdp_viz", fmt="pdf", export_dot=True)
# aggregate transitions by event type for both models
human_evt_trans = aggregate_event_transitions(human_mdp)
agent_evt_trans = aggregate_event_transitions(agent_mdp)
human_evt = aggregate_event_transitions(human_mdp)
agent_evt = aggregate_event_transitions(agent_mdp)
common = set(human_evt.keys()) & set(agent_evt.keys())
common_evts = set(human_evt_trans.keys()) & set(agent_evt_trans.keys())
if not common_evts: import sys; sys.exit("No common event types for KL divergence analysis")
if not common:
exit("No common event types for KL divergence analysis")
kl_divs = []
for evt in common_evts:
kl = kl_divergence(human_evt_trans[evt], agent_evt_trans[evt])
kl_divs.append((evt, kl))
kl_divs = sorted([(e, kl_divergence(human_evt[e], agent_evt[e])) for e in common],
key=lambda x: x[1], reverse=True)
kl_divs.sort(key=lambda x: x[1], reverse=True)
avg_kl = np.mean([kl for _, kl in kl_divs])
print(f"Average KL divergence: {avg_kl:.4f}")
print(f"\nMost divergent event types:")
print(f"Average KL divergence: {np.mean([kl for _, kl in kl_divs]):.4f}")
print("\nMost divergent event types:")
for evt, kl in kl_divs:
print(f" {evt}: {kl:.4f}")
# build joint model (combined distribution)
print("\n=== Joint Model (Human + Agent Combined) ===")
joint_model = JointBehaviorModel()
joint_model = JointBehaviorModel(human_dir, agent_dir)
joint_mdp = joint_model.build_MDP()
print(f"Built joint MDP: {joint_mdp['num_states']} states, {sum(len(t) for t in joint_mdp['transitions'].values())} transitions")
print(f"Built joint MDP: {joint_mdp['num_states']} states, "
f"{sum(len(t) for t in joint_mdp['transitions'].values())} transitions")
if joint_mdp['states']:
visualize_mdp(joint_model, threshold=0.05, output="joint_mdp_viz", fmt="pdf", export_dot=True)