Improving interface after experiment01 (#30)

* fix: fixes of backwords

* fixing hotel information with image placeholders

* chore: clean up product display in hotel and cleaner interfacing

* adding loader with historical data loading

* feature: cleaning up pipeline

* chore: simple surge pricer

* created new pricing pipeline

* adding a checkout page to both sites

* fix: fixing stale pacakge

* test: we wont be using elasticity anymore so its okay

* chore: cleaning elasticity references

* chore: store sting

* feature: e2e intro pipline surge pricing

* fix: CVE vulnerability patching
This commit is contained in:
Daniel Alves Rösel
2025-12-06 17:47:14 +01:00
committed by GitHub
parent 59d4fb7891
commit 8751583764
27 changed files with 709 additions and 1096 deletions

View File

@@ -1,348 +0,0 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
import logging
import sys
import pickle
import io
# add parent dir to path so procesing package can be imported
sys.path.insert(0, '/opt/airflow')
from procesing.context import PipelineContext
from procesing.providers import SupabaseProvider, BackendAPIProvider
from procesing.steps import (
FetchInteractionsStep,
FetchPriceLogsStep,
CreatePriceBucketsStep,
AugmentEventNamesStep,
ChunkByTimeWindowStep,
ComputeDemandForChunksStep,
AggregatePriceLogsStep,
ComputeElasticityStep,
BuildStateSpaceStep,
FitPricingFunctionStep,
PredictPricesStep,
)
default_args = {
'owner': 'phantom-research',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def get_provider():
"""Factory to create composite provider"""
class CompositeProvider(SupabaseProvider, BackendAPIProvider):
def __init__(self):
SupabaseProvider.__init__(self)
BackendAPIProvider.__init__(self)
return CompositeProvider()
def get_context(**kwargs):
"""Build pipeline context from Airflow config"""
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
return PipelineContext(
provider=get_provider(),
store_mode=dag_conf.get('store_mode', 'hotel'),
window_size=dag_conf.get('window_size', '30s'),
n_price_buckets=dag_conf.get('n_price_buckets', 5),
elasticity_method=dag_conf.get('elasticity_method', 'point'),
min_observations=dag_conf.get('min_observations', 2),
)
# atomic task functions (each wraps one sklearn step)
def fetch_interactions(**kwargs):
"""Task: Fetch interaction data from Kafka"""
context = get_context(**kwargs)
step = FetchInteractionsStep(context)
df = step.transform(None)
kwargs['ti'].xcom_push(key='interactions_raw', value=pickle.dumps(df))
logging.info(f"Fetched {len(df)} interaction records")
return len(df)
def fetch_price_logs(**kwargs):
"""Task: Fetch price logs from Kafka"""
context = get_context(**kwargs)
step = FetchPriceLogsStep(context)
df = step.transform(None)
kwargs['ti'].xcom_push(key='price_logs_raw', value=pickle.dumps(df))
logging.info(f"Fetched {len(df)} price records")
return len(df)
def create_price_buckets(**kwargs):
"""Task: Create price buckets for interactions"""
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='interactions_raw'))
context = get_context(**kwargs)
step = CreatePriceBucketsStep(context)
df = step.transform(df)
ti.xcom_push(key='interactions_bucketed', value=pickle.dumps(df))
logging.info(f"Created price buckets for {len(df)} interactions")
return len(df)
def augment_event_names(**kwargs):
"""Task: Augment event names with product and price schema"""
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='interactions_bucketed'))
context = get_context(**kwargs)
step = AugmentEventNamesStep(context)
df = step.transform(df)
ti.xcom_push(key='interactions_final', value=pickle.dumps(df))
logging.info(f"Augmented event names for {len(df)} interactions")
return len(df)
def chunk_interactions(**kwargs):
"""Task: Chunk interactions into time windows"""
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='interactions_final'))
context = get_context(**kwargs)
step = ChunkByTimeWindowStep(context)
chunks = step.transform(df)
ti.xcom_push(key='interaction_chunks', value=pickle.dumps(chunks))
logging.info(f"Generated {len(chunks)} interaction chunks")
return len(chunks)
def compute_demand(**kwargs):
"""Task: Compute demand vectors for all chunks"""
ti = kwargs['ti']
chunks = pickle.loads(ti.xcom_pull(key='interaction_chunks'))
context = get_context(**kwargs)
step = ComputeDemandForChunksStep(context)
demand_chunks = step.transform(chunks)
ti.xcom_push(key='demand_chunks', value=pickle.dumps(demand_chunks))
logging.info(f"Computed demand for {len(demand_chunks)} chunks")
return len(demand_chunks)
def aggregate_price_logs(**kwargs):
"""Task: Aggregate price logs into time windows """
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='price_logs_raw'))
context = get_context(**kwargs)
step = AggregatePriceLogsStep(context)
price_chunks = step.transform(df)
ti.xcom_push(key='price_chunks', value=pickle.dumps(price_chunks))
logging.info(f"Aggregated {len(price_chunks)} price chunks")
return len(price_chunks)
def compute_elasticity(**kwargs):
"""Task: Compute price elasticity from demand and price chunks"""
ti = kwargs['ti']
demand_chunks = pickle.loads(ti.xcom_pull(key='demand_chunks'))
price_chunks = pickle.loads(ti.xcom_pull(key='price_chunks'))
context = get_context(**kwargs)
step = ComputeElasticityStep(context)
elasticity_df = step.transform((demand_chunks, price_chunks))
ti.xcom_push(key='elasticity_results', value=pickle.dumps(elasticity_df))
logging.info(f"Computed elasticity for {len(elasticity_df)} products")
return {
'n_products': len(elasticity_df),
'mean_elasticity': float(elasticity_df['elasticity'].mean()),
'median_elasticity': float(elasticity_df['elasticity'].median())
}
def build_state_space(**kwargs):
"""Task: Build state space from elasticity"""
ti = kwargs['ti']
elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results'))
context = get_context(**kwargs)
step = BuildStateSpaceStep(context)
state_space = step.transform(elasticity_df)
ti.xcom_push(key='state_space', value=pickle.dumps(state_space))
logging.info("Built state space for pricing")
return True
def fit_pricing_function(**kwargs):
"""Task: Fit pricing function using elasticity"""
ti = kwargs['ti']
elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results'))
context = get_context(**kwargs)
step = FitPricingFunctionStep(context)
pricer = step.transform(elasticity_df)
ti.xcom_push(key='pricer', value=pickle.dumps(pricer))
logging.info("Fitted pricing function")
return True
def predict_prices(**kwargs):
"""Task: Predict optimal prices"""
ti = kwargs['ti']
pricer = pickle.loads(ti.xcom_pull(key='pricer'))
state_space = pickle.loads(ti.xcom_pull(key='state_space'))
context = get_context(**kwargs)
step = PredictPricesStep(context)
prices_df = step.transform((pricer, state_space))
ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df))
logging.info(f"Predicted prices for {len(prices_df)} products")
return len(prices_df)
def publish_results(**kwargs):
"""Task: Publish elasticity, pricing model, and predicted prices to registry"""
ti = kwargs['ti']
elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results'))
prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices'))
sys.path.insert(0, '/opt/airflow')
from lib.model_registry import ModelRegistry
registry = ModelRegistry()
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
metadata = {
'timestamp': pd.Timestamp.now().isoformat(),
'window_size': dag_conf.get('window_size', '30s'),
'store_mode': dag_conf.get('store_mode', 'hotel'),
'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual'
}
registry.publish_elasticity(elasticity_df, model_name='latest', metadata=metadata)
pricer = pickle.loads(ti.xcom_pull(key='pricer'))
registry.publish_pricing_model(
pricer,
model_name='latest',
metadata={**metadata, 'model_type': type(pricer).__name__}
)
registry.publish_prices(prices_df, model_name='latest', metadata=metadata)
logging.info(f"Published elasticity + pricing + prices for {len(elasticity_df)} products")
return {
'n_products': len(elasticity_df),
'n_prices': len(prices_df),
'registry_status': 'success',
'elasticity_mean': float(elasticity_df['elasticity'].mean())
}
# DAG definition
with DAG(
'elasticity_pricing_pipeline',
default_args=default_args,
description='E2E refactored pipeline: atomic steps with proper separation',
schedule_interval='*/15 * * * *',
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=['pricing', 'elasticity', 'research', 'refactored'],
) as dag:
# parallel data fetching
t_fetch_interactions = PythonOperator(
task_id='fetch_interactions',
python_callable=fetch_interactions,
provide_context=True,
)
t_fetch_price_logs = PythonOperator(
task_id='fetch_price_logs',
python_callable=fetch_price_logs,
provide_context=True,
)
# interaction processing branch
t_create_buckets = PythonOperator(
task_id='create_price_buckets',
python_callable=create_price_buckets,
provide_context=True,
)
t_augment_events = PythonOperator(
task_id='augment_event_names',
python_callable=augment_event_names,
provide_context=True,
)
t_chunk_interactions = PythonOperator(
task_id='chunk_interactions',
python_callable=chunk_interactions,
provide_context=True,
)
t_compute_demand = PythonOperator(
task_id='compute_demand',
python_callable=compute_demand,
provide_context=True,
)
# price processing branch (VECTORIZED)
t_aggregate_prices = PythonOperator(
task_id='aggregate_price_logs',
python_callable=aggregate_price_logs,
provide_context=True,
)
# convergence: compute elasticity
t_compute_elasticity = PythonOperator(
task_id='compute_elasticity',
python_callable=compute_elasticity,
provide_context=True,
)
# pricing tasks
t_build_state = PythonOperator(
task_id='build_state_space',
python_callable=build_state_space,
provide_context=True,
)
t_fit_pricer = PythonOperator(
task_id='fit_pricing_function',
python_callable=fit_pricing_function,
provide_context=True,
)
t_predict_prices = PythonOperator(
task_id='predict_prices',
python_callable=predict_prices,
provide_context=True,
)
# publish to registry
t_publish = PythonOperator(
task_id='publish_results',
python_callable=publish_results,
provide_context=True,
)
# dependency graph (clear atomic flow)
# parallel fetches
[t_fetch_interactions, t_fetch_price_logs]
# interaction branch: fetch -> bucket -> augment -> chunk -> demand
t_fetch_interactions >> t_create_buckets >> t_augment_events >> t_chunk_interactions >> t_compute_demand
# price branch: fetch -> aggregate (vectorized)
t_fetch_price_logs >> t_aggregate_prices
# convergence: both branches -> elasticity
[t_compute_demand, t_aggregate_prices] >> t_compute_elasticity
# pricing: elasticity -> state + fit -> predict -> publish
t_compute_elasticity >> [t_build_state, t_fit_pricer] >> t_predict_prices >> t_publish

View File

@@ -0,0 +1,237 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
import logging
import sys
import pickle
import io
# add parent dir to path so procesing package can be imported
sys.path.insert(0, '/opt/airflow')
from procesing.context import PipelineContext
from procesing.providers import SupabaseProvider, BackendAPIProvider
from procesing.steps import (
FetchInteractionsStep,
FetchPriceLogsStep,
ComputeDemandStep,
AggregatePriceLogsStep,
JoinProductFeaturesStep,
)
from procesing.pricers.simple import SimpleSurgePricer
default_args = {
'owner': 'phantom-research',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def get_provider():
"""Factory to create composite provider"""
class CompositeProvider(SupabaseProvider, BackendAPIProvider): # TODO: Fix this into one global provider singelton instead of multiple inheritance declarations acoss the codebase
def __init__(self):
SupabaseProvider.__init__(self)
BackendAPIProvider.__init__(self)
return CompositeProvider()
def get_context(**kwargs):
"""Build pipeline context from Airflow config"""
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
return PipelineContext(
provider=get_provider(),
store_mode=dag_conf.get('store_mode', 'hotel'),
)
# atomic task functions (each wraps one sklearn step)
def fetch_interactions(**kwargs):
"""Task: Fetch interaction data from Kafka"""
context = get_context(**kwargs)
step = FetchInteractionsStep(context)
df = step.transform(None)
kwargs['ti'].xcom_push(key='interactions_raw', value=pickle.dumps(df))
logging.info(f"Fetched {len(df)} interaction records")
return len(df)
def fetch_price_logs(**kwargs):
"""Task: Fetch price logs from Kafka"""
context = get_context(**kwargs)
step = FetchPriceLogsStep(context)
df = step.transform(None)
kwargs['ti'].xcom_push(key='price_logs_raw', value=pickle.dumps(df))
logging.info(f"Fetched {len(df)} price records")
return len(df)
def compute_demand(**kwargs):
"""Task: Compute demand scores from interactions"""
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='interactions_raw'))
context = get_context(**kwargs)
step = ComputeDemandStep(context)
demand_df = step.transform(df)
# TODO: clear the xcom
ti.xcom_push(key='demand_data', value=pickle.dumps(demand_df))
logging.info(f"Computed demand for {len(demand_df)} products")
return len(demand_df)
def aggregate_price_logs(**kwargs):
"""Task: Aggregate price logs"""
ti = kwargs['ti']
df = pickle.loads(ti.xcom_pull(key='price_logs_raw'))
context = get_context(**kwargs)
step = AggregatePriceLogsStep(context)
price_df = step.transform(df)
ti.xcom_push(key='price_data', value=pickle.dumps(price_df))
logging.info(f"Aggregated price logs for {len(price_df)} products")
return len(price_df)
def join_product_features(**kwargs):
"""Task: Join demand and price data"""
ti = kwargs['ti']
demand_df = pickle.loads(ti.xcom_pull(key='demand_data'))
price_df = pickle.loads(ti.xcom_pull(key='price_data'))
context = get_context(**kwargs)
step = JoinProductFeaturesStep(context)
joined_df = step.transform((demand_df, price_df))
ti.xcom_push(key='product_features', value=pickle.dumps(joined_df))
logging.info(f"Joined features for {len(joined_df)} products")
return len(joined_df)
def apply_surge_pricing(**kwargs):
"""Task: Apply surge pricing rules to generate optimal prices"""
ti = kwargs['ti']
product_features = pickle.loads(ti.xcom_pull(key='product_features'))
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
# rename demand_score to demand for pricer compatibility
data = product_features.rename(columns={'demand_score': 'demand'})
surge_pricer = SimpleSurgePricer(
high_threshold=dag_conf.get('high_threshold', 10),
low_threshold=dag_conf.get('low_threshold', 2),
surge_multiplier=dag_conf.get('surge_multiplier', 1.2),
discount_multiplier=dag_conf.get('discount_multiplier', 0.9)
)
surge_pricer.fit(data)
data['optimal_price'] = surge_pricer.predict()
prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={
'price': 'current_price',
'demand': 'demand_score'
})
ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df))
logging.info(f"Applied surge pricing for {len(prices_df)} products")
return len(prices_df)
def publish_results(**kwargs):
"""Task: Publish surge pricing results to registry"""
ti = kwargs['ti']
prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices'))
sys.path.insert(0, '/opt/airflow')
from lib.model_registry import ModelRegistry
registry = ModelRegistry()
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
metadata = {
'timestamp': pd.Timestamp.now().isoformat(),
'store_mode': dag_conf.get('store_mode', 'hotel'),
'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual',
'pricing_method': 'surge',
'high_threshold': dag_conf.get('high_threshold', 10),
'low_threshold': dag_conf.get('low_threshold', 2),
'surge_multiplier': dag_conf.get('surge_multiplier', 1.2),
'discount_multiplier': dag_conf.get('discount_multiplier', 0.9)
}
registry.publish_prices(prices_df, model_name='latest', metadata=metadata)
logging.info(f"Published surge pricing for {len(prices_df)} products")
return {
'n_products': len(prices_df),
'registry_status': 'success',
'mean_demand': float(prices_df['demand_score'].mean()) if 'demand_score' in prices_df.columns else None
}
# DAG definition
with DAG(
'surge_pricing_pipeline',
default_args=default_args,
description='Simple surge pricing pipeline: demand aggregation + rule-based pricing',
schedule_interval='*/15 * * * *',
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=['pricing', 'surge', 'research', 'simplified'],
) as dag:
# parallel data fetching
t_fetch_interactions = PythonOperator(
task_id='fetch_interactions',
python_callable=fetch_interactions,
provide_context=True,
)
t_fetch_price_logs = PythonOperator(
task_id='fetch_price_logs',
python_callable=fetch_price_logs,
provide_context=True,
)
# compute demand from interactions
t_compute_demand = PythonOperator(
task_id='compute_demand',
python_callable=compute_demand,
provide_context=True,
)
# aggregate price logs
t_aggregate_prices = PythonOperator(
task_id='aggregate_price_logs',
python_callable=aggregate_price_logs,
provide_context=True,
)
# join demand and prices
t_join_features = PythonOperator(
task_id='join_product_features',
python_callable=join_product_features,
provide_context=True,
)
# apply surge pricing
t_surge_pricing = PythonOperator(
task_id='apply_surge_pricing',
python_callable=apply_surge_pricing,
provide_context=True,
)
# publish to registry
t_publish = PythonOperator(
task_id='publish_results',
python_callable=publish_results,
provide_context=True,
)
# dependency graph: parallel fetch -> process -> join -> surge -> publish
t_fetch_interactions >> t_compute_demand
t_fetch_price_logs >> t_aggregate_prices
[t_compute_demand, t_aggregate_prices] >> t_join_features >> t_surge_pricing >> t_publish

View File

@@ -12,7 +12,6 @@ from procesing.steps import (
ComputeDemandStep,
ComputeDemandForChunksStep,
AggregatePriceLogsStep,
ComputeElasticityStep,
# StateSpace,
# BuildStateSpaceStep,
FitPricingFunctionStep,
@@ -21,7 +20,6 @@ from procesing.steps import (
from procesing.pipelines import (
interaction_extraction_pipeline,
price_extraction_pipeline,
elasticity_computation_pipeline,
pricing_pipeline,
full_pipeline,
)
@@ -42,14 +40,12 @@ __all__ = [
'ComputeDemandStep',
'ComputeDemandForChunksStep',
'AggregatePriceLogsStep',
'ComputeElasticityStep',
# 'StateSpace',
# 'BuildStateSpaceStep',
'FitPricingFunctionStep',
'PredictPricesStep',
'interaction_extraction_pipeline',
'price_extraction_pipeline',
'elasticity_computation_pipeline',
'pricing_pipeline',
'full_pipeline',
]

View File

@@ -2,7 +2,6 @@ from sklearn.pipeline import Pipeline
import pandas as pd
from procesing.context import PipelineContext
from procesing.providers import SupabaseProvider, BackendAPIProvider
from typing import Union
from procesing.steps import (
FetchInteractionsStep,
FetchPriceLogsStep,
@@ -13,11 +12,13 @@ from procesing.steps import (
ChunkByTimeWindowStep,
ComputeDemandForChunksStep,
AggregatePriceLogsStep,
ComputeElasticityStep,
# BuildStateSpaceStep,
FitPricingFunctionStep,
PredictPricesStep,
ComputeDemandStep,
JoinProductFeaturesStep
)
from procesing.pricers import SimpleSurgePricer
def interaction_extraction_pipeline(context: PipelineContext):
"""Pipeline for extracting and augmenting interaction data"""
@@ -35,80 +36,76 @@ def price_extraction_pipeline(context: PipelineContext):
])
def elasticity_computation_pipeline(context: PipelineContext,
def product_features_pipeline(context: PipelineContext,
interactions_df: pd.DataFrame,
price_logs_df: pd.DataFrame):
"""
Compute elasticity from interactions and price logs.
Manual orchestration needed for branching logic.
"""
# branch 1: chunk interactions and compute demand
chunk_step = ChunkByTimeWindowStep(context)
interaction_chunks = chunk_step.transform(interactions_df)
demand_step = ComputeDemandForChunksStep(context)
demand_chunks = demand_step.transform(interaction_chunks)
# branch 2: aggregate price logs
demand_step = ComputeDemandStep(context)
price_step = AggregatePriceLogsStep(context)
price_chunks = price_step.transform(price_logs_df)
# convergence: compute elasticity
elasticity_step = ComputeElasticityStep(context)
elasticity_df = elasticity_step.transform((demand_chunks, price_chunks))
return elasticity_df
join_step = JoinProductFeaturesStep(context)
def pricing_pipeline(context: PipelineContext, elasticity_df: pd.DataFrame):
demand_data = demand_step.transform(interactions_df)
price_data= price_step.transform(price_logs_df)
joined_data = join_step.transform((demand_data, price_data))
return joined_data
def pricing_pipeline(context: "PipelineContext",
data: pd.DataFrame,
high_threshold: int = 10,
low_threshold: int = 2,
surge_multiplier: float = 1.2,
discount_multiplier: float = 0.9) -> pd.DataFrame:
if data.empty or 'productId' not in data.columns:
return pd.DataFrame()
surge_pricer = SimpleSurgePricer()
surge_pricer.fit(data)
data['optimal_price'] = surge_pricer.predict()
return data
def full_pipeline(context: PipelineContext,
high_threshold: int = 10,
low_threshold: int = 2,
surge_multiplier: float = 1.2,
discount_multiplier: float = 0.9):
"""
Generate optimal prices from elasticity estimates.
Complete end-to-end pipeline: data extraction -> demand/price aggregation -> surge pricing
Args:
context: Pipeline context
high_threshold: Demand threshold for surge pricing
low_threshold: Demand threshold for discounts
surge_multiplier: Price multiplier for high demand
discount_multiplier: Price multiplier for low demand
Returns:
tuple: (product_features_df, optimal_prices_df)
- product_features_df: [productId, demand_score, price]
- optimal_prices_df: [productId, current_price, optimal_price, demand_score]
"""
# build state space
state_step = BuildStateSpaceStep(context)
state_space = state_step.transform(elasticity_df)
# fit pricing function
fit_step = FitPricingFunctionStep(context)
pricer = fit_step.transform(elasticity_df)
# predict prices
predict_step = PredictPricesStep(context)
prices_df = predict_step.transform((pricer, state_space))
return prices_df
def full_pipeline(context: PipelineContext):
"""
Complete end-to-end pipeline: data extraction -> elasticity -> pricing
Returns: (elasticity_df, prices_df)
"""
# extract interactions
interaction_pipe = interaction_extraction_pipeline(context)
interactions_df = interaction_pipe.fit_transform(None)
# extract price logs
price_pipe = price_extraction_pipeline(context)
interactions_df = interaction_pipe.fit_transform(None)
price_logs_df = price_pipe.fit_transform(None)
product_features_df = product_features_pipeline(context, interactions_df, price_logs_df)
print(product_features_df.to_string())
if interactions_df.empty or price_logs_df.empty:
return None, None
# generate optimal prices using surge rules
optimal_prices_df = pricing_pipeline(context, product_features_df,
high_threshold=high_threshold,
low_threshold=low_threshold,
surge_multiplier=surge_multiplier,
discount_multiplier=discount_multiplier)
# compute elasticity
elasticity_df = elasticity_computation_pipeline(
context,
interactions_df,
price_logs_df
)
return product_features_df, optimal_prices_df
if elasticity_df is None or elasticity_df.empty:
return elasticity_df, None
# generate prices
prices_df = pricing_pipeline(context, elasticity_df)
return elasticity_df, prices_df
if __name__ == '__main__':
@@ -117,24 +114,25 @@ if __name__ == '__main__':
def __init__(self, backend_url: str):
SupabaseProvider.__init__(self)
BackendAPIProvider.__init__(self, backend_url=backend_url)
class HistoricalProvider(SupabaseProvider, BackendAPIProvider):
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/858c61ab-0a7f-4595-ae49-33f4365517b9/"
interactions_file = "messages(2).json"
prices_file = "messages(3).json"
data = pd.read_json(path + (interactions_file if topic == "user-interactions" else prices_file))
data = [r['payload'] for r in data['value'].to_list()]
data = pd.DataFrame(data)
return data
# example run
context = PipelineContext(
provider=Provider(backend_url="http://localhost:5000"),
provider=HistoricalProvider(),
store_mode='hotel',
# 15 min not month
window_size='15min',
)
elasticity_df, prices_df = full_pipeline(context)
if elasticity_df is not None and not elasticity_df.empty:
print("Elasticity Estimates:")
print(elasticity_df.to_string(index=False))
else:
print("No elasticity estimates computed.")
if prices_df is not None and not prices_df.empty:
print("\nPredicted Prices:")
print(prices_df.to_string(index=False))
else:
print("No prices predicted.")
product_features, prices = full_pipeline(context)
print(prices.to_string())

View File

@@ -1,6 +1,6 @@
from procesing.pricers.base import PricingFunction
from procesing.pricers.elasticity import ElasticityBasedPricer
from procesing.pricers.simple import StaticPricer, RandomPricer
from procesing.pricers.simple import StaticPricer, RandomPricer, SimpleSurgePricer
from procesing.pricers.session_aware import SessionAwarePricer, ProductSpecificSessionPricer
__all__ = [
@@ -8,6 +8,7 @@ __all__ = [
'ElasticityBasedPricer',
'StaticPricer',
'RandomPricer',
'SimpleSurgePricer',
'SessionAwarePricer',
'ProductSpecificSessionPricer'
]

View File

@@ -25,7 +25,7 @@ class PricingFunction(ABC):
"""
@abstractmethod
def fit(self, historical_data: pd.DataFrame, **kwargs):
def fit(self, *kwargs):
"""
Offline training on historical data.
@@ -36,7 +36,7 @@ class PricingFunction(ABC):
pass
@abstractmethod
def predict(self, state_space) -> np.ndarray:
def predict(self, *kwargs) -> np.ndarray:
"""
Generate optimal prices given current state.

View File

@@ -46,3 +46,46 @@ class RandomPricer(PricingFunction):
if self.n_products is None:
self.n_products = len(state_space.demand)
return self.rng.uniform(self.price_min, self.price_max, size=self.n_products)
class SimpleSurgePricer(PricingFunction):
"""
Rule-based surge pricer adjusting prices via demand thresholds.
Logic: if demand > high_threshold -> surge, if demand < low_threshold -> discount.
Simpler and more controllable than curve fitting approaches.
"""
def __init__(self,
base_prices: np.ndarray = None,
high_threshold: int = 10,
low_threshold: int = 2,
surge_multiplier: float = 1.2,
discount_multiplier: float = 0.9):
self.base_prices = base_prices
self.high_threshold = high_threshold
self.low_threshold = low_threshold
self.surge_multiplier = surge_multiplier
self.discount_multiplier = discount_multiplier
def fit(self, market_data : pd.DataFrame):
"""Extract base prices from product catalog or historical averages"""
self.base_prices = market_data['base_price'].to_numpy() if 'base_price' in market_data.columns else market_data['price'].values
self.demand_history = market_data['demand'].to_numpy() if 'demand' in market_data.columns else np.zeros_like(self.base_prices)
def predict(self) -> np.ndarray:
"""
Adjust prices based on current demand using surge rules.
state_space.demand: demand counts per product
state_space.prices: current prices (fallback if base_prices not set)
"""
current_prices = self.base_prices if self.base_prices is not None else np.ones_like(demand_vector) * 99.99
demand = self.demand_history if self.demand_history is not None else np.zeros_like(current_prices)
new_prices = current_prices.copy()
high_mask = demand >= self.high_threshold
new_prices[high_mask] *= self.surge_multiplier
low_mask = demand <= self.low_threshold
new_prices[low_mask] *= self.discount_multiplier
return new_prices

View File

@@ -1,13 +1,12 @@
from procesing.steps.base import BaseContextStep
from procesing.steps.fetch import FetchInteractionsStep, FetchPriceLogsStep, FetchExperimentsStep
from procesing.steps.join import JoinExperimentsStep
from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep
from procesing.steps.join import JoinExperimentsStep, JoinProductFeaturesStep
from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep, AugmentInteractionsStep
from procesing.steps.chunk import ChunkByTimeWindowStep
from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep
from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep
from procesing.steps.elasticity import AggregatePriceLogsStep
from procesing.steps.pricing import FitPricingFunctionStep, PredictPricesStep
from procesing.steps.session import ExtractSessionFeaturesStep, _extract_features_for_session
# StateSpace, BuildStateSpaceStep,
__all__ = [
'BaseContextStep',
@@ -15,13 +14,14 @@ __all__ = [
'FetchPriceLogsStep',
'FetchExperimentsStep',
'JoinExperimentsStep',
'JoinProductFeaturesStep',
'CreatePriceBucketsStep',
'AugmentEventNamesStep',
'AugmentInteractionsStep',
'ChunkByTimeWindowStep',
'ComputeDemandStep',
'ComputeDemandForChunksStep',
'AggregatePriceLogsStep',
'ComputeElasticityStep',
'FitPricingFunctionStep',
'PredictPricesStep',
'ExtractSessionFeaturesStep',

View File

@@ -2,6 +2,93 @@ import numpy as np
import pandas as pd
from procesing.steps.base import BaseContextStep
class AugmentInteractionsStep(BaseContextStep):
"""
Consolidated step: create price buckets, augment event names, join experiments.
Input: (interactions_df, price_logs_df)
Output: enriched interactions_df
"""
def transform(self, data: tuple):
interactions_df, price_logs_df = data
if interactions_df.empty:
return interactions_df
# Step 1: Create price buckets
interactions_df = self._create_price_buckets(interactions_df)
# Step 2: Augment event names
interactions_df = self._augment_event_names(interactions_df)
# Step 3: Join experiments (optional)
if 'experimentId' in interactions_df.columns:
interactions_df = self._join_experiments(interactions_df)
return interactions_df
def _create_price_buckets(self, df: pd.DataFrame):
"""Create price bucket labels from price data"""
if 'metadata_price' not in df.columns:
df['price_bucket'] = ""
return df
n_buckets = self.context.config.get('n_price_buckets', 5)
if df['metadata_price'].notnull().sum() > 0:
try:
price_buckets = pd.qcut(
df['metadata_price'],
q=n_buckets,
labels=[f"PB_{i+1}" for i in range(n_buckets)],
duplicates='drop'
)
except ValueError:
# fallback for insufficient unique values
price_buckets = df['metadata_price'].apply(
lambda x: f"P_{int(x)}" if pd.notnull(x) else ""
)
else:
price_buckets = pd.Series([""] * len(df), index=df.index)
df['price_bucket'] = price_buckets
return df
def _augment_event_names(self, df: pd.DataFrame):
"""Augment event names with product and price bucket schema"""
# Create schema: _productId@price_bucket
has_product = df.get('productId', pd.Series()).notnull()
has_bucket = df.get('price_bucket', pd.Series()).notnull()
df['metadata_schema'] = np.where(
has_product & has_bucket,
"_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str),
""
)
df['eventName'] = df['eventName'] + df['metadata_schema']
return df
def _join_experiments(self, df: pd.DataFrame):
"""Join experiment metadata if experimentId present"""
exp_ids = df['experimentId'].dropna().unique().tolist()
if not exp_ids:
return df
experiments_df = self.context.provider.fetch_experiments(exp_ids)
if experiments_df.empty:
return df
return df.merge(
experiments_df,
left_on='experimentId',
right_on='id',
how='left',
suffixes=('', '_exp')
)
class CreatePriceBucketsStep(BaseContextStep):
"""Create price bucket labels from price data"""

View File

@@ -16,7 +16,7 @@ class AggregatePriceLogsStep(BaseContextStep):
df = price_logs_df.copy()
ts_col = self.context.config.get('ts_col', 'ts')
window_size = self.context.window_size
#window_size = self.context.window_size WE ARE NOT USING CHUNKS ANYMORE
# ensure datetime
if not pd.api.types.is_datetime64_any_dtype(df[ts_col]):
@@ -24,230 +24,19 @@ class AggregatePriceLogsStep(BaseContextStep):
df = df.sort_values([ts_col, 'productId'])
products = self.context.products
unique_products = products['id'].unique()
# VECTORIZED: group by product, resample by time window, compute mean
df_indexed = df.set_index(ts_col)
windowed = (
df_indexed
.groupby('productId')['price']
.resample(window_size)
.mean()
.reset_index()
# get base price from metadata if available 1) read the metadata col as json and get the base_price
products['base_price'] = products.apply(
lambda row: row['metadata'].get('base_price', 0) if isinstance(row['metadata'], dict) else 0,
axis=1
)
# forward fill missing windows (carry last known price)
windowed = windowed.sort_values([ts_col, 'productId'])
windowed['price'] = windowed.groupby('productId')['price'].ffill()
windowed = windowed.dropna(subset=['price'])
unique_products = products['id'].unique()
# group into chunks by window
chunks = []
for window_start, group in windowed.groupby(ts_col):
price_vector = group[['productId', 'price']].copy()
# fill missing products with last known price before this window
missing_products = set(unique_products) - set(price_vector['productId'])
if missing_products:
for pid in missing_products:
last_price = df_indexed[
(df_indexed['productId'] == pid) &
(df_indexed.index < window_start)
]['price']
if not last_price.empty:
price_vector = pd.concat([
price_vector,
pd.DataFrame({'productId': [pid], 'price': [last_price.iloc[-1]]})
], ignore_index=True)
if not price_vector.empty:
chunks.append({
'window_start': window_start,
'window_end': window_start + pd.Timedelta(window_size),
'price_vector': price_vector
})
return chunks
class ComputeElasticityStep(BaseContextStep):
"""
Compute price elasticity from demand and price chunks.
Input: (demand_chunks, price_chunks)
Output: elasticity_df [productId, elasticity, std_error, n_obs]
"""
def transform(self, chunk_tuple: tuple):
demand_chunks, price_chunks = chunk_tuple
method = self.context.config.get('elasticity_method', 'point')
min_obs = self.context.config.get('min_observations', 2)
products = self.context.products
all_product_ids = products['id'].unique()
# align chunks by window_start
aligned = self._align_chunks(demand_chunks, price_chunks)
if not aligned:
return pd.DataFrame({
'productId': all_product_ids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
# build time series per product
product_series = self._build_timeseries(aligned)
# compute elasticity per product
elasticities = []
for pid, series in product_series.items():
if len(series) < min_obs:
elasticities.append({
'productId': pid,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': len(series)
})
continue
elast = self._compute_elasticity(series, method)
elasticities.append({
'productId': pid,
'elasticity': elast['value'],
'std_error': elast.get('std_error', 0.0),
'n_obs': len(series)
})
result_df = pd.DataFrame(elasticities)
# fill missing products with zero elasticity
observed_pids = set(result_df['productId'])
missing_pids = [p for p in all_product_ids if p not in observed_pids]
if missing_pids:
missing_df = pd.DataFrame({
'productId': missing_pids,
'elasticity': 0.0,
'std_error': 0.0,
'n_obs': 0
})
result_df = pd.concat([result_df, missing_df], ignore_index=True)
return result_df
def _align_chunks(self, demand_chunks: List[Dict], price_chunks: List[Dict]):
"""Align demand and price chunks by window_start"""
price_lookup = {c['window_start']: c for c in price_chunks}
aligned = []
for dc in demand_chunks:
ws = dc['window_start']
if ws in price_lookup:
aligned.append({
'window_start': ws,
'window_end': dc['window_end'],
'demand': dc['demand_vector'],
'prices': price_lookup[ws]['price_vector']
})
return aligned
def _build_timeseries(self, aligned: List[Dict]):
"""Build time series [timestamp, price, quantity] per product"""
series_by_product = {}
for chunk in aligned:
merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner')
for _, row in merged.iterrows():
pid = row['productId']
if pid not in series_by_product:
series_by_product[pid] = []
series_by_product[pid].append({
'timestamp': chunk['window_start'],
'price': row['price'],
'quantity': row['demand_score']
})
return series_by_product
def _compute_elasticity(self, series: List[Dict], method: str):
"""Compute point or arc elasticity"""
prices = np.array([s['price'] for s in series])
quantities = np.array([s['quantity'] for s in series])
# filter out zero/negative values
valid = (prices > 0) & (quantities > 0)
if valid.sum() < 2:
return {'value': 0.0, 'std_error': 0.0}
prices = prices[valid]
quantities = quantities[valid]
if method == 'point':
return self._point_elasticity(prices, quantities)
elif method == 'arc':
return self._arc_elasticity(prices, quantities)
else:
raise ValueError(f"Unknown elasticity method: {method}")
def _point_elasticity(self, prices: np.ndarray, quantities: np.ndarray):
"""Point elasticity via log-log regression: log(Q) = a + b*log(P), elasticity = b"""
if len(prices) < 2:
return {'value': 0.0, 'std_error': 0.0}
log_p = np.log(prices)
log_q = np.log(quantities)
if log_p.std() == 0:
return {'value': 0.0, 'std_error': 0.0}
cov = np.cov(log_p, log_q)[0, 1]
var = np.var(log_p)
b = cov / var
# std error estimate
if len(prices) > 2:
residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean()))
mse = (residuals ** 2).sum() / (len(prices) - 2)
se_b = np.sqrt(mse / (len(prices) * var))
else:
se_b = 0.0
return {'value': b, 'std_error': se_b}
def _arc_elasticity(self, prices: np.ndarray, quantities: np.ndarray):
"""Arc elasticity: average period-over-period elasticity"""
elasticities = []
for i in range(1, len(prices)):
p1, p2 = prices[i-1], prices[i]
q1, q2 = quantities[i-1], quantities[i]
p_avg = (p1 + p2) / 2
q_avg = (q1 + q2) / 2
if p_avg == 0 or q_avg == 0:
continue
delta_p = p2 - p1
delta_q = q2 - q1
if delta_p == 0:
continue
e = (delta_q / q_avg) / (delta_p / p_avg)
elasticities.append(e)
if not elasticities:
return {'value': 0.0, 'std_error': 0.0}
return {
'value': np.mean(elasticities),
'std_error': np.std(elasticities) / np.sqrt(len(elasticities))
}
df_indexed = df.set_index(ts_col)
# we return a df of average price per product over the entire period
# TODO: maybe consider different opration to handle price aggregation over time
avg_prices = df_indexed.groupby('productId')['price'].mean().reindex(unique_products, fill_value=0).reset_index()
avg_prices.columns = ['productId', 'price']
# fill 0s with base_price from products
base_price_map = products.set_index('id')['base_price'].to_dict()
return avg_prices

View File

@@ -2,7 +2,11 @@ import pandas as pd
from procesing.steps.base import BaseContextStep
class FetchInteractionsStep(BaseContextStep):
"""Fetch raw interaction data from Kafka topic"""
"""Fetch raw interaction data from Kafka topic with optional time filtering"""
def __init__(self, context, lookback: str = None):
super().__init__(context)
self.lookback = lookback
def transform(self, X=None):
df = self.context.provider.fetch_kafka_topic('user-interactions')
@@ -24,14 +28,35 @@ class FetchInteractionsStep(BaseContextStep):
if 'metadata_dateIndex' in df.columns:
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
# Apply time filtering if lookback specified
if self.lookback and 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'])
cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback)
df = df[df['ts'] >= cutoff]
return df
class FetchPriceLogsStep(BaseContextStep):
"""Fetch price log data from Kafka topic"""
"""Fetch price log data from Kafka topic with optional time filtering"""
def __init__(self, context, lookback: str = None):
super().__init__(context)
self.lookback = lookback
def transform(self, X=None):
return self.context.provider.fetch_kafka_topic('price-logs')
df = self.context.provider.fetch_kafka_topic('price-logs')
if df.empty:
return df
# Apply time filtering if lookback specified
if self.lookback and 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'])
cutoff = pd.Timestamp.now() - pd.Timedelta(self.lookback)
df = df[df['ts'] >= cutoff]
return df
class FetchExperimentsStep(BaseContextStep):

View File

@@ -32,3 +32,27 @@ class JoinExperimentsStep(BaseContextStep):
})
return interactions_df.merge(experiments_df, on='experimentId', how='left')
class JoinProductFeaturesStep(BaseContextStep):
"""Join product features to interactions"""
def transform(self, data: tuple):
"""
Args:
data: (interactions_df, products_df)
Returns:
merged interactions dataframe
"""
demand_df, price_df = data
# get base prices from products if available
products = self.context.products
products['base_price'] = products.apply(
lambda row: float(row['metadata'].get('base_price', 0.0)) if isinstance(row['metadata'], dict) else 0,
axis=1
)
products = products[['id', 'base_price']].rename(columns={'id': 'productId'})
if price_df.empty:
return demand_df
return demand_df.merge(price_df, on='productId', how='left').merge(products, on='productId', how='left')

View File

@@ -1,353 +0,0 @@
import pytest
import pandas as pd
import numpy as np
from procesing.steps import (
AggregatePriceLogsStep,
ComputeElasticityStep
)
def test_aggregate_price_logs_basic(pipeline_context):
"""Test basic price aggregation into time windows"""
step = AggregatePriceLogsStep(pipeline_context)
# Create price logs with known window structure
df = pd.DataFrame({
'ts': pd.date_range(start='2023-01-01 10:00:00', periods=100, freq='10s'),
'productId': np.tile([
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
], 34)[:100],
'price': np.random.uniform(100, 200, 100)
})
result = step.transform(df)
assert isinstance(result, list)
assert len(result) > 0
# each chunk should have window metadata and price vector
for chunk in result:
assert 'window_start' in chunk
assert 'window_end' in chunk
assert 'price_vector' in chunk
assert isinstance(chunk['price_vector'], pd.DataFrame)
assert 'productId' in chunk['price_vector'].columns
assert 'price' in chunk['price_vector'].columns
def test_aggregate_price_logs_handles_gaps(pipeline_context):
"""Test that price aggregation forward-fills missing windows"""
step = AggregatePriceLogsStep(pipeline_context)
# create sparse data with gaps
df = pd.DataFrame({
'ts': pd.to_datetime([
'2023-01-01 10:00:00',
'2023-01-01 10:00:05',
'2023-01-01 10:02:00', # gap of ~2 mins
'2023-01-01 10:02:30'
]),
'productId': [
'd018efc1-25e9-4284-b276-80386e048b25',
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11'
],
'price': [100, 102, 150, 153]
})
result = step.transform(df)
assert isinstance(result, list)
# should have multiple windows despite gaps
assert len(result) >= 2
def test_compute_elasticity_with_known_relationship(pipeline_context):
"""Test elasticity computation with known price-demand relationship"""
step = ComputeElasticityStep(pipeline_context)
# simulate elastic demand: when price ↑10%, demand ↓15% (elasticity ~ -1.5)
base_price = 100
base_demand = 50
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand * 0.85] # 15% decrease
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:01:00'),
'window_end': pd.Timestamp('2023-01-01 10:01:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand * 0.70] # further decrease
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price * 1.10] # 10% increase
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:01:00'),
'window_end': pd.Timestamp('2023-01-01 10:01:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price * 1.20] # 20% increase
})
}
]
result = step.transform((demand_chunks, price_chunks))
assert isinstance(result, pd.DataFrame)
assert not result.empty
assert 'productId' in result.columns
assert 'elasticity' in result.columns
assert 'n_obs' in result.columns
# check elasticity is negative (normal good)
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['elasticity'] < 0
# should be roughly elastic (< -1)
assert product_elast.iloc[0]['n_obs'] == 3
def test_compute_elasticity_inelastic_product(pipeline_context):
"""Test with inelastic demand: price changes, demand barely moves"""
step = ComputeElasticityStep(pipeline_context)
base_price = 150
base_demand = 40
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'demand_score': [base_demand]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'demand_score': [base_demand * 0.98] # tiny 2% decrease
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'price': [base_price]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'price': [base_price * 1.20] # 20% increase
})
}
]
result = step.transform((demand_chunks, price_chunks))
product_elast = result[result['productId'] == '51266ddb-5b07-47b7-89ee-5b5cae94bb11']
assert len(product_elast) == 1
# inelastic: elasticity between 0 and -1
assert -1 < product_elast.iloc[0]['elasticity'] < 0
def test_compute_elasticity_multiple_products(pipeline_context):
"""Test elasticity computation across multiple products simultaneously"""
step = ComputeElasticityStep(pipeline_context)
products = [
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
]
# create 5 time windows with all 3 products
demand_chunks = []
price_chunks = []
for i in range(5):
ts = pd.Timestamp('2023-01-01 10:00:00') + pd.Timedelta(f'{i*30}s')
demand_chunks.append({
'window_start': ts,
'window_end': ts + pd.Timedelta('30s'),
'demand_vector': pd.DataFrame({
'productId': products,
'demand_score': [
50 * (0.9 ** i), # elastic: decreases as price rises
40 * (0.98 ** i), # inelastic: barely changes
30 * (0.85 ** i) # very elastic
]
})
})
price_chunks.append({
'window_start': ts,
'window_end': ts + pd.Timedelta('30s'),
'price_vector': pd.DataFrame({
'productId': products,
'price': [
100 * (1.05 ** i),
150 * (1.10 ** i),
120 * (1.08 ** i)
]
})
})
result = step.transform((demand_chunks, price_chunks))
assert isinstance(result, pd.DataFrame)
assert len(result) == 3 # all products should have elasticity
assert set(result['productId']) == set(products)
assert all(result['n_obs'] == 5)
assert all(result['elasticity'] < 0) # all normal goods
def test_compute_elasticity_insufficient_data(pipeline_context):
"""Test behavior with insufficient observations"""
step = ComputeElasticityStep(pipeline_context)
# only 1 observation
demand_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [50]
})
}]
price_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
}]
result = step.transform((demand_chunks, price_chunks))
# should still return result but with low n_obs
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['n_obs'] == 1
assert product_elast.iloc[0]['elasticity'] == 0.0 # not enough data
def test_compute_elasticity_misaligned_chunks(pipeline_context):
"""Test with non-overlapping demand and price windows"""
step = ComputeElasticityStep(pipeline_context)
demand_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [50]
})
}]
price_chunks = [{
'window_start': pd.Timestamp('2023-01-01 11:00:00'), # different time
'window_end': pd.Timestamp('2023-01-01 11:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
}]
result = step.transform((demand_chunks, price_chunks))
# should handle gracefully with no aligned data
assert isinstance(result, pd.DataFrame)
assert all(result['n_obs'] == 0)
def test_elasticity_arc_method(pipeline_context):
"""Test arc elasticity computation method"""
# configure context for arc method
pipeline_context.config['elasticity_method'] = 'arc'
step = ComputeElasticityStep(pipeline_context)
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [100]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [80]
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [110]
})
}
]
result = step.transform((demand_chunks, price_chunks))
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['elasticity'] < 0
# reset config
pipeline_context.config['elasticity_method'] = 'point'