From c639d99be2f6b0c3697776a77e05899e0a132d21 Mon Sep 17 00:00:00 2001 From: Daniel Rosel Date: Tue, 25 Nov 2025 22:27:38 +0100 Subject: [PATCH] first implementation of elasticity demand computation --- backend/server/app.py | 46 ++++- experiments/agents/agent.py | 9 +- experiments/procesing/demand.py | 81 +++++++- experiments/procesing/elasticity.py | 285 ++++++++++++++++++++++++++++ experiments/procesing/extract.py | 109 ++++++++++- experiments/procesing/pipeline.py | 76 +++++++- experiments/procesing/pricing.py | 0 web/src/app/api/pricing/route.ts | 48 +++-- 8 files changed, 616 insertions(+), 38 deletions(-) create mode 100644 experiments/procesing/elasticity.py create mode 100644 experiments/procesing/pricing.py diff --git a/backend/server/app.py b/backend/server/app.py index 2a6e44f..482eee1 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -64,6 +64,14 @@ class EventPayload(BaseModel): userAgent: Optional[str] = None ts: Optional[str] = None +class PriceLogPayload(BaseModel): + productId: str + price: float + sessionId: str + experimentId: Optional[str] = None + storeMode: str + ts: Optional[str] = None + app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -87,7 +95,8 @@ async def startup_event(): ) topics = [ - NewTopic(name='user-interactions', num_partitions=3, replication_factor=1) + NewTopic(name='user-interactions', num_partitions=3, replication_factor=1), + NewTopic(name='price-logs', num_partitions=3, replication_factor=1) ] admin.create_topics(new_topics=topics, validate_only=False) @@ -139,26 +148,52 @@ async def ingest_logs(event: EventPayload): print(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) +@app.post("/api/kafka/price-log") +async def ingest_price_log(price_log: PriceLogPayload): + try: + if not price_log.ts: + price_log.ts = datetime.utcnow().isoformat() + 'Z' + + producer = get_producer() + future = producer.send( + 'price-logs', + key=price_log.productId, + value=price_log.model_dump() + ) + future.add_errback(lambda e: print(f"[KAFKA_PRICE_LOG_ERROR] {e}")) + + return {"success": True} + except Exception as e: + import traceback + print(f"[PRICE_LOG_ERROR] {e}") + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + @app.get("/api/kafka/dump") def dump_logs( + topic: str = 'user-interactions', last_n: Optional[int] = None, t_start: Optional[str] = None, t_end: Optional[str] = None ): - """dump all messages from user-interactions topic + """dump all messages from specified kafka topic params: + topic: kafka topic to dump (default: user-interactions) last_n: return only last n messages (default: all) - t_start: filter by start timestamp iso format (future use) - t_end: filter by end timestamp iso format (future use) + t_start: filter by start timestamp iso format + t_end: filter by end timestamp iso format """ + if topic not in ['user-interactions', 'price-logs']: + raise HTTPException(status_code=400, detail="Invalid topic") + host = os.getenv('KAFKA_HOST', 'localhost') port = os.getenv('KAFKA_PORT', '9092') broker = f'{host}:{port}' try: consumer = KafkaConsumer( - 'user-interactions', + topic, bootstrap_servers=[broker], auto_offset_reset='earliest', enable_auto_commit=False, @@ -174,7 +209,6 @@ def dump_logs( # apply filters if t_start or t_end: - # filter by timestamp range if provided filtered = [] for e in events: ts = e.get('ts') diff --git a/experiments/agents/agent.py b/experiments/agents/agent.py index c31e6b2..57f3b06 100644 --- a/experiments/agents/agent.py +++ b/experiments/agents/agent.py @@ -1,4 +1,4 @@ -from .base import Agent as BaseAgent +from base import Agent as BaseAgent from browser_use import Browser, Agent, ChatOpenAI from enum import Enum @@ -38,7 +38,10 @@ def get_agent(agent_type: AgentTypes, **kwargs) -> Agent: if __name__ == "__main__": import asyncio - JTBD= "Name all the products on this site and try to find out more about each product by clicking into them (they might not open)" - agent = get_agent(AgentTypes.GENERIC_BROWSER_USE_AGENT, goal=JTBD, url="http://localhost:3000/products", timeout=300) + JTBD= "Find me the cheapest room in Madrid for 2 people in the next two days, review each hotel room in detail and then add it to cart." + agent = get_agent(AgentTypes.GENERIC_BROWSER_USE_AGENT, + goal=JTBD, + url="http://localhost:3000/start-task?uuid=d10f5ab3-a7b7-4e97-8d94-ab06f1537c0a", + timeout=300) R=asyncio.run(agent.act()) print(R) diff --git a/experiments/procesing/demand.py b/experiments/procesing/demand.py index d1924ec..4652146 100644 --- a/experiments/procesing/demand.py +++ b/experiments/procesing/demand.py @@ -2,7 +2,7 @@ from sklearn.base import BaseEstimator, TransformerMixin import numpy as np import pandas as pd from supabase import create_client, Client -import pandas as pd +from typing import Optional, Literal import os SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") @@ -10,6 +10,71 @@ SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) +class ChunkInteractionsIntoSteps(BaseEstimator, TransformerMixin): + """ + Split interaction data into time windows for temporal analysis. + Returns a list of dataframes, one per time window. + """ + def __init__(self, + window_size:str='1h', + ts_col:str='ts', + return_metadata:bool=True): + """ + Args: + window_size: pandas freq string ('1h', '30T', '1D', etc) + ts_col: timestamp column name + return_metadata: if True, return dict with metadata per chunk + """ + self.window_size = window_size + self.ts_col = ts_col + self.return_metadata = return_metadata + + def fit(self, X): + return self + + def transform(self, interactions: pd.DataFrame): + """ + Returns: + if return_metadata=False: list of dataframes, one per window + if return_metadata=True: list of dicts with keys: + - 'data': dataframe for this window + - 'window_start': start timestamp + - 'window_end': end timestamp + - 'window_idx': integer index + """ + if interactions.empty: + return [] + + df = interactions.copy() + + # ensure timestamp is datetime + if not pd.api.types.is_datetime64_any_dtype(df[self.ts_col]): + df[self.ts_col] = pd.to_datetime(df[self.ts_col]) + + # sort by time + df = df.sort_values(self.ts_col) + + # assign window + df['_window'] = df[self.ts_col].dt.floor(self.window_size) + + # group by window + chunks = [] + for idx, (window_start, group) in enumerate(df.groupby('_window')): + chunk_data = group.drop(columns=['_window']) + + if self.return_metadata: + chunks.append({ + 'data': chunk_data, + 'window_start': window_start, + 'window_end': window_start + pd.Timedelta(self.window_size), + 'window_idx': idx + }) + else: + chunks.append(chunk_data) + + return chunks + + class DemandEstimator(BaseEstimator, TransformerMixin): def __init__(self, store_mode:str='hotel', @@ -28,12 +93,24 @@ class DemandEstimator(BaseEstimator, TransformerMixin): interactions = interactions[interactions['sessionId'] == self.session_filter] if self.experiment_filter: interactions = interactions[interactions['experimentId'] == self.experiment_filter] + products=supabase.table(f'{self.store}_products').select("id, room_type, date_index, metadata, availability").execute() products = pd.DataFrame(products.data) unique_products = products['id'].unique() + + # filter out rows without productId + interactions_with_products = interactions.dropna(subset=['productId']) + + if interactions_with_products.empty: + # no interactions with products, return all zeros + return pd.DataFrame({ + 'productId': unique_products, + 'demand_score': 0 + }) + # TODO: improve demand score calculation rather than just counting interactions (use weights..) # while maintaining simplicity of a simple cross tab approach - product_demand = pd.crosstab(interactions['productId'], "no_of_interactions") + product_demand = pd.crosstab(interactions_with_products['productId'], "no_of_interactions") product_demand = product_demand.reindex(unique_products, fill_value=0).reset_index() product_demand.columns = ['productId', 'demand_score'] return product_demand diff --git a/experiments/procesing/elasticity.py b/experiments/procesing/elasticity.py new file mode 100644 index 0000000..7143e26 --- /dev/null +++ b/experiments/procesing/elasticity.py @@ -0,0 +1,285 @@ +import numpy as np +import pandas as pd +from typing import List, Dict, Optional +from sklearn.base import BaseEstimator, TransformerMixin + +class TemporalElasticityEstimator(BaseEstimator, TransformerMixin): + """ + Compute price elasticity from time-series demand and price data. + + Elasticity = (% change in quantity) / (% change in price) + + Works with chunked time-window data from ChunkInteractionsIntoSteps. + """ + + def __init__(self, + method:str='point', + min_observations:int=2, + smooth_window:Optional[int]=None): + """ + Args: + method: 'point' (point elasticity) or 'arc' (arc elasticity) + min_observations: min data points needed per product + smooth_window: if set, apply rolling avg smoothing to time series + """ + self.method = method + self.min_observations = min_observations + self.smooth_window = smooth_window + + def fit(self, X): + return self + + def transform(self, + demand_chunks: List[Dict], + price_chunks: List[Dict]) -> pd.DataFrame: + """ + Args: + demand_chunks: list from ChunkInteractionsIntoSteps + DemandEstimator + each item: {'window_start', 'window_end', 'demand_vector'} + price_chunks: list of dicts with {'window_start', 'window_end', 'price_vector'} + + Returns: + df with [productId, elasticity, std_error, n_observations] + """ + aligned = self._align_chunks(demand_chunks, price_chunks) + if not aligned: + return pd.DataFrame(columns=['productId', 'elasticity', 'std_error', 'n_obs']) + + # build time series per product + product_series = self._build_product_timeseries(aligned) + + # compute elasticity per product + elasticities = [] + for pid, series in product_series.items(): + if len(series) < self.min_observations: + continue + + # apply smoothing if requested + if self.smooth_window and len(series) >= self.smooth_window: + series = self._smooth_series(series, self.smooth_window) + + elast = self._compute_elasticity(series) + if elast is not None: + elasticities.append({ + 'productId': pid, + 'elasticity': elast['value'], + 'std_error': elast.get('std_error', np.nan), + 'n_obs': len(series) + }) + + return pd.DataFrame(elasticities) + + def _align_chunks(self, demand_chunks, price_chunks): + """Align demand and price data by matching time windows.""" + aligned = [] + + # create lookup for price chunks by window_start + price_lookup = {chunk['window_start']: chunk for chunk in price_chunks} + + for demand_chunk in demand_chunks: + window_start = demand_chunk['window_start'] + if window_start in price_lookup: + aligned.append({ + 'window_start': window_start, + 'window_end': demand_chunk['window_end'], + 'demand': demand_chunk['demand_vector'], + 'prices': price_lookup[window_start]['price_vector'] + }) + + return aligned + + def _build_product_timeseries(self, aligned_chunks): + """Build time series [price, quantity] per product.""" + series_by_product = {} + + for chunk in aligned_chunks: + demand_df = chunk['demand'] + price_df = chunk['prices'] + + # merge on productId + merged = demand_df.merge(price_df, on='productId', how='inner') + + for _, row in merged.iterrows(): + pid = row['productId'] + if pid not in series_by_product: + series_by_product[pid] = [] + + series_by_product[pid].append({ + 'timestamp': chunk['window_start'], + 'price': row['price'], + 'quantity': row['demand_score'] + }) + + return series_by_product + + def _smooth_series(self, series, window): + """Apply rolling average smoothing.""" + df = pd.DataFrame(series) + df['price_smooth'] = df['price'].rolling(window=window, center=True).mean() + df['quantity_smooth'] = df['quantity'].rolling(window=window, center=True).mean() + df = df.dropna() + + return [{'timestamp': row['timestamp'], + 'price': row['price_smooth'], + 'quantity': row['quantity_smooth']} + for _, row in df.iterrows()] + + def _compute_elasticity(self, series): + """Compute elasticity from time series.""" + if len(series) < 2: + return None + + prices = np.array([s['price'] for s in series]) + quantities = np.array([s['quantity'] for s in series]) + + # filter out zero/negative values + valid = (prices > 0) & (quantities > 0) + if valid.sum() < 2: + return None + + prices = prices[valid] + quantities = quantities[valid] + + if self.method == 'point': + return self._point_elasticity(prices, quantities) + elif self.method == 'arc': + return self._arc_elasticity(prices, quantities) + else: + raise ValueError(f"Unknown method: {self.method}") + + def _point_elasticity(self, prices, quantities): + """ + Point elasticity using log-log regression. + log(Q) = a + b*log(P), elasticity = b + """ + if len(prices) < 2: + return None + + log_p = np.log(prices) + log_q = np.log(quantities) + + # simple linear regression + if log_p.std() == 0: + return None + + cov = np.cov(log_p, log_q)[0, 1] + var = np.var(log_p) + b = cov / var + + # std error estimate + residuals = log_q - (log_q.mean() + b * (log_p - log_p.mean())) + mse = (residuals ** 2).sum() / (len(prices) - 2) + se_b = np.sqrt(mse / (len(prices) * var)) + + return {'value': b, 'std_error': se_b} + + def _arc_elasticity(self, prices, quantities): + """ + Arc elasticity: average of period-over-period elasticities. + E_t = (ΔQ/Q_avg) / (ΔP/P_avg) + """ + elasticities = [] + + for i in range(1, len(prices)): + p1, p2 = prices[i-1], prices[i] + q1, q2 = quantities[i-1], quantities[i] + + p_avg = (p1 + p2) / 2 + q_avg = (q1 + q2) / 2 + + if p_avg == 0 or q_avg == 0: + continue + + delta_p = p2 - p1 + delta_q = q2 - q1 + + if delta_p == 0: + continue + + e = (delta_q / q_avg) / (delta_p / p_avg) + elasticities.append(e) + + if not elasticities: + return None + + return { + 'value': np.mean(elasticities), + 'std_error': np.std(elasticities) / np.sqrt(len(elasticities)) + } + + +def aggregate_price_logs(price_logs: pd.DataFrame, + window_size: str = '1H', + ts_col: str = 'ts') -> List[Dict]: + """ + Recover price vectors treating prices as persistent state changes. + + Prices are set-operations that persist until next change. For each window: + - If price logs exist: average all changes within window + - If no logs: carry forward last price before window end + + Args: + price_logs: df with [productId, price, ts, ...] + window_size: time window size matching ChunkInteractionsIntoSteps + ts_col: timestamp column name + + Returns: + list of dicts with {'window_start', 'window_end', 'price_vector'} + where price_vector is df with [productId, price] + """ + if price_logs.empty: + return [] + + df = price_logs.copy() + + if not pd.api.types.is_datetime64_any_dtype(df[ts_col]): + df[ts_col] = pd.to_datetime(df[ts_col]) + + df = df.sort_values([ts_col, 'productId']) + + # generate windows across data range + min_time, max_time = df[ts_col].min(), df[ts_col].max() + windows = pd.date_range( + start=min_time.floor(window_size), + end=max_time, + freq=window_size + ) + + chunks = [] + + for window_start in windows: + window_end = window_start + pd.Timedelta(window_size) + price_vector = [] + + # all products with price history by window_end + historical_products = df[df[ts_col] < window_end]['productId'].unique() + + for pid in historical_products: + product_data = df[df['productId'] == pid] + + # logs within window + in_window = product_data[ + (product_data[ts_col] >= window_start) & + (product_data[ts_col] < window_end) + ] + + if not in_window.empty: + # average changes within window + price = in_window['price'].mean() + else: + # carry forward: last price before window end + before_window = product_data[product_data[ts_col] < window_end] + if before_window.empty: + continue + price = before_window['price'].iloc[-1] + + price_vector.append({'productId': pid, 'price': price}) + + if price_vector: + chunks.append({ + 'window_start': window_start, + 'window_end': window_end, + 'price_vector': pd.DataFrame(price_vector) + }) + + return chunks diff --git a/experiments/procesing/extract.py b/experiments/procesing/extract.py index bd77204..81eb1d3 100644 --- a/experiments/procesing/extract.py +++ b/experiments/procesing/extract.py @@ -6,6 +6,7 @@ import requests from dotenv import load_dotenv from sklearn.base import BaseEstimator, TransformerMixin from supabase import create_client, Client +from typing import Tuple, List, Dict load_dotenv() BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:5000") @@ -17,11 +18,13 @@ supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) class KafkaDataFetcher(BaseEstimator, TransformerMixin): + def __init__(self, topic: str = "user-interactions"): + self.topic = topic # also can be price-logs def fit(self, X=None, y=None): return self def transform(self, X=None): - resp = requests.get(f"{BACKEND_URL}/api/kafka/dump") + resp = requests.get(f"{BACKEND_URL}/api/kafka/dump?topic={self.topic}") resp.raise_for_status() data = resp.json() @@ -29,12 +32,12 @@ class KafkaDataFetcher(BaseEstimator, TransformerMixin): return pd.DataFrame() df = pd.DataFrame(data['data']) - # explode metadata col json - if 'metadata' in df.columns: - df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) - df = df.dropna(subset=['eventName']) - # remape dateIndex - df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') + if self.topic == 'user-interactions': + if 'metadata' in df.columns: # explode metadata col json + df = df.join(pd.json_normalize(df.pop("metadata"), sep=".").add_prefix("metadata_")) + df = df.dropna(subset=['eventName']) + # remape dateIndex + df['dateIndex'] = df['metadata_dateIndex'].astype('Int64') return df @@ -110,3 +113,95 @@ class EventTitleAugmenter(BaseEstimator, TransformerMixin): ) df["eventName"] = df["eventName"] + df["metadata_schema"].astype(str) return df + + +def chunk_shared_data(interactions_df: pd.DataFrame, + price_logs_df: pd.DataFrame, + window_size: str = '30s', + ts_col: str = 'ts') -> Tuple[List[Dict], List[Dict]]: + """ + Chunk interaction and price data into aligned time windows. + + Args: + interactions_df: interaction data with timestamp column + price_logs_df: price log data with timestamp column + window_size: pandas freq string ('30s', '1min', '1h', etc) + ts_col: name of timestamp column + + Returns: + tuple of (interaction_chunks, price_chunks) where each is list of dicts: + { + 'window_start': timestamp, + 'window_end': timestamp, + 'data': dataframe for this window + } + """ + if interactions_df.empty and price_logs_df.empty: + return [], [] + + # convert timestamps to datetime + interactions_df = interactions_df.copy() + price_logs_df = price_logs_df.copy() + + if not interactions_df.empty: + if not pd.api.types.is_datetime64_any_dtype(interactions_df[ts_col]): + interactions_df[ts_col] = pd.to_datetime(interactions_df[ts_col]) + + if not price_logs_df.empty: + if not pd.api.types.is_datetime64_any_dtype(price_logs_df[ts_col]): + price_logs_df[ts_col] = pd.to_datetime(price_logs_df[ts_col]) + + # find global time bounds + times = [] + if not interactions_df.empty: + times.extend([interactions_df[ts_col].min(), interactions_df[ts_col].max()]) + if not price_logs_df.empty: + times.extend([price_logs_df[ts_col].min(), price_logs_df[ts_col].max()]) + + if not times: + return [], [] + + earliest = min(times) + latest = max(times) + + # create shared time windows + windows = pd.date_range(start=earliest, end=latest, freq=window_size) + + if len(windows) < 2: + return [], [] + + # chunk both datasets + interaction_chunks = [] + price_chunks = [] + + for i in range(len(windows) - 1): + window_start = windows[i] + window_end = windows[i + 1] + + # filter interactions in this window + if not interactions_df.empty: + mask = (interactions_df[ts_col] >= window_start) & (interactions_df[ts_col] < window_end) + interaction_chunk = interactions_df[mask] + else: + interaction_chunk = pd.DataFrame() + + interaction_chunks.append({ + 'window_start': window_start, + 'window_end': window_end, + 'data': interaction_chunk + }) + + # filter price logs in this window + if not price_logs_df.empty: + mask = (price_logs_df[ts_col] >= window_start) & (price_logs_df[ts_col] < window_end) + price_chunk = price_logs_df[mask] + else: + price_chunk = pd.DataFrame() + + price_chunks.append({ + 'window_start': window_start, + 'window_end': window_end, + 'data': price_chunk + }) + + return interaction_chunks, price_chunks diff --git a/experiments/procesing/pipeline.py b/experiments/procesing/pipeline.py index 2d93c05..ab5840a 100644 --- a/experiments/procesing/pipeline.py +++ b/experiments/procesing/pipeline.py @@ -1,22 +1,82 @@ from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler +import pandas as pd -from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter +from extract import KafkaDataFetcher, ExperimentJoiner, EventTitleAugmenter, chunk_shared_data from mapping import SessionTransitionProbMatrixTransformer, render_graph -from demand import DemandEstimator +from demand import DemandEstimator, ChunkInteractionsIntoSteps +from elasticity import TemporalElasticityEstimator, aggregate_price_logs + + + +# elasticity pipeline components (not sklearn compatible, manual orchestration) +def elasticity_pipeline(interactions_df, price_logs_df, window_size='30s', store_mode='hotel'): + """ + Compute price elasticity from interaction and price data. + + Args: + interactions_df: raw interaction data from demand_data_pipeline + price_logs_df: price log data from price_data_pipeline + window_size: time window for chunking + store_mode: 'hotel' or 'airline' + + Returns: + df with [productId, elasticity, std_error, n_obs] + """ + # step 1: chunk interactions into time windows + chunker = ChunkInteractionsIntoSteps(window_size=window_size, return_metadata=True) + interaction_chunks = chunker.transform(interactions_df) + print(len(interaction_chunks)) + + if not interaction_chunks: + return None + + # step 2: compute demand per window + demand_estimator = DemandEstimator(store_mode=store_mode) + demand_chunks = [] + for chunk in interaction_chunks: + demand_vector = demand_estimator.transform(chunk['data']) + demand_chunks.append({ + 'window_start': chunk['window_start'], + 'window_end': chunk['window_end'], + 'demand_vector': demand_vector + }) + + # step 3: aggregate price logs into windows + price_chunks = aggregate_price_logs(price_logs_df, window_size=window_size) + + # step 4: compute elasticity + elasticity_estimator = TemporalElasticityEstimator(method='point', min_observations=2) + elasticity_df = elasticity_estimator.transform(demand_chunks, price_chunks) + + return elasticity_df # exposable pipelines -etl_pipeline = Pipeline([ - ('kafka_fetch', KafkaDataFetcher()), +interaction_pipeline = Pipeline([ + ('kafka_fetch', KafkaDataFetcher(topic='user-interactions')), ('experiment_join', ExperimentJoiner()), ('event_augment', EventTitleAugmenter()), ]) + +price_data_pipeline = Pipeline([ + ('kafka_fetch', KafkaDataFetcher(topic='price-logs')), +]) + pricing_pipeline = Pipeline([ ('demand_estimation', DemandEstimator()), ]) - if __name__ == "__main__": - processed_data = etl_pipeline.fit_transform(None) - pricing = pricing_pipeline.fit_transform(processed_data) - print(pricing) + # fetch both datasets + interaction_data = interaction_pipeline.fit_transform(None) + pricing_data = price_data_pipeline.fit_transform(None) + if interaction_data.empty or pricing_data.empty: + print("Insufficient data for elasticity computation"); exit(0) + # compute elasticity via unified pipeline + window_size = "30s" + elasticity_results = elasticity_pipeline(interaction_data, pricing_data, window_size=window_size) + + if elasticity_results is not None and not elasticity_results.empty: + print(elasticity_results.to_string(index=False)) + else: + print("\nInsufficient data for elasticity computation") diff --git a/experiments/procesing/pricing.py b/experiments/procesing/pricing.py new file mode 100644 index 0000000..e69de29 diff --git a/web/src/app/api/pricing/route.ts b/web/src/app/api/pricing/route.ts index 414d311..a809f99 100644 --- a/web/src/app/api/pricing/route.ts +++ b/web/src/app/api/pricing/route.ts @@ -13,17 +13,6 @@ export async function GET(req: NextRequest) { const experimentId = searchParams.get('experimentId'); const storeMode = process.env.NEXT_PUBLIC_STORE_MODE || 'shop'; - // log in dev - if (process.env.NODE_ENV === 'development') { - console.log('[pricing-api]', { - productId, - sessionId, - experimentId, - storeMode, - timestamp: new Date().toISOString(), - }); - } - if (!productId) { return NextResponse.json( { error: 'productId is required' }, @@ -34,11 +23,46 @@ export async function GET(req: NextRequest) { // stub: call external pricing provider (random for now) const basePrice = 100 + Math.random() * 900; // 100-1000 range const price = Math.round(basePrice * 100) / 100; + const timestamp = new Date().toISOString(); + + // log price to kafka for elasticity computation + if (sessionId) { + const backendUrl = process.env.BACKEND_URL || 'http://localhost:5000'; + try { + await fetch(`${backendUrl}/api/kafka/price-log`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + productId, + price, + sessionId, + experimentId: experimentId || undefined, + storeMode, + ts: timestamp, + }), + }); + } catch (err) { + console.error('[price-log-error]', err); + // don't fail the pricing request if logging fails + } + } + + // log in dev + if (process.env.NODE_ENV === 'development') { + console.log('[pricing-api]', { + productId, + sessionId, + experimentId, + storeMode, + price, + timestamp, + }); + } const response: PricingResponse = { price, currency: 'EUR', - cachedAt: new Date().toISOString(), + cachedAt: timestamp, }; return NextResponse.json(response);