fix: undoing ai slop code

This commit is contained in:
2025-12-12 12:59:11 +01:00
parent 0846ebd8c2
commit 504adbf869

View File

@@ -1,7 +1,8 @@
# sklearn compatible models for agent detection
from sklearn.base import BaseEstimator, ClassifierMixin
from procesing.context import PipelineContext
from typing import Any
from typing import Any, Optional, Tuple
from abc import ABC, abstractmethod
import xgboost as xgb
import lightgbm as lgb
import numpy as np
@@ -10,11 +11,13 @@ import pandas as pd
TASK = 'classification'
LABELS = ['human', 'agent']
class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin):
"""XGBoost binary classifier for agent detection with class imbalance handling"""
def __init__(self, context: PipelineContext = None, n_estimators=200, max_depth=6,
learning_rate=0.05, early_stopping_rounds=20):
class BaseAgentClassifier(BaseEstimator, ClassifierMixin, ABC):
"""Base class for tree-based agent detection classifiers with common logic"""
def __init__(self, context: Optional[PipelineContext] = None, n_estimators: int = 200,
max_depth: int = 6, learning_rate: float = 0.05,
early_stopping_rounds: int = 20):
self.context = context
self.n_estimators = n_estimators
self.max_depth = max_depth
@@ -23,19 +26,65 @@ class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin):
self.model_ = None
self.feature_names_ = None
def _to_array(self, X):
"""Convert pandas structures to numpy arrays"""
return X.values if isinstance(X, (pd.DataFrame, pd.Series)) else X
def _compute_pos_weight(self, y_arr):
"""Calculate scale_pos_weight for class imbalance handling"""
n_neg, n_pos = (y_arr == 0).sum(), (y_arr == 1).sum()
return n_neg / n_pos if n_pos > 0 else 1.0
def _prepare_eval_set(self, eval_set):
"""Convert eval_set to numpy arrays if needed"""
if not eval_set:
return None
X_val, y_val = eval_set[0]
return [(self._to_array(X_val), self._to_array(y_val))]
@abstractmethod
def _build_model(self, scale_pos: float):
"""Build the underlying model instance (must be implemented by subclasses)"""
pass
@abstractmethod
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
"""Fit model with evaluation set (must be implemented by subclasses)"""
pass
def fit(self, X, y, eval_set=None):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
y_arr = y.values if isinstance(y, pd.Series) else y
X_arr, y_arr = self._to_array(X), self._to_array(y)
if isinstance(X, pd.DataFrame):
self.feature_names_ = X.columns.tolist()
# class imbalance handling via scale_pos_weight
n_neg = (y_arr == 0).sum()
n_pos = (y_arr == 1).sum()
scale_pos = n_neg / n_pos if n_pos > 0 else 1.0
scale_pos = self._compute_pos_weight(y_arr)
self.model_ = self._build_model(scale_pos)
self.model_ = xgb.XGBClassifier(
eval_arr = self._prepare_eval_set(eval_set)
if eval_arr:
self._fit_with_eval(X_arr, y_arr, eval_arr)
else:
self.model_.fit(X_arr, y_arr)
return self
def predict(self, X):
return self.model_.predict(self._to_array(X))
def predict_proba(self, X):
return self.model_.predict_proba(self._to_array(X))
@property
def feature_importances_(self):
return self.model_.feature_importances_ if self.model_ else None
class XGBoostAgentClassifier(BaseAgentClassifier):
"""XGBoost binary classifier for agent detection with class imbalance handling"""
def _build_model(self, scale_pos: float):
return xgb.XGBClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=self.learning_rate,
@@ -47,55 +96,15 @@ class XGBoostAgentClassifier(BaseEstimator, ClassifierMixin):
enable_categorical=False
)
if eval_set:
X_val, y_val = eval_set[0]
X_val_arr = X_val.values if isinstance(X_val, pd.DataFrame) else X_val
y_val_arr = y_val.values if isinstance(y_val, pd.Series) else y_val
self.model_.fit(X_arr, y_arr, eval_set=[(X_val_arr, y_val_arr)], verbose=False)
else:
self.model_.fit(X_arr, y_arr)
return self
def predict(self, X):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
return self.model_.predict(X_arr)
def predict_proba(self, X):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
return self.model_.predict_proba(X_arr)
@property
def feature_importances_(self):
return self.model_.feature_importances_ if self.model_ else None
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
self.model_.fit(X_arr, y_arr, eval_set=eval_arr, verbose=False)
class LightGBMAgentClassifier(BaseEstimator, ClassifierMixin):
class LightGBMAgentClassifier(BaseAgentClassifier):
"""LightGBM binary classifier for agent detection with class imbalance handling"""
def __init__(self, context: PipelineContext = None, n_estimators=200, max_depth=6,
learning_rate=0.05, early_stopping_rounds=20):
self.context = context
self.n_estimators = n_estimators
self.max_depth = max_depth
self.learning_rate = learning_rate
self.early_stopping_rounds = early_stopping_rounds
self.model_ = None
self.feature_names_ = None
def fit(self, X, y, eval_set=None):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
y_arr = y.values if isinstance(y, pd.Series) else y
if isinstance(X, pd.DataFrame):
self.feature_names_ = X.columns.tolist()
# class imbalance handling via scale_pos_weight
n_neg = (y_arr == 0).sum()
n_pos = (y_arr == 1).sum()
scale_pos = n_neg / n_pos if n_pos > 0 else 1.0
self.model_ = lgb.LGBMClassifier(
def _build_model(self, scale_pos: float):
return lgb.LGBMClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=self.learning_rate,
@@ -105,28 +114,9 @@ class LightGBMAgentClassifier(BaseEstimator, ClassifierMixin):
verbosity=-1
)
if eval_set:
X_val, y_val = eval_set[0]
X_val_arr = X_val.values if isinstance(X_val, pd.DataFrame) else X_val
y_val_arr = y_val.values if isinstance(y_val, pd.Series) else y_val
def _fit_with_eval(self, X_arr, y_arr, eval_arr):
self.model_.fit(
X_arr, y_arr,
eval_set=[(X_val_arr, y_val_arr)],
eval_set=eval_arr,
callbacks=[lgb.early_stopping(self.early_stopping_rounds, verbose=False)]
)
else:
self.model_.fit(X_arr, y_arr)
return self
def predict(self, X):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
return self.model_.predict(X_arr)
def predict_proba(self, X):
X_arr = X.values if isinstance(X, pd.DataFrame) else X
return self.model_.predict_proba(X_arr)
@property
def feature_importances_(self):
return self.model_.feature_importances_ if self.model_ else None