diff --git a/experiments/agents/run.py b/experiments/agents/run.py new file mode 100644 index 0000000..823c3d9 --- /dev/null +++ b/experiments/agents/run.py @@ -0,0 +1,117 @@ +from supabase import create_client, Client +import os +import random +import asyncio +import json +from dotenv import load_dotenv + +from experiments.agents.agent import get_agent, AgentTypes +from lib.kafka_client import get_interactions + +load_dotenv() + +RESULTS="/home/velocitatem/Documents/Projects/PHANTOM/experiments/agents/collected_data/" + +client = create_client( + os.getenv("NEXT_PUBLIC_SUPABASE_URL"), + os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") +) +def pick_random_task(): + mode = 'hotel' + tasks = client.table("tasks").select("*").execute().data + if mode == 'hotel': + # drop all that have 'flight' in the description + tasks = [task for task in tasks if 'flight' not in task['task_description'].lower()] + return random.choice(tasks) if tasks else None + +def clear_kafka_data(): + """Delete and recreate Kafka topics to clear all data""" + from kafka.admin import KafkaAdminClient, NewTopic + from kafka.errors import UnknownTopicOrPartitionError + import time + + kafka_host = os.getenv('KAFKA_HOST', 'localhost') + kafka_port = os.getenv('KAFKA_PORT', '9092') + broker = f'{kafka_host}:{kafka_port}' + + admin = KafkaAdminClient(bootstrap_servers=broker) + topics = ['user-interactions', 'price-logs'] + + try: + admin.delete_topics(topics, timeout_ms=5000) + print(f"Deleted topics: {topics}") + time.sleep(2) + except UnknownTopicOrPartitionError: + print("Topics don't exist, skipping delete") + except Exception as e: + print(f"Error deleting topics: {e}") + + new_topics = [ + NewTopic(name='user-interactions', num_partitions=3, replication_factor=1), + NewTopic(name='price-logs', num_partitions=3, replication_factor=1) + ] + + try: + admin.create_topics(new_topics=new_topics, validate_only=False) + print(f"Recreated topics: {topics}") + except Exception as e: + print(f"Error creating topics: {e}") + finally: + admin.close() + +def create_new_experiment(task_id): + import uuid + subject_name = f"agent_{str(uuid.uuid4())[:8]}" + experiment = { + "subject_name": subject_name, + "xp_human_only": False, + "xp_market_mode": "hotel", + "xp_task_id": task_id, + } + response = client.table("experiments").insert(experiment).execute() + return response.data[0] if response.data else None + +if __name__ == "__main__": + clear_kafka_data() + + task = pick_random_task() + if not task: + print("No tasks available") + exit(1) + + experiment = create_new_experiment(task['id']) + exp_id = experiment['id'] + exp_dir = f"{RESULTS}{exp_id}" + os.makedirs(exp_dir, exist_ok=True) + + # construct experiment URL with uuid param + base_url = os.getenv('NEXT_PUBLIC_API_BASE', 'http://localhost:3000') + agent_url = f"{base_url}/start-task?uuid={exp_id}" + + print(f"Created experiment {exp_id} for task {task['id']}") + print(f"Agent will interact with: {agent_url}") + + # instantiate and run agent + agent = get_agent( + AgentTypes.GENERIC_BROWSER_USE_AGENT, + goal=task['task_description'], + url=agent_url, + timeout=300, + headless=True + ) + + result = asyncio.run(agent.act()) + print(f"Agent result: {result}") + + # export interaction and price data from kafka + interactions = get_interactions(topic='user-interactions', timeout_ms=3000) + prices = get_interactions(topic='price-logs', timeout_ms=3000) + + with open(f"{exp_dir}/int.json", 'w') as f: + json.dump(interactions, f, indent=2) + + with open(f"{exp_dir}/price.json", 'w') as f: + json.dump(prices, f, indent=2) + + print(f"Experiment {exp_id} completed.") + print(f"Exported {len(interactions)} interactions and {len(prices)} price logs to {exp_dir}")