Some checks are pending
🚀 LLM Fusion MCP - CI/CD Pipeline / 📢 Deployment Notification (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.10) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.11) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🔍 Code Quality & Testing (3.12) (push) Waiting to run
🚀 LLM Fusion MCP - CI/CD Pipeline / 🛡️ Security Scanning (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🐳 Docker Build & Push (push) Blocked by required conditions
🚀 LLM Fusion MCP - CI/CD Pipeline / 🎉 Create Release (push) Blocked by required conditions
Revolutionary architecture that bridges remote LLMs with the entire MCP ecosystem! ## 🌟 Key Features Added: - Real MCP protocol implementation (STDIO + HTTP servers) - Hybrid LLM provider system (OpenAI-compatible + Native APIs) - Unified YAML configuration with environment variable substitution - Advanced error handling with circuit breakers and provider fallback - FastAPI HTTP bridge for remote LLM access - Comprehensive tool & resource discovery system - Complete test suite with 4 validation levels ## 🔧 Architecture Components: - `src/llm_fusion_mcp/orchestrator.py` - Main orchestrator with hybrid providers - `src/llm_fusion_mcp/mcp_client.py` - Full MCP protocol implementation - `src/llm_fusion_mcp/config.py` - Configuration management system - `src/llm_fusion_mcp/error_handling.py` - Circuit breaker & retry logic - `config/orchestrator.yaml` - Unified system configuration ## 🧪 Testing Infrastructure: - Complete system integration tests (4/4 passed) - MCP protocol validation tests - Provider compatibility analysis - Performance benchmarking suite 🎉 This creates the FIRST system enabling remote LLMs to access the entire MCP ecosystem through a unified HTTP API! 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
512 lines
18 KiB
Python
512 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OpenAI API Compatibility Testing Script
|
|
|
|
Tests all LLM providers for OpenAI API compatibility to determine
|
|
feasibility of unified client architecture for MCP tool orchestrator.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass
|
|
from openai import OpenAI
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
@dataclass
|
|
class CompatibilityResult:
|
|
provider: str
|
|
feature: str
|
|
supported: bool
|
|
response_time: Optional[float] = None
|
|
error: Optional[str] = None
|
|
details: Optional[Dict] = None
|
|
|
|
class OpenAICompatibilityTester:
|
|
def __init__(self):
|
|
self.results: List[CompatibilityResult] = []
|
|
self.providers_config = {
|
|
'openai': {
|
|
'base_url': 'https://api.openai.com/v1',
|
|
'api_key': os.getenv('OPENAI_API_KEY'),
|
|
'model': 'gpt-4o-mini'
|
|
},
|
|
'gemini': {
|
|
'base_url': 'https://generativelanguage.googleapis.com/v1beta/openai/',
|
|
'api_key': os.getenv('GOOGLE_API_KEY'),
|
|
'model': 'gemini-2.5-flash'
|
|
},
|
|
'anthropic': {
|
|
'base_url': 'https://api.anthropic.com/v1', # Test direct first
|
|
'api_key': os.getenv('ANTHROPIC_API_KEY'),
|
|
'model': 'claude-3.5-sonnet-20241022'
|
|
},
|
|
'anthropic_openai': {
|
|
'base_url': 'https://api.anthropic.com/v1/openai', # Test OpenAI compatibility
|
|
'api_key': os.getenv('ANTHROPIC_API_KEY'),
|
|
'model': 'claude-3.5-sonnet-20241022'
|
|
},
|
|
'grok': {
|
|
'base_url': 'https://api.x.ai/v1',
|
|
'api_key': os.getenv('XAI_API_KEY'),
|
|
'model': 'grok-3'
|
|
}
|
|
}
|
|
|
|
def create_client(self, provider: str) -> Optional[OpenAI]:
|
|
"""Create OpenAI client for provider"""
|
|
config = self.providers_config.get(provider)
|
|
if not config or not config['api_key']:
|
|
print(f"❌ {provider}: Missing API key")
|
|
return None
|
|
|
|
try:
|
|
return OpenAI(
|
|
api_key=config['api_key'],
|
|
base_url=config['base_url']
|
|
)
|
|
except Exception as e:
|
|
print(f"❌ {provider}: Failed to create client - {e}")
|
|
return None
|
|
|
|
async def test_basic_chat(self, provider: str) -> CompatibilityResult:
|
|
"""Test basic chat completions endpoint"""
|
|
client = self.create_client(provider)
|
|
if not client:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="basic_chat",
|
|
supported=False,
|
|
error="Client creation failed"
|
|
)
|
|
|
|
start_time = time.time()
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=self.providers_config[provider]['model'],
|
|
messages=[
|
|
{"role": "user", "content": "Say 'Hello, World!' and nothing else."}
|
|
],
|
|
max_tokens=20
|
|
)
|
|
|
|
response_time = time.time() - start_time
|
|
|
|
# Check if response has expected structure
|
|
if hasattr(response, 'choices') and len(response.choices) > 0:
|
|
content = response.choices[0].message.content
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="basic_chat",
|
|
supported=True,
|
|
response_time=response_time,
|
|
details={"response": content, "model": response.model}
|
|
)
|
|
else:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="basic_chat",
|
|
supported=False,
|
|
error="Unexpected response structure"
|
|
)
|
|
|
|
except Exception as e:
|
|
response_time = time.time() - start_time
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="basic_chat",
|
|
supported=False,
|
|
response_time=response_time,
|
|
error=str(e)
|
|
)
|
|
|
|
async def test_streaming(self, provider: str) -> CompatibilityResult:
|
|
"""Test streaming chat completions"""
|
|
client = self.create_client(provider)
|
|
if not client:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="streaming",
|
|
supported=False,
|
|
error="Client creation failed"
|
|
)
|
|
|
|
start_time = time.time()
|
|
try:
|
|
stream = client.chat.completions.create(
|
|
model=self.providers_config[provider]['model'],
|
|
messages=[
|
|
{"role": "user", "content": "Count from 1 to 3"}
|
|
],
|
|
stream=True,
|
|
max_tokens=50
|
|
)
|
|
|
|
chunks_received = 0
|
|
content_pieces = []
|
|
|
|
for chunk in stream:
|
|
chunks_received += 1
|
|
if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
|
|
delta = chunk.choices[0].delta
|
|
if hasattr(delta, 'content') and delta.content:
|
|
content_pieces.append(delta.content)
|
|
|
|
if chunks_received > 10: # Prevent infinite loops
|
|
break
|
|
|
|
response_time = time.time() - start_time
|
|
|
|
if chunks_received > 0:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="streaming",
|
|
supported=True,
|
|
response_time=response_time,
|
|
details={
|
|
"chunks_received": chunks_received,
|
|
"content": "".join(content_pieces)
|
|
}
|
|
)
|
|
else:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="streaming",
|
|
supported=False,
|
|
error="No streaming chunks received"
|
|
)
|
|
|
|
except Exception as e:
|
|
response_time = time.time() - start_time
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="streaming",
|
|
supported=False,
|
|
response_time=response_time,
|
|
error=str(e)
|
|
)
|
|
|
|
async def test_function_calling(self, provider: str) -> CompatibilityResult:
|
|
"""Test function calling capability"""
|
|
client = self.create_client(provider)
|
|
if not client:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="function_calling",
|
|
supported=False,
|
|
error="Client creation failed"
|
|
)
|
|
|
|
tools = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_weather",
|
|
"description": "Get weather information for a city",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"city": {
|
|
"type": "string",
|
|
"description": "The city name"
|
|
}
|
|
},
|
|
"required": ["city"]
|
|
}
|
|
}
|
|
}
|
|
]
|
|
|
|
start_time = time.time()
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=self.providers_config[provider]['model'],
|
|
messages=[
|
|
{"role": "user", "content": "What's the weather in San Francisco?"}
|
|
],
|
|
tools=tools,
|
|
max_tokens=100
|
|
)
|
|
|
|
response_time = time.time() - start_time
|
|
|
|
# Check if function was called
|
|
if (hasattr(response, 'choices') and len(response.choices) > 0 and
|
|
hasattr(response.choices[0].message, 'tool_calls') and
|
|
response.choices[0].message.tool_calls):
|
|
|
|
tool_calls = response.choices[0].message.tool_calls
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="function_calling",
|
|
supported=True,
|
|
response_time=response_time,
|
|
details={
|
|
"tool_calls": [
|
|
{
|
|
"name": call.function.name,
|
|
"arguments": call.function.arguments
|
|
} for call in tool_calls
|
|
]
|
|
}
|
|
)
|
|
else:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="function_calling",
|
|
supported=False,
|
|
error="No function calls in response"
|
|
)
|
|
|
|
except Exception as e:
|
|
response_time = time.time() - start_time
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="function_calling",
|
|
supported=False,
|
|
response_time=response_time,
|
|
error=str(e)
|
|
)
|
|
|
|
async def test_embeddings(self, provider: str) -> CompatibilityResult:
|
|
"""Test embeddings endpoint"""
|
|
client = self.create_client(provider)
|
|
if not client:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="embeddings",
|
|
supported=False,
|
|
error="Client creation failed"
|
|
)
|
|
|
|
start_time = time.time()
|
|
try:
|
|
# Try common embedding models
|
|
embedding_models = {
|
|
'openai': 'text-embedding-3-small',
|
|
'gemini': 'gemini-embedding-001',
|
|
'anthropic': 'text-embedding-3-small', # Might not exist
|
|
'anthropic_openai': 'text-embedding-3-small',
|
|
'grok': 'text-embedding-3-small' # Unknown
|
|
}
|
|
|
|
model = embedding_models.get(provider, 'text-embedding-3-small')
|
|
|
|
response = client.embeddings.create(
|
|
model=model,
|
|
input="Test embedding text"
|
|
)
|
|
|
|
response_time = time.time() - start_time
|
|
|
|
if hasattr(response, 'data') and len(response.data) > 0:
|
|
embedding = response.data[0].embedding
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="embeddings",
|
|
supported=True,
|
|
response_time=response_time,
|
|
details={
|
|
"dimensions": len(embedding),
|
|
"model": getattr(response, 'model', 'unknown')
|
|
}
|
|
)
|
|
else:
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="embeddings",
|
|
supported=False,
|
|
error="No embedding data in response"
|
|
)
|
|
|
|
except Exception as e:
|
|
response_time = time.time() - start_time
|
|
return CompatibilityResult(
|
|
provider=provider,
|
|
feature="embeddings",
|
|
supported=False,
|
|
response_time=response_time,
|
|
error=str(e)
|
|
)
|
|
|
|
async def test_provider_compatibility(self, provider: str):
|
|
"""Test all features for a specific provider"""
|
|
print(f"\n🧪 Testing {provider}...")
|
|
|
|
# Test basic chat
|
|
result = await self.test_basic_chat(provider)
|
|
self.results.append(result)
|
|
self.print_result(result)
|
|
|
|
# Only continue if basic chat works
|
|
if not result.supported:
|
|
print(f"❌ {provider}: Basic chat failed, skipping other tests")
|
|
return
|
|
|
|
# Test streaming
|
|
result = await self.test_streaming(provider)
|
|
self.results.append(result)
|
|
self.print_result(result)
|
|
|
|
# Test function calling
|
|
result = await self.test_function_calling(provider)
|
|
self.results.append(result)
|
|
self.print_result(result)
|
|
|
|
# Test embeddings
|
|
result = await self.test_embeddings(provider)
|
|
self.results.append(result)
|
|
self.print_result(result)
|
|
|
|
def print_result(self, result: CompatibilityResult):
|
|
"""Print formatted test result"""
|
|
status = "✅" if result.supported else "❌"
|
|
timing = f" ({result.response_time:.2f}s)" if result.response_time else ""
|
|
error = f" - {result.error}" if result.error else ""
|
|
|
|
print(f" {status} {result.feature}{timing}{error}")
|
|
|
|
if result.details:
|
|
for key, value in result.details.items():
|
|
if isinstance(value, str) and len(value) > 100:
|
|
value = value[:100] + "..."
|
|
print(f" {key}: {value}")
|
|
|
|
def generate_report(self):
|
|
"""Generate comprehensive compatibility report"""
|
|
print("\n" + "="*60)
|
|
print("📊 OpenAI API Compatibility Report")
|
|
print("="*60)
|
|
|
|
# Group results by provider
|
|
providers = {}
|
|
for result in self.results:
|
|
if result.provider not in providers:
|
|
providers[result.provider] = {}
|
|
providers[result.provider][result.feature] = result
|
|
|
|
# Print summary matrix
|
|
features = ["basic_chat", "streaming", "function_calling", "embeddings"]
|
|
|
|
print(f"\n{'Provider':<15} {'Chat':<6} {'Stream':<8} {'Functions':<11} {'Embeddings':<11}")
|
|
print("-" * 60)
|
|
|
|
for provider, results in providers.items():
|
|
row = f"{provider:<15}"
|
|
for feature in features:
|
|
result = results.get(feature)
|
|
if result:
|
|
status = "✅" if result.supported else "❌"
|
|
timing = f"({result.response_time:.1f}s)" if result.response_time else ""
|
|
cell = f"{status:<6}" if not timing else f"{status}{timing}"
|
|
row += f" {cell:<10}"
|
|
else:
|
|
row += f" {'⏸️':<10}"
|
|
print(row)
|
|
|
|
# Detailed findings
|
|
print(f"\n📋 Detailed Findings:")
|
|
print("-" * 30)
|
|
|
|
for provider, results in providers.items():
|
|
print(f"\n🔍 {provider.upper()}:")
|
|
|
|
supported_features = []
|
|
failed_features = []
|
|
|
|
for feature, result in results.items():
|
|
if result.supported:
|
|
supported_features.append(feature)
|
|
else:
|
|
failed_features.append(f"{feature} ({result.error})")
|
|
|
|
if supported_features:
|
|
print(f" ✅ Supported: {', '.join(supported_features)}")
|
|
if failed_features:
|
|
print(f" ❌ Failed: {', '.join(failed_features)}")
|
|
|
|
# Architecture recommendations
|
|
print(f"\n💡 Architecture Recommendations:")
|
|
print("-" * 35)
|
|
|
|
fully_compatible = []
|
|
partially_compatible = []
|
|
not_compatible = []
|
|
|
|
for provider, results in providers.items():
|
|
basic_chat = results.get('basic_chat', CompatibilityResult(provider, 'basic_chat', False))
|
|
|
|
if basic_chat.supported:
|
|
supported_count = sum(1 for r in results.values() if r.supported)
|
|
total_count = len(results)
|
|
|
|
if supported_count == total_count:
|
|
fully_compatible.append(provider)
|
|
elif supported_count >= 2: # Chat + at least one other feature
|
|
partially_compatible.append(provider)
|
|
else:
|
|
not_compatible.append(provider)
|
|
else:
|
|
not_compatible.append(provider)
|
|
|
|
if fully_compatible:
|
|
print(f" 🎯 Fully Compatible (OpenAI-first): {', '.join(fully_compatible)}")
|
|
if partially_compatible:
|
|
print(f" ⚡ Partially Compatible (Hybrid): {', '.join(partially_compatible)}")
|
|
if not_compatible:
|
|
print(f" 🔧 Needs Native Implementation: {', '.join(not_compatible)}")
|
|
|
|
# Final recommendation
|
|
if len(fully_compatible) >= 3:
|
|
print(f"\n✅ RECOMMENDATION: Use OpenAI-first architecture")
|
|
print(f" Most providers support OpenAI interface well")
|
|
elif len(fully_compatible) + len(partially_compatible) >= 3:
|
|
print(f"\n⚡ RECOMMENDATION: Use hybrid architecture")
|
|
print(f" Mix of OpenAI interface and native clients")
|
|
else:
|
|
print(f"\n🔧 RECOMMENDATION: Use provider-specific implementations")
|
|
print(f" Limited OpenAI compatibility, native APIs preferred")
|
|
|
|
async def main():
|
|
"""Run compatibility tests for all providers"""
|
|
tester = OpenAICompatibilityTester()
|
|
|
|
print("🚀 Starting OpenAI API Compatibility Tests")
|
|
print("Testing providers: OpenAI, Gemini, Anthropic, Grok")
|
|
|
|
# Test each provider
|
|
for provider in tester.providers_config.keys():
|
|
try:
|
|
await tester.test_provider_compatibility(provider)
|
|
except Exception as e:
|
|
print(f"❌ {provider}: Unexpected error during testing - {e}")
|
|
|
|
# Generate comprehensive report
|
|
tester.generate_report()
|
|
|
|
# Save results to file
|
|
results_data = [
|
|
{
|
|
'provider': r.provider,
|
|
'feature': r.feature,
|
|
'supported': r.supported,
|
|
'response_time': r.response_time,
|
|
'error': r.error,
|
|
'details': r.details
|
|
}
|
|
for r in tester.results
|
|
]
|
|
|
|
with open('openai_compatibility_results.json', 'w') as f:
|
|
json.dump(results_data, f, indent=2, default=str)
|
|
|
|
print(f"\n💾 Results saved to openai_compatibility_results.json")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |