mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
* feat: training pipeline + tensorboard * tesnorboard forgot * chore: ml basic boilerplate * feat: naive architecture as start * eval setup * chore: parquet exporting of data * chore: updating requirements necesary * feat: separating modules and adding training logs paths * Update experiments/ml/train.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix: new path for runs * fix: undoing ai slop code * chore: modules and reqs --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
175 lines
6.3 KiB
Python
175 lines
6.3 KiB
Python
from sklearn.pipeline import Pipeline
|
|
import pandas as pd
|
|
from procesing.context import PipelineContext
|
|
from procesing.providers import SupabaseProvider, BackendAPIProvider
|
|
import os
|
|
from procesing.steps import (
|
|
FetchInteractionsStep,
|
|
FetchPriceLogsStep,
|
|
FetchExperimentsStep,
|
|
JoinExperimentsStep,
|
|
CreatePriceBucketsStep,
|
|
AugmentEventNamesStep,
|
|
ChunkByTimeWindowStep,
|
|
ComputeDemandForChunksStep,
|
|
AggregatePriceLogsStep,
|
|
FitPricingFunctionStep,
|
|
PredictPricesStep,
|
|
ComputeDemandStep,
|
|
JoinProductFeaturesStep,
|
|
ExtractSessionFeaturesStep,
|
|
JoinLabelsStep,
|
|
ValidateDataStep,
|
|
)
|
|
from procesing.pricers import SimpleSurgePricer
|
|
|
|
def interaction_extraction_pipeline(context: PipelineContext):
|
|
"""Pipeline for extracting and augmenting interaction data"""
|
|
return Pipeline([
|
|
('fetch', FetchInteractionsStep(context)),
|
|
('create_buckets', CreatePriceBucketsStep(context)),
|
|
('augment_events', AugmentEventNamesStep(context)),
|
|
])
|
|
|
|
|
|
def price_extraction_pipeline(context: PipelineContext):
|
|
"""Pipeline for extracting price logs"""
|
|
return Pipeline([
|
|
('fetch', FetchPriceLogsStep(context)),
|
|
])
|
|
|
|
|
|
def product_features_pipeline(context: PipelineContext,
|
|
interactions_df: pd.DataFrame,
|
|
price_logs_df: pd.DataFrame):
|
|
demand_step = ComputeDemandStep(context)
|
|
price_step = AggregatePriceLogsStep(context)
|
|
join_step = JoinProductFeaturesStep(context)
|
|
|
|
|
|
demand_data = demand_step.transform(interactions_df)
|
|
price_data= price_step.transform(price_logs_df)
|
|
joined_data = join_step.transform((demand_data, price_data))
|
|
|
|
return joined_data
|
|
|
|
|
|
|
|
def pricing_pipeline(context: "PipelineContext",
|
|
data: pd.DataFrame,
|
|
high_threshold: int = 10,
|
|
low_threshold: int = 2,
|
|
surge_multiplier: float = 1.2,
|
|
discount_multiplier: float = 0.9) -> pd.DataFrame:
|
|
|
|
if data.empty or 'productId' not in data.columns:
|
|
return pd.DataFrame()
|
|
|
|
surge_pricer = SimpleSurgePricer()
|
|
surge_pricer.fit(data)
|
|
data['optimal_price'] = surge_pricer.predict()
|
|
return data
|
|
|
|
|
|
def full_pipeline(context: PipelineContext,
|
|
high_threshold: int = 10,
|
|
low_threshold: int = 2,
|
|
surge_multiplier: float = 1.2,
|
|
discount_multiplier: float = 0.9):
|
|
"""
|
|
Complete end-to-end pipeline: data extraction -> demand/price aggregation -> surge pricing
|
|
|
|
Args:
|
|
context: Pipeline context
|
|
high_threshold: Demand threshold for surge pricing
|
|
low_threshold: Demand threshold for discounts
|
|
surge_multiplier: Price multiplier for high demand
|
|
discount_multiplier: Price multiplier for low demand
|
|
|
|
Returns:
|
|
tuple: (product_features_df, optimal_prices_df)
|
|
- product_features_df: [productId, demand_score, price]
|
|
- optimal_prices_df: [productId, current_price, optimal_price, demand_score]
|
|
"""
|
|
interaction_pipe = interaction_extraction_pipeline(context)
|
|
price_pipe = price_extraction_pipeline(context)
|
|
|
|
interactions_df = interaction_pipe.fit_transform(None)
|
|
price_logs_df = price_pipe.fit_transform(None)
|
|
product_features_df = product_features_pipeline(context, interactions_df, price_logs_df)
|
|
print(product_features_df.to_string())
|
|
|
|
# generate optimal prices using surge rules
|
|
optimal_prices_df = pricing_pipeline(context, product_features_df,
|
|
high_threshold=high_threshold,
|
|
low_threshold=low_threshold,
|
|
surge_multiplier=surge_multiplier,
|
|
discount_multiplier=discount_multiplier)
|
|
|
|
return product_features_df, optimal_prices_df
|
|
|
|
|
|
def ml_training_pipeline(context: PipelineContext) -> pd.DataFrame:
|
|
"""
|
|
Build labeled session-level feature matrix for ML model training.
|
|
Pipeline: fetch -> validate -> extract features -> join labels
|
|
|
|
Returns:
|
|
DataFrame with ~25 features per session + is_agent label
|
|
Columns: sessionId, experimentId, temporal/behavioral/product/ua features, is_agent
|
|
"""
|
|
# fetch raw interactions
|
|
interactions_df = FetchInteractionsStep(context).transform(None)
|
|
|
|
# validate data quality (report cached in context)
|
|
interactions_df = ValidateDataStep(context).transform(interactions_df)
|
|
if interactions_df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# extract vectorized session features
|
|
features_df = ExtractSessionFeaturesStep(context).transform(interactions_df)
|
|
if features_df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# join experiment labels (is_agent = ~xp_human_only)
|
|
labeled_df = JoinLabelsStep(context).transform(features_df)
|
|
|
|
return labeled_df
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
class ExperimentsProvider(SupabaseProvider, BackendAPIProvider):
|
|
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
|
|
base_path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" # os.path.join(os.path.dirname(__file__), "collected_data")
|
|
if not os.path.isdir(base_path):
|
|
return pd.DataFrame()
|
|
|
|
files = {"user-interactions": "int.json", "price-logs": "price.json"}
|
|
file_to_read = files.get(topic, files["user-interactions"])
|
|
frames = []
|
|
|
|
for d in os.listdir(base_path):
|
|
full_path = os.path.join(base_path, d, file_to_read)
|
|
if not os.path.isfile(full_path):
|
|
continue
|
|
try:
|
|
data = pd.read_json(full_path)
|
|
payloads = pd.DataFrame([r['payload'] for r in data['value'].to_list()])
|
|
frames.append(payloads)
|
|
except Exception as e:
|
|
print(f"Warning: Could not process {full_path}: {e}")
|
|
|
|
return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
|
|
|
|
# demo: run ML training pipeline
|
|
context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel')
|
|
features = ml_training_pipeline(context)
|
|
print(f"Feature matrix: {features.shape}")
|
|
print(features.head())
|
|
print(features.info())
|
|
|
|
features.to_parquet("features.parquet")
|