Files
PHANTOM/engine/backends/common.py

182 lines
6.9 KiB
Python

from __future__ import annotations
from typing import Any, Mapping
import numpy as np
def make_env(cfg: Mapping[str, Any]):
from gymnasium.wrappers import FlattenObservation
from ..lib.wrappers import EconomicMetricsWrapper
from ..wrapper import PHANTOM
env = PHANTOM(
n_products=int(cfg["n_products"]),
alpha=float(cfg["alpha"]),
N=int(cfg["N"]),
agent_params=(
float(cfg.get("agent_mu", 45.0)),
float(cfg.get("agent_std", 15.0)),
),
price_bounds=(float(cfg["price_low"]), float(cfg["price_high"])),
lambda_coi=float(cfg["lambda_coi"]),
robust_radius=float(cfg["robust_radius"]),
robust_points=int(cfg["robust_points"]),
robust_rollouts=int(cfg.get("robust_rollouts", 1)),
info_value=float(cfg["info_value"]),
eta_ux=float(cfg.get("eta_ux", 0.5)),
reward_profit_weight=float(cfg.get("reward_profit_weight", 1.0)),
action_levels=int(cfg["action_levels"]),
action_scale_low=float(cfg["action_scale_low"]),
action_scale_high=float(cfg["action_scale_high"]),
max_steps=int(cfg.get("max_steps", 100)),
margin_floor=float(cfg.get("margin_floor", 0.05)),
margin_floor_patience=int(cfg.get("margin_floor_patience", 5)),
render_mode=None,
)
env = EconomicMetricsWrapper(env)
return FlattenObservation(env)
def _action(agent: Any, obs: Any, deterministic: bool = True):
out = agent.predict(obs, deterministic=deterministic)
action = out[0] if isinstance(out, tuple) else out
if isinstance(action, np.ndarray) and action.size == 1:
return int(action.reshape(-1)[0])
return action
def _evaluate_env(agent: Any, env: Any, episodes: int) -> dict[str, float]:
rewards: list[float] = []
revenues: list[float] = []
margins: list[float] = []
coi_levels: list[float] = []
coi_leakages: list[float] = []
volatilities: list[float] = []
upward_volatilities: list[float] = []
supra_shares: list[float] = []
supra_penalties: list[float] = []
agent_probs: list[float] = []
for _ in range(int(episodes)):
obs, _ = env.reset()
done = False
ep_reward = 0.0
ep_revenue = 0.0
ep_margin = 0.0
ep_coi = 0.0
ep_coi_leakage = 0.0
ep_volatility = 0.0
ep_upward_volatility = 0.0
ep_supra_share = 0.0
ep_supra_penalty = 0.0
ep_agent_prob = 0.0
steps = 0
while not done:
obs, reward, term, trunc, info = env.step(_action(agent, obs, True))
done = bool(term or trunc)
econ = info.get("economics", {})
ep_reward += float(reward)
ep_revenue += float(econ.get("revenue", info.get("revenue", 0.0)))
ep_margin += float(econ.get("margin", 0.0))
ep_coi += float(econ.get("coi_level", 0.0))
ep_coi_leakage += float(econ.get("coi_leakage", 0.0))
ep_volatility += float(econ.get("volatility", 0.0))
ep_upward_volatility += float(
info.get("upward_volatility", econ.get("upward_volatility", 0.0))
)
ep_supra_share += float(
info.get("supra_share", econ.get("supra_share", 0.0))
)
ep_supra_penalty += float(
info.get("supra_penalty", econ.get("supra_penalty", 0.0))
)
ep_agent_prob += float(econ.get("agent_prob", info.get("agent_prob", 0.0)))
steps += 1
rewards.append(ep_reward)
revenues.append(ep_revenue)
denom = max(steps, 1)
margins.append(ep_margin / denom)
coi_levels.append(ep_coi / denom)
coi_leakages.append(ep_coi_leakage / denom)
volatilities.append(ep_volatility / denom)
upward_volatilities.append(ep_upward_volatility / denom)
supra_shares.append(ep_supra_share / denom)
supra_penalties.append(ep_supra_penalty / denom)
agent_probs.append(ep_agent_prob / denom)
return {
"eval/reward_mean": float(np.mean(rewards)) if rewards else 0.0,
"eval/reward_std": float(np.std(rewards)) if rewards else 0.0,
"eval/revenue_mean": float(np.mean(revenues)) if revenues else 0.0,
"eval/revenue_std": float(np.std(revenues)) if revenues else 0.0,
"eval/margin_mean": float(np.mean(margins)) if margins else 0.0,
"eval/coi_level_mean": float(np.mean(coi_levels)) if coi_levels else 0.0,
"eval/coi_leakage_mean": float(np.mean(coi_leakages)) if coi_leakages else 0.0,
"eval/volatility_mean": float(np.mean(volatilities)) if volatilities else 0.0,
"eval/upward_volatility_mean": (
float(np.mean(upward_volatilities)) if upward_volatilities else 0.0
),
"eval/supra_share_mean": float(np.mean(supra_shares)) if supra_shares else 0.0,
"eval/supra_penalty_mean": (
float(np.mean(supra_penalties)) if supra_penalties else 0.0
),
"eval/agent_prob_mean": float(np.mean(agent_probs)) if agent_probs else 0.0,
}
def evaluate(
agent: Any,
env: Any,
episodes: int,
cfg: Mapping[str, Any] | None = None,
) -> dict[str, float]:
metrics = _evaluate_env(agent, env, episodes)
if cfg is None or not bool(cfg.get("robust_eval_enabled", True)):
return metrics
nominal_alpha = float(cfg.get("alpha", 0.0))
eval_radius = max(float(cfg.get("robust_radius", 0.0)), 0.15)
low_alpha = float(np.clip(nominal_alpha - eval_radius, 0.0, 1.0))
high_alpha = float(np.clip(nominal_alpha + eval_radius, 0.0, 1.0))
shifted_episodes = max(1, int(np.ceil(int(episodes) / 2)))
shifted_rows = []
for tag, alpha in (
("low", low_alpha),
("nominal", nominal_alpha),
("high", high_alpha),
):
eval_cfg = dict(cfg)
eval_cfg["alpha"] = float(alpha)
shifted_env = make_env(eval_cfg)
shifted_metrics = _evaluate_env(agent, shifted_env, shifted_episodes)
shifted_env.close()
shifted_rows.append((tag, alpha, shifted_metrics))
metrics["eval/stress_alpha_low"] = low_alpha
metrics["eval/stress_alpha_high"] = high_alpha
metrics["eval/stress_reward_worst"] = float(
min(row[2]["eval/reward_mean"] for row in shifted_rows)
)
metrics["eval/stress_revenue_worst"] = float(
min(row[2]["eval/revenue_mean"] for row in shifted_rows)
)
metrics["eval/stress_coi_leakage_worst"] = float(
max(row[2]["eval/coi_leakage_mean"] for row in shifted_rows)
)
for tag, alpha, shifted_metrics in shifted_rows:
metrics[f"eval/{tag}_alpha"] = float(alpha)
metrics[f"eval/{tag}_reward_mean"] = float(shifted_metrics["eval/reward_mean"])
metrics[f"eval/{tag}_revenue_mean"] = float(
shifted_metrics["eval/revenue_mean"]
)
metrics[f"eval/{tag}_coi_leakage_mean"] = float(
shifted_metrics["eval/coi_leakage_mean"]
)
return metrics