From eb30b042716f5a5ac5bc4b2cd259982b5087943d Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Fri, 28 Nov 2025 13:52:41 +0100 Subject: [PATCH] local pipeline excution working --- .../airflow/dags/elasticity_pricing_dag.py | 430 ++++++++++-------- experiments/procesing/pricing.py | 2 + experiments/procesing/steps/pricing.py | 2 - 3 files changed, 245 insertions(+), 189 deletions(-) diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py index 18e263e..1076969 100644 --- a/experiments/airflow/dags/elasticity_pricing_dag.py +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -5,14 +5,27 @@ from datetime import timedelta import pandas as pd import logging import sys -import os +import pickle +import io # add procesing module to path (mounted at /opt/airflow/procesing in container) sys.path.insert(0, '/opt/airflow/procesing') -from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter -from demand import DemandEstimator, ChunkInteractionsIntoSteps -from elasticity import TemporalElasticityEstimator, aggregate_price_logs +from context import PipelineContext +from providers import SupabaseProvider, BackendAPIProvider +from steps import ( + FetchInteractionsStep, + FetchPriceLogsStep, + CreatePriceBucketsStep, + AugmentEventNamesStep, + ChunkByTimeWindowStep, + ComputeDemandForChunksStep, + AggregatePriceLogsStep, + ComputeElasticityStep, + BuildStateSpaceStep, + FitPricingFunctionStep, + PredictPricesStep, +) default_args = { 'owner': 'phantom-research', @@ -23,214 +36,210 @@ default_args = { 'retry_delay': timedelta(minutes=5), } -# callable functions for tasks (stateless, idempotent) -def fetch_interactions(**context): - """Extract interaction data from Kafka and augment""" - fetcher = KafkaDataFetcher(topic='user-interactions') - data = fetcher.fit_transform(None) +def get_provider(): + """Factory to create composite provider""" + class CompositeProvider(SupabaseProvider, BackendAPIProvider): + def __init__(self): + SupabaseProvider.__init__(self) + BackendAPIProvider.__init__(self) + return CompositeProvider() - if data.empty: - logging.warning("No interaction data fetched") - return None +def get_context(**kwargs): + """Build pipeline context from Airflow config""" + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + return PipelineContext( + provider=get_provider(), + store_mode=dag_conf.get('store_mode', 'hotel'), + window_size=dag_conf.get('window_size', '30s'), + n_price_buckets=dag_conf.get('n_price_buckets', 5), + elasticity_method=dag_conf.get('elasticity_method', 'point'), + min_observations=dag_conf.get('min_observations', 2), + ) - data = ExperimentJoiner().fit_transform(data) - data = EventTitleAugmenter().fit_transform(data) +# atomic task functions (each wraps one sklearn step) +def fetch_interactions(**kwargs): + """Task: Fetch interaction data from Kafka""" + context = get_context(**kwargs) + step = FetchInteractionsStep(context) + df = step.transform(None) - # push to XCom for downstream tasks - context['task_instance'].xcom_push(key='interaction_data', value=data.to_json()) - logging.info(f"Fetched {len(data)} interaction records") - return len(data) + kwargs['ti'].xcom_push(key='interactions_raw', value=df.to_json()) + logging.info(f"Fetched {len(df)} interaction records") + return len(df) -def fetch_price_logs(**context): - """Extract price logs from Kafka""" - fetcher = KafkaDataFetcher(topic='price-logs') - data = fetcher.fit_transform(None) +def fetch_price_logs(**kwargs): + """Task: Fetch price logs from Kafka""" + context = get_context(**kwargs) + step = FetchPriceLogsStep(context) + df = step.transform(None) - if data.empty: - logging.warning("No price data fetched") - return None + kwargs['ti'].xcom_push(key='price_logs_raw', value=df.to_json()) + logging.info(f"Fetched {len(df)} price records") + return len(df) - context['task_instance'].xcom_push(key='price_data', value=data.to_json()) - logging.info(f"Fetched {len(data)} price records") - return len(data) +def create_price_buckets(**kwargs): + """Task: Create price buckets for interactions""" + ti = kwargs['ti'] + interactions_json = ti.xcom_pull(key='interactions_raw') + df = pd.read_json(io.StringIO(interactions_json)) -def compute_demand_chunks(**context): - """Chunk interactions and compute demand per window""" - import io - ti = context['task_instance'] - window_size = context['dag_run'].conf.get('window_size', '30s') - store_mode = context['dag_run'].conf.get('store_mode', 'hotel') + context = get_context(**kwargs) + step = CreatePriceBucketsStep(context) + df = step.transform(df) - # pull from XCom - interaction_json = ti.xcom_pull(task_ids='fetch_interactions', key='interaction_data') - if not interaction_json: - logging.error("No interaction data available") - return None + ti.xcom_push(key='interactions_bucketed', value=df.to_json()) + logging.info(f"Created price buckets for {len(df)} interactions") + return len(df) - interactions_df = pd.read_json(io.StringIO(interaction_json)) +def augment_event_names(**kwargs): + """Task: Augment event names with product and price schema""" + ti = kwargs['ti'] + interactions_json = ti.xcom_pull(key='interactions_bucketed') + df = pd.read_json(io.StringIO(interactions_json)) - # chunk into windows - chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) - chunks = chunker.transform(interactions_df) + context = get_context(**kwargs) + step = AugmentEventNamesStep(context) + df = step.transform(df) - if not chunks: - logging.warning("No chunks generated") - return None + ti.xcom_push(key='interactions_final', value=df.to_json()) + logging.info(f"Augmented event names for {len(df)} interactions") + return len(df) - # compute demand per chunk - estimator = DemandEstimator(store_mode=store_mode) - demand_chunks = [ - { - 'window_start': c['window_start'].isoformat(), - 'window_end': c['window_end'].isoformat(), - 'demand_vector': estimator.transform(c['data']).to_json() - } - for c in chunks - ] +def chunk_interactions(**kwargs): + """Task: Chunk interactions into time windows""" + ti = kwargs['ti'] + interactions_json = ti.xcom_pull(key='interactions_final') + df = pd.read_json(io.StringIO(interactions_json)) - ti.xcom_push(key='demand_chunks', value=demand_chunks) - logging.info(f"Generated {len(demand_chunks)} demand chunks @ {window_size}") + context = get_context(**kwargs) + step = ChunkByTimeWindowStep(context) + chunks = step.transform(df) + + ti.xcom_push(key='interaction_chunks', value=pickle.dumps(chunks)) + logging.info(f"Generated {len(chunks)} interaction chunks") + return len(chunks) + +def compute_demand(**kwargs): + """Task: Compute demand vectors for all chunks""" + ti = kwargs['ti'] + chunks = pickle.loads(ti.xcom_pull(key='interaction_chunks')) + + context = get_context(**kwargs) + step = ComputeDemandForChunksStep(context) + demand_chunks = step.transform(chunks) + + ti.xcom_push(key='demand_chunks', value=pickle.dumps(demand_chunks)) + logging.info(f"Computed demand for {len(demand_chunks)} chunks") return len(demand_chunks) -def aggregate_prices(**context): - """Aggregate price logs into aligned windows""" - import io - ti = context['task_instance'] - window_size = context['dag_run'].conf.get('window_size', '30s') - store_mode = context['dag_run'].conf.get('store_mode', 'hotel') +def aggregate_price_logs(**kwargs): + """Task: Aggregate price logs into time windows (VECTORIZED)""" + ti = kwargs['ti'] + price_logs_json = ti.xcom_pull(key='price_logs_raw') + df = pd.read_json(io.StringIO(price_logs_json)) - price_json = ti.xcom_pull(task_ids='fetch_price_logs', key='price_data') - if not price_json: - logging.error("No price data available") - return None + context = get_context(**kwargs) + step = AggregatePriceLogsStep(context) + price_chunks = step.transform(df) - price_df = pd.read_json(io.StringIO(price_json)) - price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode) - - # serialize for XCom - serialized = [ - { - 'window_start': c['window_start'].isoformat(), - 'window_end': c['window_end'].isoformat(), - 'price_vector': c['price_vector'].to_json() - } - for c in price_chunks - ] - - ti.xcom_push(key='price_chunks', value=serialized) - logging.info(f"Aggregated {len(price_chunks)} price chunks") + ti.xcom_push(key='price_chunks', value=pickle.dumps(price_chunks)) + logging.info(f"Aggregated {len(price_chunks)} price chunks (vectorized)") return len(price_chunks) -def compute_elasticity(**context): - """Compute price elasticity from demand and price chunks""" - import io - ti = context['task_instance'] - store_mode = context['dag_run'].conf.get('store_mode', 'hotel') - method = context['dag_run'].conf.get('elasticity_method', 'point') - min_obs = context['dag_run'].conf.get('min_observations', 2) +def compute_elasticity(**kwargs): + """Task: Compute price elasticity from demand and price chunks""" + ti = kwargs['ti'] + demand_chunks = pickle.loads(ti.xcom_pull(key='demand_chunks')) + price_chunks = pickle.loads(ti.xcom_pull(key='price_chunks')) - # pull chunks from XCom - demand_chunks_raw = ti.xcom_pull(task_ids='compute_demand', key='demand_chunks') - price_chunks_raw = ti.xcom_pull(task_ids='aggregate_prices', key='price_chunks') + context = get_context(**kwargs) + step = ComputeElasticityStep(context) + elasticity_df = step.transform((demand_chunks, price_chunks)) - if not demand_chunks_raw or not price_chunks_raw: - logging.error("Missing demand or price chunks") - return None - - # deserialize - demand_chunks = [ - { - 'window_start': pd.Timestamp(c['window_start']), - 'window_end': pd.Timestamp(c['window_end']), - 'demand_vector': pd.read_json(io.StringIO(c['demand_vector'])) - } - for c in demand_chunks_raw - ] - - price_chunks = [ - { - 'window_start': pd.Timestamp(c['window_start']), - 'window_end': pd.Timestamp(c['window_end']), - 'price_vector': pd.read_json(io.StringIO(c['price_vector'])) - } - for c in price_chunks_raw - ] - - # compute elasticity - estimator = TemporalElasticityEstimator(method=method, min_observations=min_obs) - elasticity_df = estimator.transform(demand_chunks, price_chunks, store_mode=store_mode) - - if elasticity_df is None or elasticity_df.empty: - logging.warning("No elasticity results computed") - return None - - # store results (could push to DB, S3, or XCom) ti.xcom_push(key='elasticity_results', value=elasticity_df.to_json()) logging.info(f"Computed elasticity for {len(elasticity_df)} products") - # summary stats return { 'n_products': len(elasticity_df), 'mean_elasticity': float(elasticity_df['elasticity'].mean()), 'median_elasticity': float(elasticity_df['elasticity'].median()) } -def publish_results(**context): - """Publish elasticity results to model registry and train pricing model""" - import io - ti = context['task_instance'] - elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results') - - if not elasticity_json: - logging.error("No elasticity results to publish") - return None - +def build_state_space(**kwargs): + """Task: Build state space from elasticity""" + ti = kwargs['ti'] + elasticity_json = ti.xcom_pull(key='elasticity_results') elasticity_df = pd.read_json(io.StringIO(elasticity_json)) - # import registry and pricing modules - import sys - sys.path.insert(0, '/opt/airflow/procesing') # this is pretty janky + context = get_context(**kwargs) + step = BuildStateSpaceStep(context) + state_space = step.transform(elasticity_df) + + ti.xcom_push(key='state_space', value=pickle.dumps(state_space)) + logging.info("Built state space for pricing") + return True + +def fit_pricing_function(**kwargs): + """Task: Fit pricing function using elasticity""" + ti = kwargs['ti'] + elasticity_json = ti.xcom_pull(key='elasticity_results') + elasticity_df = pd.read_json(io.StringIO(elasticity_json)) + + context = get_context(**kwargs) + step = FitPricingFunctionStep(context) + pricer = step.transform(elasticity_df) + + ti.xcom_push(key='pricer', value=pickle.dumps(pricer)) + logging.info("Fitted pricing function") + return True + +def predict_prices(**kwargs): + """Task: Predict optimal prices""" + ti = kwargs['ti'] + pricer = pickle.loads(ti.xcom_pull(key='pricer')) + state_space = pickle.loads(ti.xcom_pull(key='state_space')) + + context = get_context(**kwargs) + step = PredictPricesStep(context) + prices_df = step.transform((pricer, state_space)) + + ti.xcom_push(key='predicted_prices', value=prices_df.to_json()) + logging.info(f"Predicted prices for {len(prices_df)} products") + return len(prices_df) + +def publish_results(**kwargs): + """Task: Publish elasticity and pricing results to model registry""" + ti = kwargs['ti'] + elasticity_json = ti.xcom_pull(key='elasticity_results') + prices_json = ti.xcom_pull(key='predicted_prices') + + elasticity_df = pd.read_json(io.StringIO(elasticity_json)) + prices_df = pd.read_json(io.StringIO(prices_json)) + sys.path.insert(0, '/opt/airflow') - from lib.model_registry import ModelRegistry - from procesing.pricing import ElasticityBasedPricingFunction - # initialize registry registry = ModelRegistry() + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} - # publish elasticity data metadata = { 'timestamp': pd.Timestamp.now().isoformat(), - 'window_size': context['dag_run'].conf.get('window_size', '30s'), - 'store_mode': context['dag_run'].conf.get('store_mode', 'hotel'), - 'dag_run_id': context['dag_run'].run_id + 'window_size': dag_conf.get('window_size', '30s'), + 'store_mode': dag_conf.get('store_mode', 'hotel'), + 'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual' } - registry.publish_elasticity( - elasticity_df, - model_name='latest', - metadata=metadata - ) - - # train and publish pricing model - pricing_model = ElasticityBasedPricingFunction( - cost_floor=0.5, - max_markup=2.5, - min_markup=1.0, - inelastic_markup=1.2 - ) - pricing_model.fit(elasticity_df) + registry.publish_elasticity(elasticity_df, model_name='latest', metadata=metadata) + # get fitted pricer from XCom + pricer = pickle.loads(ti.xcom_pull(key='pricer')) registry.publish_pricing_model( - pricing_model, + pricer, model_name='latest', - metadata={ - **metadata, - 'model_type': 'ElasticityBasedPricingFunction' - } + metadata={**metadata, 'model_type': type(pricer).__name__} ) - logging.info(f"Published elasticity + pricing model for {len(elasticity_df)} products to registry") + logging.info(f"Published elasticity + pricing for {len(elasticity_df)} products") return { 'n_products': len(elasticity_df), @@ -243,57 +252,104 @@ def publish_results(**context): with DAG( 'elasticity_pricing_pipeline', default_args=default_args, - description='E2E pipeline: interactions -> demand -> elasticity -> pricing', - schedule_interval='*/15 * * * *', # every 5 minutes for real-time pricing + description='E2E refactored pipeline: atomic steps with proper separation', + schedule_interval='*/15 * * * *', start_date=days_ago(1), catchup=False, max_active_runs=1, - tags=['pricing', 'elasticity', 'research'], + tags=['pricing', 'elasticity', 'research', 'refactored'], ) as dag: # parallel data fetching - fetch_interactions_task = PythonOperator( + t_fetch_interactions = PythonOperator( task_id='fetch_interactions', python_callable=fetch_interactions, provide_context=True, ) - fetch_price_logs_task = PythonOperator( + t_fetch_price_logs = PythonOperator( task_id='fetch_price_logs', python_callable=fetch_price_logs, provide_context=True, ) - # demand computation (depends on interactions) - compute_demand_task = PythonOperator( + # interaction processing branch + t_create_buckets = PythonOperator( + task_id='create_price_buckets', + python_callable=create_price_buckets, + provide_context=True, + ) + + t_augment_events = PythonOperator( + task_id='augment_event_names', + python_callable=augment_event_names, + provide_context=True, + ) + + t_chunk_interactions = PythonOperator( + task_id='chunk_interactions', + python_callable=chunk_interactions, + provide_context=True, + ) + + t_compute_demand = PythonOperator( task_id='compute_demand', - python_callable=compute_demand_chunks, + python_callable=compute_demand, provide_context=True, ) - # price aggregation (depends on price logs) - aggregate_prices_task = PythonOperator( - task_id='aggregate_prices', - python_callable=aggregate_prices, + # price processing branch (VECTORIZED) + t_aggregate_prices = PythonOperator( + task_id='aggregate_price_logs', + python_callable=aggregate_price_logs, provide_context=True, ) - # elasticity computation (depends on both demand and prices) - compute_elasticity_task = PythonOperator( + # convergence: compute elasticity + t_compute_elasticity = PythonOperator( task_id='compute_elasticity', python_callable=compute_elasticity, provide_context=True, ) - # publish results (depends on elasticity) - publish_results_task = PythonOperator( + # pricing tasks + t_build_state = PythonOperator( + task_id='build_state_space', + python_callable=build_state_space, + provide_context=True, + ) + + t_fit_pricer = PythonOperator( + task_id='fit_pricing_function', + python_callable=fit_pricing_function, + provide_context=True, + ) + + t_predict_prices = PythonOperator( + task_id='predict_prices', + python_callable=predict_prices, + provide_context=True, + ) + + # publish to registry + t_publish = PythonOperator( task_id='publish_results', python_callable=publish_results, provide_context=True, ) - # dependency graph - fetch_interactions_task >> compute_demand_task - fetch_price_logs_task >> aggregate_prices_task - [compute_demand_task, aggregate_prices_task] >> compute_elasticity_task - compute_elasticity_task >> publish_results_task + # dependency graph (clear atomic flow) + # parallel fetches + [t_fetch_interactions, t_fetch_price_logs] + + # interaction branch: fetch -> bucket -> augment -> chunk -> demand + t_fetch_interactions >> t_create_buckets >> t_augment_events >> t_chunk_interactions >> t_compute_demand + + # price branch: fetch -> aggregate (vectorized) + t_fetch_price_logs >> t_aggregate_prices + + # convergence: both branches -> elasticity + [t_compute_demand, t_aggregate_prices] >> t_compute_elasticity + + # pricing: elasticity -> state + fit -> predict -> publish + t_compute_elasticity >> [t_build_state, t_fit_pricer] >> t_predict_prices >> t_publish diff --git a/experiments/procesing/pricing.py b/experiments/procesing/pricing.py index 76ba1a8..4789ebc 100644 --- a/experiments/procesing/pricing.py +++ b/experiments/procesing/pricing.py @@ -35,6 +35,8 @@ from sklearn.base import BaseEstimator, TransformerMixin import numpy as np import pandas as pd import os +from dotenv import load_dotenv +load_dotenv() from supabase import create_client, Client SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") diff --git a/experiments/procesing/steps/pricing.py b/experiments/procesing/steps/pricing.py index aaa0798..1d7c695 100755 --- a/experiments/procesing/steps/pricing.py +++ b/experiments/procesing/steps/pricing.py @@ -57,8 +57,6 @@ class FitPricingFunctionStep(BaseContextStep): """ def transform(self, elasticity_df: pd.DataFrame): - from pricing import ElasticityBasedPricingFunction - pricing_class = self.context.config.get('pricing_function_class', ElasticityBasedPricingFunction) pricing_params = self.context.config.get('pricing_function_params', {})