diff --git a/engine/lib/coi.py b/engine/lib/coi.py index ed18672..7b6ecd2 100644 --- a/engine/lib/coi.py +++ b/engine/lib/coi.py @@ -1,12 +1,15 @@ import numpy as np from typing import Dict +from lib.agent_probability import DEFAULT_AGENT_PRIOR, estimate_agent_probability + def compute_agent_probability( trajectory: list, human_transitions: Dict, agent_transitions: Dict, temperature: float = 1.0, + prior_agent: float = DEFAULT_AGENT_PRIOR, ) -> float: """estimate agent probability via KL divergence between trajectory transitions and reference models @@ -18,10 +21,10 @@ def compute_agent_probability( agent_transitions: reference transition dict from agent MDP (event->event->prob) returns: - agent probability in [0, 1] via softmax over KL divergences + agent probability in [0, 1] via sigma((delta_h - delta_a) / T) """ if len(trajectory) < 2: - return 0.0 # insufficient data, assume human + return float(prior_agent) # build empirical transition distribution from trajectory trans_counts = {} @@ -54,11 +57,12 @@ def compute_agent_probability( kl_human = kl_div(empirical, human_transitions) kl_agent = kl_div(empirical, agent_transitions) - # convert to probability via softmax (lower KL = higher prob) - t = float(max(temperature, 1e-6)) - exp_h = np.exp(-kl_human / t) - exp_a = np.exp(-kl_agent / t) - return float(exp_a / (exp_h + exp_a + 1e-10)) + return estimate_agent_probability( + delta_h=kl_human, + delta_a=kl_agent, + temperature=temperature, + prior_agent=prior_agent, + ) def extract_purchases(trajectories: list) -> Dict[int, int]: diff --git a/lib/separability.py b/lib/separability.py index a93ddeb..410d7db 100644 --- a/lib/separability.py +++ b/lib/separability.py @@ -7,10 +7,9 @@ from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Sequence -import joblib import numpy as np -from experiments.ml.arch import featurize_trajectory +from lib.agent_probability import DEFAULT_AGENT_PRIOR, estimate_agent_probability DEFAULT_ARTIFACT_DIR = Path("data/separability") @@ -18,11 +17,7 @@ DEFAULT_ARTIFACT_DIR = Path("data/separability") @dataclass class SeparabilityArtifacts: - scaler: object - classifier: object - states: List[str] event_transitions: Dict[str, Dict[str, float]] - feature_dim: int def _normalize_events(raw_events: Sequence[object]) -> List[object]: @@ -36,7 +31,9 @@ def _normalize_events(raw_events: Sequence[object]) -> List[object]: return events -def _event_transition_distribution(events: Sequence[object]) -> Dict[str, Dict[str, float]]: +def _event_transition_distribution( + events: Sequence[object], +) -> Dict[str, Dict[str, float]]: counts: Dict[str, Dict[str, int]] = {} for src_evt, dst_evt in zip(events, events[1:]): src_name = getattr(src_evt, "eventName", "unknown") @@ -47,11 +44,15 @@ def _event_transition_distribution(events: Sequence[object]) -> Dict[str, Dict[s distribution: Dict[str, Dict[str, float]] = {} for src, dsts in counts.items(): total = float(sum(dsts.values())) - distribution[src] = {dst: val / total for dst, val in dsts.items()} if total else {} + distribution[src] = ( + {dst: val / total for dst, val in dsts.items()} if total else {} + ) return distribution -def _kl_divergence(p: Dict[str, Dict[str, float]], q: Dict[str, Dict[str, float]]) -> float: +def _kl_divergence( + p: Dict[str, Dict[str, float]], q: Dict[str, Dict[str, float]] +) -> float: eps = 1e-10 total = 0.0 for src, dsts in p.items(): @@ -61,28 +62,28 @@ def _kl_divergence(p: Dict[str, Dict[str, float]], q: Dict[str, Dict[str, float] return float(total) -def load_artifacts(artifact_dir: Path | str = DEFAULT_ARTIFACT_DIR) -> SeparabilityArtifacts: +def load_artifacts( + artifact_dir: Path | str = DEFAULT_ARTIFACT_DIR, +) -> SeparabilityArtifacts: artifact_dir = Path(artifact_dir) - scaler_path = artifact_dir / "scaler.joblib" - model_path = artifact_dir / "classifier.joblib" metadata_path = artifact_dir / "metadata.json" - if not (scaler_path.exists() and model_path.exists() and metadata_path.exists()): + if not metadata_path.exists(): raise FileNotFoundError( - f"Separability artifacts not found in {artifact_dir}. Run sim.strong_learner.train first." + f"Separability metadata not found in {artifact_dir}. Provide metadata.json with event transitions." ) - scaler = joblib.load(scaler_path) - classifier = joblib.load(model_path) with open(metadata_path, "r", encoding="utf-8") as fin: metadata = json.load(fin) + transitions = metadata.get("event_transitions") + if not isinstance(transitions, dict): + raise ValueError( + "metadata.json must contain an 'event_transitions' object with 'human' and 'agent' kernels" + ) + return SeparabilityArtifacts( - scaler=scaler, - classifier=classifier, - states=list(metadata["reference_states"]), - event_transitions=metadata["event_transitions"], - feature_dim=int(metadata["feature_dim"]), + event_transitions=transitions, ) @@ -92,37 +93,44 @@ def score_session( ) -> dict: events = _normalize_events(raw_events) if not events: - return {"prob_agent": 0.0, "delta_h": 0.0, "delta_a": 0.0} - - reference_mdp = {"states": artifacts.states} - features = featurize_trajectory(events, mdp=reference_mdp, input_dim=artifacts.feature_dim) - scaled = artifacts.scaler.transform(features.reshape(1, -1)) - prob_agent = float(artifacts.classifier.predict_proba(scaled)[0, 1]) + return { + "prob_agent": float(DEFAULT_AGENT_PRIOR), + "delta_h": 0.0, + "delta_a": 0.0, + "gap": 0.0, + } session_dist = _event_transition_distribution(events) delta_h = _kl_divergence(session_dist, artifacts.event_transitions.get("human", {})) delta_a = _kl_divergence(session_dist, artifacts.event_transitions.get("agent", {})) + gap = float(delta_h - delta_a) + prob_agent = estimate_agent_probability(delta_h=delta_h, delta_a=delta_a) return { "prob_agent": prob_agent, "delta_h": delta_h, "delta_a": delta_a, + "gap": gap, } -def estimate_alpha(prob_agent: float, delta_h: float, delta_a: float, temperature: float = 1.0) -> float: - divergence_mass = delta_h + delta_a - if divergence_mass <= 1e-8: - return float(prob_agent) - - ratio = delta_a / divergence_mass - blended = 0.5 * prob_agent + 0.5 * ratio - if temperature <= 0: - return float(np.clip(blended, 0.0, 1.0)) - - scaled = 1.0 / (1.0 + np.exp(-temperature * (blended - 0.5))) - return float(np.clip(scaled, 0.0, 1.0)) +def estimate_alpha( + prob_agent: float, + delta_h: float, + delta_a: float, + temperature: float = 1.0, + prior_agent: float = DEFAULT_AGENT_PRIOR, +) -> float: + _ = prob_agent + return estimate_agent_probability( + delta_h=delta_h, + delta_a=delta_a, + temperature=temperature, + prior_agent=prior_agent, + ) -def score_sessions(raw_sessions: Iterable[Sequence[object]], artifacts: SeparabilityArtifacts) -> List[dict]: +def score_sessions( + raw_sessions: Iterable[Sequence[object]], artifacts: SeparabilityArtifacts +) -> List[dict]: return [score_session(events, artifacts) for events in raw_sessions] diff --git a/sim/case/thesis_simplified/separability.py b/sim/case/thesis_simplified/separability.py index eaabaa3..74ece46 100644 --- a/sim/case/thesis_simplified/separability.py +++ b/sim/case/thesis_simplified/separability.py @@ -3,10 +3,13 @@ Computes divergence signals delta_H, delta_A from session trajectories using transition kernel estimation and KL divergence to prototype behavioral profiles. """ + from __future__ import annotations from typing import Dict, List, Tuple, TYPE_CHECKING import numpy as np +from lib.agent_probability import DEFAULT_AGENT_PRIOR, estimate_agent_probability + if TYPE_CHECKING: from .simplified import Event, Session @@ -32,7 +35,10 @@ TRANS_A = { def kl_div(p: Dict[str, float], q: Dict[str, float], eps: float = 1e-10) -> float: """KL divergence D_KL(p || q) for discrete distributions.""" keys = set(p.keys()) | set(q.keys()) - return sum(p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps)) for k in keys) + return sum( + p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps)) + for k in keys + ) def build_kernel(events: List["Event"]) -> Dict[str, Dict[str, float]]: @@ -44,7 +50,11 @@ def build_kernel(events: List["Event"]) -> Dict[str, Dict[str, float]]: trans.setdefault(prev, {}) trans[prev][curr] = trans[prev].get(curr, 0) + 1 prev = curr - return {s: {d: c / sum(dsts.values()) for d, c in dsts.items()} for s, dsts in trans.items() if sum(dsts.values()) > 0} + return { + s: {d: c / sum(dsts.values()) for d, c in dsts.items()} + for s, dsts in trans.items() + if sum(dsts.values()) > 0 + } def compute_divergence(session: "Session") -> Tuple[float, float]: @@ -55,18 +65,35 @@ def compute_divergence(session: "Session") -> Tuple[float, float]: """ kernel = build_kernel(session.events) if not kernel: - return 0.5, 0.5 - delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / len(kernel) - delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / len(kernel) + return 0.0, 0.0 + delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / len( + kernel + ) + delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / len( + kernel + ) return delta_h, delta_a -def estimate_alpha(session: "Session", beta: float = 2.0) -> float: - """Per-session contamination estimate alpha_hat = sigma(beta*(delta_H - delta_A)). +def estimate_alpha( + session: "Session", + beta: float = 2.0, + prior_agent: float = DEFAULT_AGENT_PRIOR, +) -> float: + """Per-session contamination estimate alpha_hat = sigma((delta_H - delta_A) / T). Returns probability session is agent-generated based on behavioral divergence. """ dh, da = compute_divergence(session) if (dh + da) <= 0: - return 0.5 - return 1.0 / (1.0 + np.exp(-beta * (dh - da))) + return float(prior_agent) + if beta <= 0: + return estimate_agent_probability( + dh, da, temperature=1.0, prior_agent=prior_agent + ) + return estimate_agent_probability( + delta_h=dh, + delta_a=da, + temperature=1.0 / beta, + prior_agent=prior_agent, + ) diff --git a/sim/rl/jax_core/separability.py b/sim/rl/jax_core/separability.py index c0c0293..c4165a7 100644 --- a/sim/rl/jax_core/separability.py +++ b/sim/rl/jax_core/separability.py @@ -1,14 +1,24 @@ """Vectorized KL divergence for separability scoring.""" + import numpy as np from typing import Tuple +from lib.agent_probability import ( + DEFAULT_AGENT_PRIOR, + estimate_agent_probability_batch, +) + try: import jax.numpy as jnp from jax import jit + JAX_AVAILABLE = True except ImportError: jnp, JAX_AVAILABLE = np, False - def jit(f): return f + + def jit(f): + return f + @jit def batch_kl(P, Q_human, Q_agent, eps=1e-10): @@ -20,10 +30,15 @@ def batch_kl(P, Q_human, Q_agent, eps=1e-10): delta_a = jnp.sum(p * jnp.log(p / qa), axis=(1, 2)) return delta_h, delta_a -def compute_divergences(session_trans: np.ndarray, ref_human: np.ndarray, ref_agent: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + +def compute_divergences( + session_trans: np.ndarray, ref_human: np.ndarray, ref_agent: np.ndarray +) -> Tuple[np.ndarray, np.ndarray]: """Compute KL divergence of each session from human/agent prototypes.""" if JAX_AVAILABLE: - dh, da = batch_kl(jnp.array(session_trans), jnp.array(ref_human), jnp.array(ref_agent)) + dh, da = batch_kl( + jnp.array(session_trans), jnp.array(ref_human), jnp.array(ref_agent) + ) return np.asarray(dh), np.asarray(da) # numpy fallback eps = 1e-10 @@ -34,10 +49,19 @@ def compute_divergences(session_trans: np.ndarray, ref_human: np.ndarray, ref_ag delta_a = np.sum(p * np.log(p / qa), axis=(1, 2)) return delta_h, delta_a -def estimate_alpha_batch(prob_agent: np.ndarray, delta_h: np.ndarray, delta_a: np.ndarray, temp: float = 1.0) -> np.ndarray: - """Vectorized alpha estimation from classifier probs and divergences.""" - mass = delta_h + delta_a - ratio = np.where(mass > 1e-8, delta_a / mass, 0.5) - blended = 0.5 * prob_agent + 0.5 * ratio - if temp <= 0: return np.clip(blended, 0.0, 1.0) - return np.clip(1.0 / (1.0 + np.exp(-temp * (blended - 0.5))), 0.0, 1.0) + +def estimate_alpha_batch( + prob_agent: np.ndarray, + delta_h: np.ndarray, + delta_a: np.ndarray, + temp: float = 1.0, + prior_agent: float = DEFAULT_AGENT_PRIOR, +) -> np.ndarray: + """Vectorized alpha estimation using divergence gap mapping.""" + _ = prob_agent + return estimate_agent_probability_batch( + delta_h=np.asarray(delta_h, dtype=float), + delta_a=np.asarray(delta_a, dtype=float), + temperature=temp, + prior_agent=prior_agent, + )