This commit is contained in:
2026-01-12 20:59:09 +01:00
parent 62a4008c29
commit e89cb263d4
2 changed files with 23 additions and 16 deletions

View File

@@ -1,3 +1,4 @@
from pandas.core.algorithms import factorize_array
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
@@ -208,3 +209,12 @@ def create_surge_pricing_dag(store_mode: str) -> DAG:
# instantiate DAGs for Airflow to discover
dag_airline = create_surge_pricing_dag('airline')
dag_hotel = create_surge_pricing_dag('hotel')
# TODO: Refactor this factory from a surge pricing factory to a general pricing factory
# We will do this by passing a pricing strategy class to the factory, since the generic pipeline is:
# take all interaction data, group by sessionId and assign a new price vector to each session
# in the grouping we get a subset of the interactions per sessionId and we can map that to some Features
# we define a custom _get_features(interactions .) methodin the strategy class
# we then run only the inference which is the .predict(trajectory) per-session which will give us a new price vector
# this we then publish for each sessionId group
# this might include no deleting most of the pricers we have defined and starting with a super simple surge-pricing algorithm that is no-fit only predict. This we can then test end-to-end and observe changes to prices according to a desired strategy - we have to define this one as a very short term strategy because we run sessions that take only a few minutes.

View File

@@ -7,15 +7,6 @@ import pandas as pd
class PricingFunction(ABC):
"""
Abstract base for pricing functions.
Defines mapping: f(Q_t, P_t, S_t, H_t) -> P_{t+1}
Where:
Q_t ∈ R^n: demand vector at time t
P_t ∈ R^n: price vector at time t
S_t: session features (behavioral signals, interactions)
H_t = {Q_{t-k}, P_{t-k}, S_{t-k}}: historical state trajectory
Objective:
maximize E[R_T] = E[Σ P_t^T · Q_t]
subject to:
@@ -28,10 +19,10 @@ class PricingFunction(ABC):
def fit(self, *kwargs):
"""
Offline training on historical data.
This is where we can think about some maximization of expected revenue
over historical trajectories to learn parameters of the pricing function.
(This however we cover move in the RL side of things)
Args:
historical_data: DataFrame with elasticity, prices, demand signals
**kwargs: additional training parameters
"""
pass
@@ -39,12 +30,18 @@ class PricingFunction(ABC):
def predict(self, *kwargs) -> np.ndarray:
"""
Generate optimal prices given current state.
This is an abstract method that transitions from τ -> P*
which is the mapping from the trajectory to optimal prices under
some subset of session grouping (so, per sessionId)
"""
pass
Args:
state_space: StateSpace object containing Q_t, P_t, S_t, H_t
@abstractmethod
def _get_features(self, *kwargs) -> np.ndarray:
"""
Extract features from trajectory for pricing decision.
Returns:
P_{t+1}: price vector in R^n
np.ndarray of shape (n_products, n_features)
"""
pass