Files
llm-eval-forensics/ai_eval.py
2026-01-16 09:18:07 +01:00

468 lines
17 KiB
Python

#!/usr/bin/env python3
"""
AI Model Evaluation Automation Script
Runs comprehensive test suite against OpenAI-compatible API endpoints
"""
import yaml
import json
import requests
import os
import sys
from datetime import datetime
from typing import Dict, List, Any, Optional
from pathlib import Path
import argparse
class AIModelTester:
def __init__(self, endpoint: str, api_key: str, model_name: str, output_dir: str = "results"):
"""
Initialize the AI Model Tester
Args:
endpoint: OpenAI-compatible API endpoint URL
api_key: API key for authentication
model_name: Name/identifier of the model being tested
output_dir: Directory to save results
"""
self.endpoint = endpoint.rstrip('/')
self.api_key = api_key
self.model_name = model_name
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# Results storage
self.results = {
"metadata": {
"model_name": model_name,
"endpoint": endpoint,
"test_start": datetime.now().isoformat(),
"test_end": None,
"total_tests": 0,
"completed_tests": 0
},
"test_results": []
}
# Current test session info
self.current_test_id = None
self.conversation_history = []
def load_test_suite(self, yaml_file: str) -> Dict:
"""Load test suite from YAML file"""
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
print(f"Error: Test suite file not found: {yaml_file}")
print(f"Please ensure {yaml_file} is in the current directory.")
sys.exit(1)
except yaml.YAMLError as e:
print(f"Error: Invalid YAML format in {yaml_file}")
print(f"Details: {e}")
sys.exit(1)
except Exception as e:
print(f"Error loading test suite: {e}")
sys.exit(1)
def call_api(self, messages: List[Dict], temperature: float = 0.7, max_tokens: int = 2000) -> Optional[Dict]:
"""
Call the OpenAI-compatible API
Args:
messages: List of message dicts with 'role' and 'content'
temperature: Sampling temperature
max_tokens: Maximum tokens in response
Returns:
API response dict or None if error
"""
headers = {
"Content-Type": "application/json"
}
# Only add Authorization header if API key is provided
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
payload = {
"model": self.model_name,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
try:
response = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=headers,
json=payload,
timeout=120
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"\n❌ API Error: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response: {e.response.text}")
return None
def display_test_info(self, test: Dict, category: str):
"""Display test information to user"""
print("\n" + "="*80)
print(f"📋 CATEGORY: {category}")
print(f"🆔 Test ID: {test['id']}")
print(f"📝 Test Name: {test['name']}")
print(f"🎯 Type: {test['type']}")
print(f"⚡ Difficulty: {test.get('expected_difficulty', 'N/A')}")
print("="*80)
def display_prompt(self, prompt: str, turn: Optional[int] = None):
"""Display the prompt being sent"""
if turn is not None:
print(f"\n🔄 TURN {turn}:")
else:
print(f"\n💬 PROMPT:")
print("-"*80)
print(prompt)
print("-"*80)
def display_response(self, response_text: str):
"""Display the model's response"""
print(f"\n🤖 MODEL RESPONSE:")
print("-"*80)
print(response_text)
print("-"*80)
def display_evaluation_criteria(self, criteria: List[str]):
"""Display evaluation criteria for the test"""
print(f"\n✅ EVALUATION CRITERIA:")
for i, criterion in enumerate(criteria, 1):
print(f" {i}. {criterion}")
def get_user_score(self) -> Dict:
"""Prompt user for evaluation score"""
print("\n" + "="*80)
print("📊 EVALUATION SCORING RUBRIC:")
print(" 0-1: FAIL - Major errors, fails to meet basic requirements")
print(" 2-3: PASS - Meets requirements with minor issues")
print(" 4-5: EXCEPTIONAL - Exceeds requirements, demonstrates deep understanding")
print("="*80)
while True:
try:
score_input = input("\n👉 Enter score (0-5) or 'skip' to skip this test: ").strip().lower()
if score_input == 'skip':
return {"score": None, "notes": "Skipped by user"}
score = int(score_input)
if 0 <= score <= 5:
notes = input("📝 Notes (optional, press Enter to skip): ").strip()
return {"score": score, "notes": notes if notes else ""}
else:
print("❌ Score must be between 0 and 5")
except ValueError:
print("❌ Invalid input. Please enter a number between 0 and 5, or 'skip'")
except KeyboardInterrupt:
print("\n\n⚠️ Test interrupted by user")
return {"score": None, "notes": "Interrupted"}
def run_single_turn_test(self, test: Dict, category: str) -> Dict:
"""Run a single-turn test"""
self.display_test_info(test, category)
self.display_prompt(test['prompt'])
# Prepare messages
messages = [{"role": "user", "content": test['prompt']}]
# Call API
response = self.call_api(messages)
if response is None:
return {
"test_id": test['id'],
"test_name": test['name'],
"category": category,
"type": "single_turn",
"status": "api_error",
"score": None,
"notes": "API call failed"
}
# Extract response text
response_text = response['choices'][0]['message']['content']
self.display_response(response_text)
# Display evaluation criteria
self.display_evaluation_criteria(test.get('evaluation_criteria', []))
# Get user evaluation
evaluation = self.get_user_score()
return {
"test_id": test['id'],
"test_name": test['name'],
"category": category,
"type": "single_turn",
"difficulty": test.get('expected_difficulty', 'unknown'),
"prompt": test['prompt'],
"response": response_text,
"evaluation_criteria": test.get('evaluation_criteria', []),
"score": evaluation['score'],
"notes": evaluation['notes'],
"status": "completed" if evaluation['score'] is not None else "skipped",
"timestamp": datetime.now().isoformat()
}
def run_multi_turn_test(self, test: Dict, category: str) -> Dict:
"""Run a multi-turn test"""
self.display_test_info(test, category)
# Initialize conversation history
self.conversation_history = []
turn_results = []
for i, turn_data in enumerate(test['turns'], 1):
turn_num = turn_data['turn']
prompt = turn_data['prompt']
self.display_prompt(prompt, turn_num)
# Add to conversation history
self.conversation_history.append({"role": "user", "content": prompt})
# Call API with full conversation history
response = self.call_api(self.conversation_history)
if response is None:
turn_results.append({
"turn": turn_num,
"status": "api_error",
"prompt": prompt,
"response": None
})
break
# Extract and display response
response_text = response['choices'][0]['message']['content']
self.display_response(response_text)
# Add assistant response to history
self.conversation_history.append({"role": "assistant", "content": response_text})
# Display criteria for this turn
self.display_evaluation_criteria(turn_data.get('evaluation_criteria', []))
# Get evaluation for this turn
print(f"\n🎯 Evaluate Turn {turn_num}:")
evaluation = self.get_user_score()
turn_results.append({
"turn": turn_num,
"prompt": prompt,
"response": response_text,
"evaluation_criteria": turn_data.get('evaluation_criteria', []),
"score": evaluation['score'],
"notes": evaluation['notes'],
"status": "completed" if evaluation['score'] is not None else "skipped"
})
if evaluation['score'] is None:
print(f"\n⚠️ Turn {turn_num} skipped, stopping multi-turn test")
break
# Calculate overall score for multi-turn test
valid_scores = [t['score'] for t in turn_results if t['score'] is not None]
overall_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
return {
"test_id": test['id'],
"test_name": test['name'],
"category": category,
"type": "multi_turn",
"difficulty": test.get('expected_difficulty', 'unknown'),
"turns": turn_results,
"overall_score": overall_score,
"status": "completed" if overall_score is not None else "incomplete",
"timestamp": datetime.now().isoformat()
}
def run_test_suite(self, test_suite: Dict, filter_category: Optional[str] = None):
"""Run the complete test suite"""
print("\n" + "="*80)
print(f"🚀 STARTING TEST SUITE")
print(f"📦 Model: {self.model_name}")
print(f"🔗 Endpoint: {self.endpoint}")
print("="*80)
# Count total tests
total_tests = 0
for cat_data in test_suite.get('test_categories', []):
if filter_category and cat_data['category'] != filter_category:
continue
total_tests += len(cat_data.get('tests', []))
self.results['metadata']['total_tests'] = total_tests
# Run tests by category
test_count = 0
for cat_data in test_suite.get('test_categories', []):
category = cat_data['category']
# Apply category filter if specified
if filter_category and category != filter_category:
continue
print(f"\n\n{'='*80}")
print(f"📂 CATEGORY: {category}")
print(f"{'='*80}")
for test in cat_data.get('tests', []):
test_count += 1
print(f"\n📊 Progress: {test_count}/{total_tests}")
# Run appropriate test type
if test.get('type') == 'single_turn':
result = self.run_single_turn_test(test, category)
elif test.get('type') == 'multi_turn':
result = self.run_multi_turn_test(test, category)
else:
print(f"⚠️ Unknown test type: {test.get('type')}")
continue
self.results['test_results'].append(result)
self.results['metadata']['completed_tests'] += 1
# Save after each test (in case of interruption)
self.save_results()
# Mark test suite as complete
self.results['metadata']['test_end'] = datetime.now().isoformat()
self.save_results()
print("\n\n" + "="*80)
print("✅ TEST SUITE COMPLETE")
print("="*80)
self.display_summary()
def save_results(self):
"""Save results to JSON file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.model_name.replace(':', '_')}_{timestamp}.json"
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
# Also save as "latest" for this model
latest_file = self.output_dir / f"{self.model_name.replace(':', '_')}_latest.json"
with open(latest_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
def display_summary(self):
"""Display test summary"""
total = self.results['metadata']['total_tests']
completed = self.results['metadata']['completed_tests']
# Calculate statistics
scores = [r.get('score') or r.get('overall_score')
for r in self.results['test_results']]
scores = [s for s in scores if s is not None]
if scores:
avg_score = sum(scores) / len(scores)
print(f"\n📊 SUMMARY:")
print(f" Total Tests: {total}")
print(f" Completed: {completed}")
print(f" Average Score: {avg_score:.2f}/5.00")
print(f" Pass Rate: {len([s for s in scores if s >= 2]) / len(scores) * 100:.1f}%")
print(f" Exceptional Rate: {len([s for s in scores if s >= 4]) / len(scores) * 100:.1f}%")
print(f"\n💾 Results saved to: {self.output_dir}")
def main():
parser = argparse.ArgumentParser(
description="AI Model Evaluation Test Suite",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Test a single model
python ai_eval.py --endpoint http://localhost:11434 --model qwen3:4b-q4_K_M
# Test with API key
python ai_eval.py --endpoint https://api.example.com --api-key sk-xxx --model qwen3:8b
# Test only forensics category
python ai_eval.py --endpoint http://localhost:11434 --model qwen3:14b --category "IT Forensics - File Systems"
# Test multiple models (run separately)
python ai_eval.py --endpoint http://localhost:11434 --model qwen3:4b-q4_K_M
python ai_eval.py --endpoint http://localhost:11434 --model qwen3:4b-q8_0
python ai_eval.py --endpoint http://localhost:11434 --model qwen3:4b-fp16
"""
)
parser.add_argument(
'--endpoint',
required=True,
help='OpenAI-compatible API endpoint (e.g., http://localhost:11434 for Ollama)'
)
parser.add_argument(
'--api-key',
default='',
help='API key for authentication (optional for local endpoints)'
)
parser.add_argument(
'--model',
required=True,
help='Model name/identifier (e.g., qwen3:4b-q4_K_M)'
)
parser.add_argument(
'--test-suite',
default='test_suite.yaml',
help='Path to test suite YAML file (default: test_suite.yaml)'
)
parser.add_argument(
'--output-dir',
default='results',
help='Directory to save results (default: results)'
)
parser.add_argument(
'--category',
default=None,
help='Filter tests by category (optional)'
)
args = parser.parse_args()
# Initialize tester
tester = AIModelTester(
endpoint=args.endpoint,
api_key=args.api_key,
model_name=args.model,
output_dir=args.output_dir
)
# Load test suite
print(f"📁 Loading test suite from: {args.test_suite}")
test_suite = tester.load_test_suite(args.test_suite)
# Run tests
try:
tester.run_test_suite(test_suite, filter_category=args.category)
except KeyboardInterrupt:
print("\n\n⚠️ Test suite interrupted by user")
tester.results['metadata']['test_end'] = datetime.now().isoformat()
tester.save_results()
print(f"\n💾 Partial results saved to: {tester.output_dir}")
sys.exit(1)
if __name__ == "__main__":
main()