mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
300 lines
9.7 KiB
Python
300 lines
9.7 KiB
Python
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.utils.dates import days_ago
|
|
from datetime import timedelta
|
|
import pandas as pd
|
|
import logging
|
|
import sys
|
|
import os
|
|
|
|
# add procesing module to path (mounted at /opt/airflow/procesing in container)
|
|
sys.path.insert(0, '/opt/airflow/procesing')
|
|
|
|
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter
|
|
from demand import DemandEstimator, ChunkInteractionsIntoSteps
|
|
from elasticity import TemporalElasticityEstimator, aggregate_price_logs
|
|
|
|
default_args = {
|
|
'owner': 'phantom-research',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 2,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
# callable functions for tasks (stateless, idempotent)
|
|
def fetch_interactions(**context):
|
|
"""Extract interaction data from Kafka and augment"""
|
|
fetcher = KafkaDataFetcher(topic='user-interactions')
|
|
data = fetcher.fit_transform(None)
|
|
|
|
if data.empty:
|
|
logging.warning("No interaction data fetched")
|
|
return None
|
|
|
|
data = ExperimentJoiner().fit_transform(data)
|
|
data = EventTitleAugmenter().fit_transform(data)
|
|
|
|
# push to XCom for downstream tasks
|
|
context['task_instance'].xcom_push(key='interaction_data', value=data.to_json())
|
|
logging.info(f"Fetched {len(data)} interaction records")
|
|
return len(data)
|
|
|
|
def fetch_price_logs(**context):
|
|
"""Extract price logs from Kafka"""
|
|
fetcher = KafkaDataFetcher(topic='price-logs')
|
|
data = fetcher.fit_transform(None)
|
|
|
|
if data.empty:
|
|
logging.warning("No price data fetched")
|
|
return None
|
|
|
|
context['task_instance'].xcom_push(key='price_data', value=data.to_json())
|
|
logging.info(f"Fetched {len(data)} price records")
|
|
return len(data)
|
|
|
|
def compute_demand_chunks(**context):
|
|
"""Chunk interactions and compute demand per window"""
|
|
import io
|
|
ti = context['task_instance']
|
|
window_size = context['dag_run'].conf.get('window_size', '30s')
|
|
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
|
|
|
# pull from XCom
|
|
interaction_json = ti.xcom_pull(task_ids='fetch_interactions', key='interaction_data')
|
|
if not interaction_json:
|
|
logging.error("No interaction data available")
|
|
return None
|
|
|
|
interactions_df = pd.read_json(io.StringIO(interaction_json))
|
|
|
|
# chunk into windows
|
|
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
|
|
chunks = chunker.transform(interactions_df)
|
|
|
|
if not chunks:
|
|
logging.warning("No chunks generated")
|
|
return None
|
|
|
|
# compute demand per chunk
|
|
estimator = DemandEstimator(store_mode=store_mode)
|
|
demand_chunks = [
|
|
{
|
|
'window_start': c['window_start'].isoformat(),
|
|
'window_end': c['window_end'].isoformat(),
|
|
'demand_vector': estimator.transform(c['data']).to_json()
|
|
}
|
|
for c in chunks
|
|
]
|
|
|
|
ti.xcom_push(key='demand_chunks', value=demand_chunks)
|
|
logging.info(f"Generated {len(demand_chunks)} demand chunks @ {window_size}")
|
|
return len(demand_chunks)
|
|
|
|
def aggregate_prices(**context):
|
|
"""Aggregate price logs into aligned windows"""
|
|
import io
|
|
ti = context['task_instance']
|
|
window_size = context['dag_run'].conf.get('window_size', '30s')
|
|
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
|
|
|
price_json = ti.xcom_pull(task_ids='fetch_price_logs', key='price_data')
|
|
if not price_json:
|
|
logging.error("No price data available")
|
|
return None
|
|
|
|
price_df = pd.read_json(io.StringIO(price_json))
|
|
price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode)
|
|
|
|
# serialize for XCom
|
|
serialized = [
|
|
{
|
|
'window_start': c['window_start'].isoformat(),
|
|
'window_end': c['window_end'].isoformat(),
|
|
'price_vector': c['price_vector'].to_json()
|
|
}
|
|
for c in price_chunks
|
|
]
|
|
|
|
ti.xcom_push(key='price_chunks', value=serialized)
|
|
logging.info(f"Aggregated {len(price_chunks)} price chunks")
|
|
return len(price_chunks)
|
|
|
|
def compute_elasticity(**context):
|
|
"""Compute price elasticity from demand and price chunks"""
|
|
import io
|
|
ti = context['task_instance']
|
|
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
|
method = context['dag_run'].conf.get('elasticity_method', 'point')
|
|
min_obs = context['dag_run'].conf.get('min_observations', 2)
|
|
|
|
# pull chunks from XCom
|
|
demand_chunks_raw = ti.xcom_pull(task_ids='compute_demand', key='demand_chunks')
|
|
price_chunks_raw = ti.xcom_pull(task_ids='aggregate_prices', key='price_chunks')
|
|
|
|
if not demand_chunks_raw or not price_chunks_raw:
|
|
logging.error("Missing demand or price chunks")
|
|
return None
|
|
|
|
# deserialize
|
|
demand_chunks = [
|
|
{
|
|
'window_start': pd.Timestamp(c['window_start']),
|
|
'window_end': pd.Timestamp(c['window_end']),
|
|
'demand_vector': pd.read_json(io.StringIO(c['demand_vector']))
|
|
}
|
|
for c in demand_chunks_raw
|
|
]
|
|
|
|
price_chunks = [
|
|
{
|
|
'window_start': pd.Timestamp(c['window_start']),
|
|
'window_end': pd.Timestamp(c['window_end']),
|
|
'price_vector': pd.read_json(io.StringIO(c['price_vector']))
|
|
}
|
|
for c in price_chunks_raw
|
|
]
|
|
|
|
# compute elasticity
|
|
estimator = TemporalElasticityEstimator(method=method, min_observations=min_obs)
|
|
elasticity_df = estimator.transform(demand_chunks, price_chunks, store_mode=store_mode)
|
|
|
|
if elasticity_df is None or elasticity_df.empty:
|
|
logging.warning("No elasticity results computed")
|
|
return None
|
|
|
|
# store results (could push to DB, S3, or XCom)
|
|
ti.xcom_push(key='elasticity_results', value=elasticity_df.to_json())
|
|
logging.info(f"Computed elasticity for {len(elasticity_df)} products")
|
|
|
|
# summary stats
|
|
return {
|
|
'n_products': len(elasticity_df),
|
|
'mean_elasticity': float(elasticity_df['elasticity'].mean()),
|
|
'median_elasticity': float(elasticity_df['elasticity'].median())
|
|
}
|
|
|
|
def publish_results(**context):
|
|
"""Publish elasticity results to model registry and train pricing model"""
|
|
import io
|
|
ti = context['task_instance']
|
|
elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results')
|
|
|
|
if not elasticity_json:
|
|
logging.error("No elasticity results to publish")
|
|
return None
|
|
|
|
elasticity_df = pd.read_json(io.StringIO(elasticity_json))
|
|
|
|
# import registry and pricing modules
|
|
import sys
|
|
sys.path.insert(0, '/opt/airflow/procesing')
|
|
sys.path.insert(0, '/opt/airflow')
|
|
|
|
from lib.model_registry import ModelRegistry
|
|
from procesing.pricing import ElasticityBasedPricingFunction
|
|
|
|
# initialize registry
|
|
registry = ModelRegistry()
|
|
|
|
# publish elasticity data
|
|
metadata = {
|
|
'timestamp': pd.Timestamp.now().isoformat(),
|
|
'window_size': context['dag_run'].conf.get('window_size', '30s'),
|
|
'store_mode': context['dag_run'].conf.get('store_mode', 'hotel'),
|
|
'dag_run_id': context['dag_run'].run_id
|
|
}
|
|
|
|
registry.publish_elasticity(
|
|
elasticity_df,
|
|
model_name='latest',
|
|
metadata=metadata
|
|
)
|
|
|
|
# train and publish pricing model
|
|
pricing_model = ElasticityBasedPricingFunction(
|
|
cost_floor=0.5,
|
|
max_markup=2.5,
|
|
min_markup=1.0,
|
|
inelastic_markup=1.2
|
|
)
|
|
pricing_model.fit(elasticity_df)
|
|
|
|
registry.publish_pricing_model(
|
|
pricing_model,
|
|
model_name='latest',
|
|
metadata={
|
|
**metadata,
|
|
'model_type': 'ElasticityBasedPricingFunction'
|
|
}
|
|
)
|
|
|
|
logging.info(f"Published elasticity + pricing model for {len(elasticity_df)} products to registry")
|
|
|
|
return {
|
|
'n_products': len(elasticity_df),
|
|
'registry_status': 'success',
|
|
'elasticity_mean': float(elasticity_df['elasticity'].mean())
|
|
}
|
|
|
|
|
|
# DAG definition
|
|
with DAG(
|
|
'elasticity_pricing_pipeline',
|
|
default_args=default_args,
|
|
description='E2E pipeline: interactions → demand → elasticity → pricing',
|
|
schedule_interval='*/5 * * * *', # every 5 minutes for real-time pricing
|
|
start_date=days_ago(1),
|
|
catchup=False,
|
|
max_active_runs=1,
|
|
tags=['pricing', 'elasticity', 'research'],
|
|
) as dag:
|
|
|
|
# parallel data fetching
|
|
fetch_interactions_task = PythonOperator(
|
|
task_id='fetch_interactions',
|
|
python_callable=fetch_interactions,
|
|
provide_context=True,
|
|
)
|
|
|
|
fetch_price_logs_task = PythonOperator(
|
|
task_id='fetch_price_logs',
|
|
python_callable=fetch_price_logs,
|
|
provide_context=True,
|
|
)
|
|
|
|
# demand computation (depends on interactions)
|
|
compute_demand_task = PythonOperator(
|
|
task_id='compute_demand',
|
|
python_callable=compute_demand_chunks,
|
|
provide_context=True,
|
|
)
|
|
|
|
# price aggregation (depends on price logs)
|
|
aggregate_prices_task = PythonOperator(
|
|
task_id='aggregate_prices',
|
|
python_callable=aggregate_prices,
|
|
provide_context=True,
|
|
)
|
|
|
|
# elasticity computation (depends on both demand and prices)
|
|
compute_elasticity_task = PythonOperator(
|
|
task_id='compute_elasticity',
|
|
python_callable=compute_elasticity,
|
|
provide_context=True,
|
|
)
|
|
|
|
# publish results (depends on elasticity)
|
|
publish_results_task = PythonOperator(
|
|
task_id='publish_results',
|
|
python_callable=publish_results,
|
|
provide_context=True,
|
|
)
|
|
|
|
# dependency graph
|
|
fetch_interactions_task >> compute_demand_task
|
|
fetch_price_logs_task >> aggregate_prices_task
|
|
[compute_demand_task, aggregate_prices_task] >> compute_elasticity_task
|
|
compute_elasticity_task >> publish_results_task
|