From 504adbf8694e38ecd5648a39192ad81d2b6cf1c4 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 12 Dec 2025 12:59:11 +0100 Subject: [PATCH] fix: undoing ai slop code --- experiments/ml/arch.py | 154 +++++++++++++++++++---------------------- 1 file changed, 72 insertions(+), 82 deletions(-) diff --git a/experiments/ml/arch.py b/experiments/ml/arch.py index 35f9878..4f36e18 100644 --- a/experiments/ml/arch.py +++ b/experiments/ml/arch.py @@ -1,7 +1,8 @@ # sklearn compatible models for agent detection from sklearn.base import BaseEstimator, ClassifierMixin from procesing.context import PipelineContext -from typing import Any +from typing import Any, Optional, Tuple +from abc import ABC, abstractmethod import xgboost as xgb import lightgbm as lgb import numpy as np @@ -10,11 +11,13 @@ import pandas as pd TASK = 'classification' LABELS = ['human', 'agent'] -class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin): - """XGBoost binary classifier for agent detection with class imbalance handling""" - def __init__(self, context: PipelineContext = None, n_estimators=200, max_depth=6, - learning_rate=0.05, early_stopping_rounds=20): +class BaseAgentClassifier(BaseEstimator, ClassifierMixin, ABC): + """Base class for tree-based agent detection classifiers with common logic""" + + def __init__(self, context: Optional[PipelineContext] = None, n_estimators: int = 200, + max_depth: int = 6, learning_rate: float = 0.05, + early_stopping_rounds: int = 20): self.context = context self.n_estimators = n_estimators self.max_depth = max_depth @@ -23,19 +26,65 @@ class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin): self.model_ = None self.feature_names_ = None + def _to_array(self, X): + """Convert pandas structures to numpy arrays""" + return X.values if isinstance(X, (pd.DataFrame, pd.Series)) else X + + def _compute_pos_weight(self, y_arr): + """Calculate scale_pos_weight for class imbalance handling""" + n_neg, n_pos = (y_arr == 0).sum(), (y_arr == 1).sum() + return n_neg / n_pos if n_pos > 0 else 1.0 + + def _prepare_eval_set(self, eval_set): + """Convert eval_set to numpy arrays if needed""" + if not eval_set: + return None + X_val, y_val = eval_set[0] + return [(self._to_array(X_val), self._to_array(y_val))] + + @abstractmethod + def _build_model(self, scale_pos: float): + """Build the underlying model instance (must be implemented by subclasses)""" + pass + + @abstractmethod + def _fit_with_eval(self, X_arr, y_arr, eval_arr): + """Fit model with evaluation set (must be implemented by subclasses)""" + pass + def fit(self, X, y, eval_set=None): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - y_arr = y.values if isinstance(y, pd.Series) else y + X_arr, y_arr = self._to_array(X), self._to_array(y) if isinstance(X, pd.DataFrame): self.feature_names_ = X.columns.tolist() - # class imbalance handling via scale_pos_weight - n_neg = (y_arr == 0).sum() - n_pos = (y_arr == 1).sum() - scale_pos = n_neg / n_pos if n_pos > 0 else 1.0 + scale_pos = self._compute_pos_weight(y_arr) + self.model_ = self._build_model(scale_pos) - self.model_ = xgb.XGBClassifier( + eval_arr = self._prepare_eval_set(eval_set) + if eval_arr: + self._fit_with_eval(X_arr, y_arr, eval_arr) + else: + self.model_.fit(X_arr, y_arr) + + return self + + def predict(self, X): + return self.model_.predict(self._to_array(X)) + + def predict_proba(self, X): + return self.model_.predict_proba(self._to_array(X)) + + @property + def feature_importances_(self): + return self.model_.feature_importances_ if self.model_ else None + + +class XGBoostAgentClassifier(BaseAgentClassifier): + """XGBoost binary classifier for agent detection with class imbalance handling""" + + def _build_model(self, scale_pos: float): + return xgb.XGBClassifier( n_estimators=self.n_estimators, max_depth=self.max_depth, learning_rate=self.learning_rate, @@ -47,55 +96,15 @@ class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin): enable_categorical=False ) - if eval_set: - X_val, y_val = eval_set[0] - X_val_arr = X_val.values if isinstance(X_val, pd.DataFrame) else X_val - y_val_arr = y_val.values if isinstance(y_val, pd.Series) else y_val - self.model_.fit(X_arr, y_arr, eval_set=[(X_val_arr, y_val_arr)], verbose=False) - else: - self.model_.fit(X_arr, y_arr) - - return self - - def predict(self, X): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - return self.model_.predict(X_arr) - - def predict_proba(self, X): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - return self.model_.predict_proba(X_arr) - - @property - def feature_importances_(self): - return self.model_.feature_importances_ if self.model_ else None + def _fit_with_eval(self, X_arr, y_arr, eval_arr): + self.model_.fit(X_arr, y_arr, eval_set=eval_arr, verbose=False) -class LightGBMAgentClassifier(BaseEstimator, ClassifierMixin): +class LightGBMAgentClassifier(BaseAgentClassifier): """LightGBM binary classifier for agent detection with class imbalance handling""" - def __init__(self, context: PipelineContext = None, n_estimators=200, max_depth=6, - learning_rate=0.05, early_stopping_rounds=20): - self.context = context - self.n_estimators = n_estimators - self.max_depth = max_depth - self.learning_rate = learning_rate - self.early_stopping_rounds = early_stopping_rounds - self.model_ = None - self.feature_names_ = None - - def fit(self, X, y, eval_set=None): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - y_arr = y.values if isinstance(y, pd.Series) else y - - if isinstance(X, pd.DataFrame): - self.feature_names_ = X.columns.tolist() - - # class imbalance handling via scale_pos_weight - n_neg = (y_arr == 0).sum() - n_pos = (y_arr == 1).sum() - scale_pos = n_neg / n_pos if n_pos > 0 else 1.0 - - self.model_ = lgb.LGBMClassifier( + def _build_model(self, scale_pos: float): + return lgb.LGBMClassifier( n_estimators=self.n_estimators, max_depth=self.max_depth, learning_rate=self.learning_rate, @@ -105,28 +114,9 @@ class LightGBMAgentClassifier(BaseEstimator, ClassifierMixin): verbosity=-1 ) - if eval_set: - X_val, y_val = eval_set[0] - X_val_arr = X_val.values if isinstance(X_val, pd.DataFrame) else X_val - y_val_arr = y_val.values if isinstance(y_val, pd.Series) else y_val - self.model_.fit( - X_arr, y_arr, - eval_set=[(X_val_arr, y_val_arr)], - callbacks=[lgb.early_stopping(self.early_stopping_rounds, verbose=False)] - ) - else: - self.model_.fit(X_arr, y_arr) - - return self - - def predict(self, X): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - return self.model_.predict(X_arr) - - def predict_proba(self, X): - X_arr = X.values if isinstance(X, pd.DataFrame) else X - return self.model_.predict_proba(X_arr) - - @property - def feature_importances_(self): - return self.model_.feature_importances_ if self.model_ else None + def _fit_with_eval(self, X_arr, y_arr, eval_arr): + self.model_.fit( + X_arr, y_arr, + eval_set=eval_arr, + callbacks=[lgb.early_stopping(self.early_stopping_rounds, verbose=False)] + )