mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
unified separability writing
This commit is contained in:
@@ -3,10 +3,13 @@
|
||||
Computes divergence signals delta_H, delta_A from session trajectories using
|
||||
transition kernel estimation and KL divergence to prototype behavioral profiles.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Dict, List, Tuple, TYPE_CHECKING
|
||||
import numpy as np
|
||||
|
||||
from lib.agent_probability import DEFAULT_AGENT_PRIOR, estimate_agent_probability
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .simplified import Event, Session
|
||||
|
||||
@@ -32,7 +35,10 @@ TRANS_A = {
|
||||
def kl_div(p: Dict[str, float], q: Dict[str, float], eps: float = 1e-10) -> float:
|
||||
"""KL divergence D_KL(p || q) for discrete distributions."""
|
||||
keys = set(p.keys()) | set(q.keys())
|
||||
return sum(p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps)) for k in keys)
|
||||
return sum(
|
||||
p.get(k, eps) * np.log((p.get(k, eps) + eps) / (q.get(k, eps) + eps))
|
||||
for k in keys
|
||||
)
|
||||
|
||||
|
||||
def build_kernel(events: List["Event"]) -> Dict[str, Dict[str, float]]:
|
||||
@@ -44,7 +50,11 @@ def build_kernel(events: List["Event"]) -> Dict[str, Dict[str, float]]:
|
||||
trans.setdefault(prev, {})
|
||||
trans[prev][curr] = trans[prev].get(curr, 0) + 1
|
||||
prev = curr
|
||||
return {s: {d: c / sum(dsts.values()) for d, c in dsts.items()} for s, dsts in trans.items() if sum(dsts.values()) > 0}
|
||||
return {
|
||||
s: {d: c / sum(dsts.values()) for d, c in dsts.items()}
|
||||
for s, dsts in trans.items()
|
||||
if sum(dsts.values()) > 0
|
||||
}
|
||||
|
||||
|
||||
def compute_divergence(session: "Session") -> Tuple[float, float]:
|
||||
@@ -55,18 +65,35 @@ def compute_divergence(session: "Session") -> Tuple[float, float]:
|
||||
"""
|
||||
kernel = build_kernel(session.events)
|
||||
if not kernel:
|
||||
return 0.5, 0.5
|
||||
delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / len(kernel)
|
||||
delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / len(kernel)
|
||||
return 0.0, 0.0
|
||||
delta_h = sum(kl_div(kernel.get(s, {}), TRANS_H.get(s, {})) for s in kernel) / len(
|
||||
kernel
|
||||
)
|
||||
delta_a = sum(kl_div(kernel.get(s, {}), TRANS_A.get(s, {})) for s in kernel) / len(
|
||||
kernel
|
||||
)
|
||||
return delta_h, delta_a
|
||||
|
||||
|
||||
def estimate_alpha(session: "Session", beta: float = 2.0) -> float:
|
||||
"""Per-session contamination estimate alpha_hat = sigma(beta*(delta_H - delta_A)).
|
||||
def estimate_alpha(
|
||||
session: "Session",
|
||||
beta: float = 2.0,
|
||||
prior_agent: float = DEFAULT_AGENT_PRIOR,
|
||||
) -> float:
|
||||
"""Per-session contamination estimate alpha_hat = sigma((delta_H - delta_A) / T).
|
||||
|
||||
Returns probability session is agent-generated based on behavioral divergence.
|
||||
"""
|
||||
dh, da = compute_divergence(session)
|
||||
if (dh + da) <= 0:
|
||||
return 0.5
|
||||
return 1.0 / (1.0 + np.exp(-beta * (dh - da)))
|
||||
return float(prior_agent)
|
||||
if beta <= 0:
|
||||
return estimate_agent_probability(
|
||||
dh, da, temperature=1.0, prior_agent=prior_agent
|
||||
)
|
||||
return estimate_agent_probability(
|
||||
delta_h=dh,
|
||||
delta_a=da,
|
||||
temperature=1.0 / beta,
|
||||
prior_agent=prior_agent,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user