feat: integration of pipeline hooks into testing

This commit is contained in:
2026-01-12 13:37:48 +01:00
parent 9a8525a854
commit acf731efcb
9 changed files with 58 additions and 19 deletions

View File

@@ -48,8 +48,10 @@ test.backend: $(VENV)
test.e2e: test.e2e:
@cd tests/e2e && npm install @cd tests/e2e && npm install
@cd tests/e2e && npx playwright install chromium @cd tests/e2e && npx playwright install chromium
@test -f tests/e2e/.env || cp tests/e2e/.env.example tests/e2e/.env
@timeout 30 bash -c 'until curl -sf http://localhost:5000/health > /dev/null 2>&1; do sleep 1; done' || (echo "Backend not ready" && exit 1) @timeout 30 bash -c 'until curl -sf http://localhost:5000/health > /dev/null 2>&1; do sleep 1; done' || (echo "Backend not ready" && exit 1)
@timeout 30 bash -c 'until curl -sf http://localhost:3000 > /dev/null 2>&1; do sleep 1; done' || (echo "Web app not ready" && exit 1) @timeout 30 bash -c 'until curl -sf http://localhost:3000 > /dev/null 2>&1; do sleep 1; done' || (echo "Web app not ready" && exit 1)
@timeout 30 bash -c 'until curl -sf http://localhost:8085/health > /dev/null 2>&1; do sleep 1; done' || (echo "Airflow not ready" && exit 1)
@cd tests/e2e && npm test @cd tests/e2e && npm test
.PHONY: test.all .PHONY: test.all

View File

@@ -198,12 +198,16 @@ def dump_logs(
auto_offset_reset='earliest', auto_offset_reset='earliest',
enable_auto_commit=False, enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')), value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=5000 consumer_timeout_ms=30000,
fetch_max_wait_ms=10000,
max_poll_records=1000
) )
events = [] events = []
for msg in consumer: for msg in consumer:
events.append(msg.value) events.append(msg.value)
if last_n and len(events) >= last_n * 2:
break
consumer.close() consumer.close()

View File

@@ -144,6 +144,7 @@ services:
- AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true
- AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY} - AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY}
- AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth
- KAFKA_HOST=kafka - KAFKA_HOST=kafka
- KAFKA_PORT=29092 - KAFKA_PORT=29092
- BACKEND_URL=http://backend:5000 - BACKEND_URL=http://backend:5000
@@ -180,6 +181,7 @@ services:
- AIRFLOW__CORE__LOAD_EXAMPLES=false - AIRFLOW__CORE__LOAD_EXAMPLES=false
- AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
- AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY} - AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY}
- AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth
- KAFKA_HOST=kafka - KAFKA_HOST=kafka
- KAFKA_PORT=29092 - KAFKA_PORT=29092
- BACKEND_URL=http://backend:5000 - BACKEND_URL=http://backend:5000

View File

@@ -120,15 +120,31 @@ def apply_surge_pricing(**kwargs):
# rename demand_score to demand for pricer compatibility # rename demand_score to demand for pricer compatibility
data = product_features.rename(columns={'demand_score': 'demand'}) data = product_features.rename(columns={'demand_score': 'demand'})
high_thresh = dag_conf.get('high_threshold', 10)
low_thresh = dag_conf.get('low_threshold', 2)
surge_mult = dag_conf.get('surge_multiplier', 1.2)
discount_mult = dag_conf.get('discount_multiplier', 0.9)
logging.info(f"Surge pricing config: high_thresh={high_thresh}, low_thresh={low_thresh}, surge_mult={surge_mult}, discount_mult={discount_mult}")
logging.info(f"Demand stats: min={data['demand'].min():.2f}, max={data['demand'].max():.2f}, mean={data['demand'].mean():.2f}")
logging.info(f"Products with high demand (>={high_thresh}): {(data['demand'] >= high_thresh).sum()}")
logging.info(f"Products with low demand (<={low_thresh}): {(data['demand'] <= low_thresh).sum()}")
surge_pricer = SimpleSurgePricer( surge_pricer = SimpleSurgePricer(
high_threshold=dag_conf.get('high_threshold', 10), high_threshold=high_thresh,
low_threshold=dag_conf.get('low_threshold', 2), low_threshold=low_thresh,
surge_multiplier=dag_conf.get('surge_multiplier', 1.2), surge_multiplier=surge_mult,
discount_multiplier=dag_conf.get('discount_multiplier', 0.9) discount_multiplier=discount_mult
) )
surge_pricer.fit(data) surge_pricer.fit(data)
data['optimal_price'] = surge_pricer.predict() data['optimal_price'] = surge_pricer.predict()
base_avg = data['base_price'].mean()
optimal_avg = data['optimal_price'].mean()
price_change_pct = ((optimal_avg - base_avg) / base_avg) * 100
logging.info(f"Price adjustment: base_avg={base_avg:.2f}, optimal_avg={optimal_avg:.2f}, change={price_change_pct:+.1f}%")
prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={ prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={
'price': 'current_price', 'price': 'current_price',
'demand': 'demand_score' 'demand': 'demand_score'

View File

@@ -124,7 +124,8 @@ class SimpleSurgePricer(PricingFunction):
if base is None: if base is None:
base = np.ones(len(demand)) * 99.99 base = np.ones(len(demand)) * 99.99
new_prices = base.copy() # ensure float dtype to allow multiplication by float multipliers
new_prices = base.astype(np.float64).copy()
high_mask = demand >= self.high_threshold high_mask = demand >= self.high_threshold
new_prices[high_mask] *= self.surge_multiplier new_prices[high_mask] *= self.surge_multiplier

View File

@@ -9,8 +9,8 @@ interface InteractionEvent {
const dumpKafkaTopic = async (backendUrl: string, topic: string) => { const dumpKafkaTopic = async (backendUrl: string, topic: string) => {
const resp = await fetch(`${backendUrl}/api/kafka/dump?topic=${topic}`); const resp = await fetch(`${backendUrl}/api/kafka/dump?topic=${topic}`);
if (!resp.ok) throw new Error(`Kafka dump failed: ${resp.status}`); if (!resp.ok) throw new Error(`Kafka dump failed: ${resp.status}`);
const { messages = [] } = await resp.json(); const { data = [] } = await resp.json();
return messages as any[]; return data as any[];
}; };
export const waitForInteractionEvent = async ( export const waitForInteractionEvent = async (

View File

@@ -5,14 +5,14 @@ export default defineConfig({
fullyParallel: true, fullyParallel: true,
forbidOnly: !!process.env.CI, forbidOnly: !!process.env.CI,
retries: 0, retries: 0,
workers: 5, workers: 1,
reporter: 'list', reporter: 'list',
use: { use: {
baseURL: process.env.WEB_URL || 'http://localhost:3000', baseURL: process.env.WEB_URL || 'http://localhost:3000',
trace: 'retain-on-failure', trace: 'retain-on-failure',
screenshot: 'only-on-failure', screenshot: 'only-on-failure',
}, },
timeout: 60000, timeout: 180000,
expect: { expect: {
timeout: 10000, timeout: 10000,
}, },

View File

@@ -9,6 +9,7 @@ import {
addToCart, addToCart,
} from '../helpers/interactions'; } from '../helpers/interactions';
import { getSessionEvents } from '../helpers/kafka'; import { getSessionEvents } from '../helpers/kafka';
import { runSessionPricing } from '../helpers/airflow';
test.describe('SessionAwarePricer E2E', () => { test.describe('SessionAwarePricer E2E', () => {
const STORE_TYPE = 'hotel'; const STORE_TYPE = 'hotel';
@@ -23,6 +24,9 @@ test.describe('SessionAwarePricer E2E', () => {
await page.waitForTimeout(1500); await page.waitForTimeout(1500);
const productId2 = await humanLikeViewProduct(page, STORE_TYPE); const productId2 = await humanLikeViewProduct(page, STORE_TYPE);
await runSessionPricing(STORE_TYPE);
const secondPrice = await getPriceFromDOM(page); const secondPrice = await getPriceFromDOM(page);
expect(await verifySessionConsistency(page, sessionId)).toBeTruthy(); expect(await verifySessionConsistency(page, sessionId)).toBeTruthy();
@@ -40,11 +44,13 @@ test.describe('SessionAwarePricer E2E', () => {
await rapidViewProductViaFlow(page, 8, 100, STORE_TYPE); await rapidViewProductViaFlow(page, 8, 100, STORE_TYPE);
expect(await verifySessionConsistency(page, sessionId)).toBeTruthy(); expect(await verifySessionConsistency(page, sessionId)).toBeTruthy();
await page.waitForTimeout(2500); await page.waitForTimeout(1000);
const events = await getSessionEvents(backendUrl, sessionId); const events = await getSessionEvents(backendUrl, sessionId);
expect(events.length).toBeGreaterThanOrEqual(8); expect(events.length).toBeGreaterThanOrEqual(8);
await runSessionPricing(STORE_TYPE);
await page.goto(`/products/${productId}`); await page.goto(`/products/${productId}`);
await page.waitForLoadState('networkidle'); await page.waitForLoadState('networkidle');
const agentPrice = await getPriceFromDOM(page); const agentPrice = await getPriceFromDOM(page);
@@ -59,14 +65,12 @@ test.describe('SessionAwarePricer E2E', () => {
const productId = await viewProductViaFlow(page, STORE_TYPE); const productId = await viewProductViaFlow(page, STORE_TYPE);
const baselinePrice = await getPriceFromDOM(page); const baselinePrice = await getPriceFromDOM(page);
const startTime = Date.now();
await rapidViewProductViaFlow(page, 10, 80, STORE_TYPE); await rapidViewProductViaFlow(page, 10, 80, STORE_TYPE);
const duration = (Date.now() - startTime) / 1000;
const eventsPerSec = 10 / duration; const events = await getSessionEvents(backendUrl, sessionId);
expect(eventsPerSec).toBeGreaterThan(2.0); expect(events.length).toBeGreaterThanOrEqual(10);
await page.waitForTimeout(2000); await runSessionPricing(STORE_TYPE);
await page.goto(`/products/${productId}`); await page.goto(`/products/${productId}`);
await page.waitForLoadState('networkidle'); await page.waitForLoadState('networkidle');
@@ -105,8 +109,11 @@ test.describe('SessionAwarePricer E2E', () => {
await rapidViewProductViaFlow(page, 2, 150, STORE_TYPE); await rapidViewProductViaFlow(page, 2, 150, STORE_TYPE);
await page.waitForTimeout(1500); await page.waitForTimeout(1000);
await humanLikeViewProduct(page, STORE_TYPE); await humanLikeViewProduct(page, STORE_TYPE);
await runSessionPricing(STORE_TYPE);
const finalPrice = await getPriceFromDOM(page); const finalPrice = await getPriceFromDOM(page);
expect(Math.abs(finalPrice - baselinePrice) / baselinePrice).toBeLessThan(0.3); expect(Math.abs(finalPrice - baselinePrice) / baselinePrice).toBeLessThan(0.3);

View File

@@ -7,6 +7,7 @@ import {
verifySessionConsistency, verifySessionConsistency,
} from '../helpers/interactions'; } from '../helpers/interactions';
import { waitForInteractionEvent, countProductViews } from '../helpers/kafka'; import { waitForInteractionEvent, countProductViews } from '../helpers/kafka';
import { runSurgePricing } from '../helpers/airflow';
test.describe('SimpleSurgePricer E2E', () => { test.describe('SimpleSurgePricer E2E', () => {
const STORE_TYPE = 'hotel'; const STORE_TYPE = 'hotel';
@@ -29,7 +30,7 @@ test.describe('SimpleSurgePricer E2E', () => {
await rapidViewProductViaFlow(page, 5, 200, STORE_TYPE); await rapidViewProductViaFlow(page, 5, 200, STORE_TYPE);
await page.waitForTimeout(2000); await page.waitForTimeout(1000);
const evt = await waitForInteractionEvent(backendUrl, sessionId, 'view_item_page'); const evt = await waitForInteractionEvent(backendUrl, sessionId, 'view_item_page');
expect(evt).not.toBeNull(); expect(evt).not.toBeNull();
@@ -37,6 +38,8 @@ test.describe('SimpleSurgePricer E2E', () => {
const viewCount = await countProductViews(backendUrl, productId); const viewCount = await countProductViews(backendUrl, productId);
expect(viewCount).toBeGreaterThanOrEqual(5); expect(viewCount).toBeGreaterThanOrEqual(5);
await runSurgePricing(STORE_TYPE, 3, 1);
await page.goto(`/products/${productId}`); await page.goto(`/products/${productId}`);
await page.waitForLoadState('networkidle'); await page.waitForLoadState('networkidle');
const surgedPrice = await getPriceFromDOM(page); const surgedPrice = await getPriceFromDOM(page);
@@ -72,7 +75,9 @@ test.describe('SimpleSurgePricer E2E', () => {
await rapidViewProductViaFlow(page, 5, 150, STORE_TYPE); await rapidViewProductViaFlow(page, 5, 150, STORE_TYPE);
await page.waitForTimeout(1500); await page.waitForTimeout(1000);
await runSurgePricing(STORE_TYPE, 3, 1);
await page.goto(`/products/${productId}`); await page.goto(`/products/${productId}`);
await page.waitForLoadState('networkidle'); await page.waitForLoadState('networkidle');
@@ -81,6 +86,8 @@ test.describe('SimpleSurgePricer E2E', () => {
await page.waitForTimeout(12000); await page.waitForTimeout(12000);
await runSurgePricing(STORE_TYPE, 3, 1);
await page.goto(`/products/${productId}`); await page.goto(`/products/${productId}`);
await page.waitForLoadState('networkidle'); await page.waitForLoadState('networkidle');
const decayedPrice = await getPriceFromDOM(page); const decayedPrice = await getPriceFromDOM(page);