mirror of
https://github.com/velocitatem/PHANTOM.git
synced 2026-05-31 08:33:36 +00:00
feat: integration of pipeline hooks into testing
This commit is contained in:
2
Makefile
2
Makefile
@@ -49,8 +49,10 @@ test.backend: $(VENV)
|
||||
test.e2e:
|
||||
@cd tests/e2e && npm install
|
||||
@cd tests/e2e && npx playwright install chromium
|
||||
@test -f tests/e2e/.env || cp tests/e2e/.env.example tests/e2e/.env
|
||||
@timeout 30 bash -c 'until curl -sf http://localhost:5000/health > /dev/null 2>&1; do sleep 1; done' || (echo "Backend not ready" && exit 1)
|
||||
@timeout 30 bash -c 'until curl -sf http://localhost:3000 > /dev/null 2>&1; do sleep 1; done' || (echo "Web app not ready" && exit 1)
|
||||
@timeout 30 bash -c 'until curl -sf http://localhost:8085/health > /dev/null 2>&1; do sleep 1; done' || (echo "Airflow not ready" && exit 1)
|
||||
@cd tests/e2e && npm test
|
||||
|
||||
.PHONY: test.all
|
||||
|
||||
@@ -198,12 +198,16 @@ def dump_logs(
|
||||
auto_offset_reset='earliest',
|
||||
enable_auto_commit=False,
|
||||
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
|
||||
consumer_timeout_ms=5000
|
||||
consumer_timeout_ms=30000,
|
||||
fetch_max_wait_ms=10000,
|
||||
max_poll_records=1000
|
||||
)
|
||||
|
||||
events = []
|
||||
for msg in consumer:
|
||||
events.append(msg.value)
|
||||
if last_n and len(events) >= last_n * 2:
|
||||
break
|
||||
|
||||
consumer.close()
|
||||
|
||||
|
||||
@@ -144,6 +144,7 @@ services:
|
||||
- AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
|
||||
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true
|
||||
- AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY}
|
||||
- AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth
|
||||
- KAFKA_HOST=kafka
|
||||
- KAFKA_PORT=29092
|
||||
- BACKEND_URL=http://backend:5000
|
||||
@@ -180,6 +181,7 @@ services:
|
||||
- AIRFLOW__CORE__LOAD_EXAMPLES=false
|
||||
- AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
|
||||
- AIRFLOW__WEBSERVER__SECRET_KEY=${AIRFLOW_SECRET_KEY}
|
||||
- AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth
|
||||
- KAFKA_HOST=kafka
|
||||
- KAFKA_PORT=29092
|
||||
- BACKEND_URL=http://backend:5000
|
||||
|
||||
@@ -120,15 +120,31 @@ def apply_surge_pricing(**kwargs):
|
||||
# rename demand_score to demand for pricer compatibility
|
||||
data = product_features.rename(columns={'demand_score': 'demand'})
|
||||
|
||||
high_thresh = dag_conf.get('high_threshold', 10)
|
||||
low_thresh = dag_conf.get('low_threshold', 2)
|
||||
surge_mult = dag_conf.get('surge_multiplier', 1.2)
|
||||
discount_mult = dag_conf.get('discount_multiplier', 0.9)
|
||||
|
||||
logging.info(f"Surge pricing config: high_thresh={high_thresh}, low_thresh={low_thresh}, surge_mult={surge_mult}, discount_mult={discount_mult}")
|
||||
logging.info(f"Demand stats: min={data['demand'].min():.2f}, max={data['demand'].max():.2f}, mean={data['demand'].mean():.2f}")
|
||||
logging.info(f"Products with high demand (>={high_thresh}): {(data['demand'] >= high_thresh).sum()}")
|
||||
logging.info(f"Products with low demand (<={low_thresh}): {(data['demand'] <= low_thresh).sum()}")
|
||||
|
||||
surge_pricer = SimpleSurgePricer(
|
||||
high_threshold=dag_conf.get('high_threshold', 10),
|
||||
low_threshold=dag_conf.get('low_threshold', 2),
|
||||
surge_multiplier=dag_conf.get('surge_multiplier', 1.2),
|
||||
discount_multiplier=dag_conf.get('discount_multiplier', 0.9)
|
||||
high_threshold=high_thresh,
|
||||
low_threshold=low_thresh,
|
||||
surge_multiplier=surge_mult,
|
||||
discount_multiplier=discount_mult
|
||||
)
|
||||
surge_pricer.fit(data)
|
||||
data['optimal_price'] = surge_pricer.predict()
|
||||
|
||||
base_avg = data['base_price'].mean()
|
||||
optimal_avg = data['optimal_price'].mean()
|
||||
price_change_pct = ((optimal_avg - base_avg) / base_avg) * 100
|
||||
|
||||
logging.info(f"Price adjustment: base_avg={base_avg:.2f}, optimal_avg={optimal_avg:.2f}, change={price_change_pct:+.1f}%")
|
||||
|
||||
prices_df = data[['productId', 'price', 'base_price', 'optimal_price', 'demand']].rename(columns={
|
||||
'price': 'current_price',
|
||||
'demand': 'demand_score'
|
||||
|
||||
@@ -124,7 +124,8 @@ class SimpleSurgePricer(PricingFunction):
|
||||
if base is None:
|
||||
base = np.ones(len(demand)) * 99.99
|
||||
|
||||
new_prices = base.copy()
|
||||
# ensure float dtype to allow multiplication by float multipliers
|
||||
new_prices = base.astype(np.float64).copy()
|
||||
high_mask = demand >= self.high_threshold
|
||||
new_prices[high_mask] *= self.surge_multiplier
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ interface InteractionEvent {
|
||||
const dumpKafkaTopic = async (backendUrl: string, topic: string) => {
|
||||
const resp = await fetch(`${backendUrl}/api/kafka/dump?topic=${topic}`);
|
||||
if (!resp.ok) throw new Error(`Kafka dump failed: ${resp.status}`);
|
||||
const { messages = [] } = await resp.json();
|
||||
return messages as any[];
|
||||
const { data = [] } = await resp.json();
|
||||
return data as any[];
|
||||
};
|
||||
|
||||
export const waitForInteractionEvent = async (
|
||||
|
||||
@@ -5,14 +5,14 @@ export default defineConfig({
|
||||
fullyParallel: true,
|
||||
forbidOnly: !!process.env.CI,
|
||||
retries: 0,
|
||||
workers: 5,
|
||||
workers: 1,
|
||||
reporter: 'list',
|
||||
use: {
|
||||
baseURL: process.env.WEB_URL || 'http://localhost:3000',
|
||||
trace: 'retain-on-failure',
|
||||
screenshot: 'only-on-failure',
|
||||
},
|
||||
timeout: 60000,
|
||||
timeout: 180000,
|
||||
expect: {
|
||||
timeout: 10000,
|
||||
},
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
addToCart,
|
||||
} from '../helpers/interactions';
|
||||
import { getSessionEvents } from '../helpers/kafka';
|
||||
import { runSessionPricing } from '../helpers/airflow';
|
||||
|
||||
test.describe('SessionAwarePricer E2E', () => {
|
||||
const STORE_TYPE = 'hotel';
|
||||
@@ -23,6 +24,9 @@ test.describe('SessionAwarePricer E2E', () => {
|
||||
await page.waitForTimeout(1500);
|
||||
|
||||
const productId2 = await humanLikeViewProduct(page, STORE_TYPE);
|
||||
|
||||
await runSessionPricing(STORE_TYPE);
|
||||
|
||||
const secondPrice = await getPriceFromDOM(page);
|
||||
expect(await verifySessionConsistency(page, sessionId)).toBeTruthy();
|
||||
|
||||
@@ -40,11 +44,13 @@ test.describe('SessionAwarePricer E2E', () => {
|
||||
await rapidViewProductViaFlow(page, 8, 100, STORE_TYPE);
|
||||
expect(await verifySessionConsistency(page, sessionId)).toBeTruthy();
|
||||
|
||||
await page.waitForTimeout(2500);
|
||||
await page.waitForTimeout(1000);
|
||||
|
||||
const events = await getSessionEvents(backendUrl, sessionId);
|
||||
expect(events.length).toBeGreaterThanOrEqual(8);
|
||||
|
||||
await runSessionPricing(STORE_TYPE);
|
||||
|
||||
await page.goto(`/products/${productId}`);
|
||||
await page.waitForLoadState('networkidle');
|
||||
const agentPrice = await getPriceFromDOM(page);
|
||||
@@ -59,14 +65,12 @@ test.describe('SessionAwarePricer E2E', () => {
|
||||
const productId = await viewProductViaFlow(page, STORE_TYPE);
|
||||
const baselinePrice = await getPriceFromDOM(page);
|
||||
|
||||
const startTime = Date.now();
|
||||
await rapidViewProductViaFlow(page, 10, 80, STORE_TYPE);
|
||||
const duration = (Date.now() - startTime) / 1000;
|
||||
|
||||
const eventsPerSec = 10 / duration;
|
||||
expect(eventsPerSec).toBeGreaterThan(2.0);
|
||||
const events = await getSessionEvents(backendUrl, sessionId);
|
||||
expect(events.length).toBeGreaterThanOrEqual(10);
|
||||
|
||||
await page.waitForTimeout(2000);
|
||||
await runSessionPricing(STORE_TYPE);
|
||||
|
||||
await page.goto(`/products/${productId}`);
|
||||
await page.waitForLoadState('networkidle');
|
||||
@@ -105,8 +109,11 @@ test.describe('SessionAwarePricer E2E', () => {
|
||||
|
||||
await rapidViewProductViaFlow(page, 2, 150, STORE_TYPE);
|
||||
|
||||
await page.waitForTimeout(1500);
|
||||
await page.waitForTimeout(1000);
|
||||
await humanLikeViewProduct(page, STORE_TYPE);
|
||||
|
||||
await runSessionPricing(STORE_TYPE);
|
||||
|
||||
const finalPrice = await getPriceFromDOM(page);
|
||||
|
||||
expect(Math.abs(finalPrice - baselinePrice) / baselinePrice).toBeLessThan(0.3);
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
verifySessionConsistency,
|
||||
} from '../helpers/interactions';
|
||||
import { waitForInteractionEvent, countProductViews } from '../helpers/kafka';
|
||||
import { runSurgePricing } from '../helpers/airflow';
|
||||
|
||||
test.describe('SimpleSurgePricer E2E', () => {
|
||||
const STORE_TYPE = 'hotel';
|
||||
@@ -29,7 +30,7 @@ test.describe('SimpleSurgePricer E2E', () => {
|
||||
|
||||
await rapidViewProductViaFlow(page, 5, 200, STORE_TYPE);
|
||||
|
||||
await page.waitForTimeout(2000);
|
||||
await page.waitForTimeout(1000);
|
||||
|
||||
const evt = await waitForInteractionEvent(backendUrl, sessionId, 'view_item_page');
|
||||
expect(evt).not.toBeNull();
|
||||
@@ -37,6 +38,8 @@ test.describe('SimpleSurgePricer E2E', () => {
|
||||
const viewCount = await countProductViews(backendUrl, productId);
|
||||
expect(viewCount).toBeGreaterThanOrEqual(5);
|
||||
|
||||
await runSurgePricing(STORE_TYPE, 3, 1);
|
||||
|
||||
await page.goto(`/products/${productId}`);
|
||||
await page.waitForLoadState('networkidle');
|
||||
const surgedPrice = await getPriceFromDOM(page);
|
||||
@@ -72,7 +75,9 @@ test.describe('SimpleSurgePricer E2E', () => {
|
||||
|
||||
await rapidViewProductViaFlow(page, 5, 150, STORE_TYPE);
|
||||
|
||||
await page.waitForTimeout(1500);
|
||||
await page.waitForTimeout(1000);
|
||||
|
||||
await runSurgePricing(STORE_TYPE, 3, 1);
|
||||
|
||||
await page.goto(`/products/${productId}`);
|
||||
await page.waitForLoadState('networkidle');
|
||||
@@ -81,6 +86,8 @@ test.describe('SimpleSurgePricer E2E', () => {
|
||||
|
||||
await page.waitForTimeout(12000);
|
||||
|
||||
await runSurgePricing(STORE_TYPE, 3, 1);
|
||||
|
||||
await page.goto(`/products/${productId}`);
|
||||
await page.waitForLoadState('networkidle');
|
||||
const decayedPrice = await getPriceFromDOM(page);
|
||||
|
||||
Reference in New Issue
Block a user