From 0d214a469f64194dd9b1bb4247e157b6579908f6 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Mon, 12 Jan 2026 20:59:09 +0100 Subject: [PATCH] planning --- .../airflow/dags/surge_pricing_factory.py | 10 +++++++ experiments/procesing/pricers/base.py | 29 +++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/experiments/airflow/dags/surge_pricing_factory.py b/experiments/airflow/dags/surge_pricing_factory.py index a886d5b..b61e65c 100644 --- a/experiments/airflow/dags/surge_pricing_factory.py +++ b/experiments/airflow/dags/surge_pricing_factory.py @@ -1,3 +1,4 @@ +from pandas.core.algorithms import factorize_array from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago @@ -208,3 +209,12 @@ def create_surge_pricing_dag(store_mode: str) -> DAG: # instantiate DAGs for Airflow to discover dag_airline = create_surge_pricing_dag('airline') dag_hotel = create_surge_pricing_dag('hotel') + +# TODO: Refactor this factory from a surge pricing factory to a general pricing factory +# We will do this by passing a pricing strategy class to the factory, since the generic pipeline is: +# take all interaction data, group by sessionId and assign a new price vector to each session +# in the grouping we get a subset of the interactions per sessionId and we can map that to some Features +# we define a custom _get_features(interactions .) methodin the strategy class +# we then run only the inference which is the .predict(trajectory) per-session which will give us a new price vector +# this we then publish for each sessionId group +# this might include no deleting most of the pricers we have defined and starting with a super simple surge-pricing algorithm that is no-fit only predict. This we can then test end-to-end and observe changes to prices according to a desired strategy - we have to define this one as a very short term strategy because we run sessions that take only a few minutes. diff --git a/experiments/procesing/pricers/base.py b/experiments/procesing/pricers/base.py index 6569556..ecaabed 100644 --- a/experiments/procesing/pricers/base.py +++ b/experiments/procesing/pricers/base.py @@ -7,15 +7,6 @@ import pandas as pd class PricingFunction(ABC): """ Abstract base for pricing functions. - - Defines mapping: f(Q_t, P_t, S_t, H_t) -> P_{t+1} - - Where: - Q_t ∈ R^n: demand vector at time t - P_t ∈ R^n: price vector at time t - S_t: session features (behavioral signals, interactions) - H_t = {Q_{t-k}, P_{t-k}, S_{t-k}}: historical state trajectory - Objective: maximize E[R_T] = E[Σ P_t^T · Q_t] subject to: @@ -28,10 +19,10 @@ class PricingFunction(ABC): def fit(self, *kwargs): """ Offline training on historical data. + This is where we can think about some maximization of expected revenue + over historical trajectories to learn parameters of the pricing function. + (This however we cover move in the RL side of things) - Args: - historical_data: DataFrame with elasticity, prices, demand signals - **kwargs: additional training parameters """ pass @@ -39,12 +30,18 @@ class PricingFunction(ABC): def predict(self, *kwargs) -> np.ndarray: """ Generate optimal prices given current state. + This is an abstract method that transitions from τ -> P* + which is the mapping from the trajectory to optimal prices under + some subset of session grouping (so, per sessionId) + """ + pass - Args: - state_space: StateSpace object containing Q_t, P_t, S_t, H_t - + @abstractmethod + def _get_features(self, *kwargs) -> np.ndarray: + """ + Extract features from trajectory for pricing decision. Returns: - P_{t+1}: price vector in R^n + np.ndarray of shape (n_products, n_features) """ pass