From b28f3206a7e39a6b3d5d07d33db610fb00a93efa Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 12 Dec 2025 12:08:59 +0100 Subject: [PATCH] chore: refactoring and dixing path joining --- experiments/procesing/pipelines.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py index 4a98f5b..cc29f20 100644 --- a/experiments/procesing/pipelines.py +++ b/experiments/procesing/pipelines.py @@ -143,16 +143,26 @@ if __name__ == '__main__': class ExperimentsProvider(SupabaseProvider, BackendAPIProvider): def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: - path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" - subdirs = os.listdir(path) - full_df = pd.DataFrame() + base_path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" # os.path.join(os.path.dirname(__file__), "collected_data") + if not os.path.isdir(base_path): + return pd.DataFrame() + files = {"user-interactions": "int.json", "price-logs": "price.json"} - for d in subdirs: - path += d + "/" - data = pd.read_json(path + files.get(topic, files["user-interactions"])) - data = pd.DataFrame([r['payload'] for r in data['value'].to_list()]) - full_df = pd.concat([full_df, data], ignore_index=True) - return full_df + file_to_read = files.get(topic, files["user-interactions"]) + frames = [] + + for d in os.listdir(base_path): + full_path = os.path.join(base_path, d, file_to_read) + if not os.path.isfile(full_path): + continue + try: + data = pd.read_json(full_path) + payloads = pd.DataFrame([r['payload'] for r in data['value'].to_list()]) + frames.append(payloads) + except Exception as e: + print(f"Warning: Could not process {full_path}: {e}") + + return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() # demo: run ML training pipeline context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel')