mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
chore: update provider and pricing snitch with agnostic system
This commit is contained in:
@@ -131,7 +131,7 @@ if __name__ == '__main__':
|
|||||||
# example run
|
# example run
|
||||||
context = PipelineContext(
|
context = PipelineContext(
|
||||||
provider=HistoricalProvider(),
|
provider=HistoricalProvider(),
|
||||||
store_mode='hotel',
|
store_mode='airline',
|
||||||
)
|
)
|
||||||
|
|
||||||
product_features, prices = full_pipeline(context)
|
product_features, prices = full_pipeline(context)
|
||||||
|
|||||||
@@ -18,10 +18,17 @@ class SupabaseProvider(DataProvider):
|
|||||||
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
|
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
|
||||||
|
|
||||||
def fetch_products(self, store_mode: str) -> pd.DataFrame:
|
def fetch_products(self, store_mode: str) -> pd.DataFrame:
|
||||||
resp = self.supabase.table(f'{store_mode}_products').select(
|
# hotel uses room_type, airline uses flight_type; select all and normalize
|
||||||
"id, room_type, date_index, metadata, availability"
|
resp = self.supabase.table(f'{store_mode}_products').select("*").execute()
|
||||||
).execute()
|
if not resp.data:
|
||||||
return pd.DataFrame(resp.data) if resp.data else pd.DataFrame()
|
return pd.DataFrame()
|
||||||
|
df = pd.DataFrame(resp.data)
|
||||||
|
# normalize type column: hotel has room_type, airline has flight_type
|
||||||
|
if 'room_type' in df.columns:
|
||||||
|
df['product_type'] = df['room_type']
|
||||||
|
elif 'flight_type' in df.columns:
|
||||||
|
df['product_type'] = df['flight_type']
|
||||||
|
return df
|
||||||
|
|
||||||
def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame:
|
def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame:
|
||||||
if not experiment_ids:
|
if not experiment_ids:
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import pandas as pd
|
|||||||
from procesing.steps.base import BaseContextStep
|
from procesing.steps.base import BaseContextStep
|
||||||
|
|
||||||
class FetchInteractionsStep(BaseContextStep):
|
class FetchInteractionsStep(BaseContextStep):
|
||||||
"""Fetch raw interaction data from Kafka topic with optional time filtering"""
|
"""Fetch raw interaction data from Kafka topic with optional time and store_mode filtering"""
|
||||||
|
|
||||||
def __init__(self, context, lookback: str = None):
|
def __init__(self, context, lookback: str = None):
|
||||||
super().__init__(context)
|
super().__init__(context)
|
||||||
@@ -24,6 +24,10 @@ class FetchInteractionsStep(BaseContextStep):
|
|||||||
# drop all where page has /admin/
|
# drop all where page has /admin/
|
||||||
df = df[~df['page'].str.contains('/admin/', na=False)]
|
df = df[~df['page'].str.contains('/admin/', na=False)]
|
||||||
|
|
||||||
|
# filter by store_mode from context
|
||||||
|
if 'storeMode' in df.columns:
|
||||||
|
df = df[df['storeMode'] == self.context.store_mode]
|
||||||
|
|
||||||
# Remap dateIndex if present
|
# Remap dateIndex if present
|
||||||
if 'metadata_dateIndex' in df.columns:
|
if 'metadata_dateIndex' in df.columns:
|
||||||
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
|
df['dateIndex'] = df['metadata_dateIndex'].astype('Int64')
|
||||||
@@ -38,7 +42,7 @@ class FetchInteractionsStep(BaseContextStep):
|
|||||||
|
|
||||||
|
|
||||||
class FetchPriceLogsStep(BaseContextStep):
|
class FetchPriceLogsStep(BaseContextStep):
|
||||||
"""Fetch price log data from Kafka topic with optional time filtering"""
|
"""Fetch price log data from Kafka topic with optional time and store_mode filtering"""
|
||||||
|
|
||||||
def __init__(self, context, lookback: str = None):
|
def __init__(self, context, lookback: str = None):
|
||||||
super().__init__(context)
|
super().__init__(context)
|
||||||
@@ -50,6 +54,10 @@ class FetchPriceLogsStep(BaseContextStep):
|
|||||||
if df.empty:
|
if df.empty:
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
# filter by store_mode from context
|
||||||
|
if 'storeMode' in df.columns:
|
||||||
|
df = df[df['storeMode'] == self.context.store_mode]
|
||||||
|
|
||||||
# Apply time filtering if lookback specified
|
# Apply time filtering if lookback specified
|
||||||
if self.lookback and 'ts' in df.columns:
|
if self.lookback and 'ts' in df.columns:
|
||||||
df['ts'] = pd.to_datetime(df['ts'])
|
df['ts'] = pd.to_datetime(df['ts'])
|
||||||
|
|||||||
Reference in New Issue
Block a user