diff --git a/experiments/procesing/__init__.py b/experiments/procesing/__init__.py index c5f4051..dc5c594 100644 --- a/experiments/procesing/__init__.py +++ b/experiments/procesing/__init__.py @@ -21,7 +21,6 @@ from procesing.steps import ( from procesing.pipelines import ( interaction_extraction_pipeline, price_extraction_pipeline, - elasticity_computation_pipeline, pricing_pipeline, full_pipeline, ) diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index 5c0d704..1d8dc2d 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -2,7 +2,6 @@ from sklearn.pipeline import Pipeline import pandas as pd from procesing.context import PipelineContext from procesing.providers import SupabaseProvider, BackendAPIProvider -from typing import Union from procesing.steps import ( FetchInteractionsStep, FetchPriceLogsStep, @@ -17,6 +16,8 @@ from procesing.steps import ( # BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep, + ComputeDemandStep, + JoinProductFeaturesStep ) def interaction_extraction_pipeline(context: PipelineContext): @@ -35,80 +36,127 @@ def price_extraction_pipeline(context: PipelineContext): ]) -def elasticity_computation_pipeline(context: PipelineContext, +def product_features_pipeline(context: PipelineContext, interactions_df: pd.DataFrame, price_logs_df: pd.DataFrame): - """ - Compute elasticity from interactions and price logs. - Manual orchestration needed for branching logic. - """ - # branch 1: chunk interactions and compute demand - chunk_step = ChunkByTimeWindowStep(context) - interaction_chunks = chunk_step.transform(interactions_df) - - demand_step = ComputeDemandForChunksStep(context) - demand_chunks = demand_step.transform(interaction_chunks) - - # branch 2: aggregate price logs + # elasticity_step = ComputeElasticityStep(context) + demand_step = ComputeDemandStep(context) price_step = AggregatePriceLogsStep(context) - price_chunks = price_step.transform(price_logs_df) - - # convergence: compute elasticity - elasticity_step = ComputeElasticityStep(context) - elasticity_df = elasticity_step.transform((demand_chunks, price_chunks)) - - return elasticity_df + join_step = JoinProductFeaturesStep(context) -def pricing_pipeline(context: PipelineContext, elasticity_df: pd.DataFrame): + demand_data = demand_step.transform(interactions_df) + price_data= price_step.transform(price_logs_df) + joined_data = join_step.transform((demand_data, price_data)) + + return joined_data + + + +def pricing_pipeline(context: "PipelineContext", + data: pd.DataFrame, + high_threshold: int = 10, + low_threshold: int = 2, + surge_multiplier: float = 1.2, + discount_multiplier: float = 0.9) -> pd.DataFrame: """ - Generate optimal prices from elasticity estimates. + Generate product-level optimal prices using simple surge pricing rules. + Replaces complex Bayesian curve fitting with threshold-based adjustments. + + Args: + context: Pipeline context + data: DataFrame with [productId, demand_score, price] + high_threshold: Demand threshold for surge pricing (default 10) + low_threshold: Demand threshold for discounts (default 2) + surge_multiplier: Price multiplier for high demand (default 1.2 = +20%) + discount_multiplier: Price multiplier for low demand (default 0.9 = -10%) + + Returns: + DataFrame with [productId, current_price, optimal_price, demand_score] """ - # build state space - state_step = BuildStateSpaceStep(context) - state_space = state_step.transform(elasticity_df) - # fit pricing function - fit_step = FitPricingFunctionStep(context) - pricer = fit_step.transform(elasticity_df) + if data.empty or 'productId' not in data.columns: + return pd.DataFrame() - # predict prices - predict_step = PredictPricesStep(context) - prices_df = predict_step.transform((pricer, state_space)) + products = context.products + results = [] - return prices_df + for pid in data['productId'].unique(): + prod_data = data[data['productId'] == pid] + + if prod_data.empty: + continue + + demand = prod_data["demand_score"].mean() + current_price = prod_data["price"].mean() + + # get base price from metadata or use current price + prod_meta = products[products['id'] == pid] + if not prod_meta.empty: + meta = prod_meta.iloc[0]['metadata'] + base_price = meta.get('base_price', current_price) if isinstance(meta, dict) else current_price + else: + base_price = current_price + + # apply surge rules + if demand >= high_threshold: + optimal_price = base_price * surge_multiplier + elif demand <= low_threshold: + optimal_price = base_price * discount_multiplier + else: + optimal_price = base_price + + results.append({ + 'productId': pid, + 'current_price': current_price, + 'base_price': base_price, + 'optimal_price': optimal_price, + 'demand_score': demand + }) + + return pd.DataFrame(results) -def full_pipeline(context: PipelineContext): + + + +def full_pipeline(context: PipelineContext, + high_threshold: int = 10, + low_threshold: int = 2, + surge_multiplier: float = 1.2, + discount_multiplier: float = 0.9): """ - Complete end-to-end pipeline: data extraction -> elasticity -> pricing - Returns: (elasticity_df, prices_df) + Complete end-to-end pipeline: data extraction -> demand/price aggregation -> surge pricing + + Args: + context: Pipeline context + high_threshold: Demand threshold for surge pricing + low_threshold: Demand threshold for discounts + surge_multiplier: Price multiplier for high demand + discount_multiplier: Price multiplier for low demand + + Returns: + tuple: (product_features_df, optimal_prices_df) + - product_features_df: [productId, demand_score, price] + - optimal_prices_df: [productId, current_price, optimal_price, demand_score] """ - # extract interactions interaction_pipe = interaction_extraction_pipeline(context) - interactions_df = interaction_pipe.fit_transform(None) - - # extract price logs price_pipe = price_extraction_pipeline(context) + + interactions_df = interaction_pipe.fit_transform(None) price_logs_df = price_pipe.fit_transform(None) + product_features_df = product_features_pipeline(context, interactions_df, price_logs_df) + print(product_features_df.to_string()) - if interactions_df.empty or price_logs_df.empty: - return None, None + # generate optimal prices using surge rules + optimal_prices_df = pricing_pipeline(context, product_features_df, + high_threshold=high_threshold, + low_threshold=low_threshold, + surge_multiplier=surge_multiplier, + discount_multiplier=discount_multiplier) - # compute elasticity - elasticity_df = elasticity_computation_pipeline( - context, - interactions_df, - price_logs_df - ) + return product_features_df, optimal_prices_df - if elasticity_df is None or elasticity_df.empty: - return elasticity_df, None - - # generate prices - prices_df = pricing_pipeline(context, elasticity_df) - - return elasticity_df, prices_df @@ -140,20 +188,7 @@ if __name__ == '__main__': context = PipelineContext( provider=HistoricalProvider(), store_mode='hotel', - # 15 min not month - window_size='15min', ) - elasticity_df, prices_df = full_pipeline(context) - - if elasticity_df is not None and not elasticity_df.empty: - print("Elasticity Estimates:") - print(elasticity_df.to_string(index=False)) - else: - print("No elasticity estimates computed.") - - if prices_df is not None and not prices_df.empty: - print("\nPredicted Prices:") - print(prices_df.to_string(index=False)) - else: - print("No prices predicted.") + product_features, prices = full_pipeline(context) + print(prices.to_string()) diff --git a/experiments/procesing/steps/__init__.py b/experiments/procesing/steps/__init__.py index d982269..4c31628 100755 --- a/experiments/procesing/steps/__init__.py +++ b/experiments/procesing/steps/__init__.py @@ -1,13 +1,12 @@ from procesing.steps.base import BaseContextStep from procesing.steps.fetch import FetchInteractionsStep, FetchPriceLogsStep, FetchExperimentsStep -from procesing.steps.join import JoinExperimentsStep -from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep +from procesing.steps.join import JoinExperimentsStep, JoinProductFeaturesStep +from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep, AugmentInteractionsStep from procesing.steps.chunk import ChunkByTimeWindowStep from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session -# StateSpace, BuildStateSpaceStep, __all__ = [ 'BaseContextStep', @@ -15,8 +14,10 @@ __all__ = [ 'FetchPriceLogsStep', 'FetchExperimentsStep', 'JoinExperimentsStep', + 'JoinProductFeaturesStep', 'CreatePriceBucketsStep', 'AugmentEventNamesStep', + 'AugmentInteractionsStep', 'ChunkByTimeWindowStep', 'ComputeDemandStep', 'ComputeDemandForChunksStep', diff --git a/experiments/procesing/steps/augment.py b/experiments/procesing/steps/augment.py index a8b6506..a04e20b 100755 --- a/experiments/procesing/steps/augment.py +++ b/experiments/procesing/steps/augment.py @@ -2,6 +2,93 @@ import numpy as np import pandas as pd from procesing.steps.base import BaseContextStep + +class AugmentInteractionsStep(BaseContextStep): + """ + Consolidated step: create price buckets, augment event names, join experiments. + Input: (interactions_df, price_logs_df) + Output: enriched interactions_df + """ + + def transform(self, data: tuple): + interactions_df, price_logs_df = data + + if interactions_df.empty: + return interactions_df + + # Step 1: Create price buckets + interactions_df = self._create_price_buckets(interactions_df) + + # Step 2: Augment event names + interactions_df = self._augment_event_names(interactions_df) + + # Step 3: Join experiments (optional) + if 'experimentId' in interactions_df.columns: + interactions_df = self._join_experiments(interactions_df) + + return interactions_df + + def _create_price_buckets(self, df: pd.DataFrame): + """Create price bucket labels from price data""" + if 'metadata_price' not in df.columns: + df['price_bucket'] = "" + return df + + n_buckets = self.context.config.get('n_price_buckets', 5) + + if df['metadata_price'].notnull().sum() > 0: + try: + price_buckets = pd.qcut( + df['metadata_price'], + q=n_buckets, + labels=[f"PB_{i+1}" for i in range(n_buckets)], + duplicates='drop' + ) + except ValueError: + # fallback for insufficient unique values + price_buckets = df['metadata_price'].apply( + lambda x: f"P_{int(x)}" if pd.notnull(x) else "" + ) + else: + price_buckets = pd.Series([""] * len(df), index=df.index) + + df['price_bucket'] = price_buckets + return df + + def _augment_event_names(self, df: pd.DataFrame): + """Augment event names with product and price bucket schema""" + # Create schema: _productId@price_bucket + has_product = df.get('productId', pd.Series()).notnull() + has_bucket = df.get('price_bucket', pd.Series()).notnull() + + df['metadata_schema'] = np.where( + has_product & has_bucket, + "_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str), + "" + ) + + df['eventName'] = df['eventName'] + df['metadata_schema'] + return df + + def _join_experiments(self, df: pd.DataFrame): + """Join experiment metadata if experimentId present""" + exp_ids = df['experimentId'].dropna().unique().tolist() + if not exp_ids: + return df + + experiments_df = self.context.provider.fetch_experiments(exp_ids) + if experiments_df.empty: + return df + + return df.merge( + experiments_df, + left_on='experimentId', + right_on='id', + how='left', + suffixes=('', '_exp') + ) + + class CreatePriceBucketsStep(BaseContextStep): """Create price bucket labels from price data""" diff --git a/experiments/procesing/steps/elasticity.py b/experiments/procesing/steps/elasticity.py index d65bc43..28fc0ef 100755 --- a/experiments/procesing/steps/elasticity.py +++ b/experiments/procesing/steps/elasticity.py @@ -16,7 +16,7 @@ class AggregatePriceLogsStep(BaseContextStep): df = price_logs_df.copy() ts_col = self.context.config.get('ts_col', 'ts') - window_size = self.context.window_size + #window_size = self.context.window_size WE ARE NOT USING CHUNKS ANYMORE # ensure datetime if not pd.api.types.is_datetime64_any_dtype(df[ts_col]): @@ -24,52 +24,23 @@ class AggregatePriceLogsStep(BaseContextStep): df = df.sort_values([ts_col, 'productId']) products = self.context.products - unique_products = products['id'].unique() - - # VECTORIZED: group by product, resample by time window, compute mean - df_indexed = df.set_index(ts_col) - - windowed = ( - df_indexed - .groupby('productId')['price'] - .resample(window_size) - .mean() - .reset_index() + # get base price from metadata if available 1) read the metadata col as json and get the base_price + products['base_price'] = products.apply( + lambda row: row['metadata'].get('base_price', 0) if isinstance(row['metadata'], dict) else 0, + axis=1 ) - # forward fill missing windows (carry last known price) - windowed = windowed.sort_values([ts_col, 'productId']) - windowed['price'] = windowed.groupby('productId')['price'].ffill() - windowed = windowed.dropna(subset=['price']) + unique_products = products['id'].unique() - # group into chunks by window - chunks = [] - for window_start, group in windowed.groupby(ts_col): - price_vector = group[['productId', 'price']].copy() + df_indexed = df.set_index(ts_col) + # we return a df of average price per product over the entire period + # TODO: maybe consider different opration to handle price aggregation over time + avg_prices = df_indexed.groupby('productId')['price'].mean().reindex(unique_products, fill_value=0).reset_index() + avg_prices.columns = ['productId', 'price'] + # fill 0s with base_price from products + base_price_map = products.set_index('id')['base_price'].to_dict() + return avg_prices - # fill missing products with last known price before this window - missing_products = set(unique_products) - set(price_vector['productId']) - if missing_products: - for pid in missing_products: - last_price = df_indexed[ - (df_indexed['productId'] == pid) & - (df_indexed.index < window_start) - ]['price'] - - if not last_price.empty: - price_vector = pd.concat([ - price_vector, - pd.DataFrame({'productId': [pid], 'price': [last_price.iloc[-1]]}) - ], ignore_index=True) - - if not price_vector.empty: - chunks.append({ - 'window_start': window_start, - 'window_end': window_start + pd.Timedelta(window_size), - 'price_vector': price_vector - }) - - return chunks class ComputeElasticityStep(BaseContextStep): @@ -89,9 +60,9 @@ class ComputeElasticityStep(BaseContextStep): all_product_ids = products['id'].unique() # align chunks by window_start - aligned = self._align_chunks(demand_chunks, price_chunks) + # aligned = self._align_chunks(demand_chunks, price_chunks) - if not aligned: + if None: return pd.DataFrame({ 'productId': all_product_ids, 'elasticity': 0.0, diff --git a/experiments/procesing/steps/fetch.py b/experiments/procesing/steps/fetch.py index 8d9c311..87c306e 100755 --- a/experiments/procesing/steps/fetch.py +++ b/experiments/procesing/steps/fetch.py @@ -2,7 +2,11 @@ import pandas as pd from procesing.steps.base import BaseContextStep class FetchInteractionsStep(BaseContextStep): - """Fetch raw interaction data from Kafka topic""" + """Fetch raw interaction data from Kafka topic with optional time filtering""" + + def __init__(self, context, lookback: str = None): + super().__init__(context) + self.lookback = lookback def transform(self, X=None): df = self.context.provider.fetch_kafka_topic('user-interactions') @@ -24,14 +28,35 @@ class FetchInteractionsStep(BaseContextStep): if 'metadata_dateIndex' in df.columns: df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') + # Apply time filtering if lookback specified + if self.lookback and 'ts' in df.columns: + df['ts'] = pd.to_datetime(df['ts']) + cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback) + df = df[df['ts'] >= cutoff] + return df class FetchPriceLogsStep(BaseContextStep): - """Fetch price log data from Kafka topic""" + """Fetch price log data from Kafka topic with optional time filtering""" + + def __init__(self, context, lookback: str = None): + super().__init__(context) + self.lookback = lookback def transform(self, X=None): - return self.context.provider.fetch_kafka_topic('price-logs') + df = self.context.provider.fetch_kafka_topic('price-logs') + + if df.empty: + return df + + # Apply time filtering if lookback specified + if self.lookback and 'ts' in df.columns: + df['ts'] = pd.to_datetime(df['ts']) + cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback) + df = df[df['ts'] >= cutoff] + + return df class FetchExperimentsStep(BaseContextStep): diff --git a/experiments/procesing/steps/join.py b/experiments/procesing/steps/join.py index 5567f0f..1d1c446 100755 --- a/experiments/procesing/steps/join.py +++ b/experiments/procesing/steps/join.py @@ -32,3 +32,19 @@ class JoinExperimentsStep(BaseContextStep): }) return interactions_df.merge(experiments_df, on='experimentId', how='left') + +class JoinProductFeaturesStep(BaseContextStep): + """Join product features to interactions""" + + def transform(self, data: tuple): + """ + Args: + data: (interactions_df, products_df) + Returns: + merged interactions dataframe + """ + demand_df, price_df = data + + if price_df.empty: + return demand_df + return demand_df.merge(price_df, on='productId', how='left') diff --git a/requirements.txt b/requirements.txt index 22d3fcb..93fd0eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ pytest-asyncio uv scikit-learn supabase +pymc3