mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-06-01 00:53:36 +00:00
chore: refactoring and dixing path joining
This commit is contained in:
@@ -143,16 +143,26 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
class ExperimentsProvider(SupabaseProvider, BackendAPIProvider):
|
class ExperimentsProvider(SupabaseProvider, BackendAPIProvider):
|
||||||
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
|
def fetch_kafka_topic(self, topic: str) -> pd.DataFrame:
|
||||||
path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/"
|
base_path = "/home/velocitatem/Documents/Projects/PHANTOM/experiments/collected_data/" # os.path.join(os.path.dirname(__file__), "collected_data")
|
||||||
subdirs = os.listdir(path)
|
if not os.path.isdir(base_path):
|
||||||
full_df = pd.DataFrame()
|
return pd.DataFrame()
|
||||||
|
|
||||||
files = {"user-interactions": "int.json", "price-logs": "price.json"}
|
files = {"user-interactions": "int.json", "price-logs": "price.json"}
|
||||||
for d in subdirs:
|
file_to_read = files.get(topic, files["user-interactions"])
|
||||||
path += d + "/"
|
frames = []
|
||||||
data = pd.read_json(path + files.get(topic, files["user-interactions"]))
|
|
||||||
data = pd.DataFrame([r['payload'] for r in data['value'].to_list()])
|
for d in os.listdir(base_path):
|
||||||
full_df = pd.concat([full_df, data], ignore_index=True)
|
full_path = os.path.join(base_path, d, file_to_read)
|
||||||
return full_df
|
if not os.path.isfile(full_path):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
data = pd.read_json(full_path)
|
||||||
|
payloads = pd.DataFrame([r['payload'] for r in data['value'].to_list()])
|
||||||
|
frames.append(payloads)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Could not process {full_path}: {e}")
|
||||||
|
|
||||||
|
return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
|
||||||
|
|
||||||
# demo: run ML training pipeline
|
# demo: run ML training pipeline
|
||||||
context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel')
|
context = PipelineContext(provider=ExperimentsProvider(), store_mode='hotel')
|
||||||
|
|||||||
Reference in New Issue
Block a user