chore: updating dag with upload to registry

This commit is contained in:
2025-11-27 23:25:24 +01:00
parent de7a386fc7
commit 219370cd95

View File

@@ -56,6 +56,7 @@ def fetch_price_logs(**context):
def compute_demand_chunks(**context): def compute_demand_chunks(**context):
"""Chunk interactions and compute demand per window""" """Chunk interactions and compute demand per window"""
import io
ti = context['task_instance'] ti = context['task_instance']
window_size = context['dag_run'].conf.get('window_size', '30s') window_size = context['dag_run'].conf.get('window_size', '30s')
store_mode = context['dag_run'].conf.get('store_mode', 'hotel') store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
@@ -66,7 +67,7 @@ def compute_demand_chunks(**context):
logging.error("No interaction data available") logging.error("No interaction data available")
return None return None
interactions_df = pd.read_json(interaction_json) interactions_df = pd.read_json(io.StringIO(interaction_json))
# chunk into windows # chunk into windows
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
@@ -93,6 +94,7 @@ def compute_demand_chunks(**context):
def aggregate_prices(**context): def aggregate_prices(**context):
"""Aggregate price logs into aligned windows""" """Aggregate price logs into aligned windows"""
import io
ti = context['task_instance'] ti = context['task_instance']
window_size = context['dag_run'].conf.get('window_size', '30s') window_size = context['dag_run'].conf.get('window_size', '30s')
store_mode = context['dag_run'].conf.get('store_mode', 'hotel') store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
@@ -102,7 +104,7 @@ def aggregate_prices(**context):
logging.error("No price data available") logging.error("No price data available")
return None return None
price_df = pd.read_json(price_json) price_df = pd.read_json(io.StringIO(price_json))
price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode) price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode)
# serialize for XCom # serialize for XCom
@@ -121,6 +123,7 @@ def aggregate_prices(**context):
def compute_elasticity(**context): def compute_elasticity(**context):
"""Compute price elasticity from demand and price chunks""" """Compute price elasticity from demand and price chunks"""
import io
ti = context['task_instance'] ti = context['task_instance']
store_mode = context['dag_run'].conf.get('store_mode', 'hotel') store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
method = context['dag_run'].conf.get('elasticity_method', 'point') method = context['dag_run'].conf.get('elasticity_method', 'point')
@@ -139,7 +142,7 @@ def compute_elasticity(**context):
{ {
'window_start': pd.Timestamp(c['window_start']), 'window_start': pd.Timestamp(c['window_start']),
'window_end': pd.Timestamp(c['window_end']), 'window_end': pd.Timestamp(c['window_end']),
'demand_vector': pd.read_json(c['demand_vector']) 'demand_vector': pd.read_json(io.StringIO(c['demand_vector']))
} }
for c in demand_chunks_raw for c in demand_chunks_raw
] ]
@@ -148,7 +151,7 @@ def compute_elasticity(**context):
{ {
'window_start': pd.Timestamp(c['window_start']), 'window_start': pd.Timestamp(c['window_start']),
'window_end': pd.Timestamp(c['window_end']), 'window_end': pd.Timestamp(c['window_end']),
'price_vector': pd.read_json(c['price_vector']) 'price_vector': pd.read_json(io.StringIO(c['price_vector']))
} }
for c in price_chunks_raw for c in price_chunks_raw
] ]
@@ -173,7 +176,8 @@ def compute_elasticity(**context):
} }
def publish_results(**context): def publish_results(**context):
"""Publish elasticity results to model registry or backend""" """Publish elasticity results to model registry and train pricing model"""
import io
ti = context['task_instance'] ti = context['task_instance']
elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results') elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results')
@@ -181,15 +185,58 @@ def publish_results(**context):
logging.error("No elasticity results to publish") logging.error("No elasticity results to publish")
return None return None
elasticity_df = pd.read_json(elasticity_json) elasticity_df = pd.read_json(io.StringIO(elasticity_json))
# TODO: implement actual publishing logic # import registry and pricing modules
# - push to model registry import sys
# - update pricing provider service sys.path.insert(0, '/opt/airflow/procesing')
# - store in database for audit trail sys.path.insert(0, '/opt/airflow')
logging.info(f"Published elasticity for {len(elasticity_df)} products") from lib.model_registry import ModelRegistry
return True from procesing.pricing import ElasticityBasedPricingFunction
# initialize registry
registry = ModelRegistry()
# publish elasticity data
metadata = {
'timestamp': pd.Timestamp.now().isoformat(),
'window_size': context['dag_run'].conf.get('window_size', '30s'),
'store_mode': context['dag_run'].conf.get('store_mode', 'hotel'),
'dag_run_id': context['dag_run'].run_id
}
registry.publish_elasticity(
elasticity_df,
model_name='latest',
metadata=metadata
)
# train and publish pricing model
pricing_model = ElasticityBasedPricingFunction(
cost_floor=0.5,
max_markup=2.5,
min_markup=1.0,
inelastic_markup=1.2
)
pricing_model.fit(elasticity_df)
registry.publish_pricing_model(
pricing_model,
model_name='latest',
metadata={
**metadata,
'model_type': 'ElasticityBasedPricingFunction'
}
)
logging.info(f"Published elasticity + pricing model for {len(elasticity_df)} products to registry")
return {
'n_products': len(elasticity_df),
'registry_status': 'success',
'elasticity_mean': float(elasticity_df['elasticity'].mean())
}
# DAG definition # DAG definition