mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
feature: telemetry logging
This commit is contained in:
@@ -83,20 +83,24 @@ def _run_eval_episode(env, policy) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _build_tier(name: str, cfg: dict, alpha: float):
|
||||
def _build_tier(name: str, cfg: dict, alpha: float, *, step_offset: int = 0):
|
||||
from .backends.common import make_env
|
||||
|
||||
tier = name.lower().strip()
|
||||
run_cfg = dict(cfg)
|
||||
run_cfg["alpha"] = float(alpha)
|
||||
run_cfg["wandb_step_offset"] = int(step_offset)
|
||||
|
||||
if tier == "static":
|
||||
return StaticPolicy(int(run_cfg["action_levels"]))
|
||||
return StaticPolicy(int(run_cfg["action_levels"])), []
|
||||
|
||||
if tier == "surge":
|
||||
return SurgePolicy(
|
||||
n_actions=int(run_cfg["action_levels"]),
|
||||
n_products=int(run_cfg["n_products"]),
|
||||
return (
|
||||
SurgePolicy(
|
||||
n_actions=int(run_cfg["action_levels"]),
|
||||
n_products=int(run_cfg["n_products"]),
|
||||
),
|
||||
[],
|
||||
)
|
||||
|
||||
if tier == "linear":
|
||||
@@ -113,27 +117,72 @@ def _build_tier(name: str, cfg: dict, alpha: float):
|
||||
seed=int(run_cfg["seed"]),
|
||||
)
|
||||
warmup_env.close()
|
||||
return policy
|
||||
return policy, []
|
||||
|
||||
if tier == "qtable":
|
||||
from .backends.qtable import train_qtable
|
||||
|
||||
run_cfg["console_progress"] = True
|
||||
agent, _ = train_qtable(run_cfg)
|
||||
return agent
|
||||
agent, metrics = train_qtable(run_cfg)
|
||||
events = metrics.get("_train_events", [])
|
||||
return agent, events if isinstance(events, list) else []
|
||||
|
||||
if tier in {"ppo", "a2c", "dqn"}:
|
||||
from .backends.sb3 import train_sb3
|
||||
|
||||
run_cfg["algo"] = tier
|
||||
agent, _ = train_sb3(run_cfg)
|
||||
return agent
|
||||
agent, metrics = train_sb3(run_cfg)
|
||||
events = metrics.get("_train_events", [])
|
||||
return agent, events if isinstance(events, list) else []
|
||||
|
||||
raise ValueError(f"unsupported tier '{name}'")
|
||||
|
||||
|
||||
def _log_train_events(
|
||||
events: list[dict],
|
||||
*,
|
||||
tier_name: str,
|
||||
mode_label: str,
|
||||
alpha: float,
|
||||
step_offset: int,
|
||||
) -> int:
|
||||
if not (HAS_WANDB and wandb.run is not None):
|
||||
return int(step_offset)
|
||||
if not events:
|
||||
return int(step_offset)
|
||||
|
||||
ordered = sorted(
|
||||
[evt for evt in events if isinstance(evt, dict)],
|
||||
key=lambda evt: int(evt.get("train/global_step", 0)),
|
||||
)
|
||||
if not ordered:
|
||||
return int(step_offset)
|
||||
|
||||
cursor = int(step_offset)
|
||||
for evt in ordered:
|
||||
rel_step = max(1, int(evt.get("train/global_step", 0)))
|
||||
payload = dict(evt)
|
||||
payload.update(
|
||||
{
|
||||
"run.kind": "benchmark",
|
||||
"runtime/backend": tier_name,
|
||||
"study/mode": mode_label,
|
||||
"study/no_robust": float(mode_label == "no_robust"),
|
||||
"study/alpha": float(alpha),
|
||||
}
|
||||
)
|
||||
wandb.log(payload, step=cursor + rel_step)
|
||||
max_rel = max(max(1, int(evt.get("train/global_step", 0))) for evt in ordered)
|
||||
return cursor + max_rel + 1
|
||||
|
||||
|
||||
def run_benchmark(
|
||||
cfg: dict, tiers: list[str], alpha_values: list[float], n_episodes: int
|
||||
cfg: dict,
|
||||
tiers: list[str],
|
||||
alpha_values: list[float],
|
||||
n_episodes: int,
|
||||
mode_label: str,
|
||||
step_cursor_start: int = 0,
|
||||
):
|
||||
from .backends.common import make_env
|
||||
|
||||
@@ -141,6 +190,7 @@ def run_benchmark(
|
||||
traces: list[dict] = []
|
||||
total_runs = max(1, len(alpha_values) * len(tiers))
|
||||
run_index = 0
|
||||
wandb_step_cursor = int(step_cursor_start)
|
||||
|
||||
for alpha in alpha_values:
|
||||
for tier_name in tiers:
|
||||
@@ -148,13 +198,34 @@ def run_benchmark(
|
||||
_log(
|
||||
f"[{run_index}/{total_runs}] alpha={float(alpha):.2f} tier={tier_name}: training"
|
||||
)
|
||||
policy = _build_tier(tier_name, cfg, alpha)
|
||||
policy, train_events = _build_tier(
|
||||
tier_name,
|
||||
cfg,
|
||||
alpha,
|
||||
step_offset=wandb_step_cursor,
|
||||
)
|
||||
prev_cursor = int(wandb_step_cursor)
|
||||
wandb_step_cursor = _log_train_events(
|
||||
train_events,
|
||||
tier_name=tier_name,
|
||||
mode_label=mode_label,
|
||||
alpha=float(alpha),
|
||||
step_offset=wandb_step_cursor,
|
||||
)
|
||||
if wandb_step_cursor == prev_cursor and tier_name in {
|
||||
"qtable",
|
||||
"ppo",
|
||||
"a2c",
|
||||
"dqn",
|
||||
}:
|
||||
wandb_step_cursor += max(1, int(cfg.get("total_timesteps", 1))) + 1
|
||||
env = make_env({**cfg, "alpha": float(alpha)})
|
||||
eps = [_run_eval_episode(env, policy) for _ in range(int(n_episodes))]
|
||||
env.close()
|
||||
|
||||
row = {
|
||||
"tier": tier_name,
|
||||
"mode": mode_label,
|
||||
"alpha": float(alpha),
|
||||
"episodes": int(n_episodes),
|
||||
"mean_reward": float(np.mean([e["reward"] for e in eps])),
|
||||
@@ -163,10 +234,7 @@ def run_benchmark(
|
||||
"mean_coi": float(np.mean([e["mean_coi"] for e in eps])),
|
||||
"std_revenue": float(np.std([e["revenue"] for e in eps])),
|
||||
}
|
||||
row["objective_score"] = (
|
||||
row["mean_reward"]
|
||||
+ float(cfg.get("revenue_weight", 0.01)) * row["mean_revenue"]
|
||||
)
|
||||
row["objective_score"] = row["mean_reward"]
|
||||
rows.append(row)
|
||||
_log(
|
||||
f"[{run_index}/{total_runs}] alpha={float(alpha):.2f} tier={tier_name}: "
|
||||
@@ -192,16 +260,23 @@ def run_benchmark(
|
||||
if HAS_WANDB and wandb.run is not None:
|
||||
wandb.log(
|
||||
{
|
||||
"run.kind": "benchmark",
|
||||
"runtime/backend": tier_name,
|
||||
"study/mode": mode_label,
|
||||
"study/no_robust": float(mode_label == "no_robust"),
|
||||
"study/alpha": float(alpha),
|
||||
"eval/reward_mean": row["mean_reward"],
|
||||
"eval/revenue_mean": row["mean_revenue"],
|
||||
"eval/margin_mean": row["mean_margin"],
|
||||
"eval/coi_level_mean": row["mean_coi"],
|
||||
"objective/score": row["objective_score"],
|
||||
"objective/coi_preserved": row["mean_coi"],
|
||||
}
|
||||
},
|
||||
step=wandb_step_cursor,
|
||||
)
|
||||
wandb_step_cursor += 1
|
||||
|
||||
return pd.DataFrame(rows), traces
|
||||
return pd.DataFrame(rows), traces, int(wandb_step_cursor)
|
||||
|
||||
|
||||
def _plot_outputs(df: pd.DataFrame, traces: list[dict], out_dir: Path, stamp: str):
|
||||
@@ -277,8 +352,12 @@ def _plot_outputs(df: pd.DataFrame, traces: list[dict], out_dir: Path, stamp: st
|
||||
return rev_path, coi_path, price_path
|
||||
|
||||
|
||||
def _run_with_args(args):
|
||||
compare_robust = _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST"))
|
||||
def _run_with_args(args, compare_robust_override: bool | None = None):
|
||||
compare_robust = (
|
||||
bool(compare_robust_override)
|
||||
if compare_robust_override is not None
|
||||
else _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST"))
|
||||
)
|
||||
robust_modes = [False, True] if compare_robust else [bool(args.no_robust)]
|
||||
|
||||
base_overrides = {
|
||||
@@ -289,6 +368,9 @@ def _run_with_args(args):
|
||||
"lambda_coi": args.lambda_coi,
|
||||
"robust_radius": args.robust_radius,
|
||||
"robust_points": args.robust_points,
|
||||
"robust_rollouts": args.robust_rollouts,
|
||||
"eta_ux": args.eta_ux,
|
||||
"reward_profit_weight": args.reward_profit_weight,
|
||||
"price_low": args.price_low,
|
||||
"price_high": args.price_high,
|
||||
"action_levels": args.action_levels,
|
||||
@@ -318,6 +400,7 @@ def _run_with_args(args):
|
||||
|
||||
all_frames: list[pd.DataFrame] = []
|
||||
all_traces: list[dict] = []
|
||||
wandb_step_cursor = 0
|
||||
for no_robust in robust_modes:
|
||||
overrides = dict(base_overrides)
|
||||
overrides["no_robust"] = bool(no_robust)
|
||||
@@ -327,9 +410,15 @@ def _run_with_args(args):
|
||||
cfg["linear_warmup_steps"] = int(args.linear_warmup_steps)
|
||||
mode_label = "no_robust" if no_robust else "robust"
|
||||
_log(f"mode={mode_label}: begin")
|
||||
df_mode, traces_mode = run_benchmark(cfg, tiers, alpha_values, args.episodes)
|
||||
df_mode, traces_mode, wandb_step_cursor = run_benchmark(
|
||||
cfg,
|
||||
tiers,
|
||||
alpha_values,
|
||||
args.episodes,
|
||||
mode_label=mode_label,
|
||||
step_cursor_start=wandb_step_cursor,
|
||||
)
|
||||
_log(f"mode={mode_label}: complete ({len(df_mode)} rows)")
|
||||
df_mode["mode"] = mode_label
|
||||
for trace in traces_mode:
|
||||
trace["mode"] = mode_label
|
||||
all_frames.append(df_mode)
|
||||
@@ -349,7 +438,7 @@ def _run_with_args(args):
|
||||
_log(f"artifacts written in {out_dir}")
|
||||
|
||||
if not df.empty:
|
||||
best_idx = int(df["mean_revenue"].idxmax())
|
||||
best_idx = int(df["objective_score"].idxmax())
|
||||
best = df.iloc[best_idx]
|
||||
_log(
|
||||
"BEST_TIER="
|
||||
@@ -358,6 +447,7 @@ def _run_with_args(args):
|
||||
"tier": best["tier"],
|
||||
"mode": best.get("mode", "robust"),
|
||||
"alpha": float(best["alpha"]),
|
||||
"objective_score": float(best["objective_score"]),
|
||||
"mean_revenue": float(best["mean_revenue"]),
|
||||
"mean_coi": float(best["mean_coi"]),
|
||||
}
|
||||
@@ -385,6 +475,9 @@ def run_cli(raw_args: list[str] | None = None):
|
||||
parser.add_argument("--lambda-coi", type=float, default=0.2)
|
||||
parser.add_argument("--robust-radius", type=float, default=0.15)
|
||||
parser.add_argument("--robust-points", type=int, default=5)
|
||||
parser.add_argument("--robust-rollouts", type=int, default=1)
|
||||
parser.add_argument("--eta-ux", type=float, default=0.5)
|
||||
parser.add_argument("--reward-profit-weight", type=float, default=1.0)
|
||||
parser.add_argument("--price-low", type=float, default=10.0)
|
||||
parser.add_argument("--price-high", type=float, default=150.0)
|
||||
parser.add_argument("--action-levels", type=int, default=9)
|
||||
@@ -421,6 +514,9 @@ def run_cli(raw_args: list[str] | None = None):
|
||||
"lambda_coi": "lambda_coi",
|
||||
"robust_radius": "robust_radius",
|
||||
"robust_points": "robust_points",
|
||||
"robust_rollouts": "robust_rollouts",
|
||||
"eta_ux": "eta_ux",
|
||||
"reward_profit_weight": "reward_profit_weight",
|
||||
"learning_rate": "learning_rate",
|
||||
"batch_size": "batch_size",
|
||||
"n_steps": "n_steps",
|
||||
@@ -435,6 +531,9 @@ def run_cli(raw_args: list[str] | None = None):
|
||||
"lambda_coi",
|
||||
"robust_radius",
|
||||
"robust_points",
|
||||
"robust_rollouts",
|
||||
"eta_ux",
|
||||
"reward_profit_weight",
|
||||
"learning_rate",
|
||||
"batch_size",
|
||||
"n_steps",
|
||||
@@ -459,34 +558,57 @@ def run_cli(raw_args: list[str] | None = None):
|
||||
_run_with_args(args)
|
||||
return
|
||||
|
||||
run = wandb.init(
|
||||
project=args.project,
|
||||
name=f"benchmark-{datetime.now(UTC).strftime('%m%d-%H%M%S')}",
|
||||
tags=[
|
||||
"benchmark",
|
||||
"robust-compare"
|
||||
if _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST"))
|
||||
else "single-mode",
|
||||
],
|
||||
config={
|
||||
"run.kind": "benchmark",
|
||||
"tiers": args.tiers,
|
||||
"alpha_values": args.alpha_values,
|
||||
"episodes": args.episodes,
|
||||
"total_timesteps": args.total_timesteps,
|
||||
"lambda_coi": args.lambda_coi,
|
||||
"robust_radius": args.robust_radius,
|
||||
"robust_points": args.robust_points,
|
||||
"learning_rate": args.learning_rate,
|
||||
"device": args.device,
|
||||
},
|
||||
mode="offline" if args.offline else "online",
|
||||
tiers = _parse_list(args.tiers)
|
||||
run_stamp = datetime.now(UTC).strftime("%m%d-%H%M%S")
|
||||
compare_enabled = _truthy(os.environ.get("PHANTOM_BENCHMARK_COMPARE_ROBUST"))
|
||||
compare_tag = "robust-compare" if compare_enabled else "single-mode"
|
||||
modes = (
|
||||
[("no_robust", True), ("robust", False)]
|
||||
if compare_enabled
|
||||
else [("no_robust" if bool(args.no_robust) else "robust", bool(args.no_robust))]
|
||||
)
|
||||
try:
|
||||
_run_with_args(args)
|
||||
finally:
|
||||
if run is not None:
|
||||
wandb.finish()
|
||||
|
||||
run_idx = 0
|
||||
for tier in tiers:
|
||||
for mode_label, no_robust in modes:
|
||||
run_idx += 1
|
||||
tier_args = argparse.Namespace(**vars(args))
|
||||
tier_args.tiers = tier
|
||||
tier_args.no_robust = bool(no_robust)
|
||||
run = wandb.init(
|
||||
project=args.project,
|
||||
name=f"benchmark-{tier}-{mode_label}-{run_stamp}-{run_idx}",
|
||||
tags=[
|
||||
"benchmark",
|
||||
compare_tag,
|
||||
f"backend:{tier}",
|
||||
f"mode:{mode_label}",
|
||||
],
|
||||
config={
|
||||
"run.kind": "benchmark",
|
||||
"runtime/backend": tier,
|
||||
"study/mode": mode_label,
|
||||
"study/no_robust": float(no_robust),
|
||||
"tiers": tier,
|
||||
"alpha_values": args.alpha_values,
|
||||
"episodes": args.episodes,
|
||||
"total_timesteps": args.total_timesteps,
|
||||
"lambda_coi": args.lambda_coi,
|
||||
"robust_radius": args.robust_radius,
|
||||
"robust_points": args.robust_points,
|
||||
"robust_rollouts": args.robust_rollouts,
|
||||
"eta_ux": args.eta_ux,
|
||||
"reward_profit_weight": args.reward_profit_weight,
|
||||
"learning_rate": args.learning_rate,
|
||||
"device": args.device,
|
||||
},
|
||||
mode="offline" if args.offline else "online",
|
||||
)
|
||||
try:
|
||||
_run_with_args(tier_args, compare_robust_override=False)
|
||||
finally:
|
||||
if run is not None:
|
||||
wandb.finish()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user