From 219370cd9573f1c821553f6c8465a9cdeac3e6ed Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Thu, 27 Nov 2025 23:25:24 +0100 Subject: [PATCH] chore: updating dag with upload to registry --- .../airflow/dags/elasticity_pricing_dag.py | 71 +++++++++++++++---- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py index 9e24fde..464e728 100644 --- a/experiments/airflow/dags/elasticity_pricing_dag.py +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -56,6 +56,7 @@ def fetch_price_logs(**context): def compute_demand_chunks(**context): """Chunk interactions and compute demand per window""" + import io ti = context['task_instance'] window_size = context['dag_run'].conf.get('window_size', '30s') store_mode = context['dag_run'].conf.get('store_mode', 'hotel') @@ -66,7 +67,7 @@ def compute_demand_chunks(**context): logging.error("No interaction data available") return None - interactions_df = pd.read_json(interaction_json) + interactions_df = pd.read_json(io.StringIO(interaction_json)) # chunk into windows chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) @@ -93,6 +94,7 @@ def compute_demand_chunks(**context): def aggregate_prices(**context): """Aggregate price logs into aligned windows""" + import io ti = context['task_instance'] window_size = context['dag_run'].conf.get('window_size', '30s') store_mode = context['dag_run'].conf.get('store_mode', 'hotel') @@ -102,7 +104,7 @@ def aggregate_prices(**context): logging.error("No price data available") return None - price_df = pd.read_json(price_json) + price_df = pd.read_json(io.StringIO(price_json)) price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode) # serialize for XCom @@ -121,6 +123,7 @@ def aggregate_prices(**context): def compute_elasticity(**context): """Compute price elasticity from demand and price chunks""" + import io ti = context['task_instance'] store_mode = context['dag_run'].conf.get('store_mode', 'hotel') method = context['dag_run'].conf.get('elasticity_method', 'point') @@ -139,7 +142,7 @@ def compute_elasticity(**context): { 'window_start': pd.Timestamp(c['window_start']), 'window_end': pd.Timestamp(c['window_end']), - 'demand_vector': pd.read_json(c['demand_vector']) + 'demand_vector': pd.read_json(io.StringIO(c['demand_vector'])) } for c in demand_chunks_raw ] @@ -148,7 +151,7 @@ def compute_elasticity(**context): { 'window_start': pd.Timestamp(c['window_start']), 'window_end': pd.Timestamp(c['window_end']), - 'price_vector': pd.read_json(c['price_vector']) + 'price_vector': pd.read_json(io.StringIO(c['price_vector'])) } for c in price_chunks_raw ] @@ -173,7 +176,8 @@ def compute_elasticity(**context): } def publish_results(**context): - """Publish elasticity results to model registry or backend""" + """Publish elasticity results to model registry and train pricing model""" + import io ti = context['task_instance'] elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results') @@ -181,15 +185,58 @@ def publish_results(**context): logging.error("No elasticity results to publish") return None - elasticity_df = pd.read_json(elasticity_json) + elasticity_df = pd.read_json(io.StringIO(elasticity_json)) - # TODO: implement actual publishing logic - # - push to model registry - # - update pricing provider service - # - store in database for audit trail + # import registry and pricing modules + import sys + sys.path.insert(0, '/opt/airflow/procesing') + sys.path.insert(0, '/opt/airflow') - logging.info(f"Published elasticity for {len(elasticity_df)} products") - return True + from lib.model_registry import ModelRegistry + from procesing.pricing import ElasticityBasedPricingFunction + + # initialize registry + registry = ModelRegistry() + + # publish elasticity data + metadata = { + 'timestamp': pd.Timestamp.now().isoformat(), + 'window_size': context['dag_run'].conf.get('window_size', '30s'), + 'store_mode': context['dag_run'].conf.get('store_mode', 'hotel'), + 'dag_run_id': context['dag_run'].run_id + } + + registry.publish_elasticity( + elasticity_df, + model_name='latest', + metadata=metadata + ) + + # train and publish pricing model + pricing_model = ElasticityBasedPricingFunction( + cost_floor=0.5, + max_markup=2.5, + min_markup=1.0, + inelastic_markup=1.2 + ) + pricing_model.fit(elasticity_df) + + registry.publish_pricing_model( + pricing_model, + model_name='latest', + metadata={ + **metadata, + 'model_type': 'ElasticityBasedPricingFunction' + } + ) + + logging.info(f"Published elasticity + pricing model for {len(elasticity_df)} products to registry") + + return { + 'n_products': len(elasticity_df), + 'registry_status': 'success', + 'elasticity_mean': float(elasticity_df['elasticity'].mean()) + } # DAG definition