llm-fusion-mcp/test_openai_compatibility.py
Ryan Malloy 80f1ecbf7d
Some checks are pending
🚀 LLM Fusion MCP - CI/CD Pipeline / 📢 Deployment Notification (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.10) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.11) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.12) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🛡️ Security Scanning (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🐳 Docker Build & Push (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🎉 Create Release (push) Blocked by required conditions
🚀 Phase 2 Complete: Universal MCP Tool Orchestrator
Revolutionary architecture that bridges remote LLMs with the entire MCP ecosystem!

## 🌟 Key Features Added:
- Real MCP protocol implementation (STDIO + HTTP servers)
- Hybrid LLM provider system (OpenAI-compatible + Native APIs)
- Unified YAML configuration with environment variable substitution
- Advanced error handling with circuit breakers and provider fallback
- FastAPI HTTP bridge for remote LLM access
- Comprehensive tool & resource discovery system
- Complete test suite with 4 validation levels

## 🔧 Architecture Components:
- `src/llm_fusion_mcp/orchestrator.py` - Main orchestrator with hybrid providers
- `src/llm_fusion_mcp/mcp_client.py` - Full MCP protocol implementation
- `src/llm_fusion_mcp/config.py` - Configuration management system
- `src/llm_fusion_mcp/error_handling.py` - Circuit breaker & retry logic
- `config/orchestrator.yaml` - Unified system configuration

## 🧪 Testing Infrastructure:
- Complete system integration tests (4/4 passed)
- MCP protocol validation tests
- Provider compatibility analysis
- Performance benchmarking suite

🎉 This creates the FIRST system enabling remote LLMs to access
the entire MCP ecosystem through a unified HTTP API!

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 10:01:37 -06:00

512 lines
18 KiB
Python

#!/usr/bin/env python3
"""
OpenAI API Compatibility Testing Script
Tests all LLM providers for OpenAI API compatibility to determine
feasibility of unified client architecture for MCP tool orchestrator.
"""
import asyncio
import json
import os
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from openai import OpenAI
import httpx
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
@dataclass
class CompatibilityResult:
provider: str
feature: str
supported: bool
response_time: Optional[float] = None
error: Optional[str] = None
details: Optional[Dict] = None
class OpenAICompatibilityTester:
def __init__(self):
self.results: List[CompatibilityResult] = []
self.providers_config = {
'openai': {
'base_url': 'https://api.openai.com/v1',
'api_key': os.getenv('OPENAI_API_KEY'),
'model': 'gpt-4o-mini'
},
'gemini': {
'base_url': 'https://generativelanguage.googleapis.com/v1beta/openai/',
'api_key': os.getenv('GOOGLE_API_KEY'),
'model': 'gemini-2.5-flash'
},
'anthropic': {
'base_url': 'https://api.anthropic.com/v1', # Test direct first
'api_key': os.getenv('ANTHROPIC_API_KEY'),
'model': 'claude-3.5-sonnet-20241022'
},
'anthropic_openai': {
'base_url': 'https://api.anthropic.com/v1/openai', # Test OpenAI compatibility
'api_key': os.getenv('ANTHROPIC_API_KEY'),
'model': 'claude-3.5-sonnet-20241022'
},
'grok': {
'base_url': 'https://api.x.ai/v1',
'api_key': os.getenv('XAI_API_KEY'),
'model': 'grok-3'
}
}
def create_client(self, provider: str) -> Optional[OpenAI]:
"""Create OpenAI client for provider"""
config = self.providers_config.get(provider)
if not config or not config['api_key']:
print(f"{provider}: Missing API key")
return None
try:
return OpenAI(
api_key=config['api_key'],
base_url=config['base_url']
)
except Exception as e:
print(f"{provider}: Failed to create client - {e}")
return None
async def test_basic_chat(self, provider: str) -> CompatibilityResult:
"""Test basic chat completions endpoint"""
client = self.create_client(provider)
if not client:
return CompatibilityResult(
provider=provider,
feature="basic_chat",
supported=False,
error="Client creation failed"
)
start_time = time.time()
try:
response = client.chat.completions.create(
model=self.providers_config[provider]['model'],
messages=[
{"role": "user", "content": "Say 'Hello, World!' and nothing else."}
],
max_tokens=20
)
response_time = time.time() - start_time
# Check if response has expected structure
if hasattr(response, 'choices') and len(response.choices) > 0:
content = response.choices[0].message.content
return CompatibilityResult(
provider=provider,
feature="basic_chat",
supported=True,
response_time=response_time,
details={"response": content, "model": response.model}
)
else:
return CompatibilityResult(
provider=provider,
feature="basic_chat",
supported=False,
error="Unexpected response structure"
)
except Exception as e:
response_time = time.time() - start_time
return CompatibilityResult(
provider=provider,
feature="basic_chat",
supported=False,
response_time=response_time,
error=str(e)
)
async def test_streaming(self, provider: str) -> CompatibilityResult:
"""Test streaming chat completions"""
client = self.create_client(provider)
if not client:
return CompatibilityResult(
provider=provider,
feature="streaming",
supported=False,
error="Client creation failed"
)
start_time = time.time()
try:
stream = client.chat.completions.create(
model=self.providers_config[provider]['model'],
messages=[
{"role": "user", "content": "Count from 1 to 3"}
],
stream=True,
max_tokens=50
)
chunks_received = 0
content_pieces = []
for chunk in stream:
chunks_received += 1
if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
content_pieces.append(delta.content)
if chunks_received > 10: # Prevent infinite loops
break
response_time = time.time() - start_time
if chunks_received > 0:
return CompatibilityResult(
provider=provider,
feature="streaming",
supported=True,
response_time=response_time,
details={
"chunks_received": chunks_received,
"content": "".join(content_pieces)
}
)
else:
return CompatibilityResult(
provider=provider,
feature="streaming",
supported=False,
error="No streaming chunks received"
)
except Exception as e:
response_time = time.time() - start_time
return CompatibilityResult(
provider=provider,
feature="streaming",
supported=False,
response_time=response_time,
error=str(e)
)
async def test_function_calling(self, provider: str) -> CompatibilityResult:
"""Test function calling capability"""
client = self.create_client(provider)
if not client:
return CompatibilityResult(
provider=provider,
feature="function_calling",
supported=False,
error="Client creation failed"
)
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather information for a city",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The city name"
}
},
"required": ["city"]
}
}
}
]
start_time = time.time()
try:
response = client.chat.completions.create(
model=self.providers_config[provider]['model'],
messages=[
{"role": "user", "content": "What's the weather in San Francisco?"}
],
tools=tools,
max_tokens=100
)
response_time = time.time() - start_time
# Check if function was called
if (hasattr(response, 'choices') and len(response.choices) > 0 and
hasattr(response.choices[0].message, 'tool_calls') and
response.choices[0].message.tool_calls):
tool_calls = response.choices[0].message.tool_calls
return CompatibilityResult(
provider=provider,
feature="function_calling",
supported=True,
response_time=response_time,
details={
"tool_calls": [
{
"name": call.function.name,
"arguments": call.function.arguments
} for call in tool_calls
]
}
)
else:
return CompatibilityResult(
provider=provider,
feature="function_calling",
supported=False,
error="No function calls in response"
)
except Exception as e:
response_time = time.time() - start_time
return CompatibilityResult(
provider=provider,
feature="function_calling",
supported=False,
response_time=response_time,
error=str(e)
)
async def test_embeddings(self, provider: str) -> CompatibilityResult:
"""Test embeddings endpoint"""
client = self.create_client(provider)
if not client:
return CompatibilityResult(
provider=provider,
feature="embeddings",
supported=False,
error="Client creation failed"
)
start_time = time.time()
try:
# Try common embedding models
embedding_models = {
'openai': 'text-embedding-3-small',
'gemini': 'gemini-embedding-001',
'anthropic': 'text-embedding-3-small', # Might not exist
'anthropic_openai': 'text-embedding-3-small',
'grok': 'text-embedding-3-small' # Unknown
}
model = embedding_models.get(provider, 'text-embedding-3-small')
response = client.embeddings.create(
model=model,
input="Test embedding text"
)
response_time = time.time() - start_time
if hasattr(response, 'data') and len(response.data) > 0:
embedding = response.data[0].embedding
return CompatibilityResult(
provider=provider,
feature="embeddings",
supported=True,
response_time=response_time,
details={
"dimensions": len(embedding),
"model": getattr(response, 'model', 'unknown')
}
)
else:
return CompatibilityResult(
provider=provider,
feature="embeddings",
supported=False,
error="No embedding data in response"
)
except Exception as e:
response_time = time.time() - start_time
return CompatibilityResult(
provider=provider,
feature="embeddings",
supported=False,
response_time=response_time,
error=str(e)
)
async def test_provider_compatibility(self, provider: str):
"""Test all features for a specific provider"""
print(f"\n🧪 Testing {provider}...")
# Test basic chat
result = await self.test_basic_chat(provider)
self.results.append(result)
self.print_result(result)
# Only continue if basic chat works
if not result.supported:
print(f"{provider}: Basic chat failed, skipping other tests")
return
# Test streaming
result = await self.test_streaming(provider)
self.results.append(result)
self.print_result(result)
# Test function calling
result = await self.test_function_calling(provider)
self.results.append(result)
self.print_result(result)
# Test embeddings
result = await self.test_embeddings(provider)
self.results.append(result)
self.print_result(result)
def print_result(self, result: CompatibilityResult):
"""Print formatted test result"""
status = "" if result.supported else ""
timing = f" ({result.response_time:.2f}s)" if result.response_time else ""
error = f" - {result.error}" if result.error else ""
print(f" {status} {result.feature}{timing}{error}")
if result.details:
for key, value in result.details.items():
if isinstance(value, str) and len(value) > 100:
value = value[:100] + "..."
print(f" {key}: {value}")
def generate_report(self):
"""Generate comprehensive compatibility report"""
print("\n" + "="*60)
print("📊 OpenAI API Compatibility Report")
print("="*60)
# Group results by provider
providers = {}
for result in self.results:
if result.provider not in providers:
providers[result.provider] = {}
providers[result.provider][result.feature] = result
# Print summary matrix
features = ["basic_chat", "streaming", "function_calling", "embeddings"]
print(f"\n{'Provider':<15} {'Chat':<6} {'Stream':<8} {'Functions':<11} {'Embeddings':<11}")
print("-" * 60)
for provider, results in providers.items():
row = f"{provider:<15}"
for feature in features:
result = results.get(feature)
if result:
status = "" if result.supported else ""
timing = f"({result.response_time:.1f}s)" if result.response_time else ""
cell = f"{status:<6}" if not timing else f"{status}{timing}"
row += f" {cell:<10}"
else:
row += f" {'⏸️':<10}"
print(row)
# Detailed findings
print(f"\n📋 Detailed Findings:")
print("-" * 30)
for provider, results in providers.items():
print(f"\n🔍 {provider.upper()}:")
supported_features = []
failed_features = []
for feature, result in results.items():
if result.supported:
supported_features.append(feature)
else:
failed_features.append(f"{feature} ({result.error})")
if supported_features:
print(f" ✅ Supported: {', '.join(supported_features)}")
if failed_features:
print(f" ❌ Failed: {', '.join(failed_features)}")
# Architecture recommendations
print(f"\n💡 Architecture Recommendations:")
print("-" * 35)
fully_compatible = []
partially_compatible = []
not_compatible = []
for provider, results in providers.items():
basic_chat = results.get('basic_chat', CompatibilityResult(provider, 'basic_chat', False))
if basic_chat.supported:
supported_count = sum(1 for r in results.values() if r.supported)
total_count = len(results)
if supported_count == total_count:
fully_compatible.append(provider)
elif supported_count >= 2: # Chat + at least one other feature
partially_compatible.append(provider)
else:
not_compatible.append(provider)
else:
not_compatible.append(provider)
if fully_compatible:
print(f" 🎯 Fully Compatible (OpenAI-first): {', '.join(fully_compatible)}")
if partially_compatible:
print(f" ⚡ Partially Compatible (Hybrid): {', '.join(partially_compatible)}")
if not_compatible:
print(f" 🔧 Needs Native Implementation: {', '.join(not_compatible)}")
# Final recommendation
if len(fully_compatible) >= 3:
print(f"\n✅ RECOMMENDATION: Use OpenAI-first architecture")
print(f" Most providers support OpenAI interface well")
elif len(fully_compatible) + len(partially_compatible) >= 3:
print(f"\n⚡ RECOMMENDATION: Use hybrid architecture")
print(f" Mix of OpenAI interface and native clients")
else:
print(f"\n🔧 RECOMMENDATION: Use provider-specific implementations")
print(f" Limited OpenAI compatibility, native APIs preferred")
async def main():
"""Run compatibility tests for all providers"""
tester = OpenAICompatibilityTester()
print("🚀 Starting OpenAI API Compatibility Tests")
print("Testing providers: OpenAI, Gemini, Anthropic, Grok")
# Test each provider
for provider in tester.providers_config.keys():
try:
await tester.test_provider_compatibility(provider)
except Exception as e:
print(f"{provider}: Unexpected error during testing - {e}")
# Generate comprehensive report
tester.generate_report()
# Save results to file
results_data = [
{
'provider': r.provider,
'feature': r.feature,
'supported': r.supported,
'response_time': r.response_time,
'error': r.error,
'details': r.details
}
for r in tester.results
]
with open('openai_compatibility_results.json', 'w') as f:
json.dump(results_data, f, indent=2, default=str)
print(f"\n💾 Results saved to openai_compatibility_results.json")
if __name__ == "__main__":
asyncio.run(main())