feature: cleaning up pipeline

This commit is contained in:
2025-12-05 12:43:36 +01:00
parent a351af1dbe
commit 951b08d65e
8 changed files with 257 additions and 122 deletions

View File

@@ -1,13 +1,12 @@
from procesing.steps.base import BaseContextStep
from procesing.steps.fetch import FetchInteractionsStep, FetchPriceLogsStep, FetchExperimentsStep
from procesing.steps.join import JoinExperimentsStep
from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep
from procesing.steps.join import JoinExperimentsStep, JoinProductFeaturesStep
from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep, AugmentInteractionsStep
from procesing.steps.chunk import ChunkByTimeWindowStep
from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep
from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep
from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep
from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session
# StateSpace, BuildStateSpaceStep,
__all__ = [
'BaseContextStep',
@@ -15,8 +14,10 @@ __all__ = [
'FetchPriceLogsStep',
'FetchExperimentsStep',
'JoinExperimentsStep',
'JoinProductFeaturesStep',
'CreatePriceBucketsStep',
'AugmentEventNamesStep',
'AugmentInteractionsStep',
'ChunkByTimeWindowStep',
'ComputeDemandStep',
'ComputeDemandForChunksStep',

View File

@@ -2,6 +2,93 @@ import numpy as np
import pandas as pd
from procesing.steps.base import BaseContextStep
class AugmentInteractionsStep(BaseContextStep):
"""
Consolidated step: create price buckets, augment event names, join experiments.
Input: (interactions_df, price_logs_df)
Output: enriched interactions_df
"""
def transform(self, data: tuple):
interactions_df, price_logs_df = data
if interactions_df.empty:
return interactions_df
# Step 1: Create price buckets
interactions_df = self._create_price_buckets(interactions_df)
# Step 2: Augment event names
interactions_df = self._augment_event_names(interactions_df)
# Step 3: Join experiments (optional)
if 'experimentId' in interactions_df.columns:
interactions_df = self._join_experiments(interactions_df)
return interactions_df
def _create_price_buckets(self, df: pd.DataFrame):
"""Create price bucket labels from price data"""
if 'metadata_price' not in df.columns:
df['price_bucket'] = ""
return df
n_buckets = self.context.config.get('n_price_buckets', 5)
if df['metadata_price'].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df['metadata_price'],
q=n_buckets,
labels=[f"PB_{i+1}" for i in range(n_buckets)],
duplicates='drop'
)
except ValueError:
# fallback for insufficient unique values
price_buckets = df['metadata_price'].apply(
lambda x: f"P_{int(x)}" if pd.notnull(x) else ""
)
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
df['price_bucket'] = price_buckets
return df
def _augment_event_names(self, df: pd.DataFrame):
"""Augment event names with product and price bucket schema"""
# Create schema: _productId@price_bucket
has_product = df.get('productId', pd.Series()).notnull()
has_bucket = df.get('price_bucket', pd.Series()).notnull()
df['metadata_schema'] = np.where(
has_product & has_bucket,
"_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str),
""
)
df['eventName'] = df['eventName'] + df['metadata_schema']
return df
def _join_experiments(self, df: pd.DataFrame):
"""Join experiment metadata if experimentId present"""
exp_ids = df['experimentId'].dropna().unique().tolist()
if not exp_ids:
return df
experiments_df = self.context.provider.fetch_experiments(exp_ids)
if experiments_df.empty:
return df
return df.merge(
experiments_df,
left_on='experimentId',
right_on='id',
how='left',
suffixes=('', '_exp')
)
class CreatePriceBucketsStep(BaseContextStep):
"""Create price bucket labels from price data"""

View File

@@ -16,7 +16,7 @@ class AggregatePriceLogsStep(BaseContextStep):
df = price_logs_df.copy()
ts_col = self.context.config.get('ts_col', 'ts')
window_size = self.context.window_size
#window_size = self.context.window_size WE ARE NOT USING CHUNKS ANYMORE
# ensure datetime
if not pd.api.types.is_datetime64_any_dtype(df[ts_col]):
@@ -24,52 +24,23 @@ class AggregatePriceLogsStep(BaseContextStep):
df = df.sort_values([ts_col, 'productId'])
products = self.context.products
unique_products = products['id'].unique()
# VECTORIZED: group by product, resample by time window, compute mean
df_indexed = df.set_index(ts_col)
windowed = (
df_indexed
.groupby('productId')['price']
.resample(window_size)
.mean()
.reset_index()
# get base price from metadata if available 1) read the metadata col as json and get the base_price
products['base_price'] = products.apply(
lambda row: row['metadata'].get('base_price', 0) if isinstance(row['metadata'], dict) else 0,
axis=1
)
# forward fill missing windows (carry last known price)
windowed = windowed.sort_values([ts_col, 'productId'])
windowed['price'] = windowed.groupby('productId')['price'].ffill()
windowed = windowed.dropna(subset=['price'])
unique_products = products['id'].unique()
# group into chunks by window
chunks = []
for window_start, group in windowed.groupby(ts_col):
price_vector = group[['productId', 'price']].copy()
df_indexed = df.set_index(ts_col)
# we return a df of average price per product over the entire period
# TODO: maybe consider different opration to handle price aggregation over time
avg_prices = df_indexed.groupby('productId')['price'].mean().reindex(unique_products, fill_value=0).reset_index()
avg_prices.columns = ['productId', 'price']
# fill 0s with base_price from products
base_price_map = products.set_index('id')['base_price'].to_dict()
return avg_prices
# fill missing products with last known price before this window
missing_products = set(unique_products) - set(price_vector['productId'])
if missing_products:
for pid in missing_products:
last_price = df_indexed[
(df_indexed['productId'] == pid) &
(df_indexed.index < window_start)
]['price']
if not last_price.empty:
price_vector = pd.concat([
price_vector,
pd.DataFrame({'productId': [pid], 'price': [last_price.iloc[-1]]})
], ignore_index=True)
if not price_vector.empty:
chunks.append({
'window_start': window_start,
'window_end': window_start + pd.Timedelta(window_size),
'price_vector': price_vector
})
return chunks
class ComputeElasticityStep(BaseContextStep):
@@ -89,9 +60,9 @@ class ComputeElasticityStep(BaseContextStep):
all_product_ids = products['id'].unique()
# align chunks by window_start
aligned = self._align_chunks(demand_chunks, price_chunks)
# aligned = self._align_chunks(demand_chunks, price_chunks)
if not aligned:
if None:
return pd.DataFrame({
'productId': all_product_ids,
'elasticity': 0.0,

View File

@@ -2,7 +2,11 @@ import pandas as pd
from procesing.steps.base import BaseContextStep
class FetchInteractionsStep(BaseContextStep):
"""Fetch raw interaction data from Kafka topic"""
"""Fetch raw interaction data from Kafka topic with optional time filtering"""
def __init__(self, context, lookback: str = None):
super().__init__(context)
self.lookback = lookback
def transform(self, X=None):
df = self.context.provider.fetch_kafka_topic('user-interactions')
@@ -24,14 +28,35 @@ class FetchInteractionsStep(BaseContextStep):
if 'metadata_dateIndex' in df.columns:
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
# Apply time filtering if lookback specified
if self.lookback and 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'])
cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback)
df = df[df['ts'] >= cutoff]
return df
class FetchPriceLogsStep(BaseContextStep):
"""Fetch price log data from Kafka topic"""
"""Fetch price log data from Kafka topic with optional time filtering"""
def __init__(self, context, lookback: str = None):
super().__init__(context)
self.lookback = lookback
def transform(self, X=None):
return self.context.provider.fetch_kafka_topic('price-logs')
df = self.context.provider.fetch_kafka_topic('price-logs')
if df.empty:
return df
# Apply time filtering if lookback specified
if self.lookback and 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'])
cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback)
df = df[df['ts'] >= cutoff]
return df
class FetchExperimentsStep(BaseContextStep):

View File

@@ -32,3 +32,19 @@ class JoinExperimentsStep(BaseContextStep):
})
return interactions_df.merge(experiments_df, on='experimentId', how='left')
class JoinProductFeaturesStep(BaseContextStep):
"""Join product features to interactions"""
def transform(self, data: tuple):
"""
Args:
data: (interactions_df, products_df)
Returns:
merged interactions dataframe
"""
demand_df, price_df = data
if price_df.empty:
return demand_df
return demand_df.merge(price_df, on='productId', how='left')