feature: e2e intro pipline surge pricing

This commit is contained in:
2025-12-06 16:30:28 +01:00
parent 503c5e182d
commit e6a5b95875
6 changed files with 41 additions and 110 deletions

View File

@@ -20,6 +20,7 @@ from procesing.steps import (
AggregatePriceLogsStep,
JoinProductFeaturesStep,
)
from procesing.pricers.simple import SimpleSurgePricer
default_args = {
'owner': 'phantom-research',
@@ -75,6 +76,8 @@ def compute_demand(**kwargs):
context = get_context(**kwargs)
step = ComputeDemandStep(context)
demand_df = step.transform(df)
# TODO: clear the xcom
ti.xcom_push(key='demand_data', value=pickle.dumps(demand_df))
logging.info(f"Computed demand for {len(demand_df)} products")
@@ -113,46 +116,24 @@ def apply_surge_pricing(**kwargs):
product_features = pickle.loads(ti.xcom_pull(key='product_features'))
dag_conf = kwargs.get('dag_run').conf if kwargs.get('dag_run') else {}
high_threshold = dag_conf.get('high_threshold', 10)
low_threshold = dag_conf.get('low_threshold', 2)
surge_multiplier = dag_conf.get('surge_multiplier', 1.2)
discount_multiplier = dag_conf.get('discount_multiplier', 0.9)
context = get_context(**kwargs)
products = context.products
results = []
# rename demand_score to demand for pricer compatibility
data = product_features.rename(columns={'demand_score': 'demand'})
for pid in product_features['productId'].unique():
prod_data = product_features[product_features['productId'] == pid]
if prod_data.empty:
continue
surge_pricer = SimpleSurgePricer(
high_threshold=dag_conf.get('high_threshold', 10),
low_threshold=dag_conf.get('low_threshold', 2),
surge_multiplier=dag_conf.get('surge_multiplier', 1.2),
discount_multiplier=dag_conf.get('discount_multiplier', 0.9)
)
surge_pricer.fit(data)
data['optimal_price'] = surge_pricer.predict()
demand = prod_data["demand_score"].mean()
current_price = prod_data["price"].mean()
prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={
'price': 'current_price',
'demand': 'demand_score'
})
prod_meta = products[products['id'] == pid]
if not prod_meta.empty:
meta = prod_meta.iloc[0]['metadata']
base_price = meta.get('base_price', current_price) if isinstance(meta, dict) else current_price
else:
base_price = current_price
if demand >= high_threshold:
optimal_price = base_price * surge_multiplier
elif demand <= low_threshold:
optimal_price = base_price * discount_multiplier
else:
optimal_price = base_price
results.append({
'productId': pid,
'current_price': current_price,
'base_price': base_price,
'optimal_price': optimal_price,
'demand_score': demand
})
prices_df = pd.DataFrame(results)
ti.xcom_push(key='predicted_prices', value=pickle.dumps(prices_df))
logging.info(f"Applied surge pricing for {len(prices_df)} products")
return len(prices_df)