test: extra tests wit hsemantic meaning checks

This commit is contained in:
2025-11-28 17:38:11 +01:00
parent e9d9c0e319
commit 73e46200c7
3 changed files with 489 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
import pytest
import random
import pandas as pd
from procesing.steps import (
ComputeDemandStep
)
def test_compute_demand(pipeline_context):
step = ComputeDemandStep(context=pipeline_context)
# Test with normal interaction data
df = pd.DataFrame({
'ts': pd.date_range(start='2023-01-01', periods=100, freq='h'),
'productId': random.choices([
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
], k=100),
'eventName': random.choices(['view', 'click', 'purchase'], k=100)
})
result = step.transform(df)
assert type(result) == pd.DataFrame
assert not result.empty
assert set(result['productId']) == set(pipeline_context.products['id'])
assert all(result['demand_score'] > 100/3 -10)
def test_compute_demand_skewed(pipeline_context):
step = ComputeDemandStep(context=pipeline_context)
# Test with normal interaction data
df = pd.DataFrame({
'ts': pd.date_range(start='2023-01-01', periods=100, freq='h'),
'productId': random.choices([
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
], weights=[0.7, 0.2, 0.1], k=100),
'eventName': random.choices(['view', 'click', 'purchase'], k=100)
})
result = step.transform(df)
assert type(result) == pd.DataFrame
assert not result.empty
assert set(result['productId']) == set(pipeline_context.products['id'])
# test for skewness
scores = result.set_index('productId')['demand_score'].to_dict()
assert scores['d018efc1-25e9-4284-b276-80386e048b25'] > \
scores['51266ddb-5b07-47b7-89ee-5b5cae94bb11'] > \
scores['2cd7f756-fc65-4ba0-ab01-74521c1fff43']

View File

@@ -0,0 +1,353 @@
import pytest
import pandas as pd
import numpy as np
from procesing.steps import (
AggregatePriceLogsStep,
ComputeElasticityStep
)
def test_aggregate_price_logs_basic(pipeline_context):
"""Test basic price aggregation into time windows"""
step = AggregatePriceLogsStep(pipeline_context)
# Create price logs with known window structure
df = pd.DataFrame({
'ts': pd.date_range(start='2023-01-01 10:00:00', periods=100, freq='10s'),
'productId': np.tile([
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
], 34)[:100],
'price': np.random.uniform(100, 200, 100)
})
result = step.transform(df)
assert isinstance(result, list)
assert len(result) > 0
# each chunk should have window metadata and price vector
for chunk in result:
assert 'window_start' in chunk
assert 'window_end' in chunk
assert 'price_vector' in chunk
assert isinstance(chunk['price_vector'], pd.DataFrame)
assert 'productId' in chunk['price_vector'].columns
assert 'price' in chunk['price_vector'].columns
def test_aggregate_price_logs_handles_gaps(pipeline_context):
"""Test that price aggregation forward-fills missing windows"""
step = AggregatePriceLogsStep(pipeline_context)
# create sparse data with gaps
df = pd.DataFrame({
'ts': pd.to_datetime([
'2023-01-01 10:00:00',
'2023-01-01 10:00:05',
'2023-01-01 10:02:00', # gap of ~2 mins
'2023-01-01 10:02:30'
]),
'productId': [
'd018efc1-25e9-4284-b276-80386e048b25',
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11'
],
'price': [100, 102, 150, 153]
})
result = step.transform(df)
assert isinstance(result, list)
# should have multiple windows despite gaps
assert len(result) >= 2
def test_compute_elasticity_with_known_relationship(pipeline_context):
"""Test elasticity computation with known price-demand relationship"""
step = ComputeElasticityStep(pipeline_context)
# simulate elastic demand: when price ↑10%, demand ↓15% (elasticity ~ -1.5)
base_price = 100
base_demand = 50
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand * 0.85] # 15% decrease
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:01:00'),
'window_end': pd.Timestamp('2023-01-01 10:01:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [base_demand * 0.70] # further decrease
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price * 1.10] # 10% increase
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:01:00'),
'window_end': pd.Timestamp('2023-01-01 10:01:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [base_price * 1.20] # 20% increase
})
}
]
result = step.transform((demand_chunks, price_chunks))
assert isinstance(result, pd.DataFrame)
assert not result.empty
assert 'productId' in result.columns
assert 'elasticity' in result.columns
assert 'n_obs' in result.columns
# check elasticity is negative (normal good)
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['elasticity'] < 0
# should be roughly elastic (< -1)
assert product_elast.iloc[0]['n_obs'] == 3
def test_compute_elasticity_inelastic_product(pipeline_context):
"""Test with inelastic demand: price changes, demand barely moves"""
step = ComputeElasticityStep(pipeline_context)
base_price = 150
base_demand = 40
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'demand_score': [base_demand]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'demand_score': [base_demand * 0.98] # tiny 2% decrease
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'price': [base_price]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'],
'price': [base_price * 1.20] # 20% increase
})
}
]
result = step.transform((demand_chunks, price_chunks))
product_elast = result[result['productId'] == '51266ddb-5b07-47b7-89ee-5b5cae94bb11']
assert len(product_elast) == 1
# inelastic: elasticity between 0 and -1
assert -1 < product_elast.iloc[0]['elasticity'] < 0
def test_compute_elasticity_multiple_products(pipeline_context):
"""Test elasticity computation across multiple products simultaneously"""
step = ComputeElasticityStep(pipeline_context)
products = [
'd018efc1-25e9-4284-b276-80386e048b25',
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
]
# create 5 time windows with all 3 products
demand_chunks = []
price_chunks = []
for i in range(5):
ts = pd.Timestamp('2023-01-01 10:00:00') + pd.Timedelta(f'{i*30}s')
demand_chunks.append({
'window_start': ts,
'window_end': ts + pd.Timedelta('30s'),
'demand_vector': pd.DataFrame({
'productId': products,
'demand_score': [
50 * (0.9 ** i), # elastic: decreases as price rises
40 * (0.98 ** i), # inelastic: barely changes
30 * (0.85 ** i) # very elastic
]
})
})
price_chunks.append({
'window_start': ts,
'window_end': ts + pd.Timedelta('30s'),
'price_vector': pd.DataFrame({
'productId': products,
'price': [
100 * (1.05 ** i),
150 * (1.10 ** i),
120 * (1.08 ** i)
]
})
})
result = step.transform((demand_chunks, price_chunks))
assert isinstance(result, pd.DataFrame)
assert len(result) == 3 # all products should have elasticity
assert set(result['productId']) == set(products)
assert all(result['n_obs'] == 5)
assert all(result['elasticity'] < 0) # all normal goods
def test_compute_elasticity_insufficient_data(pipeline_context):
"""Test behavior with insufficient observations"""
step = ComputeElasticityStep(pipeline_context)
# only 1 observation
demand_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [50]
})
}]
price_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
}]
result = step.transform((demand_chunks, price_chunks))
# should still return result but with low n_obs
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['n_obs'] == 1
assert product_elast.iloc[0]['elasticity'] == 0.0 # not enough data
def test_compute_elasticity_misaligned_chunks(pipeline_context):
"""Test with non-overlapping demand and price windows"""
step = ComputeElasticityStep(pipeline_context)
demand_chunks = [{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [50]
})
}]
price_chunks = [{
'window_start': pd.Timestamp('2023-01-01 11:00:00'), # different time
'window_end': pd.Timestamp('2023-01-01 11:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
}]
result = step.transform((demand_chunks, price_chunks))
# should handle gracefully with no aligned data
assert isinstance(result, pd.DataFrame)
assert all(result['n_obs'] == 0)
def test_elasticity_arc_method(pipeline_context):
"""Test arc elasticity computation method"""
# configure context for arc method
pipeline_context.config['elasticity_method'] = 'arc'
step = ComputeElasticityStep(pipeline_context)
demand_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [100]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'demand_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'demand_score': [80]
})
}
]
price_chunks = [
{
'window_start': pd.Timestamp('2023-01-01 10:00:00'),
'window_end': pd.Timestamp('2023-01-01 10:00:30'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [100]
})
},
{
'window_start': pd.Timestamp('2023-01-01 10:00:30'),
'window_end': pd.Timestamp('2023-01-01 10:01:00'),
'price_vector': pd.DataFrame({
'productId': ['d018efc1-25e9-4284-b276-80386e048b25'],
'price': [110]
})
}
]
result = step.transform((demand_chunks, price_chunks))
product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25']
assert len(product_elast) == 1
assert product_elast.iloc[0]['elasticity'] < 0
# reset config
pipeline_context.config['elasticity_method'] = 'point'

View File

@@ -0,0 +1,87 @@
import pytest
import pandas as pd
from procesing.pricers import (
StaticPricer,
RandomPricer,
ElasticityBasedPricer
)
def test_static_pricer_fit_and_predict():
# Sample historical data
historical_data = pd.DataFrame({
'product_id': [1, 2, 3],
'base_price': [100.0, 150.0, 200.0]
})
# Initialize and fit StaticPricer
pricer = StaticPricer()
pricer.fit(historical_data)
# Predict prices
predicted_prices = pricer.predict(None)
# Assert that predicted prices match base prices
expected_prices = historical_data['base_price'].values
assert all(predicted_prices == expected_prices), "Predicted prices do not match base prices"
def test_random_pricer_fit_and_predict():
# Sample historical data
historical_data = pd.DataFrame({
'product_id': [1, 2, 3],
'base_price': [100.0, 150.0, 200.0]
})
# Initialize and fit RandomPricer
pricer = RandomPricer(price_min=50.0, price_max=250.0, seed=42)
pricer.fit(historical_data)
# Predict prices
predicted_prices = pricer.predict(None)
# Assert that predicted prices are within bounds
assert predicted_prices.min() >= 50.0, "Predicted prices are below minimum bound"
assert predicted_prices.max() <= 250.0, "Predicted prices are above maximum bound"
# distribution check (not so strict)
assert len(set(predicted_prices)) > 1, "Predicted prices are not varied enough"
assert len(predicted_prices) == len(historical_data), "Number of predicted prices does not match number of products"
def test_elasticity_based_pricer_fit_and_predict():
# Sample historical data
historical_data = pd.DataFrame({
'productId': [1, 2, 3],
'elasticity': [-1.5, -0.5, -2.0],
'base_price': [100.0, 150.0, 200.0],
'mean_demand': [10, 20, 15]
})
# Initialize and fit ElasticityBasedPricer
pricer = ElasticityBasedPricer(alpha=0.1, price_floor=50.0, price_ceil=300.0)
pricer.fit(historical_data)
# Create a mock state space with demand deviations
class MockStateSpace:
def __init__(self, demand):
self.demand = demand
# Simulate demand higher than mean for all products
state_space = MockStateSpace(demand=[15, 25, 20])
# Predict prices
predicted_prices = pricer.predict(state_space)
# Assert that predicted prices are within bounds
assert predicted_prices.min() >= 50.0, "Predicted prices are below minimum bound"
assert predicted_prices.max() <= 300.0, "Predicted prices are above maximum bound"
assert len(predicted_prices) == len(historical_data), "Number of predicted prices does not match number of products"
# now we gotta check semantic validity
# since demand is higher than mean, prices should generally increase
for i, row in historical_data.iterrows():
base_price = row['base_price']
elasticity = row['elasticity']
expected_increase = base_price * (1 + 0.1 * abs(elasticity) * ((state_space.demand[i] - row['mean_demand']) / row['mean_demand']))
assert predicted_prices[i] >= base_price, f"Predicted price for product {row['productId']} did not increase as expected"
assert abs(predicted_prices[i] - expected_increase) < 1e-5, f"Predicted price for product {row['productId']} does not match expected calculation within 1e-5 tolerance"