forked from MCP/llm-fusion-mcp
- Unified access to 4 major LLM providers (Gemini, OpenAI, Anthropic, Grok) - Real-time streaming support across all providers - Multimodal capabilities (text, images, audio) - Intelligent document processing with smart chunking - Production-ready with health monitoring and error handling - Full OpenAI ecosystem integration (Assistants, DALL-E, Whisper) - Vector embeddings and semantic similarity - Session-based API key management - Built with FastMCP and modern Python tooling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
37 lines
1.1 KiB
Python
37 lines
1.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Test the streaming functionality."""
|
|
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, 'src')
|
|
|
|
from llm_fusion_mcp.server import generate_text_streaming
|
|
|
|
def test_streaming():
|
|
"""Test the streaming text generation."""
|
|
print("Testing streaming text generation...")
|
|
print("=" * 50)
|
|
|
|
prompt = "Write a short poem about coding"
|
|
|
|
try:
|
|
for chunk in generate_text_streaming(prompt):
|
|
if chunk.get("success"):
|
|
if not chunk.get("finished"):
|
|
print(chunk.get("chunk", ""), end="", flush=True)
|
|
else:
|
|
print("\n" + "=" * 50)
|
|
print("Streaming completed!")
|
|
print(f"Full text length: {len(chunk.get('full_text', ''))}")
|
|
else:
|
|
print(f"Error: {chunk.get('error')}")
|
|
break
|
|
except Exception as e:
|
|
print(f"Test failed: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
if not os.getenv("GOOGLE_API_KEY"):
|
|
print("Please set GOOGLE_API_KEY environment variable")
|
|
sys.exit(1)
|
|
|
|
test_streaming() |