Files
PHANTOM/experiments/procesing/steps/augment.py
Daniel Alves Rösel ad9423bf59 Airflow addition (#28)
* introducing airflow to run pipeline

* chore: updating dag with upload to registry

* introducing complete provider (non refactored and noisy)

* chore: removing old shit

* generic pricing baselines

* feature: super simple model registry (to be updated maybe third party OS software)

* chore: refactoring the providers docker config and requirements

* chore: refactored and broke down components (braking

* exporting all

* local pipeline excution working

* fix: fixing import structures from nonrelativistic

* chore: enables cross comm pickling with fully e2e pipeline compilation

* docs: what the pipeline is like now

* pipelines local running and pipeline high level definition

* cleaning old pipeline and vectorization

* leaked but fixing, not so important

* test: started with pipeline step testing

* chore: cleaning up provider of prices

* test: extra tests wit hsemantic meaning checks

* migrating pricers

* feature: introducing pricing predictors (pricers)

* chore: e2e is done with new pipeline

* extra session feature extraction

* feature: experiemntal sessin pricer and metrics(vibe)

* chore: redefined and connected pricers (#29)
2025-11-29 17:50:16 +01:00

54 lines
1.7 KiB
Python
Executable File

import numpy as np
import pandas as pd
from procesing.steps.base import BaseContextStep
class CreatePriceBucketsStep(BaseContextStep):
"""Create price bucket labels from price data"""
def transform(self, df: pd.DataFrame):
if df.empty or 'metadata_price' not in df.columns:
df['price_bucket'] = ""
return df
n_buckets = self.context.config.get('n_price_buckets', 5)
if df['metadata_price'].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df['metadata_price'],
q=n_buckets,
labels=[f"PB_{i+1}" for i in range(n_buckets)],
duplicates='drop'
)
except ValueError:
# fallback for insufficient unique values
price_buckets = df['metadata_price'].apply(
lambda x: f"P_{int(x)}" if pd.notnull(x) else ""
)
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
df['price_bucket'] = price_buckets
return df
class AugmentEventNamesStep(BaseContextStep):
"""Augment event names with product and price bucket schema"""
def transform(self, df: pd.DataFrame):
if df.empty:
return df
# Create schema: _productId@price_bucket
has_product = df.get('productId', pd.Series()).notnull()
has_bucket = df.get('price_bucket', pd.Series()).notnull()
df['metadata_schema'] = np.where(
has_product & has_bucket,
"_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str),
""
)
df['eventName'] = df['eventName'] + df['metadata_schema']
return df