mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 16:43:36 +00:00
introducing airflow to run pipeline
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -6,3 +6,8 @@
|
||||
**/session_*.svg
|
||||
**/*graph.svg
|
||||
paper/src/bib/auto
|
||||
|
||||
# Airflow logs - exclude DAG run logs
|
||||
experiments/airflow/logs/*
|
||||
experiments/airflow/logs/scheduler/
|
||||
experiments/airflow/logs/dag_processor_manager/
|
||||
|
||||
@@ -71,6 +71,113 @@ services:
|
||||
- "${REDPANDA_CONSOLE_PORT:-8080}:8080"
|
||||
restart: unless-stopped
|
||||
|
||||
postgres:
|
||||
container_name: "PHANTOM-postgres"
|
||||
image: postgres:13
|
||||
environment:
|
||||
- POSTGRES_USER=airflow
|
||||
- POSTGRES_PASSWORD=airflow
|
||||
- POSTGRES_DB=airflow
|
||||
ports:
|
||||
- "5433:5432"
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql/data
|
||||
restart: unless-stopped
|
||||
|
||||
airflow-init:
|
||||
container_name: "PHANTOM-airflow-init"
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/Airflow.dockerfile
|
||||
depends_on:
|
||||
- postgres
|
||||
environment:
|
||||
- AIRFLOW__CORE__EXECUTOR=SequentialExecutor
|
||||
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
|
||||
- AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=}
|
||||
- AIRFLOW__CORE__LOAD_EXAMPLES=false
|
||||
- _AIRFLOW_DB_MIGRATE=true
|
||||
- _AIRFLOW_WWW_USER_CREATE=true
|
||||
- _AIRFLOW_WWW_USER_USERNAME=admin
|
||||
- _AIRFLOW_WWW_USER_PASSWORD=admin
|
||||
volumes:
|
||||
- ./experiments/airflow/dags:/opt/airflow/dags
|
||||
- ./experiments/airflow/logs:/opt/airflow/logs
|
||||
- ./experiments/airflow/plugins:/opt/airflow/plugins
|
||||
- ./experiments/procesing:/opt/airflow/procesing
|
||||
command: version
|
||||
restart: "no"
|
||||
|
||||
airflow-webserver:
|
||||
container_name: "PHANTOM-airflow-webserver"
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/Airflow.dockerfile
|
||||
depends_on:
|
||||
- postgres
|
||||
- airflow-init
|
||||
environment:
|
||||
- AIRFLOW__CORE__EXECUTOR=SequentialExecutor
|
||||
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
|
||||
- AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=}
|
||||
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
|
||||
- AIRFLOW__CORE__LOAD_EXAMPLES=false
|
||||
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true
|
||||
- KAFKA_HOST=kafka
|
||||
- KAFKA_PORT=29092
|
||||
- BACKEND_URL=http://backend:5000
|
||||
- NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL}
|
||||
- NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY}
|
||||
ports:
|
||||
- "${AIRFLOW_WEBSERVER_PORT:-8085}:8080"
|
||||
volumes:
|
||||
- ./experiments/airflow/dags:/opt/airflow/dags:ro
|
||||
- ./experiments/airflow/logs:/opt/airflow/logs
|
||||
- ./experiments/airflow/plugins:/opt/airflow/plugins:ro
|
||||
- ./experiments/procesing:/opt/airflow/procesing:ro
|
||||
command: webserver
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 30s
|
||||
|
||||
airflow-scheduler:
|
||||
container_name: "PHANTOM-airflow-scheduler"
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/Airflow.dockerfile
|
||||
depends_on:
|
||||
airflow-webserver:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- AIRFLOW__CORE__EXECUTOR=SequentialExecutor
|
||||
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
|
||||
- AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=}
|
||||
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
|
||||
- AIRFLOW__CORE__LOAD_EXAMPLES=false
|
||||
- KAFKA_HOST=kafka
|
||||
- KAFKA_PORT=29092
|
||||
- BACKEND_URL=http://backend:5000
|
||||
- NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL}
|
||||
- NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY}
|
||||
volumes:
|
||||
- ./experiments/airflow/dags:/opt/airflow/dags:ro
|
||||
- ./experiments/airflow/logs:/opt/airflow/logs
|
||||
- ./experiments/airflow/plugins:/opt/airflow/plugins:ro
|
||||
- ./experiments/procesing:/opt/airflow/procesing:ro
|
||||
command: scheduler
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 30s
|
||||
|
||||
volumes:
|
||||
phantom_kafka_data:
|
||||
phantom_redis_data:
|
||||
postgres_data:
|
||||
|
||||
23
docker/Airflow.dockerfile
Normal file
23
docker/Airflow.dockerfile
Normal file
@@ -0,0 +1,23 @@
|
||||
FROM apache/airflow:2.7.3-python3.11
|
||||
|
||||
USER root
|
||||
|
||||
# install system deps if needed
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
USER airflow
|
||||
|
||||
# copy requirements for pipeline dependencies
|
||||
COPY requirements.txt /tmp/requirements.txt
|
||||
RUN pip install --no-cache-dir -r /tmp/requirements.txt
|
||||
|
||||
# install postgres driver and providers
|
||||
RUN pip install --no-cache-dir \
|
||||
psycopg2-binary \
|
||||
apache-airflow-providers-postgres
|
||||
|
||||
# set airflow home
|
||||
ENV AIRFLOW_HOME=/opt/airflow
|
||||
252
experiments/airflow/dags/elasticity_pricing_dag.py
Normal file
252
experiments/airflow/dags/elasticity_pricing_dag.py
Normal file
@@ -0,0 +1,252 @@
|
||||
from airflow import DAG
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.utils.dates import days_ago
|
||||
from datetime import timedelta
|
||||
import pandas as pd
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
|
||||
# add procesing module to path (mounted at /opt/airflow/procesing in container)
|
||||
sys.path.insert(0, '/opt/airflow/procesing')
|
||||
|
||||
from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter
|
||||
from demand import DemandEstimator, ChunkInteractionsIntoSteps
|
||||
from elasticity import TemporalElasticityEstimator, aggregate_price_logs
|
||||
|
||||
default_args = {
|
||||
'owner': 'phantom-research',
|
||||
'depends_on_past': False,
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 2,
|
||||
'retry_delay': timedelta(minutes=5),
|
||||
}
|
||||
|
||||
# callable functions for tasks (stateless, idempotent)
|
||||
def fetch_interactions(**context):
|
||||
"""Extract interaction data from Kafka and augment"""
|
||||
fetcher = KafkaDataFetcher(topic='user-interactions')
|
||||
data = fetcher.fit_transform(None)
|
||||
|
||||
if data.empty:
|
||||
logging.warning("No interaction data fetched")
|
||||
return None
|
||||
|
||||
data = ExperimentJoiner().fit_transform(data)
|
||||
data = EventTitleAugmenter().fit_transform(data)
|
||||
|
||||
# push to XCom for downstream tasks
|
||||
context['task_instance'].xcom_push(key='interaction_data', value=data.to_json())
|
||||
logging.info(f"Fetched {len(data)} interaction records")
|
||||
return len(data)
|
||||
|
||||
def fetch_price_logs(**context):
|
||||
"""Extract price logs from Kafka"""
|
||||
fetcher = KafkaDataFetcher(topic='price-logs')
|
||||
data = fetcher.fit_transform(None)
|
||||
|
||||
if data.empty:
|
||||
logging.warning("No price data fetched")
|
||||
return None
|
||||
|
||||
context['task_instance'].xcom_push(key='price_data', value=data.to_json())
|
||||
logging.info(f"Fetched {len(data)} price records")
|
||||
return len(data)
|
||||
|
||||
def compute_demand_chunks(**context):
|
||||
"""Chunk interactions and compute demand per window"""
|
||||
ti = context['task_instance']
|
||||
window_size = context['dag_run'].conf.get('window_size', '30s')
|
||||
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
||||
|
||||
# pull from XCom
|
||||
interaction_json = ti.xcom_pull(task_ids='fetch_interactions', key='interaction_data')
|
||||
if not interaction_json:
|
||||
logging.error("No interaction data available")
|
||||
return None
|
||||
|
||||
interactions_df = pd.read_json(interaction_json)
|
||||
|
||||
# chunk into windows
|
||||
chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True)
|
||||
chunks = chunker.transform(interactions_df)
|
||||
|
||||
if not chunks:
|
||||
logging.warning("No chunks generated")
|
||||
return None
|
||||
|
||||
# compute demand per chunk
|
||||
estimator = DemandEstimator(store_mode=store_mode)
|
||||
demand_chunks = [
|
||||
{
|
||||
'window_start': c['window_start'].isoformat(),
|
||||
'window_end': c['window_end'].isoformat(),
|
||||
'demand_vector': estimator.transform(c['data']).to_json()
|
||||
}
|
||||
for c in chunks
|
||||
]
|
||||
|
||||
ti.xcom_push(key='demand_chunks', value=demand_chunks)
|
||||
logging.info(f"Generated {len(demand_chunks)} demand chunks @ {window_size}")
|
||||
return len(demand_chunks)
|
||||
|
||||
def aggregate_prices(**context):
|
||||
"""Aggregate price logs into aligned windows"""
|
||||
ti = context['task_instance']
|
||||
window_size = context['dag_run'].conf.get('window_size', '30s')
|
||||
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
||||
|
||||
price_json = ti.xcom_pull(task_ids='fetch_price_logs', key='price_data')
|
||||
if not price_json:
|
||||
logging.error("No price data available")
|
||||
return None
|
||||
|
||||
price_df = pd.read_json(price_json)
|
||||
price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode)
|
||||
|
||||
# serialize for XCom
|
||||
serialized = [
|
||||
{
|
||||
'window_start': c['window_start'].isoformat(),
|
||||
'window_end': c['window_end'].isoformat(),
|
||||
'price_vector': c['price_vector'].to_json()
|
||||
}
|
||||
for c in price_chunks
|
||||
]
|
||||
|
||||
ti.xcom_push(key='price_chunks', value=serialized)
|
||||
logging.info(f"Aggregated {len(price_chunks)} price chunks")
|
||||
return len(price_chunks)
|
||||
|
||||
def compute_elasticity(**context):
|
||||
"""Compute price elasticity from demand and price chunks"""
|
||||
ti = context['task_instance']
|
||||
store_mode = context['dag_run'].conf.get('store_mode', 'hotel')
|
||||
method = context['dag_run'].conf.get('elasticity_method', 'point')
|
||||
min_obs = context['dag_run'].conf.get('min_observations', 2)
|
||||
|
||||
# pull chunks from XCom
|
||||
demand_chunks_raw = ti.xcom_pull(task_ids='compute_demand', key='demand_chunks')
|
||||
price_chunks_raw = ti.xcom_pull(task_ids='aggregate_prices', key='price_chunks')
|
||||
|
||||
if not demand_chunks_raw or not price_chunks_raw:
|
||||
logging.error("Missing demand or price chunks")
|
||||
return None
|
||||
|
||||
# deserialize
|
||||
demand_chunks = [
|
||||
{
|
||||
'window_start': pd.Timestamp(c['window_start']),
|
||||
'window_end': pd.Timestamp(c['window_end']),
|
||||
'demand_vector': pd.read_json(c['demand_vector'])
|
||||
}
|
||||
for c in demand_chunks_raw
|
||||
]
|
||||
|
||||
price_chunks = [
|
||||
{
|
||||
'window_start': pd.Timestamp(c['window_start']),
|
||||
'window_end': pd.Timestamp(c['window_end']),
|
||||
'price_vector': pd.read_json(c['price_vector'])
|
||||
}
|
||||
for c in price_chunks_raw
|
||||
]
|
||||
|
||||
# compute elasticity
|
||||
estimator = TemporalElasticityEstimator(method=method, min_observations=min_obs)
|
||||
elasticity_df = estimator.transform(demand_chunks, price_chunks, store_mode=store_mode)
|
||||
|
||||
if elasticity_df is None or elasticity_df.empty:
|
||||
logging.warning("No elasticity results computed")
|
||||
return None
|
||||
|
||||
# store results (could push to DB, S3, or XCom)
|
||||
ti.xcom_push(key='elasticity_results', value=elasticity_df.to_json())
|
||||
logging.info(f"Computed elasticity for {len(elasticity_df)} products")
|
||||
|
||||
# summary stats
|
||||
return {
|
||||
'n_products': len(elasticity_df),
|
||||
'mean_elasticity': float(elasticity_df['elasticity'].mean()),
|
||||
'median_elasticity': float(elasticity_df['elasticity'].median())
|
||||
}
|
||||
|
||||
def publish_results(**context):
|
||||
"""Publish elasticity results to model registry or backend"""
|
||||
ti = context['task_instance']
|
||||
elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results')
|
||||
|
||||
if not elasticity_json:
|
||||
logging.error("No elasticity results to publish")
|
||||
return None
|
||||
|
||||
elasticity_df = pd.read_json(elasticity_json)
|
||||
|
||||
# TODO: implement actual publishing logic
|
||||
# - push to model registry
|
||||
# - update pricing provider service
|
||||
# - store in database for audit trail
|
||||
|
||||
logging.info(f"Published elasticity for {len(elasticity_df)} products")
|
||||
return True
|
||||
|
||||
|
||||
# DAG definition
|
||||
with DAG(
|
||||
'elasticity_pricing_pipeline',
|
||||
default_args=default_args,
|
||||
description='E2E pipeline: interactions → demand → elasticity → pricing',
|
||||
schedule_interval='*/5 * * * *', # every 5 minutes for real-time pricing
|
||||
start_date=days_ago(1),
|
||||
catchup=False,
|
||||
max_active_runs=1,
|
||||
tags=['pricing', 'elasticity', 'research'],
|
||||
) as dag:
|
||||
|
||||
# parallel data fetching
|
||||
fetch_interactions_task = PythonOperator(
|
||||
task_id='fetch_interactions',
|
||||
python_callable=fetch_interactions,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
fetch_price_logs_task = PythonOperator(
|
||||
task_id='fetch_price_logs',
|
||||
python_callable=fetch_price_logs,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
# demand computation (depends on interactions)
|
||||
compute_demand_task = PythonOperator(
|
||||
task_id='compute_demand',
|
||||
python_callable=compute_demand_chunks,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
# price aggregation (depends on price logs)
|
||||
aggregate_prices_task = PythonOperator(
|
||||
task_id='aggregate_prices',
|
||||
python_callable=aggregate_prices,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
# elasticity computation (depends on both demand and prices)
|
||||
compute_elasticity_task = PythonOperator(
|
||||
task_id='compute_elasticity',
|
||||
python_callable=compute_elasticity,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
# publish results (depends on elasticity)
|
||||
publish_results_task = PythonOperator(
|
||||
task_id='publish_results',
|
||||
python_callable=publish_results,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
# dependency graph
|
||||
fetch_interactions_task >> compute_demand_task
|
||||
fetch_price_logs_task >> aggregate_prices_task
|
||||
[compute_demand_task, aggregate_prices_task] >> compute_elasticity_task
|
||||
compute_elasticity_task >> publish_results_task
|
||||
Reference in New Issue
Block a user