class separaiblity significance

This commit is contained in:
2026-02-28 21:38:46 +01:00
parent 8f20359c8c
commit 233ce3be34
5 changed files with 285 additions and 57 deletions

View File

@@ -48,6 +48,7 @@ DEFAULT_CFG = {
"lambda_coi": 0.2,
"robust_radius": 0.15,
"robust_points": 5,
"no_robust": False,
"info_value": 1.0,
"price_low": 10.0,
"price_high": 150.0,
@@ -91,8 +92,10 @@ DEFAULT_CFG = {
def _truthy(value: str | bool | None) -> bool:
if isinstance(value, bool): return value
if value is None: return False
if isinstance(value, bool):
return value
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on"}
@@ -104,6 +107,11 @@ def _cfg(raw: dict | None = None) -> dict:
cfg["use_jax"] = _truthy(cfg.get("use_jax")) or _truthy(
os.environ.get("PHANTOM_USE_JAX")
)
cfg["no_robust"] = _truthy(cfg.get("no_robust"))
if cfg["no_robust"]:
cfg["lambda_coi"] = 0.0
cfg["robust_radius"] = 0.0
cfg["robust_points"] = 1
return cfg
@@ -473,6 +481,7 @@ def main():
p.add_argument("--info-value", type=float)
p.add_argument("--robust-radius", type=float)
p.add_argument("--robust-points", type=int)
p.add_argument("--no-robust", action="store_true")
p.add_argument("--learning-rate", type=float)
p.add_argument("--gamma", type=float)
p.add_argument("--gae-lambda", type=float)
@@ -514,6 +523,7 @@ def main():
"info_value": args.info_value,
"robust_radius": args.robust_radius,
"robust_points": args.robust_points,
"no_robust": args.no_robust,
"learning_rate": args.learning_rate,
"gamma": args.gamma,
"gae_lambda": args.gae_lambda,

View File

@@ -297,8 +297,13 @@ To train a robust pricing learner, we need a simulator that can generate realist
\subsubsection{Ground-Truth Separability}
Because sessions are collected under controlled experimental conditions where each actor is assigned a known type at the start of the trial, labels $y_s \in \{H, A\}$ are available as ground truth rather than as the output of a heuristic classifier. We therefore estimate separate transition kernels directly from each labeled partition $\mathcal{D}_H$ and $\mathcal{D}_A$, treating the resulting $\hat{\mathcal{T}}_H$ and $\hat{\mathcal{T}}_A$ as the ground-truth behavioral profiles for each class. We then ask a direct methodological question: are the kernels separable enough to justify downstream pricing control that depends on that separability?
To answer this, we compute average KL divergence between transition probability matrices. This statistic gives global separability and event-level diagnostics at the same time. In our balanced dataset (50\% human, 50\% agent), the average divergence is approximately $1.8$. To contextualize this divergence metric we compare with an intra-class comparison baseline of randomly selected transitions.
% To contextualize this figure a useful intra-class baseline is to randomly split D_H into two equal halves, estimate a kernel from each half, compute the same average KL statistic, and repeat for B bootstrap samples (e.g. B=100). The resulting null distribution (mean +/- std) gives the divergence expected purely from estimation noise at this sample size. A between-class KL substantially above this null confirms the separation is real and not a finite-sample artefact. In practice: for each of B splits, partition D_H 50/50 without replacement, run build_kernel() on each half, average the per-state KL values, and collect the B scores into a reference distribution to compare against the 1.8 figure.
To answer this, we compute average KL divergence between transition probability matrices. This statistic gives global separability and event-level diagnostics at the same time. To test whether the observed between-class value exceeds finite-sample estimation noise, we compute an intra-class bootstrap baseline by repeatedly splitting $\mathcal{D}_H$ and $\mathcal{D}_A$ into two random halves, fitting a transition kernel on each half, and re-computing the same average KL statistic for each split.
Formally, for $B$ bootstrap splits per class we obtain reference samples $\{d_{H,b}^{\text{intra}}\}_{b=1}^B$ and $\{d_{A,b}^{\text{intra}}\}_{b=1}^B$, then compare the between-class divergence $d^{\text{inter}}$ against the pooled null distribution. We report pooled mean and variance, lift ratio $d^{\text{inter}}/\mathbb{E}[d^{\text{intra}}]$, and the empirical one-sided p-value
\begin{equation}
\hat p = \frac{1 + \sum_{j=1}^{2B}\mathbf{1}\{d_j^{\text{intra}} \ge d^{\text{inter}}\}}{2B + 1},
\end{equation}
which gives a direct significance check for separability before using divergence-derived control signals in pricing.
\begin{definition}[Kullback-Leibler Divergence for Transition Distributions]
Let $P_e$ and $Q_e$ be categorical distributions over destination states following event $e$, derived from human and agent trajectories respectively. The KL divergence between these distributions is:

View File

@@ -8,15 +8,48 @@
\subsection{Behavioral Analysis}
Include markov chains of transition matrices, compare distributions (look at Divergence metrics)
The transition-kernel analysis is evaluated with both between-class divergence and an intra-class bootstrap null baseline. This allows us to separate real behavioral differences from finite-sample estimation noise.
\begin{table}[ht]
\centering
\caption{Divergence significance using intra-class bootstrap baseline (B=100 per class).}
\label{tab:divergence_significance}
\begin{tabular}{lcccc}
\toprule
Metric & Mean KL & Std & 5\% quantile & 95\% quantile \\
\midrule
Between-class (Human vs Agent) & 5.3067 & -- & -- & -- \\
Human intra-class split & 2.5271 & 1.2501 & 0.6845 & 4.6015 \\
Agent intra-class split & 1.2065 & 1.2607 & 0.2177 & 4.2345 \\
\bottomrule
\end{tabular}
\end{table}
For this run ($n_H=11$, $n_A=7$, $B=100$), the pooled lift ratio is $2.84\times$ and the empirical one-sided p-value is $0.0149$, both computed as defined in Section~\ref{sec:tpe}. This places the between-class divergence clearly above the intra-class null and supports the use of divergence-derived contamination signals in downstream pricing control.
\subsection{Experimental Outcomes}
Align with defined objectives, show results and statistical significance (or not).
To evaluate robustness contributions, we compare two policies on the same environment family: (i) robust pricing with COI-aware reward and adversarial contamination step, and (ii) non-robust baseline with revenue-only reward (\texttt{--no-robust}).
\begin{table}[ht]
\centering
\caption{Pricing policy benchmark for robust vs non-robust training.}
\label{tab:pricing_benchmark}
\begin{tabular}{lcccc}
\toprule
Policy & Eval reward & Eval revenue & COI leakage & Margin collapse rate \\
\midrule
Robust policy & \textit{TBD} & \textit{TBD} & \textit{TBD} & \textit{TBD} \\
Non-robust baseline (\texttt{--no-robust}) & \textit{TBD} & \textit{TBD} & \textit{TBD} & \textit{TBD} \\
\bottomrule
\end{tabular}
\end{table}
This comparison isolates the effect of robustness terms from model capacity and optimization settings, and provides the benchmark needed for interpreting the value of COI-aware control.
\subsection{Interpretation and Insights}
Inference from given patterns and show key findings.
Between-class divergence substantially above the intra-class null indicates that the two actor classes are behaviorally separable at the transition-kernel level. In pricing experiments, this is the condition required for separability to act as a useful control signal rather than just an auxiliary classifier score.
\subsection{Anomalies}

View File

@@ -2,12 +2,15 @@
from __future__ import annotations
import argparse
import gc
import json
import os
import re
import shlex
import shutil
import subprocess
import time
import resource
from pathlib import Path
import wandb
@@ -23,6 +26,7 @@ CLI_MAP: dict[str, str] = {
"info_value": "--info-value",
"robust_radius": "--robust-radius",
"robust_points": "--robust-points",
"no_robust": "--no-robust",
"learning_rate": "--learning-rate",
"gamma": "--gamma",
"gae_lambda": "--gae-lambda",
@@ -67,6 +71,16 @@ def _to_cli_args(cfg: dict) -> str:
_SENTINEL = "PHANTOM_METRICS:"
def _raise_nofile_limit(min_soft: int = 8192) -> None:
try:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
target = min(hard, max(soft, min_soft))
if target > soft:
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
except Exception:
return
def _extract_metrics(output: str) -> dict:
# fast path: look for the dedicated sentinel line emitted by run_local
for line in output.splitlines():
@@ -88,6 +102,7 @@ def _extract_metrics(output: str) -> dict:
def main() -> None:
_raise_nofile_limit()
p = argparse.ArgumentParser(
description="Run W&B sweep where each trial uses full TPU pod"
)
@@ -102,6 +117,8 @@ def main() -> None:
workdir = Path(args.workdir).resolve()
env = os.environ.copy()
wandb_root = workdir / ".wandb-agent"
wandb_root.mkdir(parents=True, exist_ok=True)
prepare_cmd = [
"make",
@@ -124,12 +141,17 @@ def main() -> None:
def run_trial() -> None:
run = None
trial_wandb_dir = wandb_root / f"trial-{time.time_ns()}"
trial_wandb_dir.mkdir(parents=True, exist_ok=True)
try:
run = wandb.init()
run = wandb.init(dir=str(trial_wandb_dir))
cfg = dict(wandb.config)
cli_args = _to_cli_args(cfg)
env_trial = dict(env)
env_trial["LOCAL_TRAIN_ARGS"] = cli_args
env_trial["WANDB_DIR"] = str(trial_wandb_dir)
env_trial["WANDB_CACHE_DIR"] = str(trial_wandb_dir / "cache")
env_trial["WANDB_DATA_DIR"] = str(trial_wandb_dir / "data")
cmd = [
"make",
@@ -171,6 +193,8 @@ def main() -> None:
finally:
if run is not None and wandb.run is not None:
wandb.finish()
shutil.rmtree(trial_wandb_dir, ignore_errors=True)
gc.collect()
wandb.agent(
args.sweep_id,

View File

@@ -11,7 +11,7 @@ from pathlib import Path
# import lib utilities for optional use - models keep their own _state_repr for backwards compat
# with the specific event structure (evt.value.payload)
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / 'lib'))
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "lib"))
try:
from lib.state import make_state_repr as lib_make_state_repr
from lib.features import transition_histogram as lib_transition_histogram
@@ -37,7 +37,8 @@ class BehaviorModel:
def _extract_sessions(self) -> List[List[str]]:
trajs = []
for evts in self.data.values():
if len(evts) < 2: continue
if len(evts) < 2:
continue
states = [self._state_repr(e) for e in sorted(evts, key=self._sort_key)]
trajs.append(states)
return trajs
@@ -59,8 +60,10 @@ class BehaviorModel:
return rwd
def _normalize_trans(self, cnts: Dict) -> Dict:
return {s: {s_n: cnt/sum(nxt.values()) for s_n, cnt in nxt.items()}
for s, nxt in cnts.items()}
return {
s: {s_n: cnt / sum(nxt.values()) for s_n, cnt in nxt.items()}
for s, nxt in cnts.items()
}
def build_MDP(self) -> Dict:
trajs = self._extract_sessions()
@@ -69,34 +72,40 @@ class BehaviorModel:
state_rwd = self._calc_rewards(trajs)
self.mdp = {
'states': sorted(states),
'num_states': len(states),
'transitions': trans_prob,
'state_values': {s: np.mean(r) for s, r in state_rwd.items()},
'state_rewards': state_rwd,
'trans_counts': trans_cnt,
"states": sorted(states),
"num_states": len(states),
"transitions": trans_prob,
"state_values": {s: np.mean(r) for s, r in state_rwd.items()},
"state_rewards": state_rwd,
"trans_counts": trans_cnt,
}
return self.mdp
def transition_prob(self, s: str, s_next: str) -> float:
if not self.mdp: raise ValueError("build MDP first")
return self.mdp['transitions'].get(s, {}).get(s_next, 0.0)
if not self.mdp:
raise ValueError("build MDP first")
return self.mdp["transitions"].get(s, {}).get(s_next, 0.0)
def state_value(self, s: str) -> float:
if not self.mdp: raise ValueError("build MDP first")
return self.mdp['state_values'].get(s, 0.0)
if not self.mdp:
raise ValueError("build MDP first")
return self.mdp["state_values"].get(s, 0.0)
def sample_traj(self, start: str, max_len: int = 50) -> List[str]:
if not self.mdp: raise ValueError("build MDP first")
if not self.mdp:
raise ValueError("build MDP first")
path, curr = [start], start
for _ in range(max_len):
nxt = self.mdp['transitions'].get(curr, {})
if not nxt: break
nxt = self.mdp["transitions"].get(curr, {})
if not nxt:
break
curr = np.random.choice(list(nxt.keys()), p=list(nxt.values()))
path.append(curr)
return path
def extract_trajectory_features(self, events: List, max_trans_dim: int = 50) -> np.ndarray:
def extract_trajectory_features(
self, events: List, max_trans_dim: int = 50
) -> np.ndarray:
"""Convert trajectory to feature vector using MDP structure for contrastive learning"""
if not self.mdp:
self.build_MDP()
@@ -108,7 +117,11 @@ class BehaviorModel:
trans_counts = defaultdict(int)
for s, s_next in zip(states, states[1:]):
trans_counts[(s, s_next)] += 1
all_trans = [(s, t) for s in self.mdp['states'] for t in self.mdp['transitions'].get(s, {}).keys()]
all_trans = [
(s, t)
for s in self.mdp["states"]
for t in self.mdp["transitions"].get(s, {}).keys()
]
trans_vec = [trans_counts.get(tr, 0) for tr in all_trans[:max_trans_dim]]
trans_vec = trans_vec + [0] * (max_trans_dim - len(trans_vec)) # pad
total_trans = sum(trans_counts.values()) or 1
@@ -116,11 +129,13 @@ class BehaviorModel:
# state coverage ratio
visited = set(states)
features.append(len(visited) / max(self.mdp['num_states'], 1))
features.append(len(visited) / max(self.mdp["num_states"], 1))
# temporal entropy of transitions
if len(states) > 1:
trans_probs = [self.transition_prob(s, s_n) for s, s_n in zip(states, states[1:])]
trans_probs = [
self.transition_prob(s, s_n) for s, s_n in zip(states, states[1:])
]
entropy = -sum(p * np.log(p + 1e-10) for p in trans_probs if p > 0)
features.append(entropy / max(len(states), 1))
else:
@@ -150,6 +165,7 @@ class AgentBehaviorModel(BehaviorModel):
def _sort_key(self, evt):
return evt.ts
class JointBehaviorModel(BehaviorModel):
def __init__(self, human_dir: str, agent_dir: str):
self.loader = JointLoader(human_dir, agent_dir)
@@ -163,12 +179,13 @@ class JointBehaviorModel(BehaviorModel):
def _sort_key(self, evt):
return evt.ts
def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]:
evt_trans = defaultdict(lambda: defaultdict(float))
for s, trans in mdp['transitions'].items():
src = s.split('|')[2]
for s, trans in mdp["transitions"].items():
src = s.split("|")[2]
for s_next, prob in trans.items():
dst = s_next.split('|')[2]
dst = s_next.split("|")[2]
evt_trans[src][dst] += prob
for src in evt_trans:
@@ -177,59 +194,149 @@ def aggregate_event_transitions(mdp: Dict) -> Dict[str, Dict[str, float]]:
evt_trans[src] = {dst: p / total for dst, p in evt_trans[src].items()}
return dict(evt_trans)
def visualize_mdp(model: BehaviorModel, threshold: float = 0.05, output: str = "mdp_graph",
fmt: str = "svg", view: bool = False, export_dot: bool = False):
if not model.mdp: raise ValueError("build MDP first")
def visualize_mdp(
model: BehaviorModel,
threshold: float = 0.05,
output: str = "mdp_graph",
fmt: str = "svg",
view: bool = False,
export_dot: bool = False,
):
if not model.mdp:
raise ValueError("build MDP first")
evt_trans = aggregate_event_transitions(model.mdp)
g = graphviz.Digraph(format=fmt)
g.attr(rankdir='LR', size='30')
g.attr('node', shape='circle', width='1', height='1')
g.attr(rankdir="LR", size="30")
g.attr("node", shape="circle", width="1", height="1")
events = set(evt_trans.keys()) | {e for trans in evt_trans.values() for e in trans.keys()}
events = set(evt_trans.keys()) | {
e for trans in evt_trans.values() for e in trans.keys()
}
for evt in events:
g.node(evt)
for src, dsts in evt_trans.items():
for dst, prob in dsts.items():
if prob > threshold:
g.edge(src, dst, label=f'{prob:.2f}')
g.edge(src, dst, label=f"{prob:.2f}")
g.render(output, view=view, cleanup=True)
print(f"Saved MDP graph to {output}.{fmt}")
if export_dot:
with open(f"{output}.dot", 'w') as f:
with open(f"{output}.dot", "w") as f:
f.write(g.source)
print(f"Exported DOT source to {output}.dot")
return g
def kl_divergence(p: Dict[str, float], q: Dict[str, float]) -> float:
eps = 1e-10
# p + log(p / q) summed over all keys in P
return sum((p[k] + eps) * np.log((p[k] + eps) / (q.get(k, 0.0) + eps)) for k in p)
def _build_subset_mdp(model: BehaviorModel, session_ids: List) -> Dict:
trajs = []
for sid in session_ids:
evts = model.data.get(sid, [])
if len(evts) < 2:
continue
states = [model._state_repr(e) for e in sorted(evts, key=model._sort_key)]
trajs.append(states)
trans_cnt, _ = model._calc_transitions(trajs)
return {"transitions": model._normalize_trans(trans_cnt)}
def _avg_event_kl(
src_evt: Dict[str, Dict[str, float]], dst_evt: Dict[str, Dict[str, float]]
) -> float:
common = set(src_evt.keys()) & set(dst_evt.keys())
if not common:
return 0.0
return float(np.mean([kl_divergence(src_evt[e], dst_evt[e]) for e in common]))
def bootstrap_intra_class_divergence(
model: BehaviorModel,
n_bootstrap: int = 100,
seed: int = 42,
) -> Dict[str, float]:
session_ids = list(model.data.keys())
n = len(session_ids)
if n < 2:
return {
"mean": 0.0,
"std": 0.0,
"q05": 0.0,
"q95": 0.0,
"n_bootstrap": 0,
"scores": [],
"available": False,
"num_sessions": int(n),
}
half = n // 2
rng = np.random.default_rng(seed)
scores = []
for _ in range(n_bootstrap):
perm = rng.permutation(session_ids)
split_a, split_b = perm[:half], perm[half:]
mdp_a = _build_subset_mdp(model, list(split_a))
mdp_b = _build_subset_mdp(model, list(split_b))
score = _avg_event_kl(
aggregate_event_transitions(mdp_a),
aggregate_event_transitions(mdp_b),
)
scores.append(score)
arr = np.array(scores, dtype=float)
return {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"q05": float(np.quantile(arr, 0.05)),
"q95": float(np.quantile(arr, 0.95)),
"n_bootstrap": int(n_bootstrap),
"scores": arr.tolist(),
"available": True,
"num_sessions": int(n),
}
if __name__ == "__main__":
base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments"
human_dir, agent_dir = f"{base_dir}/collected_data/", f"{base_dir}/agents/collected_data/"
human_dir, agent_dir = (
f"{base_dir}/collected_data/",
f"{base_dir}/agents/collected_data/",
)
human_model = BehaviorModel(human_dir)
human_mdp = human_model.build_MDP()
print(f"Built MDP: {human_mdp['num_states']} states, "
f"{sum(len(t) for t in human_mdp['transitions'].values())} transitions")
if not human_mdp['states']:
print(
f"Built MDP: {human_mdp['num_states']} states, "
f"{sum(len(t) for t in human_mdp['transitions'].values())} transitions"
)
if not human_mdp["states"]:
exit("No states found")
visualize_mdp(human_model, threshold=0.05, output="human_mdp_viz", fmt="pdf", export_dot=True)
visualize_mdp(
human_model, threshold=0.05, output="human_mdp_viz", fmt="pdf", export_dot=True
)
agent_model = AgentBehaviorModel(agent_dir)
agent_mdp = agent_model.build_MDP()
print(f"AGENT... Built MDP: {agent_mdp['num_states']} states, "
f"{sum(len(t) for t in agent_mdp['transitions'].values())} transitions")
if not agent_mdp['states']:
print(
f"AGENT... Built MDP: {agent_mdp['num_states']} states, "
f"{sum(len(t) for t in agent_mdp['transitions'].values())} transitions"
)
if not agent_mdp["states"]:
exit("No states found")
visualize_mdp(agent_model, threshold=0.05, output="agent_mdp_viz", fmt="pdf", export_dot=True)
visualize_mdp(
agent_model, threshold=0.05, output="agent_mdp_viz", fmt="pdf", export_dot=True
)
human_evt = aggregate_event_transitions(human_mdp)
agent_evt = aggregate_event_transitions(agent_mdp)
@@ -239,8 +346,11 @@ if __name__ == "__main__":
if not common:
exit("No common event types for KL divergence analysis")
kl_divs = sorted([(e, kl_divergence(human_evt[e], agent_evt[e])) for e in common],
key=lambda x: x[1], reverse=True)
kl_divs = sorted(
[(e, kl_divergence(human_evt[e], agent_evt[e])) for e in common],
key=lambda x: x[1],
reverse=True,
)
print(f"Average KL divergence: {np.mean([kl for _, kl in kl_divs]):.4f}")
print("\nMost divergent event types:")
@@ -250,9 +360,55 @@ if __name__ == "__main__":
print("\n=== Joint Model (Human + Agent Combined) ===")
joint_model = JointBehaviorModel(human_dir, agent_dir)
joint_mdp = joint_model.build_MDP()
print(f"Built joint MDP: {joint_mdp['num_states']} states, "
f"{sum(len(t) for t in joint_mdp['transitions'].values())} transitions")
if joint_mdp['states']:
visualize_mdp(joint_model, threshold=0.05, output="joint_mdp_viz", fmt="pdf", export_dot=True)
print(
f"Built joint MDP: {joint_mdp['num_states']} states, "
f"{sum(len(t) for t in joint_mdp['transitions'].values())} transitions"
)
if joint_mdp["states"]:
visualize_mdp(
joint_model,
threshold=0.05,
output="joint_mdp_viz",
fmt="pdf",
export_dot=True,
)
# TODO: setup intra class divergence as baseline for evaluating and adding significance to the divergence which we observe across class
inter_class_avg = float(np.mean([kl for _, kl in kl_divs]))
human_intra = bootstrap_intra_class_divergence(
human_model, n_bootstrap=100, seed=42
)
agent_intra = bootstrap_intra_class_divergence(
agent_model, n_bootstrap=100, seed=43
)
pooled_scores = human_intra["scores"] + agent_intra["scores"]
if not pooled_scores:
pooled_scores = [0.0]
pooled_null = np.array(pooled_scores, dtype=float)
p_empirical = float(
(np.sum(pooled_null >= inter_class_avg) + 1) / (len(pooled_null) + 1)
)
print("\nIntra-class KL bootstrap baseline:")
if human_intra["available"]:
print(
f" Human split KL: {human_intra['mean']:.4f} +- {human_intra['std']:.4f} "
f"(5-95%: {human_intra['q05']:.4f}-{human_intra['q95']:.4f}, n_sessions={human_intra['num_sessions']})"
)
else:
print(
f" Human split KL: unavailable (need >=2 sessions, got {human_intra['num_sessions']})"
)
if agent_intra["available"]:
print(
f" Agent split KL: {agent_intra['mean']:.4f} +- {agent_intra['std']:.4f} "
f"(5-95%: {agent_intra['q05']:.4f}-{agent_intra['q95']:.4f}, n_sessions={agent_intra['num_sessions']})"
)
else:
print(
f" Agent split KL: unavailable (need >=2 sessions, got {agent_intra['num_sessions']})"
)
print(f" Between-class KL: {inter_class_avg:.4f}")
print(
f" Lift vs pooled intra mean: {inter_class_avg / max(float(np.mean(pooled_null)), 1e-10):.2f}x"
)
print(f" Empirical p-value (inter > intra): {p_empirical:.4f}")