diff --git a/.gitignore b/.gitignore index 18da4dd..9cdec32 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,8 @@ **/session_*.svg **/*graph.svg paper/src/bib/auto + +# Airflow logs - exclude DAG run logs +experiments/airflow/logs/* +experiments/airflow/logs/scheduler/ +experiments/airflow/logs/dag_processor_manager/ diff --git a/docker-compose.yml b/docker-compose.yml index 01da852..91384c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,6 +71,113 @@ services: - "${REDPANDA_CONSOLE_PORT:-8080}:8080" restart: unless-stopped + postgres: + container_name: "PHANTOM-postgres" + image: postgres:13 + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + ports: + - "5433:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + restart: unless-stopped + + airflow-init: + container_name: "PHANTOM-airflow-init" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + - postgres + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=} + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - _AIRFLOW_DB_MIGRATE=true + - _AIRFLOW_WWW_USER_CREATE=true + - _AIRFLOW_WWW_USER_USERNAME=admin + - _AIRFLOW_WWW_USER_PASSWORD=admin + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins + - ./experiments/procesing:/opt/airflow/procesing + command: version + restart: "no" + + airflow-webserver: + container_name: "PHANTOM-airflow-webserver" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + - postgres + - airflow-init + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=} + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + - BACKEND_URL=http://backend:5000 + - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} + - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + ports: + - "${AIRFLOW_WEBSERVER_PORT:-8085}:8080" + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags:ro + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins:ro + - ./experiments/procesing:/opt/airflow/procesing:ro + command: webserver + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + + airflow-scheduler: + container_name: "PHANTOM-airflow-scheduler" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + airflow-webserver: + condition: service_healthy + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY:-fb4E5zWb8hh7WKN7tXUkWP0r5nTcN1nKZGh1h0N3x6Q=} + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + - BACKEND_URL=http://backend:5000 + - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} + - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags:ro + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins:ro + - ./experiments/procesing:/opt/airflow/procesing:ro + command: scheduler + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + volumes: phantom_kafka_data: phantom_redis_data: + postgres_data: diff --git a/docker/Airflow.dockerfile b/docker/Airflow.dockerfile new file mode 100644 index 0000000..5b11cc9 --- /dev/null +++ b/docker/Airflow.dockerfile @@ -0,0 +1,23 @@ +FROM apache/airflow:2.7.3-python3.11 + +USER root + +# install system deps if needed +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +USER airflow + +# copy requirements for pipeline dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -r /tmp/requirements.txt + +# install postgres driver and providers +RUN pip install --no-cache-dir \ + psycopg2-binary \ + apache-airflow-providers-postgres + +# set airflow home +ENV AIRFLOW_HOME=/opt/airflow diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py new file mode 100644 index 0000000..9e24fde --- /dev/null +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -0,0 +1,252 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from datetime import timedelta +import pandas as pd +import logging +import sys +import os + +# add procesing module to path (mounted at /opt/airflow/procesing in container) +sys.path.insert(0, '/opt/airflow/procesing') + +from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter +from demand import DemandEstimator, ChunkInteractionsIntoSteps +from elasticity import TemporalElasticityEstimator, aggregate_price_logs + +default_args = { + 'owner': 'phantom-research', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=5), +} + +# callable functions for tasks (stateless, idempotent) +def fetch_interactions(**context): + """Extract interaction data from Kafka and augment""" + fetcher = KafkaDataFetcher(topic='user-interactions') + data = fetcher.fit_transform(None) + + if data.empty: + logging.warning("No interaction data fetched") + return None + + data = ExperimentJoiner().fit_transform(data) + data = EventTitleAugmenter().fit_transform(data) + + # push to XCom for downstream tasks + context['task_instance'].xcom_push(key='interaction_data', value=data.to_json()) + logging.info(f"Fetched {len(data)} interaction records") + return len(data) + +def fetch_price_logs(**context): + """Extract price logs from Kafka""" + fetcher = KafkaDataFetcher(topic='price-logs') + data = fetcher.fit_transform(None) + + if data.empty: + logging.warning("No price data fetched") + return None + + context['task_instance'].xcom_push(key='price_data', value=data.to_json()) + logging.info(f"Fetched {len(data)} price records") + return len(data) + +def compute_demand_chunks(**context): + """Chunk interactions and compute demand per window""" + ti = context['task_instance'] + window_size = context['dag_run'].conf.get('window_size', '30s') + store_mode = context['dag_run'].conf.get('store_mode', 'hotel') + + # pull from XCom + interaction_json = ti.xcom_pull(task_ids='fetch_interactions', key='interaction_data') + if not interaction_json: + logging.error("No interaction data available") + return None + + interactions_df = pd.read_json(interaction_json) + + # chunk into windows + chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) + chunks = chunker.transform(interactions_df) + + if not chunks: + logging.warning("No chunks generated") + return None + + # compute demand per chunk + estimator = DemandEstimator(store_mode=store_mode) + demand_chunks = [ + { + 'window_start': c['window_start'].isoformat(), + 'window_end': c['window_end'].isoformat(), + 'demand_vector': estimator.transform(c['data']).to_json() + } + for c in chunks + ] + + ti.xcom_push(key='demand_chunks', value=demand_chunks) + logging.info(f"Generated {len(demand_chunks)} demand chunks @ {window_size}") + return len(demand_chunks) + +def aggregate_prices(**context): + """Aggregate price logs into aligned windows""" + ti = context['task_instance'] + window_size = context['dag_run'].conf.get('window_size', '30s') + store_mode = context['dag_run'].conf.get('store_mode', 'hotel') + + price_json = ti.xcom_pull(task_ids='fetch_price_logs', key='price_data') + if not price_json: + logging.error("No price data available") + return None + + price_df = pd.read_json(price_json) + price_chunks = aggregate_price_logs(price_df, window_size=window_size, store_mode=store_mode) + + # serialize for XCom + serialized = [ + { + 'window_start': c['window_start'].isoformat(), + 'window_end': c['window_end'].isoformat(), + 'price_vector': c['price_vector'].to_json() + } + for c in price_chunks + ] + + ti.xcom_push(key='price_chunks', value=serialized) + logging.info(f"Aggregated {len(price_chunks)} price chunks") + return len(price_chunks) + +def compute_elasticity(**context): + """Compute price elasticity from demand and price chunks""" + ti = context['task_instance'] + store_mode = context['dag_run'].conf.get('store_mode', 'hotel') + method = context['dag_run'].conf.get('elasticity_method', 'point') + min_obs = context['dag_run'].conf.get('min_observations', 2) + + # pull chunks from XCom + demand_chunks_raw = ti.xcom_pull(task_ids='compute_demand', key='demand_chunks') + price_chunks_raw = ti.xcom_pull(task_ids='aggregate_prices', key='price_chunks') + + if not demand_chunks_raw or not price_chunks_raw: + logging.error("Missing demand or price chunks") + return None + + # deserialize + demand_chunks = [ + { + 'window_start': pd.Timestamp(c['window_start']), + 'window_end': pd.Timestamp(c['window_end']), + 'demand_vector': pd.read_json(c['demand_vector']) + } + for c in demand_chunks_raw + ] + + price_chunks = [ + { + 'window_start': pd.Timestamp(c['window_start']), + 'window_end': pd.Timestamp(c['window_end']), + 'price_vector': pd.read_json(c['price_vector']) + } + for c in price_chunks_raw + ] + + # compute elasticity + estimator = TemporalElasticityEstimator(method=method, min_observations=min_obs) + elasticity_df = estimator.transform(demand_chunks, price_chunks, store_mode=store_mode) + + if elasticity_df is None or elasticity_df.empty: + logging.warning("No elasticity results computed") + return None + + # store results (could push to DB, S3, or XCom) + ti.xcom_push(key='elasticity_results', value=elasticity_df.to_json()) + logging.info(f"Computed elasticity for {len(elasticity_df)} products") + + # summary stats + return { + 'n_products': len(elasticity_df), + 'mean_elasticity': float(elasticity_df['elasticity'].mean()), + 'median_elasticity': float(elasticity_df['elasticity'].median()) + } + +def publish_results(**context): + """Publish elasticity results to model registry or backend""" + ti = context['task_instance'] + elasticity_json = ti.xcom_pull(task_ids='compute_elasticity', key='elasticity_results') + + if not elasticity_json: + logging.error("No elasticity results to publish") + return None + + elasticity_df = pd.read_json(elasticity_json) + + # TODO: implement actual publishing logic + # - push to model registry + # - update pricing provider service + # - store in database for audit trail + + logging.info(f"Published elasticity for {len(elasticity_df)} products") + return True + + +# DAG definition +with DAG( + 'elasticity_pricing_pipeline', + default_args=default_args, + description='E2E pipeline: interactions → demand → elasticity → pricing', + schedule_interval='*/5 * * * *', # every 5 minutes for real-time pricing + start_date=days_ago(1), + catchup=False, + max_active_runs=1, + tags=['pricing', 'elasticity', 'research'], +) as dag: + + # parallel data fetching + fetch_interactions_task = PythonOperator( + task_id='fetch_interactions', + python_callable=fetch_interactions, + provide_context=True, + ) + + fetch_price_logs_task = PythonOperator( + task_id='fetch_price_logs', + python_callable=fetch_price_logs, + provide_context=True, + ) + + # demand computation (depends on interactions) + compute_demand_task = PythonOperator( + task_id='compute_demand', + python_callable=compute_demand_chunks, + provide_context=True, + ) + + # price aggregation (depends on price logs) + aggregate_prices_task = PythonOperator( + task_id='aggregate_prices', + python_callable=aggregate_prices, + provide_context=True, + ) + + # elasticity computation (depends on both demand and prices) + compute_elasticity_task = PythonOperator( + task_id='compute_elasticity', + python_callable=compute_elasticity, + provide_context=True, + ) + + # publish results (depends on elasticity) + publish_results_task = PythonOperator( + task_id='publish_results', + python_callable=publish_results, + provide_context=True, + ) + + # dependency graph + fetch_interactions_task >> compute_demand_task + fetch_price_logs_task >> aggregate_prices_task + [compute_demand_task, aggregate_prices_task] >> compute_elasticity_task + compute_elasticity_task >> publish_results_task