mcp-arduino/tests/examples/test_circular_buffer_demo.py
Ryan Malloy eb524b8c1d Major project refactor: Rename to mcp-arduino with smart client capabilities
BREAKING CHANGES:
- Package renamed from mcp-arduino-server to mcp-arduino
- Command changed to 'mcp-arduino' (was 'mcp-arduino-server')
- Repository moved to git.supported.systems/MCP/mcp-arduino

NEW FEATURES:
 Smart client capability detection and dual-mode sampling support
 Intelligent WireViz templates with component-specific circuits (LED, motor, sensor, button, display)
 Client debug tools for MCP capability inspection
 Enhanced error handling with progressive enhancement patterns

IMPROVEMENTS:
🧹 Major repository cleanup - removed 14+ experimental files and tests
📝 Consolidated and reorganized documentation
🐛 Fixed import issues and applied comprehensive linting with ruff
📦 Updated author information to Ryan Malloy (ryan@supported.systems)
🔧 Fixed package version references in startup code

TECHNICAL DETAILS:
- Added dual-mode WireViz: AI generation for sampling clients, smart templates for others
- Implemented client capability detection via MCP handshake inspection
- Created progressive enhancement pattern for universal MCP client compatibility
- Organized test files into proper structure (tests/examples/)
- Applied comprehensive code formatting and lint fixes

The server now provides excellent functionality for ALL MCP clients regardless
of their sampling capabilities, while preserving advanced features for clients
that support them.

Version: 2025.09.27.1
2025-09-27 20:16:43 -06:00

98 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""
Demonstration of circular buffer behavior with serial monitor
Shows wraparound, cursor invalidation, and recovery
"""
import asyncio
import os
import sys
# Set small buffer size for demonstration
os.environ['ARDUINO_SERIAL_BUFFER_SIZE'] = '20' # Very small for demo
# Add project to path
sys.path.insert(0, '/home/rpm/claude/mcp-arduino-server/src')
from mcp_arduino_server.components.circular_buffer import CircularSerialBuffer, SerialDataType
async def demo():
"""Demonstrate circular buffer behavior"""
# Create buffer with size 20
buffer = CircularSerialBuffer(max_size=20)
print("🔄 Circular Buffer Demo (size=20)\n")
# Add 10 entries
print("➤ Adding 10 entries...")
for i in range(10):
buffer.add_entry(
port="/dev/ttyUSB0",
data=f"Message {i}",
data_type=SerialDataType.RECEIVED
)
stats = buffer.get_statistics()
print(f" Buffer: {stats['buffer_size']}/{stats['max_size']} entries")
print(f" Total added: {stats['total_entries']}")
print(f" Dropped: {stats['entries_dropped']}")
# Create cursor at oldest data
cursor1 = buffer.create_cursor(start_from="oldest")
print("\n✓ Created cursor1 at oldest data")
# Read first 5 entries
result = buffer.read_from_cursor(cursor1, limit=5)
print(f" Read {result['count']} entries:")
for entry in result['entries']:
print(f" [{entry['index']}] {entry['data']}")
# Add 15 more entries (will cause wraparound)
print("\n➤ Adding 15 more entries (buffer will wrap)...")
for i in range(10, 25):
buffer.add_entry(
port="/dev/ttyUSB0",
data=f"Message {i}",
data_type=SerialDataType.RECEIVED
)
stats = buffer.get_statistics()
print(f" Buffer: {stats['buffer_size']}/{stats['max_size']} entries")
print(f" Total added: {stats['total_entries']}")
print(f" Dropped: {stats['entries_dropped']} ⚠️")
print(f" Oldest index: {stats['oldest_index']}")
print(f" Newest index: {stats['newest_index']}")
# Check cursor status
cursor_info = buffer.get_cursor_info(cursor1)
print("\n🔍 Cursor1 status after wraparound:")
print(f" Valid: {cursor_info['is_valid']}")
print(f" Position: {cursor_info['position']}")
# Try to read from invalid cursor
print("\n➤ Reading from cursor1 (should auto-recover)...")
result = buffer.read_from_cursor(cursor1, limit=5, auto_recover=True)
if result['success']:
print(f" ✓ Auto-recovered! Read {result['count']} entries:")
for entry in result['entries']:
print(f" [{entry['index']}] {entry['data']}")
if 'warning' in result:
print(f" ⚠️ {result['warning']}")
# Create new cursor and demonstrate concurrent reading
cursor2 = buffer.create_cursor(start_from="newest")
print("\n✓ Created cursor2 at newest data")
print("\n📊 Final Statistics:")
stats = buffer.get_statistics()
for key, value in stats.items():
print(f" {key}: {value}")
# Cleanup
buffer.cleanup_invalid_cursors()
print("\n🧹 Cleaned up invalid cursors")
if __name__ == "__main__":
asyncio.run(demo())