Files
PHANTOM/experiments/agents/run.py

118 lines
3.7 KiB
Python

from supabase import create_client, Client
import os
import random
import asyncio
import json
from dotenv import load_dotenv
from experiments.agents.agent import get_agent, AgentTypes
from lib.kafka_client import get_interactions
load_dotenv()
RESULTS="/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/"
client = create_client(
os.getenv("NEXT_PUBLIC_SUPABASE_URL"),
os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
)
def pick_random_task():
mode = 'hotel'
tasks = client.table("tasks").select("*").execute().data
if mode == 'hotel':
# drop all that have 'flight' in the description
tasks = [task for task in tasks if 'flight' not in task['task_description'].lower()]
return random.choice(tasks) if tasks else None
def clear_kafka_data():
"""Delete and recreate Kafka topics to clear all data"""
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import UnknownTopicOrPartitionError
import time
kafka_host = os.getenv('KAFKA_HOST', 'localhost')
kafka_port = os.getenv('KAFKA_PORT', '9092')
broker = f'{kafka_host}:{kafka_port}'
admin = KafkaAdminClient(bootstrap_servers=broker)
topics = ['user-interactions', 'price-logs']
try:
admin.delete_topics(topics, timeout_ms=5000)
print(f"Deleted topics: {topics}")
time.sleep(2)
except UnknownTopicOrPartitionError:
print("Topics don't exist, skipping delete")
except Exception as e:
print(f"Error deleting topics: {e}")
new_topics = [
NewTopic(name='user-interactions', num_partitions=3, replication_factor=1),
NewTopic(name='price-logs', num_partitions=3, replication_factor=1)
]
try:
admin.create_topics(new_topics=new_topics, validate_only=False)
print(f"Recreated topics: {topics}")
except Exception as e:
print(f"Error creating topics: {e}")
finally:
admin.close()
def create_new_experiment(task_id):
import uuid
subject_name = f"agent_{str(uuid.uuid4())[:8]}"
experiment = {
"subject_name": subject_name,
"xp_human_only": False,
"xp_market_mode": "hotel",
"xp_task_id": task_id,
}
response = client.table("experiments").insert(experiment).execute()
return response.data[0] if response.data else None
if __name__ == "__main__":
clear_kafka_data()
task = pick_random_task()
if not task:
print("No tasks available")
exit(1)
experiment = create_new_experiment(task['id'])
exp_id = experiment['id']
exp_dir = f"{RESULTS}{exp_id}"
os.makedirs(exp_dir, exist_ok=True)
# construct experiment URL with uuid param
base_url = os.getenv('NEXT_PUBLIC_API_BASE', 'http://localhost:3000')
agent_url = f"{base_url}/start-task?uuid={exp_id}"
print(f"Created experiment {exp_id} for task {task['id']}")
print(f"Agent will interact with: {agent_url}")
# instantiate and run agent
agent = get_agent(
AgentTypes.GENERIC_BROWSER_USE_AGENT,
goal=task['task_description'],
url=agent_url,
timeout=300,
headless=True
)
result = asyncio.run(agent.act())
print(f"Agent result: {result}")
# export interaction and price data from kafka
interactions = get_interactions(topic='user-interactions', timeout_ms=3000)
prices = get_interactions(topic='price-logs', timeout_ms=3000)
with open(f"{exp_dir}/int.json", 'w') as f:
json.dump(interactions, f, indent=2)
with open(f"{exp_dir}/price.json", 'w') as f:
json.dump(prices, f, indent=2)
print(f"Experiment {exp_id} completed.")
print(f"Exported {len(interactions)} interactions and {len(prices)} price logs to {exp_dir}")