mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
test: started with pipeline step testing
This commit is contained in:
0
experiments/procesing/tests/__init__.py
Normal file
0
experiments/procesing/tests/__init__.py
Normal file
271
experiments/procesing/tests/conftest.py
Normal file
271
experiments/procesing/tests/conftest.py
Normal file
@@ -0,0 +1,271 @@
|
||||
import pytest
|
||||
import pandas as pd
|
||||
from typing import List
|
||||
from procesing.providers.base import DataProvider
|
||||
from procesing.context import PipelineContext
|
||||
|
||||
|
||||
class MockProvider(DataProvider):
|
||||
"""Mock provider for testing, holds in-memory fixtures"""
|
||||
|
||||
def __init__(self, products_df=None, experiments_df=None, kafka_data=None):
|
||||
self._products = products_df if products_df is not None else pd.DataFrame()
|
||||
self._experiments = experiments_df if experiments_df is not None else pd.DataFrame()
|
||||
self._kafka_data = kafka_data if kafka_data is not None else {}
|
||||
|
||||
def fetch_products(self, store_mode: str) -> pd.DataFrame:
|
||||
return self._products.copy()
|
||||
|
||||
def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame:
|
||||
if self._experiments.empty:
|
||||
return pd.DataFrame()
|
||||
return self._experiments[
|
||||
self._experiments['id'].isin(experiment_ids)
|
||||
].copy()
|
||||
|
||||
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
|
||||
return self._kafka_data.get(topic, pd.DataFrame()).copy()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_products():
|
||||
"""Standard product catalog fixture with realistic IDs from test data"""
|
||||
return pd.DataFrame({
|
||||
'id': [
|
||||
'd018efc1-25e9-4284-b276-80386e048b25',
|
||||
'51266ddb-5b07-47b7-89ee-5b5cae94bb11',
|
||||
'2cd7f756-fc65-4ba0-ab01-74521c1fff43'
|
||||
],
|
||||
'name': ['Junior Suite', 'Superior Room', 'Deluxe Room'],
|
||||
'base_price': [200.0, 150.0, 180.0]
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_interactions_raw_kafka():
|
||||
"""Raw Kafka message structure for interactions, matches production format"""
|
||||
return [
|
||||
{
|
||||
'partitionID': 0, 'offset': 203, 'timestamp': 1764102082676,
|
||||
'value': {
|
||||
'payload': {
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'eventName': 'learn_more_about_item',
|
||||
'page': '/hotel/products/d018efc1-25e9-4284-b276-80386e048b25',
|
||||
'productId': 'd018efc1-25e9-4284-b276-80386e048b25',
|
||||
'metadata': {'type': 'hotel', 'dateIndex': 1, 'roomType': 'Junior Suite'},
|
||||
'storeMode': 'hotel',
|
||||
'ts': '2025-11-25T20:21:22.674Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 204, 'timestamp': 1764102086982,
|
||||
'value': {
|
||||
'payload': {
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'eventName': 'page_view',
|
||||
'page': '/hotel/products',
|
||||
'productId': None,
|
||||
'metadata': {'referrer': ''},
|
||||
'storeMode': 'hotel',
|
||||
'ts': '2025-11-25T20:21:26.947Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 205, 'timestamp': 1764102091825,
|
||||
'value': {
|
||||
'payload': {
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'eventName': 'hover_over_title',
|
||||
'page': '/hotel/products',
|
||||
'productId': '51266ddb-5b07-47b7-89ee-5b5cae94bb11',
|
||||
'metadata': {'elementText': 'Superior Room', 'dateIndex': 1, 'dwellTime': 1200},
|
||||
'storeMode': 'hotel',
|
||||
'ts': '2025-11-25T20:21:31.823Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 206, 'timestamp': 1764102094193,
|
||||
'value': {
|
||||
'payload': {
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': 'bbbbcccc-dddd-eeee-ffff-000011112222',
|
||||
'eventName': 'hover_over_paragraph',
|
||||
'page': '/hotel/products',
|
||||
'productId': '51266ddb-5b07-47b7-89ee-5b5cae94bb11',
|
||||
'metadata': {'elementText': 'price', 'dateIndex': 1, 'dwellTime': 1307},
|
||||
'storeMode': 'hotel',
|
||||
'ts': '2025-11-25T20:21:34.191Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 207, 'timestamp': 1764102101970,
|
||||
'value': {
|
||||
'payload': {
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': 'bbbbcccc-dddd-eeee-ffff-000011112222',
|
||||
'eventName': 'hover_over_paragraph',
|
||||
'page': '/hotel/products',
|
||||
'productId': 'd018efc1-25e9-4284-b276-80386e048b25',
|
||||
'metadata': {'elementText': 'price', 'dateIndex': 1, 'dwellTime': 1201},
|
||||
'storeMode': 'hotel',
|
||||
'ts': '2025-11-25T20:21:41.967Z'
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_interactions(mock_interactions_raw_kafka):
|
||||
"""Processed interaction DataFrame (what provider.fetch_kafka_topic returns)"""
|
||||
records = [msg['value']['payload'] for msg in mock_interactions_raw_kafka]
|
||||
df = pd.DataFrame(records)
|
||||
df['timestamp'] = pd.to_datetime(df['ts'])
|
||||
return df
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_price_logs_raw_kafka():
|
||||
"""Raw Kafka message structure for price logs, matches production format"""
|
||||
return [
|
||||
{
|
||||
'partitionID': 0, 'offset': 32, 'timestamp': 1764104757969,
|
||||
'value': {
|
||||
'payload': {
|
||||
'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43',
|
||||
'price': 162.47,
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'storeMode': 'shop',
|
||||
'ts': '2025-11-25T21:05:57.967Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 33, 'timestamp': 1764104757995,
|
||||
'value': {
|
||||
'payload': {
|
||||
'productId': '2ddabbfc-4127-48fc-86dc-ebc4c677efa2',
|
||||
'price': 743.49,
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'storeMode': 'shop',
|
||||
'ts': '2025-11-25T21:05:57.993Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 34, 'timestamp': 1764104758011,
|
||||
'value': {
|
||||
'payload': {
|
||||
'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43',
|
||||
'price': 163.87,
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'storeMode': 'shop',
|
||||
'ts': '2025-11-25T21:05:58.009Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 35, 'timestamp': 1764104758050,
|
||||
'value': {
|
||||
'payload': {
|
||||
'productId': '2ddabbfc-4127-48fc-86dc-ebc4c677efa2',
|
||||
'price': 397.46,
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'storeMode': 'shop',
|
||||
'ts': '2025-11-25T21:05:58.049Z'
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
'partitionID': 0, 'offset': 36, 'timestamp': 1764104768865,
|
||||
'value': {
|
||||
'payload': {
|
||||
'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43',
|
||||
'price': 401.66,
|
||||
'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472',
|
||||
'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35',
|
||||
'storeMode': 'shop',
|
||||
'ts': '2025-11-25T21:06:08.864Z'
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_price_logs(mock_price_logs_raw_kafka):
|
||||
"""Processed price logs DataFrame (what provider.fetch_kafka_topic returns)"""
|
||||
# extract payloads and flatten
|
||||
records = [msg['value']['payload'] for msg in mock_price_logs_raw_kafka]
|
||||
df = pd.DataFrame(records)
|
||||
df['timestamp'] = pd.to_datetime(df['ts'])
|
||||
return df
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_experiments():
|
||||
"""Standard experiment metadata fixture matching Supabase schema"""
|
||||
return pd.DataFrame({
|
||||
'id': ['53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', 'bbbbcccc-dddd-eeee-ffff-000011112222'],
|
||||
'created_at': pd.to_datetime(['2025-11-25T20:00:00Z', '2025-11-26T10:00:00Z']),
|
||||
'subject_name': ['Session A', 'Session B'],
|
||||
'xp_human_only': [True, False],
|
||||
'xp_market_mode': ['hotel', 'shop'],
|
||||
'xp_task_id': [None, None]
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_provider(mock_products, mock_experiments, mock_interactions, mock_price_logs):
|
||||
"""Fully configured mock provider"""
|
||||
return MockProvider(
|
||||
products_df=mock_products,
|
||||
experiments_df=mock_experiments,
|
||||
kafka_data={
|
||||
'user-interactions': mock_interactions,
|
||||
'price-logs': mock_price_logs
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pipeline_context(mock_provider):
|
||||
"""Standard pipeline context for testing"""
|
||||
return PipelineContext(
|
||||
provider=mock_provider,
|
||||
store_mode='hotel',
|
||||
window_size='30s',
|
||||
n_price_buckets=3
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_provider():
|
||||
"""Provider with no data, for edge case testing"""
|
||||
return MockProvider(
|
||||
products_df=pd.DataFrame(columns=['id', 'name', 'base_price']),
|
||||
experiments_df=pd.DataFrame(columns=['id', 'created_at', 'subject_name', 'xp_human_only', 'xp_market_mode', 'xp_task_id']),
|
||||
kafka_data={'user-interactions': pd.DataFrame(), 'price-logs': pd.DataFrame()}
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_context(empty_provider):
|
||||
"""Context with empty provider"""
|
||||
return PipelineContext(
|
||||
provider=empty_provider,
|
||||
store_mode='hotel',
|
||||
window_size='30s'
|
||||
)
|
||||
45
experiments/procesing/tests/test_augement.py
Normal file
45
experiments/procesing/tests/test_augement.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import pytest
|
||||
import random
|
||||
import pandas as pd
|
||||
from procesing.steps import (
|
||||
CreatePriceBucketsStep,
|
||||
AugmentEventNamesStep
|
||||
)
|
||||
|
||||
def test_bucketing(pipeline_context):
|
||||
step = CreatePriceBucketsStep(context=pipeline_context)
|
||||
|
||||
# Test with normal price data
|
||||
df = pd.DataFrame({
|
||||
'metadata_price': random.sample(range(10, 1000), 100)
|
||||
})
|
||||
result = step.transform(df)
|
||||
assert 'price_bucket' in result.columns
|
||||
# test if is categorical
|
||||
assert isinstance(result['price_bucket'].dtype, pd.CategoricalDtype)
|
||||
assert result['price_bucket'].nunique() == 3 # as per context config
|
||||
# distribution check
|
||||
counts = result['price_bucket'].value_counts()
|
||||
assert all(counts > 0)
|
||||
assert counts.max() - counts.min() <= 10 # roughly equal distribution for 100 samples
|
||||
# Test with empty DataFrame
|
||||
df = pd.DataFrame()
|
||||
result = step.transform(df)
|
||||
assert 'price_bucket' in result.columns
|
||||
assert result.empty
|
||||
|
||||
|
||||
def test_augment_names(pipeline_context):
|
||||
df = pd.DataFrame({
|
||||
'eventName': ['click', 'view', 'purchase'],
|
||||
'productId': ['prod_1', 'prod_2', None],
|
||||
'price_bucket': ['PB_1', None, 'PB_3']
|
||||
})
|
||||
step = AugmentEventNamesStep(context=pipeline_context)
|
||||
result = step.transform(df)
|
||||
expected_event_names = [
|
||||
'click_prod_1@PB_1',
|
||||
'view',
|
||||
'purchase'
|
||||
]
|
||||
assert result['eventName'].tolist() == expected_event_names
|
||||
51
experiments/procesing/tests/test_fetch.py
Normal file
51
experiments/procesing/tests/test_fetch.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import pytest
|
||||
import pandas as pd
|
||||
from procesing.steps import (
|
||||
FetchInteractionsStep,
|
||||
FetchPriceLogsStep,
|
||||
FetchExperimentsStep,
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_interactions_data(pipeline_context):
|
||||
step = FetchInteractionsStep(pipeline_context)
|
||||
data = step.transform(None)
|
||||
assert data is not None
|
||||
assert isinstance(data, pd.DataFrame)
|
||||
expected_cols = [
|
||||
"eventName",
|
||||
"dateIndex",
|
||||
"experimentId",
|
||||
"storeMode",
|
||||
"metadata_elementText"
|
||||
]
|
||||
for expected in expected_cols:
|
||||
assert expected in data.columns
|
||||
|
||||
def test_fetch_price_logs(pipeline_context):
|
||||
step = FetchPriceLogsStep(pipeline_context)
|
||||
data = step.transform(None)
|
||||
assert data is not None
|
||||
assert isinstance(data, pd.DataFrame)
|
||||
expected_cols = [
|
||||
"price",
|
||||
"productId"
|
||||
]
|
||||
for expected in expected_cols:
|
||||
assert expected in data.columns
|
||||
prices = data['price'].to_list()
|
||||
assert min(prices) >= 0
|
||||
assert max(prices) <= 9999
|
||||
|
||||
|
||||
def test_experiments_fetching(pipeline_context):
|
||||
interactions = FetchInteractionsStep(pipeline_context).transform(None)
|
||||
assert interactions is not None
|
||||
experiments = FetchExperimentsStep(pipeline_context)
|
||||
experiment_data = experiments.transform(interactions)
|
||||
assert experiment_data is not None
|
||||
assert isinstance(experiment_data, pd.DataFrame)
|
||||
assert not experiment_data.empty
|
||||
assert 'id' in experiment_data.columns
|
||||
assert len(experiment_data) == 2
|
||||
assert '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35' in experiment_data['id'].values
|
||||
8
experiments/pytest.ini
Normal file
8
experiments/pytest.ini
Normal file
@@ -0,0 +1,8 @@
|
||||
[pytest]
|
||||
pythonpath = .
|
||||
testpaths = procesing/tests agents
|
||||
python_files = test*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
asyncio_mode = auto
|
||||
asyncio_default_fixture_loop_scope = function
|
||||
@@ -1,4 +1,5 @@
|
||||
[pytest]
|
||||
pythonpath = experiments
|
||||
testpaths = experiments
|
||||
python_files = test*.py
|
||||
python_classes = Test*
|
||||
|
||||
Reference in New Issue
Block a user