diff --git a/backend/provider/app.py b/backend/provider/app.py index 654aca3..fb72a9d 100644 --- a/backend/provider/app.py +++ b/backend/provider/app.py @@ -79,7 +79,7 @@ def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Opti elasticity=None ) - optimal_price = float(product_price_row['predicted_price'].iloc[0]) + optimal_price = float(product_price_row['optimal_price'].iloc[0]) # TODO: use optimal_price everywhere as aresult # get elasticity if available product_elasticity = None diff --git a/backend/provider/requirements.txt b/backend/provider/requirements.txt index 4169911..2b3c9ac 100644 --- a/backend/provider/requirements.txt +++ b/backend/provider/requirements.txt @@ -12,4 +12,5 @@ graphviz python-dotenv>=1.0.0 requests>=2.31.0 typing-extensions>=4.8.0 -pickle5>=0.0.11; python_version < '3.8' +pypickle +pymc diff --git a/backend/server/app.py b/backend/server/app.py index 482eee1..d338408 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -290,6 +290,7 @@ async def get_products( query = supabase.table(table).select('*') # filter by exact date_index if provided + # dateIndex from frontend is days from today, convert to days since epoch if dateIndex is not None: query = query.eq('date_index', dateIndex) diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py deleted file mode 100644 index 4d6830d..0000000 --- a/experiments/airflow/dags/elasticity_pricing_dag.py +++ /dev/null @@ -1,348 +0,0 @@ -from airflow import DAG -from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago -from datetime import timedelta -import pandas as pd -import logging -import sys -import pickle -import io - -# add parent dir to path so procesing package can be imported -sys.path.insert(0, '/opt/airflow') - -from procesing.context import PipelineContext -from procesing.providers import SupabaseProvider, BackendAPIProvider -from procesing.steps import ( - FetchInteractionsStep, - FetchPriceLogsStep, - CreatePriceBucketsStep, - AugmentEventNamesStep, - ChunkByTimeWindowStep, - ComputeDemandForChunksStep, - AggregatePriceLogsStep, - ComputeElasticityStep, - BuildStateSpaceStep, - FitPricingFunctionStep, - PredictPricesStep, -) - -default_args = { - 'owner': 'phantom-research', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 2, - 'retry_delay': timedelta(minutes=5), -} - -def get_provider(): - """Factory to create composite provider""" - class CompositeProvider(SupabaseProvider, BackendAPIProvider): - def __init__(self): - SupabaseProvider.__init__(self) - BackendAPIProvider.__init__(self) - return CompositeProvider() - -def get_context(**kwargs): - """Build pipeline context from Airflow config""" - dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} - return PipelineContext( - provider=get_provider(), - store_mode=dag_conf.get('store_mode', 'hotel'), - window_size=dag_conf.get('window_size', '30s'), - n_price_buckets=dag_conf.get('n_price_buckets', 5), - elasticity_method=dag_conf.get('elasticity_method', 'point'), - min_observations=dag_conf.get('min_observations', 2), - ) - -# atomic task functions (each wraps one sklearn step) -def fetch_interactions(**kwargs): - """Task: Fetch interaction data from Kafka""" - context = get_context(**kwargs) - step = FetchInteractionsStep(context) - df = step.transform(None) - - kwargs['ti'].xcom_push(key='interactions_raw', value=pickle.dumps(df)) - logging.info(f"Fetched {len(df)} interaction records") - return len(df) - -def fetch_price_logs(**kwargs): - """Task: Fetch price logs from Kafka""" - context = get_context(**kwargs) - step = FetchPriceLogsStep(context) - df = step.transform(None) - - kwargs['ti'].xcom_push(key='price_logs_raw', value=pickle.dumps(df)) - logging.info(f"Fetched {len(df)} price records") - return len(df) - -def create_price_buckets(**kwargs): - """Task: Create price buckets for interactions""" - ti = kwargs['ti'] - df = pickle.loads(ti.xcom_pull(key='interactions_raw')) - - context = get_context(**kwargs) - step = CreatePriceBucketsStep(context) - df = step.transform(df) - - ti.xcom_push(key='interactions_bucketed', value=pickle.dumps(df)) - logging.info(f"Created price buckets for {len(df)} interactions") - return len(df) - -def augment_event_names(**kwargs): - """Task: Augment event names with product and price schema""" - ti = kwargs['ti'] - df = pickle.loads(ti.xcom_pull(key='interactions_bucketed')) - - context = get_context(**kwargs) - step = AugmentEventNamesStep(context) - df = step.transform(df) - - ti.xcom_push(key='interactions_final', value=pickle.dumps(df)) - logging.info(f"Augmented event names for {len(df)} interactions") - return len(df) - -def chunk_interactions(**kwargs): - """Task: Chunk interactions into time windows""" - ti = kwargs['ti'] - df = pickle.loads(ti.xcom_pull(key='interactions_final')) - - context = get_context(**kwargs) - step = ChunkByTimeWindowStep(context) - chunks = step.transform(df) - - ti.xcom_push(key='interaction_chunks', value=pickle.dumps(chunks)) - logging.info(f"Generated {len(chunks)} interaction chunks") - return len(chunks) - -def compute_demand(**kwargs): - """Task: Compute demand vectors for all chunks""" - ti = kwargs['ti'] - chunks = pickle.loads(ti.xcom_pull(key='interaction_chunks')) - - context = get_context(**kwargs) - step = ComputeDemandForChunksStep(context) - demand_chunks = step.transform(chunks) - - ti.xcom_push(key='demand_chunks', value=pickle.dumps(demand_chunks)) - logging.info(f"Computed demand for {len(demand_chunks)} chunks") - return len(demand_chunks) - -def aggregate_price_logs(**kwargs): - """Task: Aggregate price logs into time windows """ - ti = kwargs['ti'] - df = pickle.loads(ti.xcom_pull(key='price_logs_raw')) - - context = get_context(**kwargs) - step = AggregatePriceLogsStep(context) - price_chunks = step.transform(df) - - ti.xcom_push(key='price_chunks', value=pickle.dumps(price_chunks)) - logging.info(f"Aggregated {len(price_chunks)} price chunks") - return len(price_chunks) - -def compute_elasticity(**kwargs): - """Task: Compute price elasticity from demand and price chunks""" - ti = kwargs['ti'] - demand_chunks = pickle.loads(ti.xcom_pull(key='demand_chunks')) - price_chunks = pickle.loads(ti.xcom_pull(key='price_chunks')) - - context = get_context(**kwargs) - step = ComputeElasticityStep(context) - elasticity_df = step.transform((demand_chunks, price_chunks)) - - ti.xcom_push(key='elasticity_results', value=pickle.dumps(elasticity_df)) - logging.info(f"Computed elasticity for {len(elasticity_df)} products") - - return { - 'n_products': len(elasticity_df), - 'mean_elasticity': float(elasticity_df['elasticity'].mean()), - 'median_elasticity': float(elasticity_df['elasticity'].median()) - } - -def build_state_space(**kwargs): - """Task: Build state space from elasticity""" - ti = kwargs['ti'] - elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) - - context = get_context(**kwargs) - step = BuildStateSpaceStep(context) - state_space = step.transform(elasticity_df) - - ti.xcom_push(key='state_space', value=pickle.dumps(state_space)) - logging.info("Built state space for pricing") - return True - -def fit_pricing_function(**kwargs): - """Task: Fit pricing function using elasticity""" - ti = kwargs['ti'] - elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) - - context = get_context(**kwargs) - step = FitPricingFunctionStep(context) - pricer = step.transform(elasticity_df) - - ti.xcom_push(key='pricer', value=pickle.dumps(pricer)) - logging.info("Fitted pricing function") - return True - -def predict_prices(**kwargs): - """Task: Predict optimal prices""" - ti = kwargs['ti'] - pricer = pickle.loads(ti.xcom_pull(key='pricer')) - state_space = pickle.loads(ti.xcom_pull(key='state_space')) - - context = get_context(**kwargs) - step = PredictPricesStep(context) - prices_df = step.transform((pricer, state_space)) - - ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df)) - logging.info(f"Predicted prices for {len(prices_df)} products") - return len(prices_df) - -def publish_results(**kwargs): - """Task: Publish elasticity, pricing model, and predicted prices to registry""" - ti = kwargs['ti'] - elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) - prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices')) - - sys.path.insert(0, '/opt/airflow') - from lib.model_registry import ModelRegistry - - registry = ModelRegistry() - dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} - - metadata = { - 'timestamp': pd.Timestamp.now().isoformat(), - 'window_size': dag_conf.get('window_size', '30s'), - 'store_mode': dag_conf.get('store_mode', 'hotel'), - 'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual' - } - - registry.publish_elasticity(elasticity_df, model_name='latest', metadata=metadata) - - pricer = pickle.loads(ti.xcom_pull(key='pricer')) - registry.publish_pricing_model( - pricer, - model_name='latest', - metadata={**metadata, 'model_type': type(pricer).__name__} - ) - - registry.publish_prices(prices_df, model_name='latest', metadata=metadata) - - logging.info(f"Published elasticity + pricing + prices for {len(elasticity_df)} products") - - return { - 'n_products': len(elasticity_df), - 'n_prices': len(prices_df), - 'registry_status': 'success', - 'elasticity_mean': float(elasticity_df['elasticity'].mean()) - } - - -# DAG definition -with DAG( - 'elasticity_pricing_pipeline', - default_args=default_args, - description='E2E refactored pipeline: atomic steps with proper separation', - schedule_interval='*/15 * * * *', - start_date=days_ago(1), - catchup=False, - max_active_runs=1, - tags=['pricing', 'elasticity', 'research', 'refactored'], -) as dag: - - # parallel data fetching - t_fetch_interactions = PythonOperator( - task_id='fetch_interactions', - python_callable=fetch_interactions, - provide_context=True, - ) - - t_fetch_price_logs = PythonOperator( - task_id='fetch_price_logs', - python_callable=fetch_price_logs, - provide_context=True, - ) - - # interaction processing branch - t_create_buckets = PythonOperator( - task_id='create_price_buckets', - python_callable=create_price_buckets, - provide_context=True, - ) - - t_augment_events = PythonOperator( - task_id='augment_event_names', - python_callable=augment_event_names, - provide_context=True, - ) - - t_chunk_interactions = PythonOperator( - task_id='chunk_interactions', - python_callable=chunk_interactions, - provide_context=True, - ) - - t_compute_demand = PythonOperator( - task_id='compute_demand', - python_callable=compute_demand, - provide_context=True, - ) - - # price processing branch (VECTORIZED) - t_aggregate_prices = PythonOperator( - task_id='aggregate_price_logs', - python_callable=aggregate_price_logs, - provide_context=True, - ) - - # convergence: compute elasticity - t_compute_elasticity = PythonOperator( - task_id='compute_elasticity', - python_callable=compute_elasticity, - provide_context=True, - ) - - # pricing tasks - t_build_state = PythonOperator( - task_id='build_state_space', - python_callable=build_state_space, - provide_context=True, - ) - - t_fit_pricer = PythonOperator( - task_id='fit_pricing_function', - python_callable=fit_pricing_function, - provide_context=True, - ) - - t_predict_prices = PythonOperator( - task_id='predict_prices', - python_callable=predict_prices, - provide_context=True, - ) - - # publish to registry - t_publish = PythonOperator( - task_id='publish_results', - python_callable=publish_results, - provide_context=True, - ) - - # dependency graph (clear atomic flow) - # parallel fetches - [t_fetch_interactions, t_fetch_price_logs] - - # interaction branch: fetch -> bucket -> augment -> chunk -> demand - t_fetch_interactions >> t_create_buckets >> t_augment_events >> t_chunk_interactions >> t_compute_demand - - # price branch: fetch -> aggregate (vectorized) - t_fetch_price_logs >> t_aggregate_prices - - # convergence: both branches -> elasticity - [t_compute_demand, t_aggregate_prices] >> t_compute_elasticity - - # pricing: elasticity -> state + fit -> predict -> publish - t_compute_elasticity >> [t_build_state, t_fit_pricer] >> t_predict_prices >> t_publish diff --git a/experiments/airflow/dags/surge_pricing_pipeline.py b/experiments/airflow/dags/surge_pricing_pipeline.py new file mode 100644 index 0000000..b1d7c61 --- /dev/null +++ b/experiments/airflow/dags/surge_pricing_pipeline.py @@ -0,0 +1,237 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from datetime import timedelta +import pandas as pd +import logging +import sys +import pickle +import io + +# add parent dir to path so procesing package can be imported +sys.path.insert(0, '/opt/airflow') + +from procesing.context import PipelineContext +from procesing.providers import SupabaseProvider, BackendAPIProvider +from procesing.steps import ( + FetchInteractionsStep, + FetchPriceLogsStep, + ComputeDemandStep, + AggregatePriceLogsStep, + JoinProductFeaturesStep, +) +from procesing.pricers.simple import SimpleSurgePricer + +default_args = { + 'owner': 'phantom-research', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=5), +} + +def get_provider(): + """Factory to create composite provider""" + class CompositeProvider(SupabaseProvider, BackendAPIProvider): # TODO: Fix this into one global provider singelton instead of multiple inheritance declarations acoss the codebase + def __init__(self): + SupabaseProvider.__init__(self) + BackendAPIProvider.__init__(self) + return CompositeProvider() + +def get_context(**kwargs): + """Build pipeline context from Airflow config""" + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + return PipelineContext( + provider=get_provider(), + store_mode=dag_conf.get('store_mode', 'hotel'), + ) + +# atomic task functions (each wraps one sklearn step) +def fetch_interactions(**kwargs): + """Task: Fetch interaction data from Kafka""" + context = get_context(**kwargs) + step = FetchInteractionsStep(context) + df = step.transform(None) + + kwargs['ti'].xcom_push(key='interactions_raw', value=pickle.dumps(df)) + logging.info(f"Fetched {len(df)} interaction records") + return len(df) + +def fetch_price_logs(**kwargs): + """Task: Fetch price logs from Kafka""" + context = get_context(**kwargs) + step = FetchPriceLogsStep(context) + df = step.transform(None) + + kwargs['ti'].xcom_push(key='price_logs_raw', value=pickle.dumps(df)) + logging.info(f"Fetched {len(df)} price records") + return len(df) + +def compute_demand(**kwargs): + """Task: Compute demand scores from interactions""" + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='interactions_raw')) + + context = get_context(**kwargs) + step = ComputeDemandStep(context) + demand_df = step.transform(df) + # TODO: clear the xcom + + + ti.xcom_push(key='demand_data', value=pickle.dumps(demand_df)) + logging.info(f"Computed demand for {len(demand_df)} products") + return len(demand_df) + +def aggregate_price_logs(**kwargs): + """Task: Aggregate price logs""" + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='price_logs_raw')) + + context = get_context(**kwargs) + step = AggregatePriceLogsStep(context) + price_df = step.transform(df) + + ti.xcom_push(key='price_data', value=pickle.dumps(price_df)) + logging.info(f"Aggregated price logs for {len(price_df)} products") + return len(price_df) + +def join_product_features(**kwargs): + """Task: Join demand and price data""" + ti = kwargs['ti'] + demand_df = pickle.loads(ti.xcom_pull(key='demand_data')) + price_df = pickle.loads(ti.xcom_pull(key='price_data')) + + context = get_context(**kwargs) + step = JoinProductFeaturesStep(context) + joined_df = step.transform((demand_df, price_df)) + + ti.xcom_push(key='product_features', value=pickle.dumps(joined_df)) + logging.info(f"Joined features for {len(joined_df)} products") + return len(joined_df) + +def apply_surge_pricing(**kwargs): + """Task: Apply surge pricing rules to generate optimal prices""" + ti = kwargs['ti'] + product_features = pickle.loads(ti.xcom_pull(key='product_features')) + + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + + # rename demand_score to demand for pricer compatibility + data = product_features.rename(columns={'demand_score': 'demand'}) + + surge_pricer = SimpleSurgePricer( + high_threshold=dag_conf.get('high_threshold', 10), + low_threshold=dag_conf.get('low_threshold', 2), + surge_multiplier=dag_conf.get('surge_multiplier', 1.2), + discount_multiplier=dag_conf.get('discount_multiplier', 0.9) + ) + surge_pricer.fit(data) + data['optimal_price'] = surge_pricer.predict() + + prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={ + 'price': 'current_price', + 'demand': 'demand_score' + }) + + ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df)) + logging.info(f"Applied surge pricing for {len(prices_df)} products") + return len(prices_df) + +def publish_results(**kwargs): + """Task: Publish surge pricing results to registry""" + ti = kwargs['ti'] + prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices')) + + sys.path.insert(0, '/opt/airflow') + from lib.model_registry import ModelRegistry + + registry = ModelRegistry() + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + + metadata = { + 'timestamp': pd.Timestamp.now().isoformat(), + 'store_mode': dag_conf.get('store_mode', 'hotel'), + 'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual', + 'pricing_method': 'surge', + 'high_threshold': dag_conf.get('high_threshold', 10), + 'low_threshold': dag_conf.get('low_threshold', 2), + 'surge_multiplier': dag_conf.get('surge_multiplier', 1.2), + 'discount_multiplier': dag_conf.get('discount_multiplier', 0.9) + } + + registry.publish_prices(prices_df, model_name='latest', metadata=metadata) + + logging.info(f"Published surge pricing for {len(prices_df)} products") + + return { + 'n_products': len(prices_df), + 'registry_status': 'success', + 'mean_demand': float(prices_df['demand_score'].mean()) if 'demand_score' in prices_df.columns else None + } + + +# DAG definition +with DAG( + 'surge_pricing_pipeline', + default_args=default_args, + description='Simple surge pricing pipeline: demand aggregation + rule-based pricing', + schedule_interval='*/15 * * * *', + start_date=days_ago(1), + catchup=False, + max_active_runs=1, + tags=['pricing', 'surge', 'research', 'simplified'], +) as dag: + + # parallel data fetching + t_fetch_interactions = PythonOperator( + task_id='fetch_interactions', + python_callable=fetch_interactions, + provide_context=True, + ) + + t_fetch_price_logs = PythonOperator( + task_id='fetch_price_logs', + python_callable=fetch_price_logs, + provide_context=True, + ) + + # compute demand from interactions + t_compute_demand = PythonOperator( + task_id='compute_demand', + python_callable=compute_demand, + provide_context=True, + ) + + # aggregate price logs + t_aggregate_prices = PythonOperator( + task_id='aggregate_price_logs', + python_callable=aggregate_price_logs, + provide_context=True, + ) + + # join demand and prices + t_join_features = PythonOperator( + task_id='join_product_features', + python_callable=join_product_features, + provide_context=True, + ) + + # apply surge pricing + t_surge_pricing = PythonOperator( + task_id='apply_surge_pricing', + python_callable=apply_surge_pricing, + provide_context=True, + ) + + # publish to registry + t_publish = PythonOperator( + task_id='publish_results', + python_callable=publish_results, + provide_context=True, + ) + + # dependency graph: parallel fetch -> process -> join -> surge -> publish + t_fetch_interactions >> t_compute_demand + t_fetch_price_logs >> t_aggregate_prices + [t_compute_demand, t_aggregate_prices] >> t_join_features >> t_surge_pricing >> t_publish diff --git a/experiments/procesing/__init__.py b/experiments/procesing/__init__.py index c5f4051..adcf340 100644 --- a/experiments/procesing/__init__.py +++ b/experiments/procesing/__init__.py @@ -12,7 +12,6 @@ from procesing.steps import ( ComputeDemandStep, ComputeDemandForChunksStep, AggregatePriceLogsStep, - ComputeElasticityStep, # StateSpace, # BuildStateSpaceStep, FitPricingFunctionStep, @@ -21,7 +20,6 @@ from procesing.steps import ( from procesing.pipelines import ( interaction_extraction_pipeline, price_extraction_pipeline, - elasticity_computation_pipeline, pricing_pipeline, full_pipeline, ) @@ -42,14 +40,12 @@ __all__ = [ 'ComputeDemandStep', 'ComputeDemandForChunksStep', 'AggregatePriceLogsStep', - 'ComputeElasticityStep', # 'StateSpace', # 'BuildStateSpaceStep', 'FitPricingFunctionStep', 'PredictPricesStep', 'interaction_extraction_pipeline', 'price_extraction_pipeline', - 'elasticity_computation_pipeline', 'pricing_pipeline', 'full_pipeline', ] diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index 3b86654..2742254 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -2,7 +2,6 @@ from sklearn.pipeline import Pipeline import pandas as pd from procesing.context import PipelineContext from procesing.providers import SupabaseProvider, BackendAPIProvider -from typing import Union from procesing.steps import ( FetchInteractionsStep, FetchPriceLogsStep, @@ -13,11 +12,13 @@ from procesing.steps import ( ChunkByTimeWindowStep, ComputeDemandForChunksStep, AggregatePriceLogsStep, - ComputeElasticityStep, # BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep, + ComputeDemandStep, + JoinProductFeaturesStep ) +from procesing.pricers import SimpleSurgePricer def interaction_extraction_pipeline(context: PipelineContext): """Pipeline for extracting and augmenting interaction data""" @@ -35,80 +36,76 @@ def price_extraction_pipeline(context: PipelineContext): ]) -def elasticity_computation_pipeline(context: PipelineContext, +def product_features_pipeline(context: PipelineContext, interactions_df: pd.DataFrame, price_logs_df: pd.DataFrame): - """ - Compute elasticity from interactions and price logs. - Manual orchestration needed for branching logic. - """ - # branch 1: chunk interactions and compute demand - chunk_step = ChunkByTimeWindowStep(context) - interaction_chunks = chunk_step.transform(interactions_df) - - demand_step = ComputeDemandForChunksStep(context) - demand_chunks = demand_step.transform(interaction_chunks) - - # branch 2: aggregate price logs + demand_step = ComputeDemandStep(context) price_step = AggregatePriceLogsStep(context) - price_chunks = price_step.transform(price_logs_df) - - # convergence: compute elasticity - elasticity_step = ComputeElasticityStep(context) - elasticity_df = elasticity_step.transform((demand_chunks, price_chunks)) - - return elasticity_df + join_step = JoinProductFeaturesStep(context) -def pricing_pipeline(context: PipelineContext, elasticity_df: pd.DataFrame): + demand_data = demand_step.transform(interactions_df) + price_data= price_step.transform(price_logs_df) + joined_data = join_step.transform((demand_data, price_data)) + + return joined_data + + + +def pricing_pipeline(context: "PipelineContext", + data: pd.DataFrame, + high_threshold: int = 10, + low_threshold: int = 2, + surge_multiplier: float = 1.2, + discount_multiplier: float = 0.9) -> pd.DataFrame: + + if data.empty or 'productId' not in data.columns: + return pd.DataFrame() + + surge_pricer = SimpleSurgePricer() + surge_pricer.fit(data) + data['optimal_price'] = surge_pricer.predict() + return data + + +def full_pipeline(context: PipelineContext, + high_threshold: int = 10, + low_threshold: int = 2, + surge_multiplier: float = 1.2, + discount_multiplier: float = 0.9): """ - Generate optimal prices from elasticity estimates. + Complete end-to-end pipeline: data extraction -> demand/price aggregation -> surge pricing + + Args: + context: Pipeline context + high_threshold: Demand threshold for surge pricing + low_threshold: Demand threshold for discounts + surge_multiplier: Price multiplier for high demand + discount_multiplier: Price multiplier for low demand + + Returns: + tuple: (product_features_df, optimal_prices_df) + - product_features_df: [productId, demand_score, price] + - optimal_prices_df: [productId, current_price, optimal_price, demand_score] """ - # build state space - state_step = BuildStateSpaceStep(context) - state_space = state_step.transform(elasticity_df) - - # fit pricing function - fit_step = FitPricingFunctionStep(context) - pricer = fit_step.transform(elasticity_df) - - # predict prices - predict_step = PredictPricesStep(context) - prices_df = predict_step.transform((pricer, state_space)) - - return prices_df - - -def full_pipeline(context: PipelineContext): - """ - Complete end-to-end pipeline: data extraction -> elasticity -> pricing - Returns: (elasticity_df, prices_df) - """ - # extract interactions interaction_pipe = interaction_extraction_pipeline(context) - interactions_df = interaction_pipe.fit_transform(None) - - # extract price logs price_pipe = price_extraction_pipeline(context) + + interactions_df = interaction_pipe.fit_transform(None) price_logs_df = price_pipe.fit_transform(None) + product_features_df = product_features_pipeline(context, interactions_df, price_logs_df) + print(product_features_df.to_string()) - if interactions_df.empty or price_logs_df.empty: - return None, None + # generate optimal prices using surge rules + optimal_prices_df = pricing_pipeline(context, product_features_df, + high_threshold=high_threshold, + low_threshold=low_threshold, + surge_multiplier=surge_multiplier, + discount_multiplier=discount_multiplier) - # compute elasticity - elasticity_df = elasticity_computation_pipeline( - context, - interactions_df, - price_logs_df - ) + return product_features_df, optimal_prices_df - if elasticity_df is None or elasticity_df.empty: - return elasticity_df, None - # generate prices - prices_df = pricing_pipeline(context, elasticity_df) - - return elasticity_df, prices_df if __name__ == '__main__': @@ -117,24 +114,25 @@ if __name__ == '__main__': def __init__(self, backend_url: str): SupabaseProvider.__init__(self) BackendAPIProvider.__init__(self, backend_url=backend_url) + + + class HistoricalProvider(SupabaseProvider, BackendAPIProvider): + def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: + path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/858c61ab-0a7f-4595-ae49-33f4365517b9/" + interactions_file = "messages(2).json" + prices_file = "messages(3).json" + + data = pd.read_json(path + (interactions_file if topic == "user-interactions" else prices_file)) + data = [r['payload'] for r in data['value'].to_list()] + data = pd.DataFrame(data) + return data + + # example run context = PipelineContext( - provider=Provider(backend_url="http://localhost:5000"), + provider=HistoricalProvider(), store_mode='hotel', - # 15 min not month - window_size='15min', ) - elasticity_df, prices_df = full_pipeline(context) - - if elasticity_df is not None and not elasticity_df.empty: - print("Elasticity Estimates:") - print(elasticity_df.to_string(index=False)) - else: - print("No elasticity estimates computed.") - - if prices_df is not None and not prices_df.empty: - print("\nPredicted Prices:") - print(prices_df.to_string(index=False)) - else: - print("No prices predicted.") + product_features, prices = full_pipeline(context) + print(prices.to_string()) diff --git a/experiments/procesing/pricers/__init__.py b/experiments/procesing/pricers/__init__.py index a812d8f..73a13f4 100644 --- a/experiments/procesing/pricers/__init__.py +++ b/experiments/procesing/pricers/__init__.py @@ -1,6 +1,6 @@ from procesing.pricers.base import PricingFunction from procesing.pricers.elasticity import ElasticityBasedPricer -from procesing.pricers.simple import StaticPricer, RandomPricer +from procesing.pricers.simple import StaticPricer, RandomPricer, SimpleSurgePricer from procesing.pricers.session_aware import SessionAwarePricer, ProductSpecificSessionPricer __all__ = [ @@ -8,6 +8,7 @@ __all__ = [ 'ElasticityBasedPricer', 'StaticPricer', 'RandomPricer', + 'SimpleSurgePricer', 'SessionAwarePricer', 'ProductSpecificSessionPricer' ] diff --git a/experiments/procesing/pricers/base.py b/experiments/procesing/pricers/base.py index b2ada0c..6569556 100644 --- a/experiments/procesing/pricers/base.py +++ b/experiments/procesing/pricers/base.py @@ -25,7 +25,7 @@ class PricingFunction(ABC): """ @abstractmethod - def fit(self, historical_data: pd.DataFrame, **kwargs): + def fit(self, *kwargs): """ Offline training on historical data. @@ -36,7 +36,7 @@ class PricingFunction(ABC): pass @abstractmethod - def predict(self, state_space) -> np.ndarray: + def predict(self, *kwargs) -> np.ndarray: """ Generate optimal prices given current state. diff --git a/experiments/procesing/pricers/simple.py b/experiments/procesing/pricers/simple.py index e98b9be..39be37a 100644 --- a/experiments/procesing/pricers/simple.py +++ b/experiments/procesing/pricers/simple.py @@ -46,3 +46,46 @@ class RandomPricer(PricingFunction): if self.n_products is None: self.n_products = len(state_space.demand) return self.rng.uniform(self.price_min, self.price_max, size=self.n_products) + + +class SimpleSurgePricer(PricingFunction): + """ + Rule-based surge pricer adjusting prices via demand thresholds. + Logic: if demand > high_threshold -> surge, if demand < low_threshold -> discount. + Simpler and more controllable than curve fitting approaches. + """ + + def __init__(self, + base_prices: np.ndarray = None, + high_threshold: int = 10, + low_threshold: int = 2, + surge_multiplier: float = 1.2, + discount_multiplier: float = 0.9): + self.base_prices = base_prices + self.high_threshold = high_threshold + self.low_threshold = low_threshold + self.surge_multiplier = surge_multiplier + self.discount_multiplier = discount_multiplier + + def fit(self, market_data : pd.DataFrame): + """Extract base prices from product catalog or historical averages""" + self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values + self.demand_history = market_data['demand'].to_numpy() if 'demand' in market_data.columns else np.zeros_like(self.base_prices) + + def predict(self) -> np.ndarray: + """ + Adjust prices based on current demand using surge rules. + state_space.demand: demand counts per product + state_space.prices: current prices (fallback if base_prices not set) + """ + current_prices = self.base_prices if self.base_prices is not None else np.ones_like(demand_vector) * 99.99 + demand = self.demand_history if self.demand_history is not None else np.zeros_like(current_prices) + new_prices = current_prices.copy() + + high_mask = demand >= self.high_threshold + new_prices[high_mask] *= self.surge_multiplier + + low_mask = demand <= self.low_threshold + new_prices[low_mask] *= self.discount_multiplier + + return new_prices diff --git a/experiments/procesing/steps/__init__.py b/experiments/procesing/steps/__init__.py index d982269..d788d01 100755 --- a/experiments/procesing/steps/__init__.py +++ b/experiments/procesing/steps/__init__.py @@ -1,13 +1,12 @@ from procesing.steps.base import BaseContextStep from procesing.steps.fetch import FetchInteractionsStep, FetchPriceLogsStep, FetchExperimentsStep -from procesing.steps.join import JoinExperimentsStep -from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep +from procesing.steps.join import JoinExperimentsStep, JoinProductFeaturesStep +from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep, AugmentInteractionsStep from procesing.steps.chunk import ChunkByTimeWindowStep from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep -from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep +from procesing.steps.elasticity import AggregatePriceLogsStep from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session -# StateSpace, BuildStateSpaceStep, __all__ = [ 'BaseContextStep', @@ -15,13 +14,14 @@ __all__ = [ 'FetchPriceLogsStep', 'FetchExperimentsStep', 'JoinExperimentsStep', + 'JoinProductFeaturesStep', 'CreatePriceBucketsStep', 'AugmentEventNamesStep', + 'AugmentInteractionsStep', 'ChunkByTimeWindowStep', 'ComputeDemandStep', 'ComputeDemandForChunksStep', 'AggregatePriceLogsStep', - 'ComputeElasticityStep', 'FitPricingFunctionStep', 'PredictPricesStep', 'ExtractSessionFeaturesStep', diff --git a/experiments/procesing/steps/augment.py b/experiments/procesing/steps/augment.py index a8b6506..a04e20b 100755 --- a/experiments/procesing/steps/augment.py +++ b/experiments/procesing/steps/augment.py @@ -2,6 +2,93 @@ import numpy as np import pandas as pd from procesing.steps.base import BaseContextStep + +class AugmentInteractionsStep(BaseContextStep): + """ + Consolidated step: create price buckets, augment event names, join experiments. + Input: (interactions_df, price_logs_df) + Output: enriched interactions_df + """ + + def transform(self, data: tuple): + interactions_df, price_logs_df = data + + if interactions_df.empty: + return interactions_df + + # Step 1: Create price buckets + interactions_df = self._create_price_buckets(interactions_df) + + # Step 2: Augment event names + interactions_df = self._augment_event_names(interactions_df) + + # Step 3: Join experiments (optional) + if 'experimentId' in interactions_df.columns: + interactions_df = self._join_experiments(interactions_df) + + return interactions_df + + def _create_price_buckets(self, df: pd.DataFrame): + """Create price bucket labels from price data""" + if 'metadata_price' not in df.columns: + df['price_bucket'] = "" + return df + + n_buckets = self.context.config.get('n_price_buckets', 5) + + if df['metadata_price'].notnull().sum() > 0: + try: + price_buckets = pd.qcut( + df['metadata_price'], + q=n_buckets, + labels=[f"PB_{i+1}" for i in range(n_buckets)], + duplicates='drop' + ) + except ValueError: + # fallback for insufficient unique values + price_buckets = df['metadata_price'].apply( + lambda x: f"P_{int(x)}" if pd.notnull(x) else "" + ) + else: + price_buckets = pd.Series([""] * len(df), index=df.index) + + df['price_bucket'] = price_buckets + return df + + def _augment_event_names(self, df: pd.DataFrame): + """Augment event names with product and price bucket schema""" + # Create schema: _productId@price_bucket + has_product = df.get('productId', pd.Series()).notnull() + has_bucket = df.get('price_bucket', pd.Series()).notnull() + + df['metadata_schema'] = np.where( + has_product & has_bucket, + "_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str), + "" + ) + + df['eventName'] = df['eventName'] + df['metadata_schema'] + return df + + def _join_experiments(self, df: pd.DataFrame): + """Join experiment metadata if experimentId present""" + exp_ids = df['experimentId'].dropna().unique().tolist() + if not exp_ids: + return df + + experiments_df = self.context.provider.fetch_experiments(exp_ids) + if experiments_df.empty: + return df + + return df.merge( + experiments_df, + left_on='experimentId', + right_on='id', + how='left', + suffixes=('', '_exp') + ) + + class CreatePriceBucketsStep(BaseContextStep): """Create price bucket labels from price data""" diff --git a/experiments/procesing/steps/elasticity.py b/experiments/procesing/steps/elasticity.py index d65bc43..66168f4 100755 --- a/experiments/procesing/steps/elasticity.py +++ b/experiments/procesing/steps/elasticity.py @@ -16,7 +16,7 @@ class AggregatePriceLogsStep(BaseContextStep): df = price_logs_df.copy() ts_col = self.context.config.get('ts_col', 'ts') - window_size = self.context.window_size + #window_size = self.context.window_size WE ARE NOT USING CHUNKS ANYMORE # ensure datetime if not pd.api.types.is_datetime64_any_dtype(df[ts_col]): @@ -24,230 +24,19 @@ class AggregatePriceLogsStep(BaseContextStep): df = df.sort_values([ts_col, 'productId']) products = self.context.products - unique_products = products['id'].unique() - - # VECTORIZED: group by product, resample by time window, compute mean - df_indexed = df.set_index(ts_col) - - windowed = ( - df_indexed - .groupby('productId')['price'] - .resample(window_size) - .mean() - .reset_index() + # get base price from metadata if available 1) read the metadata col as json and get the base_price + products['base_price'] = products.apply( + lambda row: row['metadata'].get('base_price', 0) if isinstance(row['metadata'], dict) else 0, + axis=1 ) - # forward fill missing windows (carry last known price) - windowed = windowed.sort_values([ts_col, 'productId']) - windowed['price'] = windowed.groupby('productId')['price'].ffill() - windowed = windowed.dropna(subset=['price']) + unique_products = products['id'].unique() - # group into chunks by window - chunks = [] - for window_start, group in windowed.groupby(ts_col): - price_vector = group[['productId', 'price']].copy() - - # fill missing products with last known price before this window - missing_products = set(unique_products) - set(price_vector['productId']) - if missing_products: - for pid in missing_products: - last_price = df_indexed[ - (df_indexed['productId'] == pid) & - (df_indexed.index < window_start) - ]['price'] - - if not last_price.empty: - price_vector = pd.concat([ - price_vector, - pd.DataFrame({'productId': [pid], 'price': [last_price.iloc[-1]]}) - ], ignore_index=True) - - if not price_vector.empty: - chunks.append({ - 'window_start': window_start, - 'window_end': window_start + pd.Timedelta(window_size), - 'price_vector': price_vector - }) - - return chunks - - -class ComputeElasticityStep(BaseContextStep): - """ - Compute price elasticity from demand and price chunks. - Input: (demand_chunks, price_chunks) - Output: elasticity_df [productId, elasticity, std_error, n_obs] - """ - - def transform(self, chunk_tuple: tuple): - demand_chunks, price_chunks = chunk_tuple - - method = self.context.config.get('elasticity_method', 'point') - min_obs = self.context.config.get('min_observations', 2) - - products = self.context.products - all_product_ids = products['id'].unique() - - # align chunks by window_start - aligned = self._align_chunks(demand_chunks, price_chunks) - - if not aligned: - return pd.DataFrame({ - 'productId': all_product_ids, - 'elasticity': 0.0, - 'std_error': 0.0, - 'n_obs': 0 - }) - - # build time series per product - product_series = self._build_timeseries(aligned) - - # compute elasticity per product - elasticities = [] - for pid, series in product_series.items(): - if len(series) < min_obs: - elasticities.append({ - 'productId': pid, - 'elasticity': 0.0, - 'std_error': 0.0, - 'n_obs': len(series) - }) - continue - - elast = self._compute_elasticity(series, method) - elasticities.append({ - 'productId': pid, - 'elasticity': elast['value'], - 'std_error': elast.get('std_error', 0.0), - 'n_obs': len(series) - }) - - result_df = pd.DataFrame(elasticities) - - # fill missing products with zero elasticity - observed_pids = set(result_df['productId']) - missing_pids = [p for p in all_product_ids if p not in observed_pids] - - if missing_pids: - missing_df = pd.DataFrame({ - 'productId': missing_pids, - 'elasticity': 0.0, - 'std_error': 0.0, - 'n_obs': 0 - }) - result_df = pd.concat([result_df, missing_df], ignore_index=True) - - return result_df - - def _align_chunks(self, demand_chunks: List[Dict], price_chunks: List[Dict]): - """Align demand and price chunks by window_start""" - price_lookup = {c['window_start']: c for c in price_chunks} - aligned = [] - - for dc in demand_chunks: - ws = dc['window_start'] - if ws in price_lookup: - aligned.append({ - 'window_start': ws, - 'window_end': dc['window_end'], - 'demand': dc['demand_vector'], - 'prices': price_lookup[ws]['price_vector'] - }) - - return aligned - - def _build_timeseries(self, aligned: List[Dict]): - """Build time series [timestamp, price, quantity] per product""" - series_by_product = {} - - for chunk in aligned: - merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner') - - for _, row in merged.iterrows(): - pid = row['productId'] - if pid not in series_by_product: - series_by_product[pid] = [] - - series_by_product[pid].append({ - 'timestamp': chunk['window_start'], - 'price': row['price'], - 'quantity': row['demand_score'] - }) - - return series_by_product - - def _compute_elasticity(self, series: List[Dict], method: str): - """Compute point or arc elasticity""" - prices = np.array([s['price'] for s in series]) - quantities = np.array([s['quantity'] for s in series]) - - # filter out zero/negative values - valid = (prices > 0) & (quantities > 0) - if valid.sum() < 2: - return {'value': 0.0, 'std_error': 0.0} - - prices = prices[valid] - quantities = quantities[valid] - - if method == 'point': - return self._point_elasticity(prices, quantities) - elif method == 'arc': - return self._arc_elasticity(prices, quantities) - else: - raise ValueError(f"Unknown elasticity method: {method}") - - def _point_elasticity(self, prices: np.ndarray, quantities: np.ndarray): - """Point elasticity via log-log regression: log(Q) = a + b*log(P), elasticity = b""" - if len(prices) < 2: - return {'value': 0.0, 'std_error': 0.0} - - log_p = np.log(prices) - log_q = np.log(quantities) - - if log_p.std() == 0: - return {'value': 0.0, 'std_error': 0.0} - - cov = np.cov(log_p, log_q)[0, 1] - var = np.var(log_p) - b = cov / var - - # std error estimate - if len(prices) > 2: - residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean())) - mse = (residuals ** 2).sum() / (len(prices) - 2) - se_b = np.sqrt(mse / (len(prices) * var)) - else: - se_b = 0.0 - - return {'value': b, 'std_error': se_b} - - def _arc_elasticity(self, prices: np.ndarray, quantities: np.ndarray): - """Arc elasticity: average period-over-period elasticity""" - elasticities = [] - - for i in range(1, len(prices)): - p1, p2 = prices[i-1], prices[i] - q1, q2 = quantities[i-1], quantities[i] - - p_avg = (p1 + p2) / 2 - q_avg = (q1 + q2) / 2 - - if p_avg == 0 or q_avg == 0: - continue - - delta_p = p2 - p1 - delta_q = q2 - q1 - - if delta_p == 0: - continue - - e = (delta_q / q_avg) / (delta_p / p_avg) - elasticities.append(e) - - if not elasticities: - return {'value': 0.0, 'std_error': 0.0} - - return { - 'value': np.mean(elasticities), - 'std_error': np.std(elasticities) / np.sqrt(len(elasticities)) - } + df_indexed = df.set_index(ts_col) + # we return a df of average price per product over the entire period + # TODO: maybe consider different opration to handle price aggregation over time + avg_prices = df_indexed.groupby('productId')['price'].mean().reindex(unique_products, fill_value=0).reset_index() + avg_prices.columns = ['productId', 'price'] + # fill 0s with base_price from products + base_price_map = products.set_index('id')['base_price'].to_dict() + return avg_prices diff --git a/experiments/procesing/steps/fetch.py b/experiments/procesing/steps/fetch.py index 8d9c311..87c306e 100755 --- a/experiments/procesing/steps/fetch.py +++ b/experiments/procesing/steps/fetch.py @@ -2,7 +2,11 @@ import pandas as pd from procesing.steps.base import BaseContextStep class FetchInteractionsStep(BaseContextStep): - """Fetch raw interaction data from Kafka topic""" + """Fetch raw interaction data from Kafka topic with optional time filtering""" + + def __init__(self, context, lookback: str = None): + super().__init__(context) + self.lookback = lookback def transform(self, X=None): df = self.context.provider.fetch_kafka_topic('user-interactions') @@ -24,14 +28,35 @@ class FetchInteractionsStep(BaseContextStep): if 'metadata_dateIndex' in df.columns: df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') + # Apply time filtering if lookback specified + if self.lookback and 'ts' in df.columns: + df['ts'] = pd.to_datetime(df['ts']) + cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback) + df = df[df['ts'] >= cutoff] + return df class FetchPriceLogsStep(BaseContextStep): - """Fetch price log data from Kafka topic""" + """Fetch price log data from Kafka topic with optional time filtering""" + + def __init__(self, context, lookback: str = None): + super().__init__(context) + self.lookback = lookback def transform(self, X=None): - return self.context.provider.fetch_kafka_topic('price-logs') + df = self.context.provider.fetch_kafka_topic('price-logs') + + if df.empty: + return df + + # Apply time filtering if lookback specified + if self.lookback and 'ts' in df.columns: + df['ts'] = pd.to_datetime(df['ts']) + cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback) + df = df[df['ts'] >= cutoff] + + return df class FetchExperimentsStep(BaseContextStep): diff --git a/experiments/procesing/steps/join.py b/experiments/procesing/steps/join.py index 5567f0f..be6fcdf 100755 --- a/experiments/procesing/steps/join.py +++ b/experiments/procesing/steps/join.py @@ -32,3 +32,27 @@ class JoinExperimentsStep(BaseContextStep): }) return interactions_df.merge(experiments_df, on='experimentId', how='left') + +class JoinProductFeaturesStep(BaseContextStep): + """Join product features to interactions""" + + def transform(self, data: tuple): + """ + Args: + data: (interactions_df, products_df) + Returns: + merged interactions dataframe + """ + demand_df, price_df = data + + # get base prices from products if available + products = self.context.products + products['base_price'] = products.apply( + lambda row: float(row['metadata'].get('base_price', 0.0)) if isinstance(row['metadata'], dict) else 0, + axis=1 + ) + products = products[['id', 'base_price']].rename(columns={'id': 'productId'}) + + if price_df.empty: + return demand_df + return demand_df.merge(price_df, on='productId', how='left').merge(products, on='productId', how='left') diff --git a/experiments/procesing/tests/test_elasticity.py b/experiments/procesing/tests/test_elasticity.py deleted file mode 100644 index 2172c78..0000000 --- a/experiments/procesing/tests/test_elasticity.py +++ /dev/null @@ -1,353 +0,0 @@ -import pytest -import pandas as pd -import numpy as np -from procesing.steps import ( - AggregatePriceLogsStep, - ComputeElasticityStep -) - - -def test_aggregate_price_logs_basic(pipeline_context): - """Test basic price aggregation into time windows""" - step = AggregatePriceLogsStep(pipeline_context) - - # Create price logs with known window structure - df = pd.DataFrame({ - 'ts': pd.date_range(start='2023-01-01 10:00:00', periods=100, freq='10s'), - 'productId': np.tile([ - 'd018efc1-25e9-4284-b276-80386e048b25', - '51266ddb-5b07-47b7-89ee-5b5cae94bb11', - '2cd7f756-fc65-4ba0-ab01-74521c1fff43' - ], 34)[:100], - 'price': np.random.uniform(100, 200, 100) - }) - - result = step.transform(df) - assert isinstance(result, list) - assert len(result) > 0 - # each chunk should have window metadata and price vector - for chunk in result: - assert 'window_start' in chunk - assert 'window_end' in chunk - assert 'price_vector' in chunk - assert isinstance(chunk['price_vector'], pd.DataFrame) - assert 'productId' in chunk['price_vector'].columns - assert 'price' in chunk['price_vector'].columns - - -def test_aggregate_price_logs_handles_gaps(pipeline_context): - """Test that price aggregation forward-fills missing windows""" - step = AggregatePriceLogsStep(pipeline_context) - - # create sparse data with gaps - df = pd.DataFrame({ - 'ts': pd.to_datetime([ - '2023-01-01 10:00:00', - '2023-01-01 10:00:05', - '2023-01-01 10:02:00', # gap of ~2 mins - '2023-01-01 10:02:30' - ]), - 'productId': [ - 'd018efc1-25e9-4284-b276-80386e048b25', - 'd018efc1-25e9-4284-b276-80386e048b25', - '51266ddb-5b07-47b7-89ee-5b5cae94bb11', - '51266ddb-5b07-47b7-89ee-5b5cae94bb11' - ], - 'price': [100, 102, 150, 153] - }) - - result = step.transform(df) - assert isinstance(result, list) - # should have multiple windows despite gaps - assert len(result) >= 2 - - -def test_compute_elasticity_with_known_relationship(pipeline_context): - """Test elasticity computation with known price-demand relationship""" - step = ComputeElasticityStep(pipeline_context) - - # simulate elastic demand: when price ↑10%, demand ↓15% (elasticity ~ -1.5) - base_price = 100 - base_demand = 50 - - demand_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [base_demand] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [base_demand * 0.85] # 15% decrease - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:01:00'), - 'window_end': pd.Timestamp('2023-01-01 10:01:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [base_demand * 0.70] # further decrease - }) - } - ] - - price_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [base_price] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [base_price * 1.10] # 10% increase - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:01:00'), - 'window_end': pd.Timestamp('2023-01-01 10:01:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [base_price * 1.20] # 20% increase - }) - } - ] - - result = step.transform((demand_chunks, price_chunks)) - assert isinstance(result, pd.DataFrame) - assert not result.empty - assert 'productId' in result.columns - assert 'elasticity' in result.columns - assert 'n_obs' in result.columns - - # check elasticity is negative (normal good) - product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] - assert len(product_elast) == 1 - assert product_elast.iloc[0]['elasticity'] < 0 - # should be roughly elastic (< -1) - assert product_elast.iloc[0]['n_obs'] == 3 - - -def test_compute_elasticity_inelastic_product(pipeline_context): - """Test with inelastic demand: price changes, demand barely moves""" - step = ComputeElasticityStep(pipeline_context) - - base_price = 150 - base_demand = 40 - - demand_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], - 'demand_score': [base_demand] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'demand_vector': pd.DataFrame({ - 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], - 'demand_score': [base_demand * 0.98] # tiny 2% decrease - }) - } - ] - - price_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], - 'price': [base_price] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'price_vector': pd.DataFrame({ - 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], - 'price': [base_price * 1.20] # 20% increase - }) - } - ] - - result = step.transform((demand_chunks, price_chunks)) - product_elast = result[result['productId'] == '51266ddb-5b07-47b7-89ee-5b5cae94bb11'] - assert len(product_elast) == 1 - # inelastic: elasticity between 0 and -1 - assert -1 < product_elast.iloc[0]['elasticity'] < 0 - - -def test_compute_elasticity_multiple_products(pipeline_context): - """Test elasticity computation across multiple products simultaneously""" - step = ComputeElasticityStep(pipeline_context) - - products = [ - 'd018efc1-25e9-4284-b276-80386e048b25', - '51266ddb-5b07-47b7-89ee-5b5cae94bb11', - '2cd7f756-fc65-4ba0-ab01-74521c1fff43' - ] - - # create 5 time windows with all 3 products - demand_chunks = [] - price_chunks = [] - - for i in range(5): - ts = pd.Timestamp('2023-01-01 10:00:00') + pd.Timedelta(f'{i*30}s') - - demand_chunks.append({ - 'window_start': ts, - 'window_end': ts + pd.Timedelta('30s'), - 'demand_vector': pd.DataFrame({ - 'productId': products, - 'demand_score': [ - 50 * (0.9 ** i), # elastic: decreases as price rises - 40 * (0.98 ** i), # inelastic: barely changes - 30 * (0.85 ** i) # very elastic - ] - }) - }) - - price_chunks.append({ - 'window_start': ts, - 'window_end': ts + pd.Timedelta('30s'), - 'price_vector': pd.DataFrame({ - 'productId': products, - 'price': [ - 100 * (1.05 ** i), - 150 * (1.10 ** i), - 120 * (1.08 ** i) - ] - }) - }) - - result = step.transform((demand_chunks, price_chunks)) - assert isinstance(result, pd.DataFrame) - assert len(result) == 3 # all products should have elasticity - assert set(result['productId']) == set(products) - assert all(result['n_obs'] == 5) - assert all(result['elasticity'] < 0) # all normal goods - - -def test_compute_elasticity_insufficient_data(pipeline_context): - """Test behavior with insufficient observations""" - step = ComputeElasticityStep(pipeline_context) - - # only 1 observation - demand_chunks = [{ - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [50] - }) - }] - - price_chunks = [{ - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [100] - }) - }] - - result = step.transform((demand_chunks, price_chunks)) - # should still return result but with low n_obs - product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] - assert len(product_elast) == 1 - assert product_elast.iloc[0]['n_obs'] == 1 - assert product_elast.iloc[0]['elasticity'] == 0.0 # not enough data - - -def test_compute_elasticity_misaligned_chunks(pipeline_context): - """Test with non-overlapping demand and price windows""" - step = ComputeElasticityStep(pipeline_context) - - demand_chunks = [{ - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [50] - }) - }] - - price_chunks = [{ - 'window_start': pd.Timestamp('2023-01-01 11:00:00'), # different time - 'window_end': pd.Timestamp('2023-01-01 11:00:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [100] - }) - }] - - result = step.transform((demand_chunks, price_chunks)) - # should handle gracefully with no aligned data - assert isinstance(result, pd.DataFrame) - assert all(result['n_obs'] == 0) - - -def test_elasticity_arc_method(pipeline_context): - """Test arc elasticity computation method""" - # configure context for arc method - pipeline_context.config['elasticity_method'] = 'arc' - step = ComputeElasticityStep(pipeline_context) - - demand_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [100] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'demand_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'demand_score': [80] - }) - } - ] - - price_chunks = [ - { - 'window_start': pd.Timestamp('2023-01-01 10:00:00'), - 'window_end': pd.Timestamp('2023-01-01 10:00:30'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [100] - }) - }, - { - 'window_start': pd.Timestamp('2023-01-01 10:00:30'), - 'window_end': pd.Timestamp('2023-01-01 10:01:00'), - 'price_vector': pd.DataFrame({ - 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], - 'price': [110] - }) - } - ] - - result = step.transform((demand_chunks, price_chunks)) - product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] - assert len(product_elast) == 1 - assert product_elast.iloc[0]['elasticity'] < 0 - # reset config - pipeline_context.config['elasticity_method'] = 'point' diff --git a/requirements.txt b/requirements.txt index 22d3fcb..e7eaf09 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ pytest-asyncio uv scikit-learn supabase +pymc diff --git a/web/package-lock.json b/web/package-lock.json index 207a105..32d2192 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -10,7 +10,7 @@ "dependencies": { "@supabase/ssr": "^0.7.0", "@supabase/supabase-js": "^2.81.1", - "next": "16.0.0", + "next": "^16.0.0", "react": "19.2.0", "react-dom": "19.2.0", "zod": "^4.1.12" @@ -526,15 +526,15 @@ } }, "node_modules/@next/env": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/env/-/env-16.0.0.tgz", - "integrity": "sha512-s5j2iFGp38QsG1LWRQaE2iUY3h1jc014/melHFfLdrsMJPqxqDQwWNwyQTcNoUSGZlCVZuM7t7JDMmSyRilsnA==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/env/-/env-16.0.7.tgz", + "integrity": "sha512-gpaNgUh5nftFKRkRQGnVi5dpcYSKGcZZkQffZ172OrG/XkrnS7UBTQ648YY+8ME92cC4IojpI2LqTC8sTDhAaw==", "license": "MIT" }, "node_modules/@next/swc-darwin-arm64": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-darwin-arm64/-/swc-darwin-arm64-16.0.0.tgz", - "integrity": "sha512-/CntqDCnk5w2qIwMiF0a9r6+9qunZzFmU0cBX4T82LOflE72zzH6gnOjCwUXYKOBlQi8OpP/rMj8cBIr18x4TA==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-darwin-arm64/-/swc-darwin-arm64-16.0.7.tgz", + "integrity": "sha512-LlDtCYOEj/rfSnEn/Idi+j1QKHxY9BJFmxx7108A6D8K0SB+bNgfYQATPk/4LqOl4C0Wo3LACg2ie6s7xqMpJg==", "cpu": [ "arm64" ], @@ -548,9 +548,9 @@ } }, "node_modules/@next/swc-darwin-x64": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-16.0.0.tgz", - "integrity": "sha512-hB4GZnJGKa8m4efvTGNyii6qs76vTNl+3dKHTCAUaksN6KjYy4iEO3Q5ira405NW2PKb3EcqWiRaL9DrYJfMHg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-16.0.7.tgz", + "integrity": "sha512-rtZ7BhnVvO1ICf3QzfW9H3aPz7GhBrnSIMZyr4Qy6boXF0b5E3QLs+cvJmg3PsTCG2M1PBoC+DANUi4wCOKXpA==", "cpu": [ "x64" ], @@ -564,9 +564,9 @@ } }, "node_modules/@next/swc-linux-arm64-gnu": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-16.0.0.tgz", - "integrity": "sha512-E2IHMdE+C1k+nUgndM13/BY/iJY9KGCphCftMh7SXWcaQqExq/pJU/1Hgn8n/tFwSoLoYC/yUghOv97tAsIxqg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-16.0.7.tgz", + "integrity": "sha512-mloD5WcPIeIeeZqAIP5c2kdaTa6StwP4/2EGy1mUw8HiexSHGK/jcM7lFuS3u3i2zn+xH9+wXJs6njO7VrAqww==", "cpu": [ "arm64" ], @@ -580,9 +580,9 @@ } }, "node_modules/@next/swc-linux-arm64-musl": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-16.0.0.tgz", - "integrity": "sha512-xzgl7c7BVk4+7PDWldU+On2nlwnGgFqJ1siWp3/8S0KBBLCjonB6zwJYPtl4MUY7YZJrzzumdUpUoquu5zk8vg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-16.0.7.tgz", + "integrity": "sha512-+ksWNrZrthisXuo9gd1XnjHRowCbMtl/YgMpbRvFeDEqEBd523YHPWpBuDjomod88U8Xliw5DHhekBC3EOOd9g==", "cpu": [ "arm64" ], @@ -596,9 +596,9 @@ } }, "node_modules/@next/swc-linux-x64-gnu": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-16.0.0.tgz", - "integrity": "sha512-sdyOg4cbiCw7YUr0F/7ya42oiVBXLD21EYkSwN+PhE4csJH4MSXUsYyslliiiBwkM+KsuQH/y9wuxVz6s7Nstg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-16.0.7.tgz", + "integrity": "sha512-4WtJU5cRDxpEE44Ana2Xro1284hnyVpBb62lIpU5k85D8xXxatT+rXxBgPkc7C1XwkZMWpK5rXLXTh9PFipWsA==", "cpu": [ "x64" ], @@ -612,9 +612,9 @@ } }, "node_modules/@next/swc-linux-x64-musl": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-16.0.0.tgz", - "integrity": "sha512-IAXv3OBYqVaNOgyd3kxR4L3msuhmSy1bcchPHxDOjypG33i2yDWvGBwFD94OuuTjjTt/7cuIKtAmoOOml6kfbg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-16.0.7.tgz", + "integrity": "sha512-HYlhqIP6kBPXalW2dbMTSuB4+8fe+j9juyxwfMwCe9kQPPeiyFn7NMjNfoFOfJ2eXkeQsoUGXg+O2SE3m4Qg2w==", "cpu": [ "x64" ], @@ -628,9 +628,9 @@ } }, "node_modules/@next/swc-win32-arm64-msvc": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-16.0.0.tgz", - "integrity": "sha512-bmo3ncIJKUS9PWK1JD9pEVv0yuvp1KPuOsyJTHXTv8KDrEmgV/K+U0C75rl9rhIaODcS7JEb6/7eJhdwXI0XmA==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-16.0.7.tgz", + "integrity": "sha512-EviG+43iOoBRZg9deGauXExjRphhuYmIOJ12b9sAPy0eQ6iwcPxfED2asb/s2/yiLYOdm37kPaiZu8uXSYPs0Q==", "cpu": [ "arm64" ], @@ -644,9 +644,9 @@ } }, "node_modules/@next/swc-win32-x64-msvc": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-16.0.0.tgz", - "integrity": "sha512-O1cJbT+lZp+cTjYyZGiDwsOjO3UHHzSqobkPNipdlnnuPb1swfcuY6r3p8dsKU4hAIEO4cO67ZCfVVH/M1ETXA==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-16.0.7.tgz", + "integrity": "sha512-gniPjy55zp5Eg0896qSrf3yB1dw4F/3s8VK1ephdsZZ129j2n6e1WqCbE2YgcKhW9hPB9TVZENugquWJD5x0ug==", "cpu": [ "x64" ], @@ -1447,12 +1447,12 @@ } }, "node_modules/next": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/next/-/next-16.0.0.tgz", - "integrity": "sha512-nYohiNdxGu4OmBzggxy9rczmjIGI+TpR5vbKTsE1HqYwNm1B+YSiugSrFguX6omMOKnDHAmBPY4+8TNJk0Idyg==", + "version": "16.0.7", + "resolved": "https://registry.npmjs.org/next/-/next-16.0.7.tgz", + "integrity": "sha512-3mBRJyPxT4LOxAJI6IsXeFtKfiJUbjCLgvXO02fV8Wy/lIhPvP94Fe7dGhUgHXcQy4sSuYwQNcOLhIfOm0rL0A==", "license": "MIT", "dependencies": { - "@next/env": "16.0.0", + "@next/env": "16.0.7", "@swc/helpers": "0.5.15", "caniuse-lite": "^1.0.30001579", "postcss": "8.4.31", @@ -1465,14 +1465,14 @@ "node": ">=20.9.0" }, "optionalDependencies": { - "@next/swc-darwin-arm64": "16.0.0", - "@next/swc-darwin-x64": "16.0.0", - "@next/swc-linux-arm64-gnu": "16.0.0", - "@next/swc-linux-arm64-musl": "16.0.0", - "@next/swc-linux-x64-gnu": "16.0.0", - "@next/swc-linux-x64-musl": "16.0.0", - "@next/swc-win32-arm64-msvc": "16.0.0", - "@next/swc-win32-x64-msvc": "16.0.0", + "@next/swc-darwin-arm64": "16.0.7", + "@next/swc-darwin-x64": "16.0.7", + "@next/swc-linux-arm64-gnu": "16.0.7", + "@next/swc-linux-arm64-musl": "16.0.7", + "@next/swc-linux-x64-gnu": "16.0.7", + "@next/swc-linux-x64-musl": "16.0.7", + "@next/swc-win32-arm64-msvc": "16.0.7", + "@next/swc-win32-x64-msvc": "16.0.7", "sharp": "^0.34.4" }, "peerDependencies": { diff --git a/web/package.json b/web/package.json index e5452f6..22a330e 100644 --- a/web/package.json +++ b/web/package.json @@ -10,7 +10,7 @@ "dependencies": { "@supabase/ssr": "^0.7.0", "@supabase/supabase-js": "^2.81.1", - "next": "16.0.0", + "next": "^16.0.0", "react": "19.2.0", "react-dom": "19.2.0", "zod": "^4.1.12" diff --git a/web/src/app/airline/checkout/page.tsx b/web/src/app/airline/checkout/page.tsx new file mode 100644 index 0000000..2052570 --- /dev/null +++ b/web/src/app/airline/checkout/page.tsx @@ -0,0 +1,11 @@ +export default function AirlineCheckout() { + return ( +