From a351af1dbee76d606802e407bdbf96e59997dfff Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 5 Dec 2025 11:54:58 +0100 Subject: [PATCH] adding loader with historical data loading --- experiments/procesing/pipelines.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index c2e01e1..5c0d704 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -118,9 +118,27 @@ if __name__ == '__main__': def __init__(self, backend_url: str): SupabaseProvider.__init__(self) BackendAPIProvider.__init__(self, backend_url=backend_url) + + + class HistoricalProvider(SupabaseProvider, BackendAPIProvider): + def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: + path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/858c61ab-0a7f-4595-ae49-33f4365517b9/" + interactions_file = "messages(2).json" + prices_file = "messages(3).json" + + if topic == "interactions": + data = pd.read_json(path + interactions_file) + elif topic == "price_logs": + pd.read_json(path + prices_file) + data = pd.read_json(path + (interactions_file if topic == "user-interactions" else prices_file)) + data = [r['payload'] for r in data['value'].to_list()] + data = pd.DataFrame(data) + return data + + # example run context = PipelineContext( - provider=Provider(backend_url="http://localhost:5000"), + provider=HistoricalProvider(), store_mode='hotel', # 15 min not month window_size='15min',