diff --git a/experiments/procesing/contaminator.py b/experiments/procesing/contaminator.py index da44c3d..2f23b2b 100644 --- a/experiments/procesing/contaminator.py +++ b/experiments/procesing/contaminator.py @@ -1,45 +1,66 @@ import pandas as pd import random -from sim.rl.behavior_loader import AgentBehaviorModel # TODO: proper import this +import os +from pathlib import Path -base_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments" -agent_dir = f"{base_dir}/agents/collected_data/" +# use relative import when in package context, fallback for standalone +try: + from sim.rl.behavior_loader.models import AgentBehaviorModel +except ImportError: + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent / "sim" / "rl" / "behavior_loader")) + from models import AgentBehaviorModel + +# paths should be configurable via environment or relative to project root +PROJECT_ROOT = Path(__file__).parent.parent.parent +AGENT_DATA_DIR = Path(os.getenv('PHANTOM_AGENT_DATA_DIR', PROJECT_ROOT / "experiments" / "agents" / "collected_data")) - -def remap_schema(df : pd.DataFrame, mapping: dict, on: str = "event_type"): +def remap_schema(df: pd.DataFrame, mapping: dict, on: str = "event_type") -> pd.DataFrame: + """remap column values according to mapping dict, preserving unmapped values""" df = df.copy() df[on] = df[on].map(mapping).fillna(df[on]) return df -def contaminate_dataset(df : pd.DataFrame, on : str = "event_type", - contamination_rate: float = 0.1) -> pd.DataFrame: - model = AgentBehaviorModel(agent_dir) - target_df_schema = df[on].unique().tolist() - mapping = { - 'view': 'view_page' - # TODO: define properly for the given dataset - } - # think about replacing with freqdist method from library - OG_event_distribution = df[on].value_counts(normalize=True).to_dict() - # normalize to weights - OG_event_distribution = {k: v / sum(OG_event_distribution.values()) for k, v in OG_event_distribution.items()} - mapped_df = remap_schema(df, mapping, on=on) - N = len(df) - N_final = N / (1 - contamination_rate) # TODO: explain this in paper - N_contaminate = int(N_final - N) - start_event_types = random.choices(list(OG_event_distribution.keys()), - weights=list(OG_event_distribution.values()), k=N_contaminate) - # it makes sense - new_trajectories = [] - for start_event in start_event_types: - # sample from og start - start = None # TODO: defin start accoding to dataset (randomly sample with weights of event distr) - trajectory = model.sample_trajectory(start) # TODO: explain this method in paper - new_trajectories.extend(trajectory) +def contaminate_dataset(df: pd.DataFrame, on: str = "event_type", + contamination_rate: float = 0.1, + agent_data_dir: Path = None) -> pd.DataFrame: + """inject synthetic agent trajectories into a dataset + contamination_rate: fraction of final dataset that should be agent data (0.1 = 10% agents) + """ + data_dir = agent_data_dir or AGENT_DATA_DIR + model = AgentBehaviorModel(str(data_dir)) + model.build_MDP() # ensure MDP is built before sampling - # TODO: make sure the new trajctories schema conforms with dataset - contaminate_df = pd.DataFrame(new_trajectories) - df = pd.concat([df, contaminate_df], ignore_index=True) + # compute event distribution from original data + event_dist = df[on].value_counts(normalize=True).to_dict() + total = sum(event_dist.values()) + event_dist = {k: v / total for k, v in event_dist.items()} + + # calculate how many synthetic events to add + N = len(df) + N_final = N / (1 - contamination_rate) + N_contaminate = int(N_final - N) + + # sample start states weighted by original distribution + start_events = random.choices(list(event_dist.keys()), weights=list(event_dist.values()), k=N_contaminate) + + # generate synthetic trajectories + new_rows = [] + for start_event in start_events: + # sample trajectory from agent model, using a state that contains the event type + mdp_states = model.mdp.get('states', []) if model.mdp else [] + matching_starts = [s for s in mdp_states if start_event in s] + if not matching_starts: + continue # skip if no matching start state + start_state = random.choice(matching_starts) + trajectory = model.sample_traj(start_state, max_len=20) + for state in trajectory: + parts = state.split('|') # page|productId|eventName format + new_rows.append({on: parts[-1] if parts else start_event, 'source': 'synthetic_agent'}) + + if new_rows: + contaminate_df = pd.DataFrame(new_rows) + df = pd.concat([df, contaminate_df], ignore_index=True) return df diff --git a/sim/rl/train.py b/sim/rl/train.py index ba257de..01e6809 100644 --- a/sim/rl/train.py +++ b/sim/rl/train.py @@ -3,15 +3,17 @@ import logging from pathlib import Path from typing import Dict, Type, Optional import pickle -from torch import neg_ from torch.utils.tensorboard import SummaryWriter -from environment import PHANTOMEnv, FastTrainingConstraints, BusinessLogicConstraints -from engine import (BasePricingEngine, WildPricingEngine, StaticPricingEngine, - SimpleDemandEngine, RandomWalkEngine, ThompsonSamplingEngine) +from environment import PHANTOMEnv, BusinessLogicConstraints logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) +try: + from engine import (BasePricingEngine, WildPricingEngine, StaticPricingEngine, + SimpleDemandEngine, RandomWalkEngine, ThompsonSamplingEngine) +except ImportError: + BasePricingEngine = None # engines not required for basic usage """ @@ -26,8 +28,7 @@ CURRENT SOLUTION BELOW does not implement correct learning or updates. class EngineTrainer: """wrapper to run pricing engines through episodes and collect metrics""" - def __init__(self, engine: BasePricingEngine, env: PHANTOMEnv, - tb_writer: Optional[SummaryWriter] = None): + def __init__(self, engine, env: PHANTOMEnv, tb_writer: Optional[SummaryWriter] = None): self.engine = engine self.env = env self.episode_metrics = [] @@ -35,7 +36,6 @@ class EngineTrainer: self.global_step = 0 def train(self, n_episodes: int, seed: int = 42): - obs, _ = self.env.reset(seed=seed) prices = None for ep in range(n_episodes): @@ -44,12 +44,21 @@ class EngineTrainer: self.engine.update(obs, reward, done, info) return self - - - - - - return self.episode_metrics + def run_episode(self, seed: int = 42) -> Dict: + """run single evaluation episode and return metrics""" + obs, _ = self.env.reset(seed=seed) + self.engine.reset() + total_reward, prices = 0.0, None + ep_metrics = {'total_reward': 0.0} + done = False + while not done: + prices = self.engine.compute_prices(prices, obs) if prices is not None else obs["elasticity"]["price"] + obs, reward, done, _, info = self.env.step(prices) + total_reward += reward + for k, v in info.items(): + ep_metrics[k] = v + ep_metrics['total_reward'] = total_reward + return ep_metrics def evaluate(self, n_episodes: int = 10, seed: int = 100) -> Dict: """evaluate trained engine""" @@ -57,17 +66,16 @@ class EngineTrainer: 'agent_loss', 'ux_volatility', 'look_to_book']} for ep in range(n_episodes): metrics = self.run_episode(seed=seed + ep) - for k in results: results[k].append(metrics[k]) + for k in results: + results[k].append(metrics.get(k, 0.0)) return {k: (np.mean(v), np.std(v)) for k, v in results.items()} -def make_env(fast: bool = True): - constraints = FastTrainingConstraints() if fast else BusinessLogicConstraints() - return PHANTOMEnv(constraints=constraints) +def make_env(): + return PHANTOMEnv(constraints=BusinessLogicConstraints()) -def train_engine(engine_cls: Type[BasePricingEngine], env: PHANTOMEnv, - n_episodes: int, seed: int = 42, +def train_engine(engine_cls, env: PHANTOMEnv, n_episodes: int, seed: int = 42, tb_writer: Optional[SummaryWriter] = None) -> EngineTrainer: constraints = env.constraints engine = engine_cls(constraints=constraints, seed=seed) @@ -80,15 +88,11 @@ def save_trainer(trainer: EngineTrainer, path: Path): """save engine state and metrics""" path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'wb') as f: - pickle.dump({ - 'engine': trainer.engine, - 'metrics': trainer.episode_metrics - }, f) + pickle.dump({'engine': trainer.engine, 'metrics': trainer.episode_metrics}, f) logger.info(f"Saved trainer to {path}") -def load_trainer(path: Path, env: PHANTOMEnv, - tb_writer: Optional[SummaryWriter] = None) -> EngineTrainer: +def load_trainer(path: Path, env: PHANTOMEnv, tb_writer: Optional[SummaryWriter] = None) -> EngineTrainer: """load saved engine""" with open(path, 'rb') as f: data = pickle.load(f) @@ -98,45 +102,44 @@ def load_trainer(path: Path, env: PHANTOMEnv, if __name__ == "__main__": + if BasePricingEngine is None: + logger.error("Engines not available, cannot run training") + exit(1) + base_dir = Path("./runs") base_dir.mkdir(exist_ok=True) engines = { "Wild": WildPricingEngine, "Static": StaticPricingEngine, -# "SimpleDemand": SimpleDemandEngine, "RandomWalk": RandomWalkEngine, "ThompsonSampling": ThompsonSamplingEngine, } - defenses = [False, True] n_train_episodes = 50 n_eval_episodes = 10 seed = 42 - fast_mode = True - logger.info(f"Training config: {n_train_episodes} episodes per engine, fast_mode={fast_mode}") + logger.info(f"Training config: {n_train_episodes} episodes per engine") trained_trainers = {} for engine_name, engine_cls in engines.items(): - for use_defense in defenses: - defense_label = "defense_on" if use_defense else "defense_off" - run_name = f"{engine_name}_{defense_label}" - log_dir = base_dir / run_name - log_dir.mkdir(parents=True, exist_ok=True) + run_name = engine_name + log_dir = base_dir / run_name + log_dir.mkdir(parents=True, exist_ok=True) - logger.info(f"Training {engine_name} with defense={use_defense}") - logger.info(f"Log directory: {log_dir}") + logger.info(f"Training {engine_name}") + logger.info(f"Log directory: {log_dir}") - env = make_env(fast=fast_mode) - tb_writer = SummaryWriter(log_dir=str(log_dir)) - trainer = train_engine(engine_cls, env, n_train_episodes, seed, tb_writer=tb_writer) - tb_writer.close() + env = make_env() + tb_writer = SummaryWriter(log_dir=str(log_dir)) + trainer = train_engine(engine_cls, env, n_train_episodes, seed, tb_writer=tb_writer) + tb_writer.close() - save_path = log_dir / "trainer.pkl" - save_trainer(trainer, save_path) + save_path = log_dir / "trainer.pkl" + save_trainer(trainer, save_path) - trained_trainers[run_name] = (trainer, env) + trained_trainers[run_name] = (trainer, env) logger.info("Starting evaluation")