1
0
forked from MCP/llm-fusion-mcp
llm-fusion-mcp/test_streaming.py
Ryan Malloy c335ba0e1e Initial commit: LLM Fusion MCP Server
- Unified access to 4 major LLM providers (Gemini, OpenAI, Anthropic, Grok)
- Real-time streaming support across all providers
- Multimodal capabilities (text, images, audio)
- Intelligent document processing with smart chunking
- Production-ready with health monitoring and error handling
- Full OpenAI ecosystem integration (Assistants, DALL-E, Whisper)
- Vector embeddings and semantic similarity
- Session-based API key management
- Built with FastMCP and modern Python tooling

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-05 05:47:51 -06:00

37 lines
1.1 KiB
Python

#!/usr/bin/env python3
"""Test the streaming functionality."""
import sys
import os
sys.path.insert(0, 'src')
from llm_fusion_mcp.server import generate_text_streaming
def test_streaming():
"""Test the streaming text generation."""
print("Testing streaming text generation...")
print("=" * 50)
prompt = "Write a short poem about coding"
try:
for chunk in generate_text_streaming(prompt):
if chunk.get("success"):
if not chunk.get("finished"):
print(chunk.get("chunk", ""), end="", flush=True)
else:
print("\n" + "=" * 50)
print("Streaming completed!")
print(f"Full text length: {len(chunk.get('full_text', ''))}")
else:
print(f"Error: {chunk.get('error')}")
break
except Exception as e:
print(f"Test failed: {e}")
if __name__ == "__main__":
if not os.getenv("GOOGLE_API_KEY"):
print("Please set GOOGLE_API_KEY environment variable")
sys.exit(1)
test_streaming()