From ad9423bf59d3546be3f2f50c49eef02e929cc88d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Alves=20R=C3=B6sel?= <60182044+velocitatem@users.noreply.github.com> Date: Sat, 29 Nov 2025 17:50:16 +0100 Subject: [PATCH] Airflow addition (#28) * introducing airflow to run pipeline * chore: updating dag with upload to registry * introducing complete provider (non refactored and noisy) * chore: removing old shit * generic pricing baselines * feature: super simple model registry (to be updated maybe third party OS software) * chore: refactoring the providers docker config and requirements * chore: refactored and broke down components (braking * exporting all * local pipeline excution working * fix: fixing import structures from nonrelativistic * chore: enables cross comm pickling with fully e2e pipeline compilation * docs: what the pipeline is like now * pipelines local running and pipeline high level definition * cleaning old pipeline and vectorization * leaked but fixing, not so important * test: started with pipeline step testing * chore: cleaning up provider of prices * test: extra tests wit hsemantic meaning checks * migrating pricers * feature: introducing pricing predictors (pricers) * chore: e2e is done with new pipeline * extra session feature extraction * feature: experiemntal sessin pricer and metrics(vibe) * chore: redefined and connected pricers (#29) --- .gitignore | 5 + Makefile | 4 + backend/provider/app.py | 182 +++++++++ backend/provider/requirements.txt | 15 + docker-compose.yml | 147 ++++++++ docker/Airflow.dockerfile | 23 ++ docker/Provider.dockerfile | 24 ++ experiments/README.md | 8 + .../airflow/dags/elasticity_pricing_dag.py | 346 +++++++++++++++++ experiments/procesing/__init__.py | 64 +++- experiments/procesing/context.py | 34 ++ experiments/procesing/demand.py | 119 ------ experiments/procesing/elasticity.py | 31 +- experiments/procesing/extract.py | 207 ---------- experiments/procesing/mapping.py | 158 -------- experiments/procesing/metrics.py | 245 ++++++++++++ experiments/procesing/pipeline.py | 90 ----- experiments/procesing/pipelines.py | 138 +++++++ experiments/procesing/pricers/__init__.py | 13 + experiments/procesing/pricers/base.py | 70 ++++ experiments/procesing/pricers/elasticity.py | 59 +++ .../procesing/pricers/session_aware.py | 172 +++++++++ experiments/procesing/pricers/simple.py | 48 +++ experiments/procesing/pricing.py | 127 ++++++- experiments/procesing/providers/__init__.py | 5 + experiments/procesing/providers/backend.py | 19 + experiments/procesing/providers/base.py | 21 ++ experiments/procesing/providers/supabase.py | 35 ++ experiments/procesing/steps/__init__.py | 27 ++ experiments/procesing/steps/augment.py | 53 +++ experiments/procesing/steps/base.py | 31 ++ experiments/procesing/steps/chunk.py | 34 ++ experiments/procesing/steps/demand.py | 61 +++ experiments/procesing/steps/elasticity.py | 253 +++++++++++++ experiments/procesing/steps/fetch.py | 46 +++ experiments/procesing/steps/join.py | 34 ++ experiments/procesing/steps/pricing.py | 149 ++++++++ experiments/procesing/steps/session.py | 114 ++++++ experiments/procesing/tests/__init__.py | 0 experiments/procesing/tests/conftest.py | 271 ++++++++++++++ experiments/procesing/tests/test_augement.py | 45 +++ experiments/procesing/tests/test_demand.py | 49 +++ .../procesing/tests/test_elasticity.py | 353 ++++++++++++++++++ experiments/procesing/tests/test_fetch.py | 51 +++ experiments/procesing/tests/test_pricing.py | 87 +++++ experiments/pytest.ini | 8 + lib/model_registry.py | 139 +++++++ pytest.ini | 1 + web/src/app/api/pricing/route.ts | 46 ++- 49 files changed, 3642 insertions(+), 619 deletions(-) create mode 100644 backend/provider/app.py create mode 100644 backend/provider/requirements.txt create mode 100644 docker/Airflow.dockerfile create mode 100644 docker/Provider.dockerfile create mode 100644 experiments/airflow/dags/elasticity_pricing_dag.py create mode 100644 experiments/procesing/context.py delete mode 100644 experiments/procesing/demand.py delete mode 100644 experiments/procesing/extract.py delete mode 100644 experiments/procesing/mapping.py create mode 100644 experiments/procesing/metrics.py delete mode 100644 experiments/procesing/pipeline.py create mode 100644 experiments/procesing/pipelines.py create mode 100644 experiments/procesing/pricers/__init__.py create mode 100644 experiments/procesing/pricers/base.py create mode 100644 experiments/procesing/pricers/elasticity.py create mode 100644 experiments/procesing/pricers/session_aware.py create mode 100644 experiments/procesing/pricers/simple.py create mode 100755 experiments/procesing/providers/__init__.py create mode 100755 experiments/procesing/providers/backend.py create mode 100755 experiments/procesing/providers/base.py create mode 100755 experiments/procesing/providers/supabase.py create mode 100755 experiments/procesing/steps/__init__.py create mode 100755 experiments/procesing/steps/augment.py create mode 100755 experiments/procesing/steps/base.py create mode 100755 experiments/procesing/steps/chunk.py create mode 100755 experiments/procesing/steps/demand.py create mode 100755 experiments/procesing/steps/elasticity.py create mode 100755 experiments/procesing/steps/fetch.py create mode 100755 experiments/procesing/steps/join.py create mode 100755 experiments/procesing/steps/pricing.py create mode 100644 experiments/procesing/steps/session.py create mode 100644 experiments/procesing/tests/__init__.py create mode 100644 experiments/procesing/tests/conftest.py create mode 100644 experiments/procesing/tests/test_augement.py create mode 100644 experiments/procesing/tests/test_demand.py create mode 100644 experiments/procesing/tests/test_elasticity.py create mode 100644 experiments/procesing/tests/test_fetch.py create mode 100644 experiments/procesing/tests/test_pricing.py create mode 100644 experiments/pytest.ini create mode 100755 lib/model_registry.py diff --git a/.gitignore b/.gitignore index 18da4dd..9cdec32 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,8 @@ **/session_*.svg **/*graph.svg paper/src/bib/auto + +# Airflow logs - exclude DAG run logs +experiments/airflow/logs/* +experiments/airflow/logs/scheduler/ +experiments/airflow/logs/dag_processor_manager/ diff --git a/Makefile b/Makefile index d9eaac5..d4fd961 100644 --- a/Makefile +++ b/Makefile @@ -49,4 +49,8 @@ install: $(VENV) test: $(VENV) $(PYTEST) -v +count-lines: + @find . \( -path '*/node_modules' -o -path '*/.venv' -o -path '*/venv' \) -prune -o \ + \( -name "*.ts" -o -name "*.py" \) -type f -print0 | xargs -0 cat | wc -l + .PHONY: all pdf clean watch run.webapp install test diff --git a/backend/provider/app.py b/backend/provider/app.py new file mode 100644 index 0000000..be7d0e7 --- /dev/null +++ b/backend/provider/app.py @@ -0,0 +1,182 @@ +from fastapi import FastAPI, HTTPException, Query +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Literal, Optional +import uvicorn, os, sys +from supabase import create_client, Client +from dotenv import load_dotenv +import numpy as np +import pandas as pd +load_dotenv() + +# Local imports of registry and pricing function + +sys.path.append(os.path.dirname(os.path.abspath(__file__))+ "/../../experiments/") +from procesing.providers import SupabaseProvider, BackendAPIProvider +from procesing.pricers import ( + StaticPricer, + RandomPricer, + ElasticityBasedPricer +) +from procesing.steps import ( + StateSpace, + PredictPricesStep +) +from procesing import PipelineContext +sys.path.append(os.path.dirname(os.path.abspath(__file__))+ "/../../lib/") +from lib.model_registry import ModelRegistry + +# Config +app = FastAPI(title="PHANTOM Pricing Provider") +app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) + +supabase: Client = create_client(os.getenv("NEXT_PUBLIC_SUPABASE_URL"), os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")) +registry = ModelRegistry() + +class PriceResponse(BaseModel): + productId: str + price: float + base_price: float + markup: float + elasticity: Optional[float] = None + model_version: str = 'latest' + +@app.get("/health") +def health() -> dict: + return {"status": "healthy", "redis": registry.health_check()} + +@app.get("/api/{mode}/price/{productId}", response_model=PriceResponse) +def get_price(mode: Literal['hotel', 'airline'], productId: str, sessionId: Optional[str] = Query(None), experimentId: Optional[str] = Query(None)): + product = supabase.table(f'{mode}_products').select("metadata").eq('id', productId).execute().data[0] + if not product: raise HTTPException(404, f"Product {productId} not found") + + metadata = product['metadata'] + base_price = metadata.get('base_price', 100.0) + + class Provider(SupabaseProvider, BackendAPIProvider): + def __init__(self, backend_url: str): + SupabaseProvider.__init__(self) + BackendAPIProvider.__init__(self, backend_url=backend_url) + + context = PipelineContext( + provider=Provider(backend_url=os.getenv("BACKEND_URL")), + store_mode=mode + ) + + pricing_model = registry.get_pricing_model('latest') + elasticity_df = registry.get_elasticity('latest') + + if pricing_model is None or elasticity_df is None: + return PriceResponse( + productId=productId, + price=base_price, + base_price=base_price, + markup=1.0, + elasticity=None + ) + + products = context.products + if products.empty: + raise HTTPException(500, "No products available in catalog") + + # merge elasticity with product base prices + products_with_meta = products.copy() + products_with_meta['base_price'] = products_with_meta['metadata'].apply( + lambda m: m.get('base_price', 100.0) if isinstance(m, dict) else 100.0 + ) + + merged = products_with_meta[['id', 'base_price']].rename( + columns={'id': 'productId'} + ).merge( + elasticity_df[['productId', 'elasticity']], + on='productId', + how='left' + ).fillna({'elasticity': 0.0}) + + # compute demand: use pricer's mean_demand if available, else default + demand_values = (pricing_model.mean_demand + if hasattr(pricing_model, 'mean_demand') and pricing_model.mean_demand is not None + else np.ones(len(merged)) * 10.0) + + # build state space with session features if sessionId provided + session_features = pd.DataFrame() + if sessionId: + try: + # fetch recent session interactions from backend + from procesing.steps.session import ExtractSessionFeaturesStep + import requests + from datetime import datetime, timedelta + + t_end = datetime.utcnow() + t_start = t_end - timedelta(hours=1) + backend_url = os.getenv("BACKEND_URL") + print(backend_url) + + resp = requests.get( + f"{os.getenv('BACKEND_URL')}/api/kafka/dump", # TODO: THIS IS SHIT, must fix this + params={'topic': 'user-interactions', 't_start': t_start.isoformat(), 't_end': t_end.isoformat()}, + timeout=2 + ) + + if resp.ok: + msgs = resp.json().get('messages', []) + interactions_df = pd.DataFrame(msgs) + + if not interactions_df.empty and 'sessionId' in interactions_df.columns: + session_interactions = interactions_df[interactions_df['sessionId'] == sessionId] + + if not session_interactions.empty: + extractor = ExtractSessionFeaturesStep(context=context) + session_features_df = extractor.transform(session_interactions) + + if not session_features_df.empty: + session_features = session_features_df.drop(columns=['sessionId']) + except Exception as e: + print(f"[session-features-error] {e}") + # continue without session features + + state = StateSpace( + demand=demand_values, + prices=merged['base_price'].values, + session_features=session_features, + product_ids=merged['productId'].values, + elasticity=merged['elasticity'].values, + metadata={'sessionId': sessionId, 'experimentId': experimentId} + ) + + oracle = PredictPricesStep(context=context) + prices_df = oracle.transform((pricing_model, state)) + + product_price_row = prices_df[prices_df['productId'] == productId] + if product_price_row.empty: + raise HTTPException(404, f"No pricing available for product {productId}") + + optimal_price = float(product_price_row['predicted_price'].iloc[0]) + + product_elasticity_row = elasticity_df[elasticity_df['productId'] == productId] + product_elasticity = (float(product_elasticity_row['elasticity'].iloc[0]) + if not product_elasticity_row.empty else None) + + return PriceResponse( + productId=productId, + price=optimal_price, + base_price=base_price, + markup=optimal_price/base_price, + elasticity=product_elasticity + ) + +@app.get("/models") +def list_models(): return registry.list_models() + +@app.post("/models/reload") +def reload_models(): + elasticity, pricing_model = registry.get_elasticity('latest'), registry.get_pricing_model('latest') + return { + "elasticity_loaded": bool(elasticity), + "n_products": len(elasticity) if elasticity is not None else 0, + "pricing_model_loaded": bool(pricing_model), + "model_class": pricing_model.__class__.__name__ if pricing_model else None + } + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PROVIDER_PORT", "5001"))) diff --git a/backend/provider/requirements.txt b/backend/provider/requirements.txt new file mode 100644 index 0000000..4169911 --- /dev/null +++ b/backend/provider/requirements.txt @@ -0,0 +1,15 @@ +fastapi +uvicorn[standard] +pydantic +numpy +pandas +scikit-learn +redis +supabase +confluent-kafka>=2.3.0 +kafka-python +graphviz +python-dotenv>=1.0.0 +requests>=2.31.0 +typing-extensions>=4.8.0 +pickle5>=0.0.11; python_version < '3.8' diff --git a/docker-compose.yml b/docker-compose.yml index 01da852..a806b83 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,6 +71,153 @@ services: - "${REDPANDA_CONSOLE_PORT:-8080}:8080" restart: unless-stopped + postgres: + container_name: "PHANTOM-postgres" + image: postgres:13 + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + ports: + - "5433:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + restart: unless-stopped + + airflow-init: + container_name: "PHANTOM-airflow-init" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + - postgres + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY} + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true + - _AIRFLOW_DB_MIGRATE=true + - _AIRFLOW_WWW_USER_CREATE=true + - _AIRFLOW_WWW_USER_USERNAME=admin + - _AIRFLOW_WWW_USER_PASSWORD=admin + - REDIS_HOST=redis + - REDIS_PORT=6379 + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins + - ./experiments/procesing:/opt/airflow/procesing + - ./lib:/opt/airflow/lib + command: version + restart: "no" + + airflow-webserver: + container_name: "PHANTOM-airflow-webserver" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + - postgres + - airflow-init + - redis + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY} + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true + - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + - BACKEND_URL=http://backend:5000 + - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} + - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + - REDIS_HOST=redis + - REDIS_PORT=6379 + ports: + - "${AIRFLOW_WEBSERVER_PORT:-8085}:8080" + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags:ro + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins:ro + - ./experiments/procesing:/opt/airflow/procesing:ro + - ./lib:/opt/airflow/lib:ro + command: webserver + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + + airflow-scheduler: + container_name: "PHANTOM-airflow-scheduler" + build: + context: . + dockerfile: docker/Airflow.dockerfile + depends_on: + airflow-webserver: + condition: service_healthy + redis: + condition: service_started + environment: + - AIRFLOW__CORE__EXECUTOR=SequentialExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow + - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW_FERNET_KEY} + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + - BACKEND_URL=http://backend:5000 + - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} + - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + - REDIS_HOST=redis + - REDIS_PORT=6379 + volumes: + - ./experiments/airflow/dags:/opt/airflow/dags:ro + - ./experiments/airflow/logs:/opt/airflow/logs + - ./experiments/airflow/plugins:/opt/airflow/plugins:ro + - ./experiments/procesing:/opt/airflow/procesing:ro + - ./lib:/opt/airflow/lib:ro + command: scheduler + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + + pricing-provider: + container_name: "PHANTOM-pricing-provider" + build: + context: . + dockerfile: docker/Provider.dockerfile + depends_on: + - redis + - kafka + environment: + - PROVIDER_PORT=5001 + - REDIS_HOST=redis + - REDIS_PORT=6379 + - KAFKA_HOST=kafka + - KAFKA_PORT=29092 + - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} + - NEXT_PUBLIC_SUPABASE_ANON_KEY=${NEXT_PUBLIC_SUPABASE_ANON_KEY} + ports: + - "${PROVIDER_PORT:-5001}:5001" + volumes: + - ./lib:/app/lib:ro + - ./experiments/procesing:/app/procesing:ro + - ./backend/provider:/app/provider:ro + command: python -m uvicorn provider.app:app --host 0.0.0.0 --port 5001 + restart: unless-stopped + volumes: phantom_kafka_data: phantom_redis_data: + postgres_data: diff --git a/docker/Airflow.dockerfile b/docker/Airflow.dockerfile new file mode 100644 index 0000000..5b11cc9 --- /dev/null +++ b/docker/Airflow.dockerfile @@ -0,0 +1,23 @@ +FROM apache/airflow:2.7.3-python3.11 + +USER root + +# install system deps if needed +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +USER airflow + +# copy requirements for pipeline dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -r /tmp/requirements.txt + +# install postgres driver and providers +RUN pip install --no-cache-dir \ + psycopg2-binary \ + apache-airflow-providers-postgres + +# set airflow home +ENV AIRFLOW_HOME=/opt/airflow diff --git a/docker/Provider.dockerfile b/docker/Provider.dockerfile new file mode 100644 index 0000000..d39da80 --- /dev/null +++ b/docker/Provider.dockerfile @@ -0,0 +1,24 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies including graphviz +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + graphviz \ + libgraphviz-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy and install Python dependencies +COPY backend/provider/requirements.txt /app/ +RUN pip install --no-cache-dir -r requirements.txt + +# Structure will be mounted via volumes: +# /app/lib -> lib/ +# /app/procesing -> experiments/procesing/ +# /app/provider -> backend/provider/ + +ENV PYTHONPATH=/app:/app/lib:/app/procesing + +CMD ["python", "-m", "uvicorn", "provider.app:app", "--host", "0.0.0.0", "--port", "5001"] diff --git a/experiments/README.md b/experiments/README.md index e69de29..0e8d104 100644 --- a/experiments/README.md +++ b/experiments/README.md @@ -0,0 +1,8 @@ + +# Products +# Agents +# Pipeline + +Our pipeline technically should follow principles in a style like this: +- Each step should be defined as an inheriting child of an scikit pipeline step, the granularity of the steps is dictated by the following: a step should be a transformation, augmentation or computation independently, no single stage should run multiple in-itself. This way we can modularize properly all the components and track properly in airflow. A stage can be defined as an sklearn step but then must be transalted to a function that takes the context in our DAG of airflow. All parametrization must be done via contexts. + diff --git a/experiments/airflow/dags/elasticity_pricing_dag.py b/experiments/airflow/dags/elasticity_pricing_dag.py new file mode 100644 index 0000000..6dd3f2d --- /dev/null +++ b/experiments/airflow/dags/elasticity_pricing_dag.py @@ -0,0 +1,346 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from datetime import timedelta +import pandas as pd +import logging +import sys +import pickle +import io + +# add parent dir to path so procesing package can be imported +sys.path.insert(0, '/opt/airflow') + +from procesing.context import PipelineContext +from procesing.providers import SupabaseProvider, BackendAPIProvider +from procesing.steps import ( + FetchInteractionsStep, + FetchPriceLogsStep, + CreatePriceBucketsStep, + AugmentEventNamesStep, + ChunkByTimeWindowStep, + ComputeDemandForChunksStep, + AggregatePriceLogsStep, + ComputeElasticityStep, + BuildStateSpaceStep, + FitPricingFunctionStep, + PredictPricesStep, +) + +default_args = { + 'owner': 'phantom-research', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=5), +} + +def get_provider(): + """Factory to create composite provider""" + class CompositeProvider(SupabaseProvider, BackendAPIProvider): + def __init__(self): + SupabaseProvider.__init__(self) + BackendAPIProvider.__init__(self) + return CompositeProvider() + +def get_context(**kwargs): + """Build pipeline context from Airflow config""" + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + return PipelineContext( + provider=get_provider(), + store_mode=dag_conf.get('store_mode', 'hotel'), + window_size=dag_conf.get('window_size', '30s'), + n_price_buckets=dag_conf.get('n_price_buckets', 5), + elasticity_method=dag_conf.get('elasticity_method', 'point'), + min_observations=dag_conf.get('min_observations', 2), + ) + +# atomic task functions (each wraps one sklearn step) +def fetch_interactions(**kwargs): + """Task: Fetch interaction data from Kafka""" + context = get_context(**kwargs) + step = FetchInteractionsStep(context) + df = step.transform(None) + + kwargs['ti'].xcom_push(key='interactions_raw', value=pickle.dumps(df)) + logging.info(f"Fetched {len(df)} interaction records") + return len(df) + +def fetch_price_logs(**kwargs): + """Task: Fetch price logs from Kafka""" + context = get_context(**kwargs) + step = FetchPriceLogsStep(context) + df = step.transform(None) + + kwargs['ti'].xcom_push(key='price_logs_raw', value=pickle.dumps(df)) + logging.info(f"Fetched {len(df)} price records") + return len(df) + +def create_price_buckets(**kwargs): + """Task: Create price buckets for interactions""" + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='interactions_raw')) + + context = get_context(**kwargs) + step = CreatePriceBucketsStep(context) + df = step.transform(df) + + ti.xcom_push(key='interactions_bucketed', value=pickle.dumps(df)) + logging.info(f"Created price buckets for {len(df)} interactions") + return len(df) + +def augment_event_names(**kwargs): + """Task: Augment event names with product and price schema""" + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='interactions_bucketed')) + + context = get_context(**kwargs) + step = AugmentEventNamesStep(context) + df = step.transform(df) + + ti.xcom_push(key='interactions_final', value=pickle.dumps(df)) + logging.info(f"Augmented event names for {len(df)} interactions") + return len(df) + +def chunk_interactions(**kwargs): + """Task: Chunk interactions into time windows""" + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='interactions_final')) + + context = get_context(**kwargs) + step = ChunkByTimeWindowStep(context) + chunks = step.transform(df) + + ti.xcom_push(key='interaction_chunks', value=pickle.dumps(chunks)) + logging.info(f"Generated {len(chunks)} interaction chunks") + return len(chunks) + +def compute_demand(**kwargs): + """Task: Compute demand vectors for all chunks""" + ti = kwargs['ti'] + chunks = pickle.loads(ti.xcom_pull(key='interaction_chunks')) + + context = get_context(**kwargs) + step = ComputeDemandForChunksStep(context) + demand_chunks = step.transform(chunks) + + ti.xcom_push(key='demand_chunks', value=pickle.dumps(demand_chunks)) + logging.info(f"Computed demand for {len(demand_chunks)} chunks") + return len(demand_chunks) + +def aggregate_price_logs(**kwargs): + """Task: Aggregate price logs into time windows """ + ti = kwargs['ti'] + df = pickle.loads(ti.xcom_pull(key='price_logs_raw')) + + context = get_context(**kwargs) + step = AggregatePriceLogsStep(context) + price_chunks = step.transform(df) + + ti.xcom_push(key='price_chunks', value=pickle.dumps(price_chunks)) + logging.info(f"Aggregated {len(price_chunks)} price chunks") + return len(price_chunks) + +def compute_elasticity(**kwargs): + """Task: Compute price elasticity from demand and price chunks""" + ti = kwargs['ti'] + demand_chunks = pickle.loads(ti.xcom_pull(key='demand_chunks')) + price_chunks = pickle.loads(ti.xcom_pull(key='price_chunks')) + + context = get_context(**kwargs) + step = ComputeElasticityStep(context) + elasticity_df = step.transform((demand_chunks, price_chunks)) + + ti.xcom_push(key='elasticity_results', value=pickle.dumps(elasticity_df)) + logging.info(f"Computed elasticity for {len(elasticity_df)} products") + + return { + 'n_products': len(elasticity_df), + 'mean_elasticity': float(elasticity_df['elasticity'].mean()), + 'median_elasticity': float(elasticity_df['elasticity'].median()) + } + +def build_state_space(**kwargs): + """Task: Build state space from elasticity""" + ti = kwargs['ti'] + elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) + + context = get_context(**kwargs) + step = BuildStateSpaceStep(context) + state_space = step.transform(elasticity_df) + + ti.xcom_push(key='state_space', value=pickle.dumps(state_space)) + logging.info("Built state space for pricing") + return True + +def fit_pricing_function(**kwargs): + """Task: Fit pricing function using elasticity""" + ti = kwargs['ti'] + elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) + + context = get_context(**kwargs) + step = FitPricingFunctionStep(context) + pricer = step.transform(elasticity_df) + + ti.xcom_push(key='pricer', value=pickle.dumps(pricer)) + logging.info("Fitted pricing function") + return True + +def predict_prices(**kwargs): + """Task: Predict optimal prices""" + ti = kwargs['ti'] + pricer = pickle.loads(ti.xcom_pull(key='pricer')) + state_space = pickle.loads(ti.xcom_pull(key='state_space')) + + context = get_context(**kwargs) + step = PredictPricesStep(context) + prices_df = step.transform((pricer, state_space)) + + ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df)) + logging.info(f"Predicted prices for {len(prices_df)} products") + return len(prices_df) + +def publish_results(**kwargs): + """Task: Publish elasticity and pricing results to model registry""" + ti = kwargs['ti'] + elasticity_df = pickle.loads(ti.xcom_pull(key='elasticity_results')) + prices_df = pickle.loads(ti.xcom_pull(key='predicted_prices')) + + sys.path.insert(0, '/opt/airflow') + from lib.model_registry import ModelRegistry + + registry = ModelRegistry() + dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {} + + metadata = { + 'timestamp': pd.Timestamp.now().isoformat(), + 'window_size': dag_conf.get('window_size', '30s'), + 'store_mode': dag_conf.get('store_mode', 'hotel'), + 'dag_run_id': kwargs['dag_run'].run_id if kwargs.get('dag_run') else 'manual' + } + + registry.publish_elasticity(elasticity_df, model_name='latest', metadata=metadata) + + # get fitted pricer from XCom + pricer = pickle.loads(ti.xcom_pull(key='pricer')) + registry.publish_pricing_model( + pricer, + model_name='latest', + metadata={**metadata, 'model_type': type(pricer).__name__} + ) + + logging.info(f"Published elasticity + pricing for {len(elasticity_df)} products") + + return { + 'n_products': len(elasticity_df), + 'registry_status': 'success', + 'elasticity_mean': float(elasticity_df['elasticity'].mean()) + } + + +# DAG definition +with DAG( + 'elasticity_pricing_pipeline', + default_args=default_args, + description='E2E refactored pipeline: atomic steps with proper separation', + schedule_interval='*/15 * * * *', + start_date=days_ago(1), + catchup=False, + max_active_runs=1, + tags=['pricing', 'elasticity', 'research', 'refactored'], +) as dag: + + # parallel data fetching + t_fetch_interactions = PythonOperator( + task_id='fetch_interactions', + python_callable=fetch_interactions, + provide_context=True, + ) + + t_fetch_price_logs = PythonOperator( + task_id='fetch_price_logs', + python_callable=fetch_price_logs, + provide_context=True, + ) + + # interaction processing branch + t_create_buckets = PythonOperator( + task_id='create_price_buckets', + python_callable=create_price_buckets, + provide_context=True, + ) + + t_augment_events = PythonOperator( + task_id='augment_event_names', + python_callable=augment_event_names, + provide_context=True, + ) + + t_chunk_interactions = PythonOperator( + task_id='chunk_interactions', + python_callable=chunk_interactions, + provide_context=True, + ) + + t_compute_demand = PythonOperator( + task_id='compute_demand', + python_callable=compute_demand, + provide_context=True, + ) + + # price processing branch (VECTORIZED) + t_aggregate_prices = PythonOperator( + task_id='aggregate_price_logs', + python_callable=aggregate_price_logs, + provide_context=True, + ) + + # convergence: compute elasticity + t_compute_elasticity = PythonOperator( + task_id='compute_elasticity', + python_callable=compute_elasticity, + provide_context=True, + ) + + # pricing tasks + t_build_state = PythonOperator( + task_id='build_state_space', + python_callable=build_state_space, + provide_context=True, + ) + + t_fit_pricer = PythonOperator( + task_id='fit_pricing_function', + python_callable=fit_pricing_function, + provide_context=True, + ) + + t_predict_prices = PythonOperator( + task_id='predict_prices', + python_callable=predict_prices, + provide_context=True, + ) + + # publish to registry + t_publish = PythonOperator( + task_id='publish_results', + python_callable=publish_results, + provide_context=True, + ) + + # dependency graph (clear atomic flow) + # parallel fetches + [t_fetch_interactions, t_fetch_price_logs] + + # interaction branch: fetch -> bucket -> augment -> chunk -> demand + t_fetch_interactions >> t_create_buckets >> t_augment_events >> t_chunk_interactions >> t_compute_demand + + # price branch: fetch -> aggregate (vectorized) + t_fetch_price_logs >> t_aggregate_prices + + # convergence: both branches -> elasticity + [t_compute_demand, t_aggregate_prices] >> t_compute_elasticity + + # pricing: elasticity -> state + fit -> predict -> publish + t_compute_elasticity >> [t_build_state, t_fit_pricer] >> t_predict_prices >> t_publish diff --git a/experiments/procesing/__init__.py b/experiments/procesing/__init__.py index 48b91bf..2ea5d56 100644 --- a/experiments/procesing/__init__.py +++ b/experiments/procesing/__init__.py @@ -1,19 +1,55 @@ -from .extract import ( - KafkaDataFetcher, - ExperimentJoiner, - EventTitleAugmenter, +from procesing.context import PipelineContext +from procesing.providers import DataProvider, SupabaseProvider, BackendAPIProvider +from procesing.steps import ( + BaseContextStep, + FetchInteractionsStep, + FetchPriceLogsStep, + FetchExperimentsStep, + JoinExperimentsStep, + CreatePriceBucketsStep, + AugmentEventNamesStep, + ChunkByTimeWindowStep, + ComputeDemandStep, + ComputeDemandForChunksStep, + AggregatePriceLogsStep, + ComputeElasticityStep, + StateSpace, + BuildStateSpaceStep, + FitPricingFunctionStep, + PredictPricesStep, +) +from procesing.pipelines import ( + interaction_extraction_pipeline, + price_extraction_pipeline, + elasticity_computation_pipeline, + pricing_pipeline, + full_pipeline, ) -from .demand import DemandEstimator -from .mapping import SessionTransitionProbMatrixTransformer, render_graph -from .pipeline import etl_pipeline, pricing_pipeline __all__ = [ - 'KafkaDataFetcher', - 'ExperimentJoiner', - 'EventTitleAugmenter', - 'DemandEstimator', - 'SessionTransitionProbMatrixTransformer', - 'render_graph', - 'etl_pipeline', + 'PipelineContext', + 'DataProvider', + 'SupabaseProvider', + 'BackendAPIProvider', + 'BaseContextStep', + 'FetchInteractionsStep', + 'FetchPriceLogsStep', + 'FetchExperimentsStep', + 'JoinExperimentsStep', + 'CreatePriceBucketsStep', + 'AugmentEventNamesStep', + 'ChunkByTimeWindowStep', + 'ComputeDemandStep', + 'ComputeDemandForChunksStep', + 'AggregatePriceLogsStep', + 'ComputeElasticityStep', + 'StateSpace', + 'BuildStateSpaceStep', + 'FitPricingFunctionStep', + 'PredictPricesStep', + 'interaction_extraction_pipeline', + 'price_extraction_pipeline', + 'elasticity_computation_pipeline', 'pricing_pipeline', + 'full_pipeline', ] diff --git a/experiments/procesing/context.py b/experiments/procesing/context.py new file mode 100644 index 0000000..0ff5943 --- /dev/null +++ b/experiments/procesing/context.py @@ -0,0 +1,34 @@ +from typing import Any, Dict +import pandas as pd +from procesing.providers.base import DataProvider + +class PipelineContext: + """ + Context for pipeline execution holding config, provider, and cached data. + Enables dependency injection and eliminates global state. + """ + + def __init__(self, + provider: DataProvider, + store_mode: str, + window_size: str = '30s', + **config): + self.provider = provider + self.store_mode = store_mode + self.window_size = window_size + self.config = config + self._cache: Dict[str, Any] = {} + + def get_cached(self, key: str, default=None): + return self._cache.get(key, default) + + def cache(self, key: str, value): + self._cache[key] = value + return value + + @property + def products(self) -> pd.DataFrame: + """Lazy-load and cache product catalog, single fetch per pipeline run""" + if 'products' not in self._cache: + self._cache['products'] = self.provider.fetch_products(self.store_mode) + return self._cache['products'] diff --git a/experiments/procesing/demand.py b/experiments/procesing/demand.py deleted file mode 100644 index ba3c6c6..0000000 --- a/experiments/procesing/demand.py +++ /dev/null @@ -1,119 +0,0 @@ -from sklearn.base import BaseEstimator, TransformerMixin -import numpy as np -import pandas as pd -from supabase import create_client, Client -from typing import Optional, Literal -import os -import logging -log = logging.getLogger(__name__) - -SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") -SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") - -supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) - -class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin): - """ - Split interaction data into time windows for temporal analysis. - Returns a list of dataframes, one per time window. - """ - def __init__(self, - window_size:str='1h', - ts_col:str='ts', - return_metadata:bool=True): - """ - Args: - window_size: pandas freq string ('1h', '30T', '1D', etc) - ts_col: timestamp column name - return_metadata: if True, return dict with metadata per chunk - """ - self.window_size = window_size - self.ts_col = ts_col - self.return_metadata = return_metadata - - def fit(self, X): - return self - - def transform(self, interactions: pd.DataFrame): - """ - Returns: - if return_metadata=False: list of dataframes, one per window - if return_metadata=True: list of dicts with keys: - - 'data': dataframe for this window - - 'window_start': start timestamp - - 'window_end': end timestamp - - 'window_idx': integer index - """ - if interactions.empty: - return [] - - df = interactions.copy() - - # ensure timestamp is datetime - if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]): - df[self.ts_col] = pd.to_datetime(df[self.ts_col]) - - # sort by time - df = df.sort_values(self.ts_col) - - # assign window - df['_window'] = df[self.ts_col].dt.floor(self.window_size) - - # group by window - chunks = [] - for idx, (window_start, group) in enumerate(df.groupby('_window')): - chunk_data = group.drop(columns=['_window']) - - if self.return_metadata: - chunks.append({ - 'data': chunk_data, - 'window_start': window_start, - 'window_end': window_start + pd.Timedelta(self.window_size), - 'window_idx': idx - }) - else: - chunks.append(chunk_data) - - return chunks - - -class DemandEstimator(BaseEstimator, TransformerMixin): - def __init__(self, - store_mode:str='hotel', - session_filter:str="", - experiment_filter:str=""): - self.store=store_mode - self.session_filter=session_filter if len(session_filter)>0 else None - self.experiment_filter=experiment_filter if len(experiment_filter)>0 else None - def fit(self, X): - return self - - def transform(self, interactions : pd.DataFrame): - if interactions.empty: - return pd.DataFrame(columns=["productId", "demand_score"]) - if self.session_filter: - interactions = interactions[interactions['sessionId'] == self.session_filter] - if self.experiment_filter: - interactions = interactions[interactions['experimentId'] == self.experiment_filter] - - products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() - products = pd.DataFrame(products.data) - unique_products = products['id'].unique() - log.info(f"Demand estimator found {len(unique_products)} in data") - - # filter out rows without productId - interactions_with_products = interactions.dropna(subset=['productId']) - - if interactions_with_products.empty: - # no interactions with products, return all zeros - return pd.DataFrame({ - 'productId': unique_products, - 'demand_score': 0 - }) - - # TODO: improve demand score calculation rather than just counting interactions (use weights..) - # while maintaining simplicity of a simple cross tab approach - product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions") - product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index() - product_demand.columns = ['productId', 'demand_score'] - return product_demand diff --git a/experiments/procesing/elasticity.py b/experiments/procesing/elasticity.py index eea3022..736364c 100644 --- a/experiments/procesing/elasticity.py +++ b/experiments/procesing/elasticity.py @@ -130,25 +130,24 @@ class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): def _build_product_timeseries(self, aligned_chunks): """Build time series [price, quantity] per product.""" - series_by_product = {} - + # vectorize chunk merging instead of iterating rows + all_merged = [] for chunk in aligned_chunks: - demand_df = chunk['demand'] - price_df = chunk['prices'] + merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner') + merged['timestamp'] = chunk['window_start'] + all_merged.append(merged[['productId', 'timestamp', 'price', 'demand_score']]) - # merge on productId - merged = demand_df.merge(price_df, on='productId', how='inner') + if not all_merged: + return {} - for _, row in merged.iterrows(): - pid = row['productId'] - if pid not in series_by_product: - series_by_product[pid] = [] - - series_by_product[pid].append({ - 'timestamp': chunk['window_start'], - 'price': row['price'], - 'quantity': row['demand_score'] - }) + # concat all chunks and group by productId in one pass + combined = pd.concat(all_merged, ignore_index=True) + series_by_product = { + pid: group[['timestamp', 'price', 'demand_score']].rename( + columns={'demand_score': 'quantity'} + ).to_dict('records') + for pid, group in combined.groupby('productId') + } return series_by_product diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py deleted file mode 100644 index 81eb1d3..0000000 --- a/experiments/procesing/extract.py +++ /dev/null @@ -1,207 +0,0 @@ -import pandas as pd -import json -import numpy as np -import os -import requests -from dotenv import load_dotenv -from sklearn.base import BaseEstimator, TransformerMixin -from supabase import create_client, Client -from typing import Tuple, List, Dict -load_dotenv() - -BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000") -SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") -SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") -N_PRICE_BUCKETS = 5 - -supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) - - -class KafkaDataFetcher(BaseEstimator, TransformerMixin): - def __init__(self, topic: str = "user-interactions"): - self.topic = topic # also can be price-logs - def fit(self, X=None, y=None): - return self - - def transform(self, X=None): - resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}") - resp.raise_for_status() - data = resp.json() - - if not data.get('success') or not data.get('data'): - return pd.DataFrame() - - df = pd.DataFrame(data['data']) - if self.topic == 'user-interactions': - if 'metadata' in df.columns: # explode metadata col json - df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) - df = df.dropna(subset=['eventName']) - # remape dateIndex - df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') - return df - - -class ExperimentJoiner(BaseEstimator, TransformerMixin): - def fit(self, X=None, y=None): - return self - - def transform(self, df): - if df.empty or 'experimentId' not in df.columns: - return df - - unique_exp_ids = df['experimentId'].dropna().unique() - if len(unique_exp_ids) == 0: - return df - - resp = supabase.table('experiments').select( - 'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, task:tasks(task_name, task_description, task_def_of_done)' - ).in_('id', unique_exp_ids.tolist()).execute() - - if not resp.data: - return df - - exp_df = pd.DataFrame(resp.data) - - # flatten task nested object if present - if 'task' in exp_df.columns and exp_df['task'].notnull().any(): - task_normalized = pd.json_normalize(exp_df['task'].dropna()) - task_normalized.index = exp_df[exp_df['task'].notnull()].index - exp_df = exp_df.drop(columns=['task']).join(task_normalized, rsuffix='_task') - - # rename experiment columns for clarity - exp_df = exp_df.rename(columns={ - 'id': 'experimentId', - 'subject_name': 'exp_subject', - 'xp_human_only': 'exp_human_only', - 'xp_market_mode': 'exp_market_mode', - 'xp_task_id': 'exp_task_id' - }) - - df = df.merge(exp_df, on='experimentId', how='left') - return df - - -class EventTitleAugmenter(BaseEstimator, TransformerMixin): - def fit(self, X=None, y=None): - return self - - def transform(self, df): - # from taking standard view_item_page in eventName to view_item_page_{metadata_schema} - # we want metadata schema to create product specific event names - - # only create price buckets if we have enough unique prices - if df["metadata_price"].notnull().sum() > 0: - try: - price_buckets = pd.qcut( - df["metadata_price"], - q=N_PRICE_BUCKETS, - labels=[f"PB_{i+1}" for i in range(N_PRICE_BUCKETS)], - duplicates='drop' # handle duplicate bin edges - ) - except ValueError: - # fallback: if still not enough unique values, use cut with fixed ranges or just use raw price - price_buckets = df["metadata_price"].apply(lambda x: f"P_{int(x)}" if pd.notnull(x) else "") - else: - price_buckets = pd.Series([""] * len(df), index=df.index) - - # metadata_schema: _product_id@price_bucket_{i} only if we have product metadata otherswise keep original event name - # TODO: make this adaptive, if we have hover_over_title we append the title, if its view_page we say which page - df["metadata_schema"] = np.where( - df["productId"].notnull() & df["metadata_price"].notnull(), - "_" + df["productId"].astype(str) + "@" + price_buckets.astype(str), - "" - ) - df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) - return df - - -def chunk_shared_data(interactions_df: pd.DataFrame, - price_logs_df: pd.DataFrame, - window_size: str = '30s', - ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]: - """ - Chunk interaction and price data into aligned time windows. - - Args: - interactions_df: interaction data with timestamp column - price_logs_df: price log data with timestamp column - window_size: pandas freq string ('30s', '1min', '1h', etc) - ts_col: name of timestamp column - - Returns: - tuple of (interaction_chunks, price_chunks) where each is list of dicts: - { - 'window_start': timestamp, - 'window_end': timestamp, - 'data': dataframe for this window - } - """ - if interactions_df.empty and price_logs_df.empty: - return [], [] - - # convert timestamps to datetime - interactions_df = interactions_df.copy() - price_logs_df = price_logs_df.copy() - - if not interactions_df.empty: - if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]): - interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col]) - - if not price_logs_df.empty: - if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]): - price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col]) - - # find global time bounds - times = [] - if not interactions_df.empty: - times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()]) - if not price_logs_df.empty: - times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()]) - - if not times: - return [], [] - - earliest = min(times) - latest = max(times) - - # create shared time windows - windows = pd.date_range(start=earliest, end=latest, freq=window_size) - - if len(windows) < 2: - return [], [] - - # chunk both datasets - interaction_chunks = [] - price_chunks = [] - - for i in range(len(windows) - 1): - window_start = windows[i] - window_end = windows[i + 1] - - # filter interactions in this window - if not interactions_df.empty: - mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end) - interaction_chunk = interactions_df[mask] - else: - interaction_chunk = pd.DataFrame() - - interaction_chunks.append({ - 'window_start': window_start, - 'window_end': window_end, - 'data': interaction_chunk - }) - - # filter price logs in this window - if not price_logs_df.empty: - mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end) - price_chunk = price_logs_df[mask] - else: - price_chunk = pd.DataFrame() - - price_chunks.append({ - 'window_start': window_start, - 'window_end': window_end, - 'data': price_chunk - }) - - return interaction_chunks, price_chunks diff --git a/experiments/procesing/mapping.py b/experiments/procesing/mapping.py deleted file mode 100644 index 6c32b91..0000000 --- a/experiments/procesing/mapping.py +++ /dev/null @@ -1,158 +0,0 @@ -import numpy as np -import pandas as pd -from sklearn.base import BaseEstimator, TransformerMixin - -def build_transition_prob_matrix(df: pd.DataFrame): - df = df.dropna(subset=['eventName']) - events = df['eventName'].tolist() - labels = pd.Index(events).unique().tolist() - idx = {e:i for i,e in enumerate(labels)} - M = np.zeros((len(labels), len(labels)), dtype=float) - for a, b in zip(events, events[1:]): - M[idx[a], idx[b]] += 1 - row_sums = M.sum(axis=1, keepdims=True) - with np.errstate(divide='ignore', invalid='ignore'): - P = np.divide(M, row_sums, where=row_sums>0) # row-normalized - return P, labels - -# https://medium.com/data-science/time-series-data-markov-transition-matrices-7060771e362b -from graphviz import Digraph -import numpy as np -import pandas as pd - -def _as_prob_df(matrix, labels=None): - """Return a square DataFrame with index=columns=labels.""" - if isinstance(matrix, pd.DataFrame): - # Ensure square and aligned - assert (matrix.index == matrix.columns).all(), "Index/columns must match." - return matrix - matrix = np.asarray(matrix, dtype=float) - assert matrix.shape[0] == matrix.shape[1], "Matrix must be square." - if labels is None: - raise ValueError("labels are required when matrix is not a DataFrame") - assert len(labels) == matrix.shape[0], "labels length must match matrix size." - return pd.DataFrame(matrix, index=list(labels), columns=list(labels)) - -def _df_to_edgelist(P: pd.DataFrame, threshold=0.0, round_digits=2): - """Build weighted edges > threshold.""" - edges = [] - for src in P.index: - for dst in P.columns: - w = float(P.loc[src, dst]) - if w > threshold: - edges.append((str(src), str(dst), f"{w:.{round_digits}f}")) - return edges - -def render_graph(fname, matrix, ls_index=None, threshold=0.0, fmt="svg", view=False): - """ - fname: output file stem (no extension) - matrix: NumPy array or pandas DataFrame of transition PROBABILITIES - ls_index: ordered labels (required if matrix is not a DataFrame) - threshold: hide edges with weight <= threshold - fmt: 'svg'|'png'|'pdf' etc. - view: open after rendering - """ - P = _as_prob_df(matrix, labels=ls_index) - edges = _df_to_edgelist(P, threshold=threshold) - - g = Digraph(format=fmt) - g.attr(rankdir="LR", size="30") - g.attr("node", shape="circle") - - # ensure isolated nodes appear - for node in P.index: - g.node(str(node), width="1", height="1") - - for src, dst, label in edges: - g.edge(src, dst, label=label) - - g.render(fname, view=view, cleanup=True) - return g - - -class TransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): - def __init__(self, threshold=0.0): - self.threshold = threshold - self.P_ = None - self.labels_ = None - - def fit(self, X: pd.DataFrame, y=None): - P, labels = build_transition_prob_matrix(X) - self.P_ = P - self.labels_ = labels - return self - - def transform(self, X: pd.DataFrame = None): - return self.P_, self.labels_ - - def render(self, fname: str, fmt="svg", view=False): - if self.P_ is None or self.labels_ is None: - raise ValueError("Transformer has not been fitted yet.") - return render_graph( - fname, - self.P_, - ls_index=self.labels_, - threshold=self.threshold, - fmt=fmt, - view=view - ) - - -class SessionTransitionProbMatrixTransformer(BaseEstimator, TransformerMixin): - def __init__(self, threshold=0.0, session_col='sessionId'): - self.threshold = threshold - self.session_col = session_col - self.session_matrices_ = None - - def fit(self, X: pd.DataFrame, y=None): - if self.session_col not in X.columns: - raise ValueError(f"Column '{self.session_col}' not found in DataFrame") - - session_matrices = {} - for session_id, grp in X.groupby(self.session_col): - if len(grp) > 1: # need at least 2 events for transitions - P, labels = build_transition_prob_matrix(grp) - session_matrices[session_id] = {'matrix': P, 'labels': labels} - - self.session_matrices_ = session_matrices - return self - - def transform(self, X: pd.DataFrame = None): - if self.session_matrices_ is None: - raise ValueError("Transformer has not been fitted yet.") - return pd.Series(self.session_matrices_) - - def render_session(self, session_id: str, fname: str, fmt="svg", view=False): - if self.session_matrices_ is None: - raise ValueError("Transformer has not been fitted yet.") - if session_id not in self.session_matrices_: - raise ValueError(f"Session '{session_id}' not found in fitted data.") - - sess_data = self.session_matrices_[session_id] - return render_graph( - fname, - sess_data['matrix'], - ls_index=sess_data['labels'], - threshold=self.threshold, - fmt=fmt, - view=view - ) -if __name__ == "__main__": - # Example usage - data = { - 'eventName': [ - 'A', 'B', 'A', 'C', 'B', 'A', 'A', 'C', 'B', 'C', - 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A' - ] - } - df = pd.DataFrame(data) - - transformer = TransitionProbMatrixTransformer(threshold=0.1) - transformer.fit(df) - P, labels = transformer.transform(None) - - print("Transition Probability Matrix:") - print(pd.DataFrame(P, index=labels, columns=labels)) - - # Render the graph - transformer.render("transition_graph", fmt="svg", view=False) diff --git a/experiments/procesing/metrics.py b/experiments/procesing/metrics.py new file mode 100644 index 0000000..ce2fe4e --- /dev/null +++ b/experiments/procesing/metrics.py @@ -0,0 +1,245 @@ +""" +Revenue and KPI benchmark framework for pricing strategies. + +Computes session-level and aggregate metrics to compare pricing functions: + - Revenue: R_T = Σ P_t^T · Q_t + - Conversion rate + - Average order value (AOV) + - Agent exploitation loss: L_agent = R_oracle - R_observed +""" +from typing import Dict, List, Any, Optional +from dataclasses import dataclass, field, asdict +import pandas as pd +import numpy as np + + +@dataclass +class SessionMetrics: + """KPIs for single session.""" + session_id: str + experiment_id: Optional[str] = None + + # interaction metrics + total_interactions: int = 0 + page_views: int = 0 + item_views: int = 0 + searches: int = 0 + cart_adds: int = 0 + + # revenue metrics + items_purchased: int = 0 + total_revenue: float = 0.0 + avg_item_price: float = 0.0 + conversion_rate: float = 0.0 + + # pricing signals + total_price_shown: float = 0.0 # sum of all prices displayed + avg_markup: float = 0.0 # avg (price / base_price) + + # behavioral features (for agent detection) + interaction_velocity: float = 0.0 # interactions per minute + session_duration_sec: float = 0.0 + unique_products_viewed: int = 0 + + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class AggregateMetrics: + """Aggregate KPIs across sessions/experiments.""" + experiment_id: Optional[str] = None + n_sessions: int = 0 + + # revenue aggregates + total_revenue: float = 0.0 + avg_revenue_per_session: float = 0.0 + median_revenue_per_session: float = 0.0 + + # conversion aggregates + total_conversions: int = 0 + conversion_rate: float = 0.0 # purchases / sessions + + # pricing aggregates + avg_markup: float = 0.0 + median_markup: float = 0.0 + + # agent exploitation metrics + estimated_agent_sessions: int = 0 # sessions flagged as agent-driven + agent_revenue: float = 0.0 + human_revenue: float = 0.0 + agent_loss: float = 0.0 # L_agent = R_oracle - R_observed (if available) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class MetricsComputer: + """Compute session and aggregate metrics from interaction/price logs.""" + + @staticmethod + def compute_session_metrics( + session_id: str, + interactions: pd.DataFrame, + price_logs: pd.DataFrame, + purchases: Optional[pd.DataFrame] = None, + experiment_id: Optional[str] = None + ) -> SessionMetrics: + """ + Compute metrics for single session. + + Args: + session_id: session identifier + interactions: user-interactions events for this session + price_logs: price-logs for this session + purchases: purchase events (if available) + experiment_id: experiment identifier + """ + metrics = SessionMetrics(session_id=session_id, experiment_id=experiment_id) + + if interactions.empty: + return metrics + + # interaction counts + event_counts = interactions['eventName'].value_counts().to_dict() + metrics.total_interactions = len(interactions) + metrics.page_views = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) + metrics.item_views = event_counts.get('view_item_page', 0) + metrics.searches = event_counts.get('search', 0) + metrics.cart_adds = event_counts.get('add_item_to_cart', 0) + + # unique products viewed + metrics.unique_products_viewed = interactions['productId'].dropna().nunique() + + # session duration + if 'ts' in interactions.columns: + timestamps = pd.to_datetime(interactions['ts']) + metrics.session_duration_sec = (timestamps.max() - timestamps.min()).total_seconds() + if metrics.session_duration_sec > 0: + metrics.interaction_velocity = (metrics.total_interactions / metrics.session_duration_sec) * 60 + + # revenue from purchases + if purchases is not None and not purchases.empty: + metrics.items_purchased = len(purchases) + metrics.total_revenue = purchases['price'].sum() if 'price' in purchases.columns else 0.0 + metrics.avg_item_price = metrics.total_revenue / metrics.items_purchased if metrics.items_purchased > 0 else 0.0 + metrics.conversion_rate = 1.0 if metrics.items_purchased > 0 else 0.0 + + # pricing metrics + if not price_logs.empty: + metrics.total_price_shown = price_logs['price'].sum() + # compute markup if base_price available in price logs or join with product catalog + if 'base_price' in price_logs.columns: + valid_markup = price_logs[price_logs['base_price'] > 0] + if not valid_markup.empty: + metrics.avg_markup = (valid_markup['price'] / valid_markup['base_price']).mean() + + return metrics + + @staticmethod + def compute_aggregate_metrics( + session_metrics_list: List[SessionMetrics], + experiment_id: Optional[str] = None, + agent_detector_fn: Optional[callable] = None + ) -> AggregateMetrics: + """ + Aggregate metrics across sessions. + + Args: + session_metrics_list: list of SessionMetrics + experiment_id: experiment identifier + agent_detector_fn: optional function to classify session as agent (returns bool) + """ + agg = AggregateMetrics(experiment_id=experiment_id) + agg.n_sessions = len(session_metrics_list) + + if agg.n_sessions == 0: + return agg + + df = pd.DataFrame([m.to_dict() for m in session_metrics_list]) + + # revenue aggregates + agg.total_revenue = df['total_revenue'].sum() + agg.avg_revenue_per_session = df['total_revenue'].mean() + agg.median_revenue_per_session = df['total_revenue'].median() + + # conversion aggregates + agg.total_conversions = (df['items_purchased'] > 0).sum() + agg.conversion_rate = agg.total_conversions / agg.n_sessions + + # pricing aggregates + valid_markups = df[df['avg_markup'] > 0] + if not valid_markups.empty: + agg.avg_markup = valid_markups['avg_markup'].mean() + agg.median_markup = valid_markups['avg_markup'].median() + + # agent detection (if detector provided) + if agent_detector_fn is not None: + agent_flags = [agent_detector_fn(m) for m in session_metrics_list] + agg.estimated_agent_sessions = sum(agent_flags) + + agent_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if is_agent) + human_revenue = sum(m.total_revenue for m, is_agent in zip(session_metrics_list, agent_flags) if not is_agent) + + agg.agent_revenue = agent_revenue + agg.human_revenue = human_revenue + + return agg + + @staticmethod + def compare_pricing_strategies( + experiments: Dict[str, List[SessionMetrics]], + baseline_experiment_id: Optional[str] = None + ) -> pd.DataFrame: + """ + Compare multiple pricing strategies/experiments. + + Args: + experiments: dict mapping experiment_id -> list of SessionMetrics + baseline_experiment_id: experiment to use as baseline for comparison + + Returns: + DataFrame with comparative metrics + """ + results = [] + baseline_agg = None + + for exp_id, session_metrics in experiments.items(): + agg = MetricsComputer.compute_aggregate_metrics(session_metrics, experiment_id=exp_id) + result = agg.to_dict() + + if exp_id == baseline_experiment_id: + baseline_agg = agg + + results.append(result) + + df = pd.DataFrame(results) + + # add relative metrics if baseline exists + if baseline_agg is not None: + df['revenue_lift_pct'] = ((df['total_revenue'] - baseline_agg.total_revenue) / baseline_agg.total_revenue * 100) + df['conversion_lift_pct'] = ((df['conversion_rate'] - baseline_agg.conversion_rate) / baseline_agg.conversion_rate * 100) + + return df + + +def simple_agent_detector(session_metrics: SessionMetrics, velocity_threshold: float = 5.0) -> bool: + """ + Simple heuristic agent detector based on interaction velocity. + + Args: + session_metrics: SessionMetrics instance + velocity_threshold: interactions per minute threshold (default: 5.0) + + Returns: + True if session likely agent-driven + """ + # agents tend to have higher interaction velocity and lower session duration + if session_metrics.interaction_velocity > velocity_threshold: + return True + # agents often view many products quickly without converting + if session_metrics.unique_products_viewed > 10 and session_metrics.conversion_rate == 0: + return True + return False diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py deleted file mode 100644 index 2efa4ae..0000000 --- a/experiments/procesing/pipeline.py +++ /dev/null @@ -1,90 +0,0 @@ -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import StandardScaler -import pandas as pd -import logging -log = logging.getLogger(__name__) - -from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data -from mapping import SessionTransitionProbMatrixTransformer, render_graph -from demand import DemandEstimator, ChunkInteractionsIntoSteps -from elasticity import TemporalElasticityEstimator, aggregate_price_logs - - - -# elasticity pipeline components (not sklearn compatible, manual orchestration) -def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'): - """ - Compute price elasticity from interaction and price data. - - Args: - interactions_df: raw interaction data from demand_data_pipeline - price_logs_df: price log data from price_data_pipeline - window_size: time window for chunking - store_mode: 'hotel' or 'airline' - - Returns: - df with [productId, elasticity, std_error, n_obs] - """ - # step 1: chunk interactions into time windows - chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) - interaction_chunks = chunker.transform(interactions_df) - log.info(f"Chunked interactions into {len(interaction_chunks)} windows of size {window_size}") - - if not interaction_chunks: - return None - - # step 2: compute demand per window - demand_estimator = DemandEstimator(store_mode=store_mode) - demand_chunks = [] - for chunk in interaction_chunks: - demand_vector = demand_estimator.transform(chunk['data']) - demand_chunks.append({ - 'window_start': chunk['window_start'], - 'window_end': chunk['window_end'], - 'demand_vector': demand_vector # each has a full list of all products, even if demand is 0 - }) - # [q_chunk1, q_chunk2, ...] - - # step 3: aggregate price logs into windows - price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size) - - # step 4: compute elasticity - elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2) - elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks, store_mode=store_mode) - - return elasticity_df - - -# exposable pipelines -interaction_pipeline = Pipeline([ - ('kafka_fetch', KafkaDataFetcher(topic='user-interactions')), - ('experiment_join', ExperimentJoiner()), - ('event_augment', EventTitleAugmenter()), -]) - -price_data_pipeline = Pipeline([ - ('kafka_fetch', KafkaDataFetcher(topic='price-logs')), -]) - -# interaction_data + price_data -> elasticity (demand) -# elasticity -> pricing - -pricing_pipeline = Pipeline([ - ('demand_estimation', DemandEstimator()), -]) -if __name__ == "__main__": - # fetch both datasets - interaction_data = interaction_pipeline.fit_transform(None) - pricing_data = price_data_pipeline.fit_transform(None) - if interaction_data.empty or pricing_data.empty: - print("Insufficient data for elasticity computation"); exit(0) - # compute elasticity via unified pipeline - window_size = "30s" - elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size) - elasticity_value_array = elasticity_results['elasticity'].values if elasticity_results is not None else np.array([]) - print(elasticity_value_array) - - if elasticity_results is not None and not elasticity_results.empty: - print(elasticity_results.to_string(index=False)) - else: - print("\nInsufficient data for elasticity computation") diff --git a/experiments/procesing/pipelines.py b/experiments/procesing/pipelines.py new file mode 100644 index 0000000..ac95314 --- /dev/null +++ b/experiments/procesing/pipelines.py @@ -0,0 +1,138 @@ +from sklearn.pipeline import Pipeline +import pandas as pd +from procesing.context import PipelineContext +from procesing.providers import SupabaseProvider, BackendAPIProvider +from typing import Union +from procesing.steps import ( + FetchInteractionsStep, + FetchPriceLogsStep, + FetchExperimentsStep, + JoinExperimentsStep, + CreatePriceBucketsStep, + AugmentEventNamesStep, + ChunkByTimeWindowStep, + ComputeDemandForChunksStep, + AggregatePriceLogsStep, + ComputeElasticityStep, + BuildStateSpaceStep, + FitPricingFunctionStep, + PredictPricesStep, +) + +def interaction_extraction_pipeline(context: PipelineContext): + """Pipeline for extracting and augmenting interaction data""" + return Pipeline([ + ('fetch', FetchInteractionsStep(context)), + ('create_buckets', CreatePriceBucketsStep(context)), + ('augment_events', AugmentEventNamesStep(context)), + ]) + + +def price_extraction_pipeline(context: PipelineContext): + """Pipeline for extracting price logs""" + return Pipeline([ + ('fetch', FetchPriceLogsStep(context)), + ]) + + +def elasticity_computation_pipeline(context: PipelineContext, + interactions_df: pd.DataFrame, + price_logs_df: pd.DataFrame): + """ + Compute elasticity from interactions and price logs. + Manual orchestration needed for branching logic. + """ + # branch 1: chunk interactions and compute demand + chunk_step = ChunkByTimeWindowStep(context) + interaction_chunks = chunk_step.transform(interactions_df) + + demand_step = ComputeDemandForChunksStep(context) + demand_chunks = demand_step.transform(interaction_chunks) + + # branch 2: aggregate price logs + price_step = AggregatePriceLogsStep(context) + price_chunks = price_step.transform(price_logs_df) + + # convergence: compute elasticity + elasticity_step = ComputeElasticityStep(context) + elasticity_df = elasticity_step.transform((demand_chunks, price_chunks)) + + return elasticity_df + + +def pricing_pipeline(context: PipelineContext, elasticity_df: pd.DataFrame): + """ + Generate optimal prices from elasticity estimates. + """ + # build state space + state_step = BuildStateSpaceStep(context) + state_space = state_step.transform(elasticity_df) + + # fit pricing function + fit_step = FitPricingFunctionStep(context) + pricer = fit_step.transform(elasticity_df) + + # predict prices + predict_step = PredictPricesStep(context) + prices_df = predict_step.transform((pricer, state_space)) + + return prices_df + + +def full_pipeline(context: PipelineContext): + """ + Complete end-to-end pipeline: data extraction -> elasticity -> pricing + Returns: (elasticity_df, prices_df) + """ + # extract interactions + interaction_pipe = interaction_extraction_pipeline(context) + interactions_df = interaction_pipe.fit_transform(None) + + # extract price logs + price_pipe = price_extraction_pipeline(context) + price_logs_df = price_pipe.fit_transform(None) + + if interactions_df.empty or price_logs_df.empty: + return None, None + + # compute elasticity + elasticity_df = elasticity_computation_pipeline( + context, + interactions_df, + price_logs_df + ) + + if elasticity_df is None or elasticity_df.empty: + return elasticity_df, None + + # generate prices + prices_df = pricing_pipeline(context, elasticity_df) + + return elasticity_df, prices_df + + +if __name__ == '__main__': + + class Provider(SupabaseProvider, BackendAPIProvider): + def __init__(self, backend_url: str): + SupabaseProvider.__init__(self) + BackendAPIProvider.__init__(self, backend_url=backend_url) + # example run + context = PipelineContext( + provider=Provider(backend_url="http://localhost:5000"), + store_mode='hotel', + ) + + elasticity_df, prices_df = full_pipeline(context) + + if elasticity_df is not None and not elasticity_df.empty: + print("Elasticity Estimates:") + print(elasticity_df.to_string(index=False)) + else: + print("No elasticity estimates computed.") + + if prices_df is not None and not prices_df.empty: + print("\nPredicted Prices:") + print(prices_df.to_string(index=False)) + else: + print("No prices predicted.") diff --git a/experiments/procesing/pricers/__init__.py b/experiments/procesing/pricers/__init__.py new file mode 100644 index 0000000..a812d8f --- /dev/null +++ b/experiments/procesing/pricers/__init__.py @@ -0,0 +1,13 @@ +from procesing.pricers.base import PricingFunction +from procesing.pricers.elasticity import ElasticityBasedPricer +from procesing.pricers.simple import StaticPricer, RandomPricer +from procesing.pricers.session_aware import SessionAwarePricer, ProductSpecificSessionPricer + +__all__ = [ + 'PricingFunction', + 'ElasticityBasedPricer', + 'StaticPricer', + 'RandomPricer', + 'SessionAwarePricer', + 'ProductSpecificSessionPricer' +] diff --git a/experiments/procesing/pricers/base.py b/experiments/procesing/pricers/base.py new file mode 100644 index 0000000..b2ada0c --- /dev/null +++ b/experiments/procesing/pricers/base.py @@ -0,0 +1,70 @@ +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List +import numpy as np +import pandas as pd + + +class PricingFunction(ABC): + """ + Abstract base for pricing functions. + + Defines mapping: f(Q_t, P_t, S_t, H_t) -> P_{t+1} + + Where: + Q_t ∈ R^n: demand vector at time t + P_t ∈ R^n: price vector at time t + S_t: session features (behavioral signals, interactions) + H_t = {Q_{t-k}, P_{t-k}, S_{t-k}}: historical state trajectory + + Objective: + maximize E[R_T] = E[Σ P_t^T · Q_t] + subject to: + Q_t = g(P_t, S_t) (demand response via elasticity) + P_t ≥ C (cost floor) + minimize L_agent = R_oracle - R_observed + """ + + @abstractmethod + def fit(self, historical_data: pd.DataFrame, **kwargs): + """ + Offline training on historical data. + + Args: + historical_data: DataFrame with elasticity, prices, demand signals + **kwargs: additional training parameters + """ + pass + + @abstractmethod + def predict(self, state_space) -> np.ndarray: + """ + Generate optimal prices given current state. + + Args: + state_space: StateSpace object containing Q_t, P_t, S_t, H_t + + Returns: + P_{t+1}: price vector in R^n + """ + pass + + def update(self, observation: Dict[str, Any]): + """ + Online learning update (optional). + + Args: + observation: dict with {state, action, reward, next_state} + - state: StateSpace before pricing decision + - action: prices shown (P_t) + - reward: revenue/conversion signal + - next_state: StateSpace after user interaction + """ + pass # default: no online learning + + def get_params(self) -> Dict[str, Any]: + """Return pricing function parameters for serialization.""" + return {} + + def set_params(self, params: Dict[str, Any]): + """Load pricing function parameters from dict.""" + pass diff --git a/experiments/procesing/pricers/elasticity.py b/experiments/procesing/pricers/elasticity.py new file mode 100644 index 0000000..b203159 --- /dev/null +++ b/experiments/procesing/pricers/elasticity.py @@ -0,0 +1,59 @@ +import numpy as np +import pandas as pd +from procesing.pricers.base import PricingFunction + + +class ElasticityBasedPricer(PricingFunction): + """ + Pricing based on demand elasticity estimates. + f(Q, S) = base_price * (1 + alpha * elasticity * demand_deviation) + """ + + def __init__(self, alpha: float = 0.1, price_floor: float = 0.0, price_ceil: float = np.inf): + self.alpha = alpha + self.price_floor = price_floor + self.price_ceil = price_ceil + self.elasticity = None + self.base_prices = None + self.mean_demand = None + + def fit(self, historical_data: pd.DataFrame): + """ + Calibrate from historical elasticity estimates. + Expects: [productId, elasticity, base_price, mean_demand] + """ + if 'elasticity' not in historical_data.columns: + raise ValueError("historical_data must contain 'elasticity' column") + + self.elasticity = historical_data['elasticity'].values + self.base_prices = (historical_data['base_price'].values + if 'base_price' in historical_data.columns + else np.ones(len(historical_data)) * 100) + self.mean_demand = (historical_data['mean_demand'].values + if 'mean_demand' in historical_data.columns + else np.ones(len(historical_data)) * 10) + return self + + def predict(self, state_space) -> np.ndarray: + """ + Adjust prices based on demand deviation and elasticity. + Higher demand -> increase price (but less for elastic goods) + """ + if self.elasticity is None: + raise ValueError("Must call fit() before predict()") + + demand = np.asarray(state_space.demand) + if len(demand) != len(self.elasticity): + raise ValueError(f"Demand vector size {len(demand)} != elasticity size {len(self.elasticity)}") + + # compute demand deviation from mean + demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6) + + # adjust price: if demand high and elastic, don't increase much + # if demand high and inelastic, increase more + price_multiplier = 1 + self.alpha * np.abs(self.elasticity) * demand_dev + prices = self.base_prices * price_multiplier + + # enforce bounds + prices = np.clip(prices, self.price_floor, self.price_ceil) + return prices diff --git a/experiments/procesing/pricers/session_aware.py b/experiments/procesing/pricers/session_aware.py new file mode 100644 index 0000000..40343a7 --- /dev/null +++ b/experiments/procesing/pricers/session_aware.py @@ -0,0 +1,172 @@ +""" +Session-aware pricing functions that leverage behavioral features S_t. +These pricers aim to minimize L_agent = R_oracle - R_observed. +""" +import numpy as np +import pandas as pd +from procesing.pricers.base import PricingFunction +from procesing.pricers.elasticity import ElasticityBasedPricer + + +class SessionAwarePricer(PricingFunction): + """ + Extends elasticity-based pricing with session behavioral signals. + + f(Q, P, S) = base_price * elasticity_factor * session_factor + + Where session_factor adjusts for: + - interaction_velocity (agent detection proxy) + - product_view_depth (interest signal) + - cart_to_view_ratio (conversion intent) + + Strategy: charge higher prices to suspected agents (high velocity) + to recover oracle revenue from reconnaissance sessions. + """ + + def __init__(self, + alpha: float = 0.1, + beta_velocity: float = 0.05, + beta_attention: float = 0.03, + agent_velocity_threshold: float = 5.0, + agent_markup: float = 1.2, + price_floor: float = 0.0, + price_ceil: float = np.inf): + """ + Args: + alpha: elasticity sensitivity + beta_velocity: interaction velocity weight + beta_attention: product attention weight + agent_velocity_threshold: velocity above which to apply agent markup + agent_markup: price multiplier for suspected agent sessions + price_floor, price_ceil: price bounds + """ + self.alpha = alpha + self.beta_velocity = beta_velocity + self.beta_attention = beta_attention + self.agent_velocity_threshold = agent_velocity_threshold + self.agent_markup = agent_markup + self.price_floor = price_floor + self.price_ceil = price_ceil + + # fitted parameters + self.elasticity = None + self.base_prices = None + self.mean_demand = None + + def fit(self, historical_data: pd.DataFrame, **kwargs): + """Calibrate from historical elasticity data.""" + if 'elasticity' not in historical_data.columns: + raise ValueError("historical_data must contain 'elasticity'") + + self.elasticity = historical_data['elasticity'].values + self.base_prices = (historical_data['base_price'].values + if 'base_price' in historical_data.columns + else np.ones(len(historical_data)) * 100) + self.mean_demand = (historical_data['mean_demand'].values + if 'mean_demand' in historical_data.columns + else np.ones(len(historical_data)) * 10) + return self + + def predict(self, state_space) -> np.ndarray: + """Generate prices with session awareness.""" + if self.elasticity is None: + raise ValueError("Must call fit() before predict()") + + demand = np.asarray(state_space.demand) + n_products = len(demand) + + # base elasticity-driven pricing + demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6) + elasticity_factor = 1 + self.alpha * np.abs(self.elasticity) * demand_dev + + # session-aware adjustments + session_factor = np.ones(n_products) + + if not state_space.session_features.empty: + sf = state_space.session_features.iloc[0] # single session features + + # agent detection via velocity + velocity = sf.get('interaction_velocity', 0.0) + if velocity > self.agent_velocity_threshold: + # suspected agent: apply markup to recover oracle revenue + session_factor *= self.agent_markup + + # attention signal: higher view depth -> user interested -> can charge more + view_depth = sf.get('product_view_depth', 0) + if view_depth > 0: + attention_boost = 1 + self.beta_attention * np.log1p(view_depth) + session_factor *= attention_boost + + # cart presence: if user has items in cart, slightly increase prices + cart_to_view = sf.get('cart_to_view_ratio', 0.0) + if cart_to_view > 0.1: + session_factor *= (1 + 0.02) # small boost for conversion intent + + prices = self.base_prices * elasticity_factor * session_factor + prices = np.clip(prices, self.price_floor, self.price_ceil) + + return prices + + +class ProductSpecificSessionPricer(PricingFunction): + """ + Session-aware pricer with product-specific demand signals. + + Uses S_t to extract per-product interaction counts and adjusts pricing + for products the user has already viewed/hovered. + + Strategy: products viewed multiple times = high interest -> price up + """ + + def __init__(self, + alpha: float = 0.1, + view_boost: float = 0.02, + max_view_boost: float = 0.15, + price_floor: float = 0.0, + price_ceil: float = np.inf): + self.alpha = alpha + self.view_boost = view_boost + self.max_view_boost = max_view_boost + self.price_floor = price_floor + self.price_ceil = price_ceil + + self.elasticity = None + self.base_prices = None + self.mean_demand = None + self.product_ids = None + + def fit(self, historical_data: pd.DataFrame, **kwargs): + if 'elasticity' not in historical_data.columns or 'productId' not in historical_data.columns: + raise ValueError("historical_data must contain 'elasticity' and 'productId'") + + self.elasticity = historical_data['elasticity'].values + self.base_prices = (historical_data['base_price'].values + if 'base_price' in historical_data.columns + else np.ones(len(historical_data)) * 100) + self.mean_demand = (historical_data['mean_demand'].values + if 'mean_demand' in historical_data.columns + else np.ones(len(historical_data)) * 10) + self.product_ids = historical_data['productId'].values + return self + + def predict(self, state_space) -> np.ndarray: + if self.elasticity is None: + raise ValueError("Must call fit() before predict()") + + demand = np.asarray(state_space.demand) + n_products = len(demand) + + # base pricing + demand_dev = (demand - self.mean_demand) / (self.mean_demand + 1e-6) + base_prices = self.base_prices * (1 + self.alpha * np.abs(self.elasticity) * demand_dev) + + # product-specific session adjustments + if not state_space.session_features.empty and state_space.product_ids is not None: + # extract product interaction counts from session metadata + # (this would require session features to include per-product signals) + # for now, use uniform boost as placeholder + # TODO: extend session feature extraction to include product-specific counts + pass + + prices = np.clip(base_prices, self.price_floor, self.price_ceil) + return prices diff --git a/experiments/procesing/pricers/simple.py b/experiments/procesing/pricers/simple.py new file mode 100644 index 0000000..e98b9be --- /dev/null +++ b/experiments/procesing/pricers/simple.py @@ -0,0 +1,48 @@ +import numpy as np +import pandas as pd +from procesing.pricers.base import PricingFunction + + +class StaticPricer(PricingFunction): + """Static pricing: always return fixed base prices""" + + def __init__(self, base_prices: np.ndarray = None): + self.base_prices = base_prices + + def fit(self, historical_data: pd.DataFrame): + """Extract base prices from historical data""" + if 'base_price' in historical_data.columns: + self.base_prices = historical_data['base_price'].values + elif 'price' in historical_data.columns: + self.base_prices = historical_data['price'].values + else: + raise ValueError("historical_data must contain 'base_price' or 'price' column") + return self + + def predict(self, state_space) -> np.ndarray: + """Return static base prices regardless of state""" + if self.base_prices is None: + raise ValueError("Must call fit() or provide base_prices in constructor") + return self.base_prices.copy() + + +class RandomPricer(PricingFunction): + """Random pricing within bounds (for baseline comparison)""" + + def __init__(self, price_min: float = 50.0, price_max: float = 500.0, seed: int = None): + self.price_min = price_min + self.price_max = price_max + self.seed = seed + self.n_products = None + self.rng = np.random.default_rng(seed) + + def fit(self, historical_data: pd.DataFrame): + """Learn number of products""" + self.n_products = len(historical_data) + return self + + def predict(self, state_space) -> np.ndarray: + """Generate random prices""" + if self.n_products is None: + self.n_products = len(state_space.demand) + return self.rng.uniform(self.price_min, self.price_max, size=self.n_products) diff --git a/experiments/procesing/pricing.py b/experiments/procesing/pricing.py index afdcfdd..4789ebc 100644 --- a/experiments/procesing/pricing.py +++ b/experiments/procesing/pricing.py @@ -35,8 +35,9 @@ from sklearn.base import BaseEstimator, TransformerMixin import numpy as np import pandas as pd import os +from dotenv import load_dotenv +load_dotenv() from supabase import create_client, Client -from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "") SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY", "") @@ -79,18 +80,136 @@ class PricingFunction(BaseEstimator, TransformerMixin, ABC): class SimpleLinearPricingFunction(PricingFunction): def __init__(self, price_sensitivity: float = -0.1): super().__init__() - self.price_sensitivity = price_sensitivity # simple coefficient + self.price_sensitivity = price_sensitivity def fit(self, historical_data): return self def transform(self, state_space: StateSpace) -> np.ndarray: - # Simple linear adjustment: P_{t+1} = P_t + sensitivity * Q_t - new_prices = state_space.prices + self.price_sensitivity * state_space.demand # this is not great + new_prices = state_space.prices + self.price_sensitivity * state_space.demand return np.maximum(new_prices, 0) + +class ElasticityBasedPricingFunction(PricingFunction): + """ + Revenue-maximizing pricing using elasticity estimates. + + For each product, optimal price P* maximizes R = P * Q(P) + where Q(P) follows power law: Q(P) = Q_0 * (P/P_0)^ε + + Taking derivative dR/dP = 0 gives optimal markup: + P* = P_0 * (1 + 1/ε) if ε < -1 (elastic) + + For inelastic demand (|ε| < 1), we apply bounded markup. + """ + + def __init__(self, + cost_floor: float = 0.5, + max_markup: float = 2.0, + min_markup: float = 1.0, + inelastic_markup: float = 1.3): + super().__init__() + self.cost_floor = cost_floor # prices as fraction of base + self.max_markup = max_markup # max price = base * max_markup + self.min_markup = min_markup # min price = base * min_markup + self.inelastic_markup = inelastic_markup # default for |ε| < 1 + self.elasticity_map = {} # productId -> elasticity + + def fit(self, elasticity_df: pd.DataFrame): + """ + Args: + elasticity_df: df with [productId, elasticity, std_error, n_obs] + """ + if elasticity_df is not None and not elasticity_df.empty: + self.elasticity_map = dict(zip( + elasticity_df['productId'], + elasticity_df['elasticity'] + )) + return self + + def transform(self, state_space: StateSpace, product_ids: np.ndarray = None) -> np.ndarray: + """ + Args: + state_space: current state (prices = base prices) + product_ids: array of productIds aligned with state_space.prices + + Returns: + optimized prices P_{t+1} + """ + base_prices = state_space.prices + + if product_ids is None: + # fallback: use positional index as productId (not ideal) + product_ids = np.arange(len(base_prices)) + + new_prices = np.zeros_like(base_prices) + + for i, (base_p, pid) in enumerate(zip(base_prices, product_ids)): + elasticity = self.elasticity_map.get(pid, 0.0) + + if elasticity < -1: # elastic demand + # optimal markup: (1 + 1/ε) + markup = 1 + (1 / elasticity) + optimal_p = base_p * markup + elif elasticity > -1 and elasticity < 0: # inelastic + # conservative markup + optimal_p = base_p * self.inelastic_markup + else: # ε ≥ 0 (demand increases with price, or no data) + # no elasticity data or anomalous, keep base price + optimal_p = base_p + + # apply bounds + optimal_p = np.clip( + optimal_p, + base_p * self.min_markup, + base_p * self.max_markup + ) + optimal_p = max(optimal_p, self.cost_floor) + + new_prices[i] = optimal_p + + return new_prices + + +class ContextualElasticityPricing(PricingFunction): + """ + Revenue optimization with contextual adjustments based on session features. + + Combines elasticity-based pricing with surge/demand-based multipliers. + """ + + def __init__(self, + base_pricer: ElasticityBasedPricingFunction = None, + demand_sensitivity: float = 0.1, + surge_threshold: float = 0.7): + super().__init__() + self.base_pricer = base_pricer or ElasticityBasedPricingFunction() + self.demand_sensitivity = demand_sensitivity + self.surge_threshold = surge_threshold + + def fit(self, elasticity_df: pd.DataFrame): + self.base_pricer.fit(elasticity_df) + return self + + def transform(self, state_space: StateSpace, product_ids: np.ndarray = None) -> np.ndarray: + # get base optimal prices from elasticity + base_optimal = self.base_pricer.transform(state_space, product_ids) + + # compute surge multiplier from demand + if len(state_space.demand) > 0: + demand_normalized = state_space.demand / (state_space.demand.max() + 1e-8) + surge_multiplier = 1 + self.demand_sensitivity * np.maximum( + demand_normalized - self.surge_threshold, 0 + ) + else: + surge_multiplier = np.ones_like(base_optimal) + + return base_optimal * surge_multiplier + # Example usage: if __name__ == "__main__": + from pipeline import interaction_pipeline, price_data_pipeline, elasticity_pipeline + store_mode = 'hotel' interaction_data = interaction_pipeline.fit_transform(None) price_data = price_data_pipeline.fit_transform(None) diff --git a/experiments/procesing/providers/__init__.py b/experiments/procesing/providers/__init__.py new file mode 100755 index 0000000..bbb5e6e --- /dev/null +++ b/experiments/procesing/providers/__init__.py @@ -0,0 +1,5 @@ +from procesing.providers.base import DataProvider +from procesing.providers.supabase import SupabaseProvider +from procesing.providers.backend import BackendAPIProvider + +__all__ = ['DataProvider', 'SupabaseProvider', 'BackendAPIProvider'] diff --git a/experiments/procesing/providers/backend.py b/experiments/procesing/providers/backend.py new file mode 100755 index 0000000..9767136 --- /dev/null +++ b/experiments/procesing/providers/backend.py @@ -0,0 +1,19 @@ +import os +import pandas as pd +import requests +from typing import List +from procesing.providers.base import DataProvider + +class BackendAPIProvider(DataProvider): + """Concrete backend API implementation""" + def __init__(self, backend_url: str = None): + self.backend_url = backend_url or os.getenv("BACKEND_URL", "http://localhost:5000") + def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: + resp = requests.get(f"{self.backend_url}/api/kafka/dump?topic={topic}") + resp.raise_for_status() + data = resp.json() + + if not data.get('success') or not data.get('data'): + return pd.DataFrame() + + return pd.DataFrame(data['data']) diff --git a/experiments/procesing/providers/base.py b/experiments/procesing/providers/base.py new file mode 100755 index 0000000..baee561 --- /dev/null +++ b/experiments/procesing/providers/base.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod +from typing import List +import pandas as pd + +class DataProvider(ABC): + """Abstract interface for data access, enables DI and testing""" + + @abstractmethod + def fetch_products(self, store_mode: str) -> pd.DataFrame: + """Fetch product catalog for given store mode""" + pass + + @abstractmethod + def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame: + """Fetch experiment metadata for given IDs""" + pass + + @abstractmethod + def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: + """Fetch data from Kafka topic via backend API""" + pass diff --git a/experiments/procesing/providers/supabase.py b/experiments/procesing/providers/supabase.py new file mode 100755 index 0000000..8da01f9 --- /dev/null +++ b/experiments/procesing/providers/supabase.py @@ -0,0 +1,35 @@ +import os +import pandas as pd +import requests +from typing import List +from supabase import create_client, Client +from procesing.providers.base import DataProvider +from dotenv import load_dotenv + +class SupabaseProvider(DataProvider): + """Concrete Supabase + backend API implementation""" + + def __init__(self, + supabase_url: str = None, + supabase_key: str = None,): + load_dotenv() + self.supabase_url = supabase_url or os.getenv("NEXT_PUBLIC_SUPABASE_URL") + self.supabase_key = supabase_key or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") + self.supabase: Client = create_client(self.supabase_url, self.supabase_key) + + def fetch_products(self, store_mode: str) -> pd.DataFrame: + resp = self.supabase.table(f'{store_mode}_products').select( + "id, room_type, date_index, metadata, availability" + ).execute() + return pd.DataFrame(resp.data) if resp.data else pd.DataFrame() + + def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame: + if not experiment_ids: + return pd.DataFrame() + + resp = self.supabase.table('experiments').select( + 'id, subject_name, xp_human_only, xp_market_mode, xp_task_id, ' + 'task:tasks(task_name, task_description, task_def_of_done)' + ).in_('id', experiment_ids).execute() + + return pd.DataFrame(resp.data) if resp.data else pd.DataFrame() diff --git a/experiments/procesing/steps/__init__.py b/experiments/procesing/steps/__init__.py new file mode 100755 index 0000000..6fa8779 --- /dev/null +++ b/experiments/procesing/steps/__init__.py @@ -0,0 +1,27 @@ +from procesing.steps.base import BaseContextStep +from procesing.steps.fetch import FetchInteractionsStep, FetchPriceLogsStep, FetchExperimentsStep +from procesing.steps.join import JoinExperimentsStep +from procesing.steps.augment import CreatePriceBucketsStep, AugmentEventNamesStep +from procesing.steps.chunk import ChunkByTimeWindowStep +from procesing.steps.demand import ComputeDemandStep, ComputeDemandForChunksStep +from procesing.steps.elasticity import AggregatePriceLogsStep, ComputeElasticityStep +from procesing.steps.pricing import StateSpace, BuildStateSpaceStep, FitPricingFunctionStep, PredictPricesStep + +__all__ = [ + 'BaseContextStep', + 'FetchInteractionsStep', + 'FetchPriceLogsStep', + 'FetchExperimentsStep', + 'JoinExperimentsStep', + 'CreatePriceBucketsStep', + 'AugmentEventNamesStep', + 'ChunkByTimeWindowStep', + 'ComputeDemandStep', + 'ComputeDemandForChunksStep', + 'AggregatePriceLogsStep', + 'ComputeElasticityStep', + 'StateSpace', + 'BuildStateSpaceStep', + 'FitPricingFunctionStep', + 'PredictPricesStep', +] diff --git a/experiments/procesing/steps/augment.py b/experiments/procesing/steps/augment.py new file mode 100755 index 0000000..a8b6506 --- /dev/null +++ b/experiments/procesing/steps/augment.py @@ -0,0 +1,53 @@ +import numpy as np +import pandas as pd +from procesing.steps.base import BaseContextStep + +class CreatePriceBucketsStep(BaseContextStep): + """Create price bucket labels from price data""" + + def transform(self, df: pd.DataFrame): + if df.empty or 'metadata_price' not in df.columns: + df['price_bucket'] = "" + return df + + n_buckets = self.context.config.get('n_price_buckets', 5) + + if df['metadata_price'].notnull().sum() > 0: + try: + price_buckets = pd.qcut( + df['metadata_price'], + q=n_buckets, + labels=[f"PB_{i+1}" for i in range(n_buckets)], + duplicates='drop' + ) + except ValueError: + # fallback for insufficient unique values + price_buckets = df['metadata_price'].apply( + lambda x: f"P_{int(x)}" if pd.notnull(x) else "" + ) + else: + price_buckets = pd.Series([""] * len(df), index=df.index) + + df['price_bucket'] = price_buckets + return df + + +class AugmentEventNamesStep(BaseContextStep): + """Augment event names with product and price bucket schema""" + + def transform(self, df: pd.DataFrame): + if df.empty: + return df + + # Create schema: _productId@price_bucket + has_product = df.get('productId', pd.Series()).notnull() + has_bucket = df.get('price_bucket', pd.Series()).notnull() + + df['metadata_schema'] = np.where( + has_product & has_bucket, + "_" + df['productId'].astype(str) + "@" + df['price_bucket'].astype(str), + "" + ) + + df['eventName'] = df['eventName'] + df['metadata_schema'] + return df diff --git a/experiments/procesing/steps/base.py b/experiments/procesing/steps/base.py new file mode 100755 index 0000000..054b777 --- /dev/null +++ b/experiments/procesing/steps/base.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from sklearn.base import BaseEstimator, TransformerMixin +from procesing.context import PipelineContext + +class BaseContextStep(BaseEstimator, TransformerMixin, ABC): + """ + Base for all pipeline steps. + Each step is stateless, context-driven, and performs ONE transformation. + """ + + def __init__(self, context: PipelineContext): + self.context = context + + def fit(self, X=None, y=None): + """Most steps don't need training""" + return self + + @abstractmethod + def transform(self, X): + """Transform input using context. Must be implemented by subclass.""" + pass + + def get_params(self, deep=True): + """sklearn compatibility""" + return {'context': self.context} + + def set_params(self, **params): + """sklearn compatibility""" + if 'context' in params: + self.context = params['context'] + return self diff --git a/experiments/procesing/steps/chunk.py b/experiments/procesing/steps/chunk.py new file mode 100755 index 0000000..6877599 --- /dev/null +++ b/experiments/procesing/steps/chunk.py @@ -0,0 +1,34 @@ +import pandas as pd +from procesing.steps.base import BaseContextStep + +class ChunkByTimeWindowStep(BaseContextStep): + """ + Chunk dataframe into time windows. + Returns list of dicts with window metadata. + """ + + def transform(self, df: pd.DataFrame): + if df.empty: + return [] + + df = df.copy() + ts_col = self.context.config.get('ts_col', 'ts') + window_size = self.context.window_size + + # ensure datetime + if not pd.api.types.is_datetime64_any_dtype(df[ts_col]): + df[ts_col] = pd.to_datetime(df[ts_col]) + + df = df.sort_values(ts_col) + df['_window'] = df[ts_col].dt.floor(window_size) + + chunks = [] + for idx, (window_start, group) in enumerate(df.groupby('_window')): + chunks.append({ + 'window_start': window_start, + 'window_end': window_start + pd.Timedelta(window_size), + 'window_idx': idx, + 'data': group.drop(columns=['_window']) + }) + + return chunks diff --git a/experiments/procesing/steps/demand.py b/experiments/procesing/steps/demand.py new file mode 100755 index 0000000..2f84985 --- /dev/null +++ b/experiments/procesing/steps/demand.py @@ -0,0 +1,61 @@ +import pandas as pd +from procesing.steps.base import BaseContextStep + +class ComputeDemandStep(BaseContextStep): + """ + Compute demand vector for a single time window or dataframe. + Input: single chunk dict OR raw dataframe + Output: demand dataframe with [productId, demand_score] + """ + + def transform(self, chunk): + # handle both chunk dict and raw dataframe + if isinstance(chunk, dict): + interactions = chunk['data'] + window_meta = {k: v for k, v in chunk.items() if k != 'data'} + else: + interactions = chunk + window_meta = {} + + products = self.context.products + unique_products = products['id'].unique() + + # apply filters if configured + session_filter = self.context.config.get('session_filter') + experiment_filter = self.context.config.get('experiment_filter') + + if session_filter and 'sessionId' in interactions.columns: + interactions = interactions[interactions['sessionId'] == session_filter] + if experiment_filter and 'experimentId' in interactions.columns: + interactions = interactions[interactions['experimentId'] == experiment_filter] + + interactions_with_products = interactions.dropna(subset=['productId']) + + if interactions_with_products.empty: + demand_df = pd.DataFrame({ + 'productId': unique_products, + 'demand_score': 0 + }) + else: + # crosstab for simple demand count + demand_df = pd.crosstab( + interactions_with_products['productId'], + 'count' + ).reindex(unique_products, fill_value=0).reset_index() + demand_df.columns = ['productId', 'demand_score'] + + # attach window metadata if present + if window_meta: + return {**window_meta, 'demand_vector': demand_df} + return demand_df + + +class ComputeDemandForChunksStep(BaseContextStep): + """Apply ComputeDemandStep to list of chunks""" + + def transform(self, chunks: list): + if not chunks: + return [] + + demand_step = ComputeDemandStep(self.context) + return [demand_step.transform(chunk) for chunk in chunks] diff --git a/experiments/procesing/steps/elasticity.py b/experiments/procesing/steps/elasticity.py new file mode 100755 index 0000000..d65bc43 --- /dev/null +++ b/experiments/procesing/steps/elasticity.py @@ -0,0 +1,253 @@ +import numpy as np +import pandas as pd +from typing import Dict, List +from procesing.steps.base import BaseContextStep + +class AggregatePriceLogsStep(BaseContextStep): + """ + Aggregate price logs into time windows using VECTORIZED operations. + Input: price_logs_df + Output: list of price chunks with [productId, price] + """ + + def transform(self, price_logs_df: pd.DataFrame): + if price_logs_df.empty: + return [] + + df = price_logs_df.copy() + ts_col = self.context.config.get('ts_col', 'ts') + window_size = self.context.window_size + + # ensure datetime + if not pd.api.types.is_datetime64_any_dtype(df[ts_col]): + df[ts_col] = pd.to_datetime(df[ts_col]) + + df = df.sort_values([ts_col, 'productId']) + products = self.context.products + unique_products = products['id'].unique() + + # VECTORIZED: group by product, resample by time window, compute mean + df_indexed = df.set_index(ts_col) + + windowed = ( + df_indexed + .groupby('productId')['price'] + .resample(window_size) + .mean() + .reset_index() + ) + + # forward fill missing windows (carry last known price) + windowed = windowed.sort_values([ts_col, 'productId']) + windowed['price'] = windowed.groupby('productId')['price'].ffill() + windowed = windowed.dropna(subset=['price']) + + # group into chunks by window + chunks = [] + for window_start, group in windowed.groupby(ts_col): + price_vector = group[['productId', 'price']].copy() + + # fill missing products with last known price before this window + missing_products = set(unique_products) - set(price_vector['productId']) + if missing_products: + for pid in missing_products: + last_price = df_indexed[ + (df_indexed['productId'] == pid) & + (df_indexed.index < window_start) + ]['price'] + + if not last_price.empty: + price_vector = pd.concat([ + price_vector, + pd.DataFrame({'productId': [pid], 'price': [last_price.iloc[-1]]}) + ], ignore_index=True) + + if not price_vector.empty: + chunks.append({ + 'window_start': window_start, + 'window_end': window_start + pd.Timedelta(window_size), + 'price_vector': price_vector + }) + + return chunks + + +class ComputeElasticityStep(BaseContextStep): + """ + Compute price elasticity from demand and price chunks. + Input: (demand_chunks, price_chunks) + Output: elasticity_df [productId, elasticity, std_error, n_obs] + """ + + def transform(self, chunk_tuple: tuple): + demand_chunks, price_chunks = chunk_tuple + + method = self.context.config.get('elasticity_method', 'point') + min_obs = self.context.config.get('min_observations', 2) + + products = self.context.products + all_product_ids = products['id'].unique() + + # align chunks by window_start + aligned = self._align_chunks(demand_chunks, price_chunks) + + if not aligned: + return pd.DataFrame({ + 'productId': all_product_ids, + 'elasticity': 0.0, + 'std_error': 0.0, + 'n_obs': 0 + }) + + # build time series per product + product_series = self._build_timeseries(aligned) + + # compute elasticity per product + elasticities = [] + for pid, series in product_series.items(): + if len(series) < min_obs: + elasticities.append({ + 'productId': pid, + 'elasticity': 0.0, + 'std_error': 0.0, + 'n_obs': len(series) + }) + continue + + elast = self._compute_elasticity(series, method) + elasticities.append({ + 'productId': pid, + 'elasticity': elast['value'], + 'std_error': elast.get('std_error', 0.0), + 'n_obs': len(series) + }) + + result_df = pd.DataFrame(elasticities) + + # fill missing products with zero elasticity + observed_pids = set(result_df['productId']) + missing_pids = [p for p in all_product_ids if p not in observed_pids] + + if missing_pids: + missing_df = pd.DataFrame({ + 'productId': missing_pids, + 'elasticity': 0.0, + 'std_error': 0.0, + 'n_obs': 0 + }) + result_df = pd.concat([result_df, missing_df], ignore_index=True) + + return result_df + + def _align_chunks(self, demand_chunks: List[Dict], price_chunks: List[Dict]): + """Align demand and price chunks by window_start""" + price_lookup = {c['window_start']: c for c in price_chunks} + aligned = [] + + for dc in demand_chunks: + ws = dc['window_start'] + if ws in price_lookup: + aligned.append({ + 'window_start': ws, + 'window_end': dc['window_end'], + 'demand': dc['demand_vector'], + 'prices': price_lookup[ws]['price_vector'] + }) + + return aligned + + def _build_timeseries(self, aligned: List[Dict]): + """Build time series [timestamp, price, quantity] per product""" + series_by_product = {} + + for chunk in aligned: + merged = chunk['demand'].merge(chunk['prices'], on='productId', how='inner') + + for _, row in merged.iterrows(): + pid = row['productId'] + if pid not in series_by_product: + series_by_product[pid] = [] + + series_by_product[pid].append({ + 'timestamp': chunk['window_start'], + 'price': row['price'], + 'quantity': row['demand_score'] + }) + + return series_by_product + + def _compute_elasticity(self, series: List[Dict], method: str): + """Compute point or arc elasticity""" + prices = np.array([s['price'] for s in series]) + quantities = np.array([s['quantity'] for s in series]) + + # filter out zero/negative values + valid = (prices > 0) & (quantities > 0) + if valid.sum() < 2: + return {'value': 0.0, 'std_error': 0.0} + + prices = prices[valid] + quantities = quantities[valid] + + if method == 'point': + return self._point_elasticity(prices, quantities) + elif method == 'arc': + return self._arc_elasticity(prices, quantities) + else: + raise ValueError(f"Unknown elasticity method: {method}") + + def _point_elasticity(self, prices: np.ndarray, quantities: np.ndarray): + """Point elasticity via log-log regression: log(Q) = a + b*log(P), elasticity = b""" + if len(prices) < 2: + return {'value': 0.0, 'std_error': 0.0} + + log_p = np.log(prices) + log_q = np.log(quantities) + + if log_p.std() == 0: + return {'value': 0.0, 'std_error': 0.0} + + cov = np.cov(log_p, log_q)[0, 1] + var = np.var(log_p) + b = cov / var + + # std error estimate + if len(prices) > 2: + residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean())) + mse = (residuals ** 2).sum() / (len(prices) - 2) + se_b = np.sqrt(mse / (len(prices) * var)) + else: + se_b = 0.0 + + return {'value': b, 'std_error': se_b} + + def _arc_elasticity(self, prices: np.ndarray, quantities: np.ndarray): + """Arc elasticity: average period-over-period elasticity""" + elasticities = [] + + for i in range(1, len(prices)): + p1, p2 = prices[i-1], prices[i] + q1, q2 = quantities[i-1], quantities[i] + + p_avg = (p1 + p2) / 2 + q_avg = (q1 + q2) / 2 + + if p_avg == 0 or q_avg == 0: + continue + + delta_p = p2 - p1 + delta_q = q2 - q1 + + if delta_p == 0: + continue + + e = (delta_q / q_avg) / (delta_p / p_avg) + elasticities.append(e) + + if not elasticities: + return {'value': 0.0, 'std_error': 0.0} + + return { + 'value': np.mean(elasticities), + 'std_error': np.std(elasticities) / np.sqrt(len(elasticities)) + } diff --git a/experiments/procesing/steps/fetch.py b/experiments/procesing/steps/fetch.py new file mode 100755 index 0000000..cde2b55 --- /dev/null +++ b/experiments/procesing/steps/fetch.py @@ -0,0 +1,46 @@ +import pandas as pd +from procesing.steps.base import BaseContextStep + +class FetchInteractionsStep(BaseContextStep): + """Fetch raw interaction data from Kafka topic""" + + def transform(self, X=None): + df = self.context.provider.fetch_kafka_topic('user-interactions') + + if df.empty: + return df + + # Explode metadata JSON column + if 'metadata' in df.columns: + df = df.join( + pd.json_normalize(df.pop('metadata'), sep='.').add_prefix('metadata_') + ) + + df = df.dropna(subset=['eventName']) + + # Remap dateIndex if present + if 'metadata_dateIndex' in df.columns: + df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') + + return df + + +class FetchPriceLogsStep(BaseContextStep): + """Fetch price log data from Kafka topic""" + + def transform(self, X=None): + return self.context.provider.fetch_kafka_topic('price-logs') + + +class FetchExperimentsStep(BaseContextStep): + """Fetch experiment metadata for given interaction data""" + + def transform(self, interactions_df: pd.DataFrame): + if interactions_df.empty or 'experimentId' not in interactions_df.columns: + return pd.DataFrame() + + exp_ids = interactions_df['experimentId'].dropna().unique().tolist() + if not exp_ids: + return pd.DataFrame() + + return self.context.provider.fetch_experiments(exp_ids) diff --git a/experiments/procesing/steps/join.py b/experiments/procesing/steps/join.py new file mode 100755 index 0000000..5567f0f --- /dev/null +++ b/experiments/procesing/steps/join.py @@ -0,0 +1,34 @@ +import pandas as pd +from procesing.steps.base import BaseContextStep + +class JoinExperimentsStep(BaseContextStep): + """Join experiment metadata to interactions""" + + def transform(self, data: tuple): + """ + Args: + data: (interactions_df, experiments_df) + Returns: + merged interactions dataframe + """ + interactions_df, experiments_df = data + + if experiments_df.empty: + return interactions_df + + # Flatten nested task field if present + if 'task' in experiments_df.columns and experiments_df['task'].notnull().any(): + task_norm = pd.json_normalize(experiments_df['task'].dropna()) + task_norm.index = experiments_df[experiments_df['task'].notnull()].index + experiments_df = experiments_df.drop('task', axis=1).join(task_norm, rsuffix='_task') + + # Rename for clarity + experiments_df = experiments_df.rename(columns={ + 'id': 'experimentId', + 'subject_name': 'exp_subject', + 'xp_human_only': 'exp_human_only', + 'xp_market_mode': 'exp_market_mode', + 'xp_task_id': 'exp_task_id' + }) + + return interactions_df.merge(experiments_df, on='experimentId', how='left') diff --git a/experiments/procesing/steps/pricing.py b/experiments/procesing/steps/pricing.py new file mode 100755 index 0000000..c99f83e --- /dev/null +++ b/experiments/procesing/steps/pricing.py @@ -0,0 +1,149 @@ +import numpy as np +import pandas as pd +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field +from procesing.steps.base import BaseContextStep +from procesing.pricers import ElasticityBasedPricer + +@dataclass +class StateSpace: + """ + State representation for pricing functions. + + Components: + Q_t: demand ∈ R^n (current demand signal per product) + P_t: prices ∈ R^n (current/base prices) + S_t: session_features (behavioral signals, interaction data) + H_t: history = {Q_{t-k}, P_{t-k}, S_{t-k}} for k in [1, history_length] + + Additionally stores: + - product_ids: product identifiers (n,) + - elasticity: price elasticity per product (n,) + - metadata: arbitrary context (experiment_id, timestamp, etc.) + """ + demand: np.ndarray # Q_t ∈ R^n + prices: np.ndarray # P_t ∈ R^n + session_features: pd.DataFrame = field(default_factory=pd.DataFrame) # S_t + + # augmented state components + product_ids: Optional[np.ndarray] = None + elasticity: Optional[np.ndarray] = None + + # historical trajectory H_t = {(Q_{t-k}, P_{t-k}, S_{t-k})} + history: List[Dict[str, Any]] = field(default_factory=list) + + # metadata for context + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """Validate dimensions.""" + n = len(self.demand) + assert len(self.prices) == n, "demand and prices must have same dimension" + if self.elasticity is not None: + assert len(self.elasticity) == n, "elasticity must match dimension" + if self.product_ids is not None: + assert len(self.product_ids) == n, "product_ids must match dimension" + + @property + def n_products(self) -> int: + """Number of products in state space.""" + return len(self.demand) + + def add_history(self, q: np.ndarray, p: np.ndarray, s: pd.DataFrame, max_length: int = 10): + """Append historical state to trajectory H_t.""" + self.history.append({'demand': q, 'prices': p, 'session_features': s}) + if len(self.history) > max_length: + self.history.pop(0) + + def get_history_window(self, k: int = 5) -> List[Dict[str, Any]]: + """Retrieve last k historical states.""" + return self.history[-k:] if len(self.history) >= k else self.history + + +class BuildStateSpaceStep(BaseContextStep): + """ + Build state space from elasticity, demand, and price data. + + Input: elasticity_df [productId, elasticity, ...], optional demand_df + Output: StateSpace instance with Q_t, P_t, elasticity, product_ids + """ + + def transform(self, elasticity_df: pd.DataFrame, demand_df: Optional[pd.DataFrame] = None): + products = self.context.products + + # extract base prices from product metadata + products_with_prices = products.copy() + if 'metadata' in products_with_prices.columns: + products_with_prices['base_price'] = products_with_prices['metadata'].apply( + lambda m: m.get('base_price', 0) if isinstance(m, dict) else 0 + ) + else: + products_with_prices['base_price'] = 0 + + # merge with elasticity + merged = products_with_prices[['id', 'base_price']].rename( + columns={'id': 'productId'} + ).merge( + elasticity_df[['productId', 'elasticity']], + on='productId', + how='left' + ).fillna({'elasticity': 0.0, 'base_price': 0.0}) + + # merge with demand if provided, else use default + if demand_df is not None and 'demand' in demand_df.columns: + merged = merged.merge( + demand_df[['productId', 'demand']], + on='productId', + how='left' + ).fillna({'demand': 0.0}) + demand_vector = merged['demand'].values + else: + # default: uniform demand or use elasticity as proxy + demand_vector = np.ones(len(merged)) * 10.0 + + return StateSpace( + demand=demand_vector, + prices=merged['base_price'].values, + session_features=pd.DataFrame(), + product_ids=merged['productId'].values, + elasticity=merged['elasticity'].values, + metadata={'timestamp': pd.Timestamp.now().isoformat()} + ) + + +class FitPricingFunctionStep(BaseContextStep): + """ + Fit pricing function using elasticity data. + Input: elasticity_df + Output: fitted pricing function instance + """ + + def transform(self, elasticity_df: pd.DataFrame): + pricing_class = self.context.config.get('pricing_function_class', ElasticityBasedPricer) + pricing_params = self.context.config.get('pricing_function_params', {}) + + pricer = pricing_class(**pricing_params) + pricer.fit(elasticity_df) + + return pricer + + +class PredictPricesStep(BaseContextStep): + """ + Predict optimal prices using fitted pricing function. + Input: (pricer, state_space) + Output: prices_df [productId, predicted_price] + """ + + def transform(self, data: tuple): + pricer, state_space = data + + products = self.context.products + product_ids = products['id'].values + + predicted_prices = pricer.predict(state_space) + + return pd.DataFrame({ + 'productId': product_ids, + 'predicted_price': predicted_prices + }) diff --git a/experiments/procesing/steps/session.py b/experiments/procesing/steps/session.py new file mode 100644 index 0000000..4329651 --- /dev/null +++ b/experiments/procesing/steps/session.py @@ -0,0 +1,114 @@ +""" +Session feature extraction for S_t component of state space. +Computes behavioral signals from interaction data already in pipeline. +""" +import pandas as pd +import numpy as np +from typing import Optional, Dict, Any +from collections import Counter +from procesing.steps.base import BaseContextStep + + +class ExtractSessionFeaturesStep(BaseContextStep): + """ + Extract session-level behavioral features from interaction logs. + + Input: interactions_df (user-interactions from earlier pipeline step) + Output: session_features DataFrame [sessionId, feature_1, feature_2, ...] + + Features computed: + - total_interactions: count of all events + - page_views, item_views, searches, cart_adds: event type counts + - hovers: hover event counts + - unique_products_viewed: distinct product IDs + - interaction_velocity: events per minute + - session_duration_sec: time span of session + - avg_time_between_events: mean inter-event time + - product_view_depth: max views for single product (attention signal) + """ + + def transform(self, interactions_df: pd.DataFrame) -> pd.DataFrame: + if interactions_df.empty: + return pd.DataFrame() + + # ensure timestamp column + if 'ts' in interactions_df.columns: + interactions_df = interactions_df.copy() + interactions_df['ts'] = pd.to_datetime(interactions_df['ts']) + + # group by session and compute features + session_features = [] + for session_id, session_df in interactions_df.groupby('sessionId'): + features = self._extract_features_for_session(session_id, session_df) + session_features.append(features) + + return pd.DataFrame(session_features) + + def _extract_features_for_session(self, session_id: str, session_df: pd.DataFrame) -> Dict[str, Any]: + """Compute features for single session.""" + features = {'sessionId': session_id} + + # basic counts + features['total_interactions'] = len(session_df) + + event_counts = session_df['eventName'].value_counts().to_dict() + features['page_views'] = event_counts.get('page_view', 0) + event_counts.get('view_item_page', 0) + features['item_views'] = event_counts.get('view_item_page', 0) + features['searches'] = event_counts.get('search', 0) + features['cart_adds'] = event_counts.get('add_item_to_cart', 0) + + # hover events + hover_events = ['hover_over_title', 'hover_over_paragraph', 'hover_over_link', 'hover_over_button'] + features['hovers'] = sum(event_counts.get(ev, 0) for ev in hover_events) + + # product-level signals + product_ids = session_df['productId'].dropna() + features['unique_products_viewed'] = product_ids.nunique() + + if len(product_ids) > 0: + product_view_counts = Counter(product_ids) + features['product_view_depth'] = max(product_view_counts.values()) + else: + features['product_view_depth'] = 0 + + # temporal features + if 'ts' in session_df.columns: + timestamps = session_df['ts'].sort_values() + features['session_duration_sec'] = (timestamps.max() - timestamps.min()).total_seconds() + + if features['session_duration_sec'] > 0: + features['interaction_velocity'] = (features['total_interactions'] / features['session_duration_sec']) * 60 + else: + features['interaction_velocity'] = 0.0 + + # inter-event timing + if len(timestamps) > 1: + time_diffs = timestamps.diff().dropna().dt.total_seconds() + features['avg_time_between_events'] = time_diffs.mean() + features['std_time_between_events'] = time_diffs.std() + else: + features['avg_time_between_events'] = 0.0 + features['std_time_between_events'] = 0.0 + else: + features['session_duration_sec'] = 0.0 + features['interaction_velocity'] = 0.0 + features['avg_time_between_events'] = 0.0 + features['std_time_between_events'] = 0.0 + + # cart/conversion signals + features['cart_to_view_ratio'] = features['cart_adds'] / features['item_views'] if features['item_views'] > 0 else 0.0 + + return features + + +class FilterSessionInteractionsStep(BaseContextStep): + """ + Filter interactions DataFrame to specific session. + + Input: (interactions_df, session_id) + Output: interactions_df filtered to session_id + """ + + def transform(self, data: tuple) -> pd.DataFrame: + interactions_df, session_id = data + return interactions_df[interactions_df['sessionId'] == session_id].copy() diff --git a/experiments/procesing/tests/__init__.py b/experiments/procesing/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/experiments/procesing/tests/conftest.py b/experiments/procesing/tests/conftest.py new file mode 100644 index 0000000..8715a86 --- /dev/null +++ b/experiments/procesing/tests/conftest.py @@ -0,0 +1,271 @@ +import pytest +import pandas as pd +from typing import List +from procesing.providers.base import DataProvider +from procesing.context import PipelineContext + + +class MockProvider(DataProvider): + """Mock provider for testing, holds in-memory fixtures""" + + def __init__(self, products_df=None, experiments_df=None, kafka_data=None): + self._products = products_df if products_df is not None else pd.DataFrame() + self._experiments = experiments_df if experiments_df is not None else pd.DataFrame() + self._kafka_data = kafka_data if kafka_data is not None else {} + + def fetch_products(self, store_mode: str) -> pd.DataFrame: + return self._products.copy() + + def fetch_experiments(self, experiment_ids: List[str]) -> pd.DataFrame: + if self._experiments.empty: + return pd.DataFrame() + return self._experiments[ + self._experiments['id'].isin(experiment_ids) + ].copy() + + def fetch_kafka_topic(self, topic: str) -> pd.DataFrame: + return self._kafka_data.get(topic, pd.DataFrame()).copy() + + +@pytest.fixture +def mock_products(): + """Standard product catalog fixture with realistic IDs from test data""" + return pd.DataFrame({ + 'id': [ + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '2cd7f756-fc65-4ba0-ab01-74521c1fff43' + ], + 'name': ['Junior Suite', 'Superior Room', 'Deluxe Room'], + 'base_price': [200.0, 150.0, 180.0] + }) + + +@pytest.fixture +def mock_interactions_raw_kafka(): + """Raw Kafka message structure for interactions, matches production format""" + return [ + { + 'partitionID': 0, 'offset': 203, 'timestamp': 1764102082676, + 'value': { + 'payload': { + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'eventName': 'learn_more_about_item', + 'page': '/hotel/products/d018efc1-25e9-4284-b276-80386e048b25', + 'productId': 'd018efc1-25e9-4284-b276-80386e048b25', + 'metadata': {'type': 'hotel', 'dateIndex': 1, 'roomType': 'Junior Suite'}, + 'storeMode': 'hotel', + 'ts': '2025-11-25T20:21:22.674Z' + } + } + }, + { + 'partitionID': 0, 'offset': 204, 'timestamp': 1764102086982, + 'value': { + 'payload': { + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'eventName': 'page_view', + 'page': '/hotel/products', + 'productId': None, + 'metadata': {'referrer': ''}, + 'storeMode': 'hotel', + 'ts': '2025-11-25T20:21:26.947Z' + } + } + }, + { + 'partitionID': 0, 'offset': 205, 'timestamp': 1764102091825, + 'value': { + 'payload': { + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'eventName': 'hover_over_title', + 'page': '/hotel/products', + 'productId': '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + 'metadata': {'elementText': 'Superior Room', 'dateIndex': 1, 'dwellTime': 1200}, + 'storeMode': 'hotel', + 'ts': '2025-11-25T20:21:31.823Z' + } + } + }, + { + 'partitionID': 0, 'offset': 206, 'timestamp': 1764102094193, + 'value': { + 'payload': { + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': 'bbbbcccc-dddd-eeee-ffff-000011112222', + 'eventName': 'hover_over_paragraph', + 'page': '/hotel/products', + 'productId': '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + 'metadata': {'elementText': 'price', 'dateIndex': 1, 'dwellTime': 1307}, + 'storeMode': 'hotel', + 'ts': '2025-11-25T20:21:34.191Z' + } + } + }, + { + 'partitionID': 0, 'offset': 207, 'timestamp': 1764102101970, + 'value': { + 'payload': { + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': 'bbbbcccc-dddd-eeee-ffff-000011112222', + 'eventName': 'hover_over_paragraph', + 'page': '/hotel/products', + 'productId': 'd018efc1-25e9-4284-b276-80386e048b25', + 'metadata': {'elementText': 'price', 'dateIndex': 1, 'dwellTime': 1201}, + 'storeMode': 'hotel', + 'ts': '2025-11-25T20:21:41.967Z' + } + } + } + ] + + +@pytest.fixture +def mock_interactions(mock_interactions_raw_kafka): + """Processed interaction DataFrame (what provider.fetch_kafka_topic returns)""" + records = [msg['value']['payload'] for msg in mock_interactions_raw_kafka] + df = pd.DataFrame(records) + df['timestamp'] = pd.to_datetime(df['ts']) + return df + + +@pytest.fixture +def mock_price_logs_raw_kafka(): + """Raw Kafka message structure for price logs, matches production format""" + return [ + { + 'partitionID': 0, 'offset': 32, 'timestamp': 1764104757969, + 'value': { + 'payload': { + 'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43', + 'price': 162.47, + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'storeMode': 'shop', + 'ts': '2025-11-25T21:05:57.967Z' + } + } + }, + { + 'partitionID': 0, 'offset': 33, 'timestamp': 1764104757995, + 'value': { + 'payload': { + 'productId': '2ddabbfc-4127-48fc-86dc-ebc4c677efa2', + 'price': 743.49, + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'storeMode': 'shop', + 'ts': '2025-11-25T21:05:57.993Z' + } + } + }, + { + 'partitionID': 0, 'offset': 34, 'timestamp': 1764104758011, + 'value': { + 'payload': { + 'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43', + 'price': 163.87, + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'storeMode': 'shop', + 'ts': '2025-11-25T21:05:58.009Z' + } + } + }, + { + 'partitionID': 0, 'offset': 35, 'timestamp': 1764104758050, + 'value': { + 'payload': { + 'productId': '2ddabbfc-4127-48fc-86dc-ebc4c677efa2', + 'price': 397.46, + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'storeMode': 'shop', + 'ts': '2025-11-25T21:05:58.049Z' + } + } + }, + { + 'partitionID': 0, 'offset': 36, 'timestamp': 1764104768865, + 'value': { + 'payload': { + 'productId': '2cd7f756-fc65-4ba0-ab01-74521c1fff43', + 'price': 401.66, + 'sessionId': 'd423ce8a-77aa-4c9a-94d4-d1adddcc3472', + 'experimentId': '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', + 'storeMode': 'shop', + 'ts': '2025-11-25T21:06:08.864Z' + } + } + } + ] + + +@pytest.fixture +def mock_price_logs(mock_price_logs_raw_kafka): + """Processed price logs DataFrame (what provider.fetch_kafka_topic returns)""" + # extract payloads and flatten + records = [msg['value']['payload'] for msg in mock_price_logs_raw_kafka] + df = pd.DataFrame(records) + df['timestamp'] = pd.to_datetime(df['ts']) + return df + + +@pytest.fixture +def mock_experiments(): + """Standard experiment metadata fixture matching Supabase schema""" + return pd.DataFrame({ + 'id': ['53aefd07-f66a-4d7f-ba8b-7ea1fc562d35', 'bbbbcccc-dddd-eeee-ffff-000011112222'], + 'created_at': pd.to_datetime(['2025-11-25T20:00:00Z', '2025-11-26T10:00:00Z']), + 'subject_name': ['Session A', 'Session B'], + 'xp_human_only': [True, False], + 'xp_market_mode': ['hotel', 'shop'], + 'xp_task_id': [None, None] + }) + + +@pytest.fixture +def mock_provider(mock_products, mock_experiments, mock_interactions, mock_price_logs): + """Fully configured mock provider""" + return MockProvider( + products_df=mock_products, + experiments_df=mock_experiments, + kafka_data={ + 'user-interactions': mock_interactions, + 'price-logs': mock_price_logs + } + ) + + +@pytest.fixture +def pipeline_context(mock_provider): + """Standard pipeline context for testing""" + return PipelineContext( + provider=mock_provider, + store_mode='hotel', + window_size='30s', + n_price_buckets=3 + ) + + +@pytest.fixture +def empty_provider(): + """Provider with no data, for edge case testing""" + return MockProvider( + products_df=pd.DataFrame(columns=['id', 'name', 'base_price']), + experiments_df=pd.DataFrame(columns=['id', 'created_at', 'subject_name', 'xp_human_only', 'xp_market_mode', 'xp_task_id']), + kafka_data={'user-interactions': pd.DataFrame(), 'price-logs': pd.DataFrame()} + ) + + +@pytest.fixture +def empty_context(empty_provider): + """Context with empty provider""" + return PipelineContext( + provider=empty_provider, + store_mode='hotel', + window_size='30s' + ) diff --git a/experiments/procesing/tests/test_augement.py b/experiments/procesing/tests/test_augement.py new file mode 100644 index 0000000..3ae6e25 --- /dev/null +++ b/experiments/procesing/tests/test_augement.py @@ -0,0 +1,45 @@ +import pytest +import random +import pandas as pd +from procesing.steps import ( + CreatePriceBucketsStep, + AugmentEventNamesStep +) + +def test_bucketing(pipeline_context): + step = CreatePriceBucketsStep(context=pipeline_context) + + # Test with normal price data + df = pd.DataFrame({ + 'metadata_price': random.sample(range(10, 1000), 100) + }) + result = step.transform(df) + assert 'price_bucket' in result.columns + # test if is categorical + assert isinstance(result['price_bucket'].dtype, pd.CategoricalDtype) + assert result['price_bucket'].nunique() == 3 # as per context config + # distribution check + counts = result['price_bucket'].value_counts() + assert all(counts > 0) + assert counts.max() - counts.min() <= 10 # roughly equal distribution for 100 samples + # Test with empty DataFrame + df = pd.DataFrame() + result = step.transform(df) + assert 'price_bucket' in result.columns + assert result.empty + + +def test_augment_names(pipeline_context): + df = pd.DataFrame({ + 'eventName': ['click', 'view', 'purchase'], + 'productId': ['prod_1', 'prod_2', None], + 'price_bucket': ['PB_1', None, 'PB_3'] + }) + step = AugmentEventNamesStep(context=pipeline_context) + result = step.transform(df) + expected_event_names = [ + 'click_prod_1@PB_1', + 'view', + 'purchase' + ] + assert result['eventName'].tolist() == expected_event_names diff --git a/experiments/procesing/tests/test_demand.py b/experiments/procesing/tests/test_demand.py new file mode 100644 index 0000000..18dce5d --- /dev/null +++ b/experiments/procesing/tests/test_demand.py @@ -0,0 +1,49 @@ +import pytest +import random +import pandas as pd +from procesing.steps import ( + ComputeDemandStep +) + +def test_compute_demand(pipeline_context): + step = ComputeDemandStep(context=pipeline_context) + + # Test with normal interaction data + df = pd.DataFrame({ + 'ts': pd.date_range(start='2023-01-01', periods=100, freq='h'), + 'productId': random.choices([ + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '2cd7f756-fc65-4ba0-ab01-74521c1fff43' + ], k=100), + 'eventName': random.choices(['view', 'click', 'purchase'], k=100) + }) + result = step.transform(df) + assert type(result) == pd.DataFrame + assert not result.empty + assert set(result['productId']) == set(pipeline_context.products['id']) + assert all(result['demand_score'] > 100/3 -10) + + +def test_compute_demand_skewed(pipeline_context): + step = ComputeDemandStep(context=pipeline_context) + + # Test with normal interaction data + df = pd.DataFrame({ + 'ts': pd.date_range(start='2023-01-01', periods=100, freq='h'), + 'productId': random.choices([ + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '2cd7f756-fc65-4ba0-ab01-74521c1fff43' + ], weights=[0.7, 0.2, 0.1], k=100), + 'eventName': random.choices(['view', 'click', 'purchase'], k=100) + }) + result = step.transform(df) + assert type(result) == pd.DataFrame + assert not result.empty + assert set(result['productId']) == set(pipeline_context.products['id']) + # test for skewness + scores = result.set_index('productId')['demand_score'].to_dict() + assert scores['d018efc1-25e9-4284-b276-80386e048b25'] > \ + scores['51266ddb-5b07-47b7-89ee-5b5cae94bb11'] > \ + scores['2cd7f756-fc65-4ba0-ab01-74521c1fff43'] diff --git a/experiments/procesing/tests/test_elasticity.py b/experiments/procesing/tests/test_elasticity.py new file mode 100644 index 0000000..2172c78 --- /dev/null +++ b/experiments/procesing/tests/test_elasticity.py @@ -0,0 +1,353 @@ +import pytest +import pandas as pd +import numpy as np +from procesing.steps import ( + AggregatePriceLogsStep, + ComputeElasticityStep +) + + +def test_aggregate_price_logs_basic(pipeline_context): + """Test basic price aggregation into time windows""" + step = AggregatePriceLogsStep(pipeline_context) + + # Create price logs with known window structure + df = pd.DataFrame({ + 'ts': pd.date_range(start='2023-01-01 10:00:00', periods=100, freq='10s'), + 'productId': np.tile([ + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '2cd7f756-fc65-4ba0-ab01-74521c1fff43' + ], 34)[:100], + 'price': np.random.uniform(100, 200, 100) + }) + + result = step.transform(df) + assert isinstance(result, list) + assert len(result) > 0 + # each chunk should have window metadata and price vector + for chunk in result: + assert 'window_start' in chunk + assert 'window_end' in chunk + assert 'price_vector' in chunk + assert isinstance(chunk['price_vector'], pd.DataFrame) + assert 'productId' in chunk['price_vector'].columns + assert 'price' in chunk['price_vector'].columns + + +def test_aggregate_price_logs_handles_gaps(pipeline_context): + """Test that price aggregation forward-fills missing windows""" + step = AggregatePriceLogsStep(pipeline_context) + + # create sparse data with gaps + df = pd.DataFrame({ + 'ts': pd.to_datetime([ + '2023-01-01 10:00:00', + '2023-01-01 10:00:05', + '2023-01-01 10:02:00', # gap of ~2 mins + '2023-01-01 10:02:30' + ]), + 'productId': [ + 'd018efc1-25e9-4284-b276-80386e048b25', + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11' + ], + 'price': [100, 102, 150, 153] + }) + + result = step.transform(df) + assert isinstance(result, list) + # should have multiple windows despite gaps + assert len(result) >= 2 + + +def test_compute_elasticity_with_known_relationship(pipeline_context): + """Test elasticity computation with known price-demand relationship""" + step = ComputeElasticityStep(pipeline_context) + + # simulate elastic demand: when price ↑10%, demand ↓15% (elasticity ~ -1.5) + base_price = 100 + base_demand = 50 + + demand_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [base_demand] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [base_demand * 0.85] # 15% decrease + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:01:00'), + 'window_end': pd.Timestamp('2023-01-01 10:01:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [base_demand * 0.70] # further decrease + }) + } + ] + + price_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [base_price] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [base_price * 1.10] # 10% increase + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:01:00'), + 'window_end': pd.Timestamp('2023-01-01 10:01:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [base_price * 1.20] # 20% increase + }) + } + ] + + result = step.transform((demand_chunks, price_chunks)) + assert isinstance(result, pd.DataFrame) + assert not result.empty + assert 'productId' in result.columns + assert 'elasticity' in result.columns + assert 'n_obs' in result.columns + + # check elasticity is negative (normal good) + product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] + assert len(product_elast) == 1 + assert product_elast.iloc[0]['elasticity'] < 0 + # should be roughly elastic (< -1) + assert product_elast.iloc[0]['n_obs'] == 3 + + +def test_compute_elasticity_inelastic_product(pipeline_context): + """Test with inelastic demand: price changes, demand barely moves""" + step = ComputeElasticityStep(pipeline_context) + + base_price = 150 + base_demand = 40 + + demand_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], + 'demand_score': [base_demand] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'demand_vector': pd.DataFrame({ + 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], + 'demand_score': [base_demand * 0.98] # tiny 2% decrease + }) + } + ] + + price_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], + 'price': [base_price] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'price_vector': pd.DataFrame({ + 'productId': ['51266ddb-5b07-47b7-89ee-5b5cae94bb11'], + 'price': [base_price * 1.20] # 20% increase + }) + } + ] + + result = step.transform((demand_chunks, price_chunks)) + product_elast = result[result['productId'] == '51266ddb-5b07-47b7-89ee-5b5cae94bb11'] + assert len(product_elast) == 1 + # inelastic: elasticity between 0 and -1 + assert -1 < product_elast.iloc[0]['elasticity'] < 0 + + +def test_compute_elasticity_multiple_products(pipeline_context): + """Test elasticity computation across multiple products simultaneously""" + step = ComputeElasticityStep(pipeline_context) + + products = [ + 'd018efc1-25e9-4284-b276-80386e048b25', + '51266ddb-5b07-47b7-89ee-5b5cae94bb11', + '2cd7f756-fc65-4ba0-ab01-74521c1fff43' + ] + + # create 5 time windows with all 3 products + demand_chunks = [] + price_chunks = [] + + for i in range(5): + ts = pd.Timestamp('2023-01-01 10:00:00') + pd.Timedelta(f'{i*30}s') + + demand_chunks.append({ + 'window_start': ts, + 'window_end': ts + pd.Timedelta('30s'), + 'demand_vector': pd.DataFrame({ + 'productId': products, + 'demand_score': [ + 50 * (0.9 ** i), # elastic: decreases as price rises + 40 * (0.98 ** i), # inelastic: barely changes + 30 * (0.85 ** i) # very elastic + ] + }) + }) + + price_chunks.append({ + 'window_start': ts, + 'window_end': ts + pd.Timedelta('30s'), + 'price_vector': pd.DataFrame({ + 'productId': products, + 'price': [ + 100 * (1.05 ** i), + 150 * (1.10 ** i), + 120 * (1.08 ** i) + ] + }) + }) + + result = step.transform((demand_chunks, price_chunks)) + assert isinstance(result, pd.DataFrame) + assert len(result) == 3 # all products should have elasticity + assert set(result['productId']) == set(products) + assert all(result['n_obs'] == 5) + assert all(result['elasticity'] < 0) # all normal goods + + +def test_compute_elasticity_insufficient_data(pipeline_context): + """Test behavior with insufficient observations""" + step = ComputeElasticityStep(pipeline_context) + + # only 1 observation + demand_chunks = [{ + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [50] + }) + }] + + price_chunks = [{ + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [100] + }) + }] + + result = step.transform((demand_chunks, price_chunks)) + # should still return result but with low n_obs + product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] + assert len(product_elast) == 1 + assert product_elast.iloc[0]['n_obs'] == 1 + assert product_elast.iloc[0]['elasticity'] == 0.0 # not enough data + + +def test_compute_elasticity_misaligned_chunks(pipeline_context): + """Test with non-overlapping demand and price windows""" + step = ComputeElasticityStep(pipeline_context) + + demand_chunks = [{ + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [50] + }) + }] + + price_chunks = [{ + 'window_start': pd.Timestamp('2023-01-01 11:00:00'), # different time + 'window_end': pd.Timestamp('2023-01-01 11:00:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [100] + }) + }] + + result = step.transform((demand_chunks, price_chunks)) + # should handle gracefully with no aligned data + assert isinstance(result, pd.DataFrame) + assert all(result['n_obs'] == 0) + + +def test_elasticity_arc_method(pipeline_context): + """Test arc elasticity computation method""" + # configure context for arc method + pipeline_context.config['elasticity_method'] = 'arc' + step = ComputeElasticityStep(pipeline_context) + + demand_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [100] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'demand_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'demand_score': [80] + }) + } + ] + + price_chunks = [ + { + 'window_start': pd.Timestamp('2023-01-01 10:00:00'), + 'window_end': pd.Timestamp('2023-01-01 10:00:30'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [100] + }) + }, + { + 'window_start': pd.Timestamp('2023-01-01 10:00:30'), + 'window_end': pd.Timestamp('2023-01-01 10:01:00'), + 'price_vector': pd.DataFrame({ + 'productId': ['d018efc1-25e9-4284-b276-80386e048b25'], + 'price': [110] + }) + } + ] + + result = step.transform((demand_chunks, price_chunks)) + product_elast = result[result['productId'] == 'd018efc1-25e9-4284-b276-80386e048b25'] + assert len(product_elast) == 1 + assert product_elast.iloc[0]['elasticity'] < 0 + # reset config + pipeline_context.config['elasticity_method'] = 'point' diff --git a/experiments/procesing/tests/test_fetch.py b/experiments/procesing/tests/test_fetch.py new file mode 100644 index 0000000..9602b16 --- /dev/null +++ b/experiments/procesing/tests/test_fetch.py @@ -0,0 +1,51 @@ +import pytest +import pandas as pd +from procesing.steps import ( + FetchInteractionsStep, + FetchPriceLogsStep, + FetchExperimentsStep, +) + + +def test_fetch_interactions_data(pipeline_context): + step = FetchInteractionsStep(pipeline_context) + data = step.transform(None) + assert data is not None + assert isinstance(data, pd.DataFrame) + expected_cols = [ + "eventName", + "dateIndex", + "experimentId", + "storeMode", + "metadata_elementText" + ] + for expected in expected_cols: + assert expected in data.columns + +def test_fetch_price_logs(pipeline_context): + step = FetchPriceLogsStep(pipeline_context) + data = step.transform(None) + assert data is not None + assert isinstance(data, pd.DataFrame) + expected_cols = [ + "price", + "productId" + ] + for expected in expected_cols: + assert expected in data.columns + prices = data['price'].to_list() + assert min(prices) >= 0 + assert max(prices) <= 9999 + + +def test_experiments_fetching(pipeline_context): + interactions = FetchInteractionsStep(pipeline_context).transform(None) + assert interactions is not None + experiments = FetchExperimentsStep(pipeline_context) + experiment_data = experiments.transform(interactions) + assert experiment_data is not None + assert isinstance(experiment_data, pd.DataFrame) + assert not experiment_data.empty + assert 'id' in experiment_data.columns + assert len(experiment_data) == 2 + assert '53aefd07-f66a-4d7f-ba8b-7ea1fc562d35' in experiment_data['id'].values diff --git a/experiments/procesing/tests/test_pricing.py b/experiments/procesing/tests/test_pricing.py new file mode 100644 index 0000000..4fef8f0 --- /dev/null +++ b/experiments/procesing/tests/test_pricing.py @@ -0,0 +1,87 @@ +import pytest +import pandas as pd + +from procesing.pricers import ( + StaticPricer, + RandomPricer, + ElasticityBasedPricer +) + + +def test_static_pricer_fit_and_predict(): + # Sample historical data + historical_data = pd.DataFrame({ + 'product_id': [1, 2, 3], + 'base_price': [100.0, 150.0, 200.0] + }) + + # Initialize and fit StaticPricer + pricer = StaticPricer() + pricer.fit(historical_data) + + # Predict prices + predicted_prices = pricer.predict(None) + + # Assert that predicted prices match base prices + expected_prices = historical_data['base_price'].values + assert all(predicted_prices == expected_prices), "Predicted prices do not match base prices" + + +def test_random_pricer_fit_and_predict(): + # Sample historical data + historical_data = pd.DataFrame({ + 'product_id': [1, 2, 3], + 'base_price': [100.0, 150.0, 200.0] + }) + + # Initialize and fit RandomPricer + pricer = RandomPricer(price_min=50.0, price_max=250.0, seed=42) + pricer.fit(historical_data) + + # Predict prices + predicted_prices = pricer.predict(None) + + # Assert that predicted prices are within bounds + assert predicted_prices.min() >= 50.0, "Predicted prices are below minimum bound" + assert predicted_prices.max() <= 250.0, "Predicted prices are above maximum bound" + # distribution check (not so strict) + assert len(set(predicted_prices)) > 1, "Predicted prices are not varied enough" + assert len(predicted_prices) == len(historical_data), "Number of predicted prices does not match number of products" + +def test_elasticity_based_pricer_fit_and_predict(): + # Sample historical data + historical_data = pd.DataFrame({ + 'productId': [1, 2, 3], + 'elasticity': [-1.5, -0.5, -2.0], + 'base_price': [100.0, 150.0, 200.0], + 'mean_demand': [10, 20, 15] + }) + + # Initialize and fit ElasticityBasedPricer + pricer = ElasticityBasedPricer(alpha=0.1, price_floor=50.0, price_ceil=300.0) + pricer.fit(historical_data) + + # Create a mock state space with demand deviations + class MockStateSpace: + def __init__(self, demand): + self.demand = demand + + # Simulate demand higher than mean for all products + state_space = MockStateSpace(demand=[15, 25, 20]) + + # Predict prices + predicted_prices = pricer.predict(state_space) + + # Assert that predicted prices are within bounds + assert predicted_prices.min() >= 50.0, "Predicted prices are below minimum bound" + assert predicted_prices.max() <= 300.0, "Predicted prices are above maximum bound" + assert len(predicted_prices) == len(historical_data), "Number of predicted prices does not match number of products" + + # now we gotta check semantic validity + # since demand is higher than mean, prices should generally increase + for i, row in historical_data.iterrows(): + base_price = row['base_price'] + elasticity = row['elasticity'] + expected_increase = base_price * (1 + 0.1 * abs(elasticity) * ((state_space.demand[i] - row['mean_demand']) / row['mean_demand'])) + assert predicted_prices[i] >= base_price, f"Predicted price for product {row['productId']} did not increase as expected" + assert abs(predicted_prices[i] - expected_increase) < 1e-5, f"Predicted price for product {row['productId']} does not match expected calculation within 1e-5 tolerance" diff --git a/experiments/pytest.ini b/experiments/pytest.ini new file mode 100644 index 0000000..37c977f --- /dev/null +++ b/experiments/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +pythonpath = . +testpaths = procesing/tests agents +python_files = test*.py +python_classes = Test* +python_functions = test_* +asyncio_mode = auto +asyncio_default_fixture_loop_scope = function diff --git a/lib/model_registry.py b/lib/model_registry.py new file mode 100755 index 0000000..08233a0 --- /dev/null +++ b/lib/model_registry.py @@ -0,0 +1,139 @@ +import redis +import pickle +import json +import pandas as pd +from typing import Optional, Dict, Any +import os +import logging +log = logging.getLogger(__name__) + +class ModelRegistry: + """ + Lightweight model registry using Redis for storing pricing models and elasticity data. + Models are serialized using pickle, metadata stored as JSON. + """ + + def __init__(self, redis_host: str = None, redis_port: int = None): + host = redis_host or os.getenv('REDIS_HOST', 'localhost') + port = redis_port or int(os.getenv('REDIS_PORT', '6378')) + + self.redis_client = redis.Redis( + host=host, + port=port, + db=0, + decode_responses=False + ) + self.metadata_prefix = "model:meta:" + self.data_prefix = "model:data:" + self.elasticity_prefix = "elasticity:" + + def publish_elasticity(self, + elasticity_df: pd.DataFrame, + model_name: str = 'latest', + metadata: Optional[Dict[str, Any]] = None): + """ + Store elasticity estimates in registry. + + Args: + elasticity_df: df with [productId, elasticity, std_error, n_obs] + model_name: identifier for this elasticity snapshot + metadata: additional info (timestamp, window_size, etc) + """ + key = f"{self.elasticity_prefix}{model_name}" + + # serialize dataframe as JSON + data_json = elasticity_df.to_json(orient='records') + + # store data + self.redis_client.set(key, data_json) + + # store metadata + meta = metadata or {} + meta.update({ + 'n_products': len(elasticity_df), + 'mean_elasticity': float(elasticity_df['elasticity'].mean()), + 'model_type': 'elasticity_snapshot' + }) + + meta_key = f"{self.metadata_prefix}{model_name}" + self.redis_client.set(meta_key, json.dumps(meta)) + + log.info(f"Published elasticity model '{model_name}' with {len(elasticity_df)} products") + + def get_elasticity(self, model_name: str = 'latest') -> Optional[pd.DataFrame]: + """Retrieve elasticity estimates from registry.""" + key = f"{self.elasticity_prefix}{model_name}" + data_json = self.redis_client.get(key) + + if data_json is None: + return None + + # decode bytes to string if needed + if isinstance(data_json, bytes): + data_json = data_json.decode('utf-8') + + return pd.read_json(data_json, orient='records') + + def publish_pricing_model(self, + pricing_function, + model_name: str = 'latest', + metadata: Optional[Dict[str, Any]] = None): + """ + Store a fitted pricing function object. + + Args: + pricing_function: fitted PricingFunction instance + model_name: identifier + metadata: additional info + """ + key = f"{self.data_prefix}{model_name}" + + # serialize object + model_bytes = pickle.dumps(pricing_function) + self.redis_client.set(key, model_bytes) + + # store metadata + meta = metadata or {} + meta.update({ + 'model_class': pricing_function.__class__.__name__, + 'model_type': 'pricing_function' + }) + + meta_key = f"{self.metadata_prefix}{model_name}" + self.redis_client.set(meta_key, json.dumps(meta)) + + log.info(f"Published pricing model '{model_name}' ({meta['model_class']})") + + def get_pricing_model(self, model_name: str = 'latest'): + """Retrieve a pricing function from registry.""" + key = f"{self.data_prefix}{model_name}" + model_bytes = self.redis_client.get(key) + + if model_bytes is None: + return None + + return pickle.loads(model_bytes) + + def list_models(self) -> Dict[str, Any]: + """List all registered models with metadata.""" + models = {} + + for key in self.redis_client.scan_iter(f"{self.metadata_prefix}*"): + key_str = key.decode('utf-8') if isinstance(key, bytes) else key + model_name = key_str.replace(self.metadata_prefix, '') + meta_json = self.redis_client.get(key) + + if meta_json: + if isinstance(meta_json, bytes): + meta_json = meta_json.decode('utf-8') + models[model_name] = json.loads(meta_json) + + return models + + def health_check(self) -> bool: + """Check if Redis connection is alive.""" + try: + self.redis_client.ping() + return True + except: + return False diff --git a/pytest.ini b/pytest.ini index 2122ae5..0bb0dbd 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,5 @@ [pytest] +pythonpath = experiments testpaths = experiments python_files = test*.py python_classes = Test* diff --git a/web/src/app/api/pricing/route.ts b/web/src/app/api/pricing/route.ts index a809f99..959b46c 100644 --- a/web/src/app/api/pricing/route.ts +++ b/web/src/app/api/pricing/route.ts @@ -20,10 +20,40 @@ export async function GET(req: NextRequest) { ); } - // stub: call external pricing provider (random for now) - const basePrice = 100 + Math.random() * 900; // 100-1000 range - const price = Math.round(basePrice * 100) / 100; const timestamp = new Date().toISOString(); + let price: number; + let basePrice: number | undefined; + let markup: number | undefined; + let elasticity: number | undefined; + + // call real pricing provider + const providerUrl = process.env.PRICING_PROVIDER_URL || 'http://localhost:5001'; + try { + const queryParams = new URLSearchParams(); + if (sessionId) queryParams.append('sessionId', sessionId); + if (experimentId) queryParams.append('experimentId', experimentId); + + const providerResponse = await fetch( + `${providerUrl}/api/${storeMode}/price/${productId}?${queryParams.toString()}`, + { headers: { 'Accept': 'application/json' }, cache: 'no-store' } + ); + + if (!providerResponse.ok) { + throw new Error(`Provider returned ${providerResponse.status}`); + } + + const providerData = await providerResponse.json(); + price = providerData.price; + basePrice = providerData.base_price; + markup = providerData.markup; + elasticity = providerData.elasticity; + + } catch (err) { + console.error('[pricing-provider-error]', err); + // fallback to random pricing if provider unavailable + const randomBase = 100 + Math.random() * 900; + price = Math.round(randomBase * 100) / 100; + } // log price to kafka for elasticity computation if (sessionId) { @@ -43,19 +73,13 @@ export async function GET(req: NextRequest) { }); } catch (err) { console.error('[price-log-error]', err); - // don't fail the pricing request if logging fails } } - // log in dev if (process.env.NODE_ENV === 'development') { console.log('[pricing-api]', { - productId, - sessionId, - experimentId, - storeMode, - price, - timestamp, + productId, sessionId, experimentId, storeMode, + price, basePrice, markup, elasticity, timestamp, }); }