feat: weak train scaffold

This commit is contained in:
2026-01-21 11:27:03 +01:00
parent b2f0746c01
commit 04907df393
2 changed files with 39 additions and 108 deletions

View File

@@ -12,111 +12,12 @@ TASK = 'classification'
LABELS = ['human', 'agent'] LABELS = ['human', 'agent']
class BaseAgentClassifier(BaseEstimator, ClassifierMixin, ABC): class WeakClassifier(BaseEstimator, ClassifierMixin, ABC):
"""Base class for tree-based agent detection classifiers with common logic""" # a simple contrastive machine learning model
# this model should learn to distinguish between human and agent behavior
def __init__(self, context: Optional[PipelineContext] = None, n_estimators: int = 200, # using a weakly supervised approach and contrastive learning + augmentation
max_depth: int = 6, learning_rate: float = 0.05, #
early_stopping_rounds: int = 20): def __init__(self, **kwargs):
self.context = context super().__init__()
self.n_estimators = n_estimators self.model = None
self.max_depth = max_depth self.kwargs = kwargs
self.learning_rate = learning_rate
self.early_stopping_rounds = early_stopping_rounds
self.model_ = None
self.feature_names_ = None
def _to_array(self, X):
"""Convert pandas structures to numpy arrays"""
return X.values if isinstance(X, (pd.DataFrame, pd.Series)) else X
def _compute_pos_weight(self, y_arr):
"""Calculate scale_pos_weight for class imbalance handling"""
n_neg, n_pos = (y_arr == 0).sum(), (y_arr == 1).sum()
return n_neg / n_pos if n_pos > 0 else 1.0
def _prepare_eval_set(self, eval_set):
"""Convert eval_set to numpy arrays if needed"""
if not eval_set:
return None
X_val, y_val = eval_set[0]
return [(self._to_array(X_val), self._to_array(y_val))]
@abstractmethod
def _build_model(self, scale_pos: float):
"""Build the underlying model instance (must be implemented by subclasses)"""
pass
@abstractmethod
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
"""Fit model with evaluation set (must be implemented by subclasses)"""
pass
def fit(self, X, y, eval_set=None):
X_arr, y_arr = self._to_array(X), self._to_array(y)
if isinstance(X, pd.DataFrame):
self.feature_names_ = X.columns.tolist()
scale_pos = self._compute_pos_weight(y_arr)
self.model_ = self._build_model(scale_pos)
eval_arr = self._prepare_eval_set(eval_set)
if eval_arr:
self._fit_with_eval(X_arr, y_arr, eval_arr)
else:
self.model_.fit(X_arr, y_arr)
return self
def predict(self, X):
return self.model_.predict(self._to_array(X))
def predict_proba(self, X):
return self.model_.predict_proba(self._to_array(X))
@property
def feature_importances_(self):
return self.model_.feature_importances_ if self.model_ else None
class XGBoostAgentClassifier(BaseAgentClassifier):
"""XGBoost binary classifier for agent detection with class imbalance handling"""
def _build_model(self, scale_pos: float):
return xgb.XGBClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=self.learning_rate,
scale_pos_weight=scale_pos,
eval_metric='auc',
early_stopping_rounds=self.early_stopping_rounds,
random_state=42,
tree_method='hist',
enable_categorical=False
)
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
self.model_.fit(X_arr, y_arr, eval_set=eval_arr, verbose=False)
class LightGBMAgentClassifier(BaseAgentClassifier):
"""LightGBM binary classifier for agent detection with class imbalance handling"""
def _build_model(self, scale_pos: float):
return lgb.LGBMClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=self.learning_rate,
scale_pos_weight=scale_pos,
metric='auc',
random_state=42,
verbosity=-1
)
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
self.model_.fit(
X_arr, y_arr,
eval_set=eval_arr,
callbacks=[lgb.early_stopping(self.early_stopping_rounds, verbose=False)]
)

View File

@@ -0,0 +1,30 @@
from sim.rl.behavior_loader.loader import AgentLoader, Loader, JointLoader
from sim.rl.behavior_loader.loader import PayloadModel
from arch import WeakClassifier
agent_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/"
human_dir = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/"
def augment_trajectory(trajectory : list[PayloadModel], augmentation_rate: float = 0.1) -> list[PayloadModel]:
# augmentations possible:
# return a sub-trajectory window of the original trajectory
# insert random noise events
# shuffle a few events (find a few indices and swap them with i+1 neighbor)
# adjust metadata
return trajectory
def train():
pass
if __name__ == "__main__":
joint_loader = JointLoader(human_dir, agent_dir)
data = joint_loader.get_data()
entries, num_entries = joint_loader.get_entries()
print(f"Loaded {num_entries} entries")
# TODO: augment
# fit model
model = WeakClassifier()
model.fit(data)