BREAKING CHANGES: - Package renamed from mcp-arduino-server to mcp-arduino - Command changed to 'mcp-arduino' (was 'mcp-arduino-server') - Repository moved to git.supported.systems/MCP/mcp-arduino NEW FEATURES: ✨ Smart client capability detection and dual-mode sampling support ✨ Intelligent WireViz templates with component-specific circuits (LED, motor, sensor, button, display) ✨ Client debug tools for MCP capability inspection ✨ Enhanced error handling with progressive enhancement patterns IMPROVEMENTS: 🧹 Major repository cleanup - removed 14+ experimental files and tests 📝 Consolidated and reorganized documentation 🐛 Fixed import issues and applied comprehensive linting with ruff 📦 Updated author information to Ryan Malloy (ryan@supported.systems) 🔧 Fixed package version references in startup code TECHNICAL DETAILS: - Added dual-mode WireViz: AI generation for sampling clients, smart templates for others - Implemented client capability detection via MCP handshake inspection - Created progressive enhancement pattern for universal MCP client compatibility - Organized test files into proper structure (tests/examples/) - Applied comprehensive code formatting and lint fixes The server now provides excellent functionality for ALL MCP clients regardless of their sampling capabilities, while preserving advanced features for clients that support them. Version: 2025.09.27.1
98 lines
3.2 KiB
Python
98 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Demonstration of circular buffer behavior with serial monitor
|
|
Shows wraparound, cursor invalidation, and recovery
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
# Set small buffer size for demonstration
|
|
os.environ['ARDUINO_SERIAL_BUFFER_SIZE'] = '20' # Very small for demo
|
|
|
|
# Add project to path
|
|
sys.path.insert(0, '/home/rpm/claude/mcp-arduino-server/src')
|
|
|
|
from mcp_arduino_server.components.circular_buffer import CircularSerialBuffer, SerialDataType
|
|
|
|
|
|
async def demo():
|
|
"""Demonstrate circular buffer behavior"""
|
|
|
|
# Create buffer with size 20
|
|
buffer = CircularSerialBuffer(max_size=20)
|
|
print("🔄 Circular Buffer Demo (size=20)\n")
|
|
|
|
# Add 10 entries
|
|
print("➤ Adding 10 entries...")
|
|
for i in range(10):
|
|
buffer.add_entry(
|
|
port="/dev/ttyUSB0",
|
|
data=f"Message {i}",
|
|
data_type=SerialDataType.RECEIVED
|
|
)
|
|
|
|
stats = buffer.get_statistics()
|
|
print(f" Buffer: {stats['buffer_size']}/{stats['max_size']} entries")
|
|
print(f" Total added: {stats['total_entries']}")
|
|
print(f" Dropped: {stats['entries_dropped']}")
|
|
|
|
# Create cursor at oldest data
|
|
cursor1 = buffer.create_cursor(start_from="oldest")
|
|
print("\n✓ Created cursor1 at oldest data")
|
|
|
|
# Read first 5 entries
|
|
result = buffer.read_from_cursor(cursor1, limit=5)
|
|
print(f" Read {result['count']} entries:")
|
|
for entry in result['entries']:
|
|
print(f" [{entry['index']}] {entry['data']}")
|
|
|
|
# Add 15 more entries (will cause wraparound)
|
|
print("\n➤ Adding 15 more entries (buffer will wrap)...")
|
|
for i in range(10, 25):
|
|
buffer.add_entry(
|
|
port="/dev/ttyUSB0",
|
|
data=f"Message {i}",
|
|
data_type=SerialDataType.RECEIVED
|
|
)
|
|
|
|
stats = buffer.get_statistics()
|
|
print(f" Buffer: {stats['buffer_size']}/{stats['max_size']} entries")
|
|
print(f" Total added: {stats['total_entries']}")
|
|
print(f" Dropped: {stats['entries_dropped']} ⚠️")
|
|
print(f" Oldest index: {stats['oldest_index']}")
|
|
print(f" Newest index: {stats['newest_index']}")
|
|
|
|
# Check cursor status
|
|
cursor_info = buffer.get_cursor_info(cursor1)
|
|
print("\n🔍 Cursor1 status after wraparound:")
|
|
print(f" Valid: {cursor_info['is_valid']}")
|
|
print(f" Position: {cursor_info['position']}")
|
|
|
|
# Try to read from invalid cursor
|
|
print("\n➤ Reading from cursor1 (should auto-recover)...")
|
|
result = buffer.read_from_cursor(cursor1, limit=5, auto_recover=True)
|
|
if result['success']:
|
|
print(f" ✓ Auto-recovered! Read {result['count']} entries:")
|
|
for entry in result['entries']:
|
|
print(f" [{entry['index']}] {entry['data']}")
|
|
if 'warning' in result:
|
|
print(f" ⚠️ {result['warning']}")
|
|
|
|
# Create new cursor and demonstrate concurrent reading
|
|
cursor2 = buffer.create_cursor(start_from="newest")
|
|
print("\n✓ Created cursor2 at newest data")
|
|
|
|
print("\n📊 Final Statistics:")
|
|
stats = buffer.get_statistics()
|
|
for key, value in stats.items():
|
|
print(f" {key}: {value}")
|
|
|
|
# Cleanup
|
|
buffer.cleanup_invalid_cursors()
|
|
print("\n🧹 Cleaned up invalid cursors")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(demo())
|