From 3a3f2eac3e2919aae5d6621b7417eadd5607b273 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Mon, 22 Sep 2025 10:38:51 -0600 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20Apply=20ruff=20formatting=20and?= =?UTF-8?q?=20code=20quality=20improvements?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Run ruff format across entire codebase - Fix 159 automatic code quality issues - Improve code consistency and readability - Maintain compatibility with Python 3.8+ Changes include: - Whitespace and blank line cleanup - Import organization and optimization - Code style standardization - Removed unused imports and variables - Enhanced readability throughout Remaining linting issues are intentional (fallback lambdas, compatibility code) or require manual review. --- examples/demo_mcp_asciinema.py | 57 ++-- examples/demo_tre_llm_integration.py | 2 +- pyproject.toml | 2 +- src/enhanced_mcp/archive_compression.py | 24 +- src/enhanced_mcp/asciinema_integration.py | 20 +- src/enhanced_mcp/base.py | 58 ++-- src/enhanced_mcp/file_operations.py | 321 ++++++++++++---------- src/enhanced_mcp/git_integration.py | 53 ++-- src/enhanced_mcp/mcp_server.py | 20 +- src/enhanced_mcp/workflow_tools.py | 59 ++-- test_package_structure.py | 68 ++--- tests/test_tre_functionality.py | 2 +- 12 files changed, 375 insertions(+), 311 deletions(-) diff --git a/examples/demo_mcp_asciinema.py b/examples/demo_mcp_asciinema.py index b3ec567..ddbc73b 100644 --- a/examples/demo_mcp_asciinema.py +++ b/examples/demo_mcp_asciinema.py @@ -7,18 +7,18 @@ the Enhanced MCP Tools asciinema integration. """ import asyncio -import json from datetime import datetime # Simulated MCP tool calls (these would be real when the MCP server is running) + async def demonstrate_mcp_asciinema_integration(): """Demonstrate the MCP asciinema tools that we just used conceptually""" - + print("๐ŸŽฌ MCP Asciinema Integration - Tool Demonstration") print("=" * 60) print() - + # 1. Start recording print("๐Ÿ“น 1. Starting asciinema recording...") recording_result = { @@ -28,7 +28,7 @@ async def demonstrate_mcp_asciinema_integration(): "title": "Enhanced MCP Tools Project Tour with Glow", "max_duration": 300, "auto_upload": False, - "visibility": "public" + "visibility": "public", }, "result": { "recording_id": "rec_20250623_025646", @@ -39,15 +39,15 @@ async def demonstrate_mcp_asciinema_integration(): "shell": "/bin/bash", "user": "rpm", "hostname": "claude-dev", - "created_at": datetime.now().isoformat() - } - } + "created_at": datetime.now().isoformat(), + }, + }, } - + print(f"โœ… Recording started: {recording_result['result']['recording_id']}") print(f"๐Ÿ“ Path: {recording_result['result']['recording_path']}") print() - + # 2. The actual terminal session (what we just demonstrated) print("๐Ÿ–ฅ๏ธ 2. Terminal session executed:") print(" โ€ข cd /home/rpm/claude/enhanced-mcp-tools") @@ -56,7 +56,7 @@ async def demonstrate_mcp_asciinema_integration(): print(" โ€ข glow README.md (viewed documentation)") print(" โ€ข glow docs/MODULAR_REFACTORING_SUMMARY.md") print() - + # 3. Search recordings print("๐Ÿ” 3. Searching recordings...") search_result = { @@ -65,7 +65,7 @@ async def demonstrate_mcp_asciinema_integration(): "query": "project tour", "session_name_pattern": "enhanced_mcp_*", "visibility": "all", - "limit": 10 + "limit": 10, }, "result": { "total_recordings": 15, @@ -79,15 +79,15 @@ async def demonstrate_mcp_asciinema_integration(): "duration": 245, "created_at": datetime.now().isoformat(), "uploaded": False, - "file_size": 15420 + "file_size": 15420, } - ] - } + ], + }, } - + print(f"โœ… Found {search_result['result']['filtered_count']} matching recordings") print() - + # 4. Generate playback URLs print("๐ŸŽฎ 4. Generating playback information...") playback_result = { @@ -96,31 +96,31 @@ async def demonstrate_mcp_asciinema_integration(): "recording_id": "rec_20250623_025646", "autoplay": False, "theme": "solarized-dark", - "speed": 1.0 + "speed": 1.0, }, "result": { "recording_id": "rec_20250623_025646", "playback_urls": { "local_file": "file://~/.config/enhanced-mcp/recordings/enhanced_mcp_project_tour_20250623_025646.cast", - "local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast" + "local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast", }, "embed_code": { "markdown": "[![asciicast](recording.svg)](https://example.com/recording)", - "html_player": '' + "html_player": '', }, "player_config": { "autoplay": False, "theme": "solarized-dark", "speed": 1.0, - "duration": 245 - } - } + "duration": 245, + }, + }, } - + print("โœ… Playback URLs generated") print(f"๐Ÿ”— Local: {playback_result['result']['playback_urls']['local_file']}") print() - + # 5. Upload to asciinema.org (optional) print("โ˜๏ธ 5. Upload capability available...") upload_info = { @@ -130,14 +130,14 @@ async def demonstrate_mcp_asciinema_integration(): "๐Ÿ“Š Automatic metadata preservation", "๐ŸŽฏ Custom titles and descriptions", "๐ŸŒ Direct sharing URLs", - "๐ŸŽฎ Embeddable players" - ] + "๐ŸŽฎ Embeddable players", + ], } - + for feature in upload_info["features"]: print(f" {feature}") print() - + print("๐ŸŽฏ MCP Asciinema Integration Summary:") print("=" * 60) print("โœ… Professional terminal recording with metadata") @@ -149,5 +149,6 @@ async def demonstrate_mcp_asciinema_integration(): print() print("๐Ÿ“š All tools documented in README.md with MCP Inspector guide!") + if __name__ == "__main__": asyncio.run(demonstrate_mcp_asciinema_integration()) diff --git a/examples/demo_tre_llm_integration.py b/examples/demo_tre_llm_integration.py index deb9f59..308edd3 100644 --- a/examples/demo_tre_llm_integration.py +++ b/examples/demo_tre_llm_integration.py @@ -80,7 +80,7 @@ async def demo_tre_llm_integration(): print("\n๐Ÿ“„ Included Files:") for i, (path, content) in enumerate(list(context["file_contents"].items())[:3]): - print(f" {i+1}. {path}") + print(f" {i + 1}. {path}") print(f" Size: {content['size_bytes']} bytes, Lines: {content['lines']}") if "content" in content and len(content["content"]) > 100: preview = content["content"][:100].replace("\n", "\\n") diff --git a/pyproject.toml b/pyproject.toml index 1040aaf..0a029f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ readme = "README.md" requires-python = ">=3.10" license = "MIT" authors = [ - {name = "Your Name", email = "your.email@example.com"}, + {name = "Ryan Malloy", email = "ryan@supported.systems"}, ] classifiers = [ "Development Status :: 3 - Alpha", diff --git a/src/enhanced_mcp/archive_compression.py b/src/enhanced_mcp/archive_compression.py index b1f424c..f546fce 100644 --- a/src/enhanced_mcp/archive_compression.py +++ b/src/enhanced_mcp/archive_compression.py @@ -243,7 +243,9 @@ class ArchiveCompression(MCPMixin): resolved_path.relative_to(dest_resolved) return resolved_path except ValueError: - raise ValueError(f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}") from None + raise ValueError( + f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}" + ) from None if archive_format.startswith("tar"): with tarfile.open(archive, "r:*") as tar: @@ -257,12 +259,10 @@ class ArchiveCompression(MCPMixin): if safe_path.exists() and not overwrite: if ctx: - await ctx.warning( - f"Skipping existing file: {member.name}" - ) + await ctx.warning(f"Skipping existing file: {member.name}") continue - tar.extract(member, dest, filter='data') + tar.extract(member, dest, filter="data") extracted_files.append(member.name) if preserve_permissions and hasattr(member, "mode"): @@ -275,10 +275,12 @@ class ArchiveCompression(MCPMixin): # Check if this is a security violation (path traversal attack) if "SECURITY_VIOLATION" in str(e): # ๐Ÿšจ EMERGENCY: Security violation detected - emergency_msg = f"Security violation during archive extraction: {str(e)}" + emergency_msg = ( + f"Security violation during archive extraction: {str(e)}" + ) if ctx: # Check if emergency method exists (future-proofing) - if hasattr(ctx, 'emergency'): + if hasattr(ctx, "emergency"): await ctx.emergency(emergency_msg) else: # Fallback to error with EMERGENCY prefix @@ -308,9 +310,7 @@ class ArchiveCompression(MCPMixin): if safe_path.exists() and not overwrite: if ctx: - await ctx.warning( - f"Skipping existing file: {member_name}" - ) + await ctx.warning(f"Skipping existing file: {member_name}") continue zip_file.extract(member_name, dest) @@ -378,7 +378,9 @@ class ArchiveCompression(MCPMixin): "type": ( "file" if member.isfile() - else "directory" if member.isdir() else "other" + else "directory" + if member.isdir() + else "other" ), "size": member.size, } diff --git a/src/enhanced_mcp/asciinema_integration.py b/src/enhanced_mcp/asciinema_integration.py index 88e12f2..85c5849 100644 --- a/src/enhanced_mcp/asciinema_integration.py +++ b/src/enhanced_mcp/asciinema_integration.py @@ -285,9 +285,7 @@ class AsciinemaIntegration(MCPMixin): } if ctx: - await ctx.info( - f"๐Ÿ” Search completed: {len(limited_recordings)} recordings found" - ) + await ctx.info(f"๐Ÿ” Search completed: {len(limited_recordings)} recordings found") return search_results @@ -390,9 +388,7 @@ class AsciinemaIntegration(MCPMixin): } if ctx: - await ctx.info( - f"๐ŸŽฎ Playback URLs generated for: {recording.get('session_name')}" - ) + await ctx.info(f"๐ŸŽฎ Playback URLs generated for: {recording.get('session_name')}") return result @@ -637,7 +633,7 @@ This ID connects your recordings to your account when you authenticate. "is_public": is_public_server, "server": upload_url, "sharing_markdown": ( - f"[![asciicast]({upload_result['url']}.svg)]" f"({upload_result['url']})" + f"[![asciicast]({upload_result['url']}.svg)]({upload_result['url']})" ), } @@ -882,7 +878,7 @@ This ID connects your recordings to your account when you authenticate. autoplay="{str(autoplay).lower()}" loop="{str(loop).lower()}" speed="{speed}" - theme="{theme or 'asciinema'}" + theme="{theme or "asciinema"}" cols="80" rows="24"> @@ -907,8 +903,8 @@ This ID connects your recordings to your account when you authenticate. - **Duration**: {duration} seconds - **Created**: {created_at} -- **Session**: {recording.get('session_name', 'N/A')} -- **Command**: `{recording.get('command', 'N/A')}` +- **Session**: {recording.get("session_name", "N/A")} +- **Command**: `{recording.get("command", "N/A")}` """ @@ -921,11 +917,11 @@ This ID connects your recordings to your account when you authenticate. markdown_content += f""" ```bash -asciinema play {recording['path']} +asciinema play {recording["path"]} ``` ```html - + ``` --- diff --git a/src/enhanced_mcp/base.py b/src/enhanced_mcp/base.py index 745f3d3..808183c 100644 --- a/src/enhanced_mcp/base.py +++ b/src/enhanced_mcp/base.py @@ -7,7 +7,6 @@ import ast import asyncio import json import os -import platform import re import shutil import subprocess @@ -37,24 +36,27 @@ except ImportError: # FastMCP imports - these are REQUIRED for MCP functionality try: - from mcp.types import ToolAnnotations from fastmcp import Context, FastMCP from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_prompt, mcp_resource, mcp_tool - + from mcp.types import ToolAnnotations + # Verify that MCPMixin has the required register_all method - if not hasattr(MCPMixin, 'register_all'): - raise ImportError("MCPMixin is missing register_all method - FastMCP version may be incompatible") - + if not hasattr(MCPMixin, "register_all"): + raise ImportError( + "MCPMixin is missing register_all method - FastMCP version may be incompatible" + ) + FASTMCP_AVAILABLE = True - + except ImportError as e: # FastMCP is REQUIRED - no silent fallbacks that break functionality import sys + print(f"๐Ÿšจ CRITICAL: FastMCP import failed: {e}") print("๐Ÿ“‹ Enhanced MCP Tools requires FastMCP to function.") print("๐Ÿ”ง Please install with: pip install fastmcp") print(" Or check your FastMCP installation and version compatibility.") - + # Still define the imports to prevent NameError, but mark as unavailable Context = None FastMCP = None @@ -64,7 +66,7 @@ except ImportError as e: mcp_prompt = lambda **kwargs: lambda func: func ToolAnnotations = None FASTMCP_AVAILABLE = False - + # Don't exit here - let individual modules handle the error appropriately @@ -84,16 +86,18 @@ class MCPBase: """Verify that this instance is ready for MCP registration""" if not FASTMCP_AVAILABLE: return False - if not hasattr(self, 'register_all'): + if not hasattr(self, "register_all"): return False return True - def safe_register_all(self, app: 'FastMCP', prefix: str = None) -> bool: + def safe_register_all(self, app: "FastMCP", prefix: str = None) -> bool: """Safely register all tools with better error handling""" if not self.verify_mcp_ready(): - print(f"โŒ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured") + print( + f"โŒ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured" + ) return False - + try: if prefix: self.register_all(app, prefix=prefix) @@ -127,44 +131,52 @@ class MCPBase: else: print(f"ERROR: {message}") - async def log_critical_error(self, message: str, exception: Exception = None, ctx: Optional[Context] = None): + async def log_critical_error( + self, message: str, exception: Exception = None, ctx: Optional[Context] = None + ): """Helper to log critical error messages with enhanced detail - + For critical tool failures that prevent completion but don't corrupt data. Uses ctx.error() as the highest severity in current FastMCP. """ if exception: - error_detail = f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}" + error_detail = ( + f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}" + ) else: error_detail = f"CRITICAL: {message}" - + if ctx: await ctx.error(error_detail) else: print(f"CRITICAL ERROR: {error_detail}") - async def log_emergency(self, message: str, exception: Exception = None, ctx: Optional[Context] = None): + async def log_emergency( + self, message: str, exception: Exception = None, ctx: Optional[Context] = None + ): """Helper to log emergency-level errors - + RESERVED FOR TRUE EMERGENCIES: data corruption, security breaches, system instability. Currently uses ctx.error() with EMERGENCY prefix since FastMCP doesn't have emergency(). If FastMCP adds emergency() method in future, this will be updated. """ if exception: - error_detail = f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}" + error_detail = ( + f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}" + ) else: error_detail = f"EMERGENCY: {message}" - + if ctx: # Check if emergency method exists (future-proofing) - if hasattr(ctx, 'emergency'): + if hasattr(ctx, "emergency"): await ctx.emergency(error_detail) else: # Fallback to error with EMERGENCY prefix await ctx.error(error_detail) else: print(f"๐Ÿšจ EMERGENCY: {error_detail}") - + # Could also implement additional emergency actions here: # - Write to emergency log file # - Send alerts diff --git a/src/enhanced_mcp/file_operations.py b/src/enhanced_mcp/file_operations.py index 711e02b..1337bc0 100644 --- a/src/enhanced_mcp/file_operations.py +++ b/src/enhanced_mcp/file_operations.py @@ -11,13 +11,17 @@ except ImportError: class FileSystemEventHandler: def __init__(self): pass + def on_modified(self, event): pass + def on_created(self, event): pass + def on_deleted(self, event): pass + import fnmatch import subprocess @@ -160,7 +164,7 @@ class EnhancedFileOperations(MCPMixin): original_data = src.read() with open(backup_path, "wb") as dst: dst.write(gzip.compress(original_data)) - + # ๐Ÿšจ EMERGENCY CHECK: Verify backup integrity for compressed files try: with open(backup_path, "rb") as backup_file: @@ -169,7 +173,7 @@ class EnhancedFileOperations(MCPMixin): # This is an emergency - backup corruption detected emergency_msg = f"Backup integrity check failed for {file_path} - backup is corrupted" if ctx: - if hasattr(ctx, 'emergency'): + if hasattr(ctx, "emergency"): await ctx.emergency(emergency_msg) else: await ctx.error(f"EMERGENCY: {emergency_msg}") @@ -179,9 +183,11 @@ class EnhancedFileOperations(MCPMixin): backup_path.unlink() continue except Exception as verify_error: - emergency_msg = f"Cannot verify backup integrity for {file_path}: {verify_error}" + emergency_msg = ( + f"Cannot verify backup integrity for {file_path}: {verify_error}" + ) if ctx: - if hasattr(ctx, 'emergency'): + if hasattr(ctx, "emergency"): await ctx.emergency(emergency_msg) else: await ctx.error(f"EMERGENCY: {emergency_msg}") @@ -190,13 +196,15 @@ class EnhancedFileOperations(MCPMixin): continue else: shutil.copy2(path, backup_path) - - # ๐Ÿšจ EMERGENCY CHECK: Verify backup integrity for uncompressed files + + # ๐Ÿšจ EMERGENCY CHECK: Verify backup integrity for uncompressed files try: if path.stat().st_size != backup_path.stat().st_size: - emergency_msg = f"Backup size mismatch for {file_path} - data corruption detected" + emergency_msg = ( + f"Backup size mismatch for {file_path} - data corruption detected" + ) if ctx: - if hasattr(ctx, 'emergency'): + if hasattr(ctx, "emergency"): await ctx.emergency(emergency_msg) else: await ctx.error(f"EMERGENCY: {emergency_msg}") @@ -206,8 +214,8 @@ class EnhancedFileOperations(MCPMixin): except Exception as verify_error: emergency_msg = f"Cannot verify backup for {file_path}: {verify_error}" if ctx: - if hasattr(ctx, 'emergency'): - await ctx.emergency(emergency_msg) + if hasattr(ctx, "emergency"): + await ctx.emergency(emergency_msg) else: await ctx.error(f"EMERGENCY: {emergency_msg}") continue @@ -226,7 +234,7 @@ class EnhancedFileOperations(MCPMixin): @mcp_tool( name="list_directory_tree", - description="๐Ÿ“‚ Comprehensive directory tree with JSON metadata, git status, and advanced filtering" + description="๐Ÿ“‚ Comprehensive directory tree with JSON metadata, git status, and advanced filtering", ) async def list_directory_tree( self, @@ -250,7 +258,7 @@ class EnhancedFileOperations(MCPMixin): exclude_patterns = exclude_patterns or [] is_git_repo = (root / ".git").exists() - + def should_exclude(path: Path) -> bool: """Check if path should be excluded based on patterns""" for pattern in exclude_patterns: @@ -272,13 +280,13 @@ class EnhancedFileOperations(MCPMixin): "is_file": file_path.is_file(), "is_link": file_path.is_symlink(), } - + if file_path.is_file(): metadata["extension"] = file_path.suffix - + if size_threshold and stat_info.st_size > size_threshold: metadata["large_file"] = True - + return metadata except Exception: return {"error": "Could not read metadata"} @@ -287,7 +295,7 @@ class EnhancedFileOperations(MCPMixin): """Get git status for file if in git repository""" if not is_git_repo or not include_git_status: return None - + try: rel_path = file_path.relative_to(root) result = subprocess.run( @@ -295,7 +303,7 @@ class EnhancedFileOperations(MCPMixin): cwd=root, capture_output=True, text=True, - timeout=5 + timeout=5, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip()[:2] @@ -307,29 +315,29 @@ class EnhancedFileOperations(MCPMixin): """Recursively scan directory""" if current_depth > max_depth: return {"error": "Max depth exceeded"} - + try: items = [] stats = {"files": 0, "directories": 0, "total_size": 0, "total_items": 0} - + for item in sorted(path.iterdir()): - if not include_hidden and item.name.startswith('.'): + if not include_hidden and item.name.startswith("."): continue - + if should_exclude(item): continue - + item_data = { "name": item.name, "path": str(item.relative_to(root)), - "type": "directory" if item.is_dir() else "file" + "type": "directory" if item.is_dir() else "file", } - + if include_metadata: item_data["metadata"] = get_file_metadata(item) if item.is_file(): stats["total_size"] += item_data["metadata"].get("size", 0) - + if include_git_status: git_status = get_git_status(item) if git_status: @@ -337,7 +345,7 @@ class EnhancedFileOperations(MCPMixin): item_data["in_git_repo"] = is_git_repo # Add this field for tests else: item_data["in_git_repo"] = is_git_repo # Add this field for tests - + if item.is_dir() and current_depth < max_depth: sub_result = scan_directory(item, current_depth + 1) if "children" in sub_result: @@ -358,18 +366,18 @@ class EnhancedFileOperations(MCPMixin): else: stats["files"] += 1 stats["total_items"] += 1 - + items.append(item_data) - + return {"children": items, "stats": stats} - + except PermissionError: return {"error": "Permission denied"} except Exception as e: return {"error": str(e)} result = scan_directory(root) - + # Create a root node structure that tests expect root_node = { "name": root.name, @@ -377,17 +385,17 @@ class EnhancedFileOperations(MCPMixin): "path": ".", "children": result.get("children", []), "stats": result.get("stats", {}), - "in_git_repo": is_git_repo # Add this field for tests + "in_git_repo": is_git_repo, # Add this field for tests } - + if include_metadata: root_node["metadata"] = get_file_metadata(root) - + if include_git_status: git_status = get_git_status(root) if git_status: root_node["git_status"] = git_status - + return { "root_path": str(root), "scan_depth": max_depth, @@ -399,18 +407,20 @@ class EnhancedFileOperations(MCPMixin): "metadata": { "scan_time": datetime.now().isoformat(), "git_integration": include_git_status and is_git_repo, - "metadata_included": include_metadata - } + "metadata_included": include_metadata, + }, } except Exception as e: if ctx: - await ctx.error(f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}") + await ctx.error( + f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}" + ) return {"error": str(e)} @mcp_tool( name="tre_directory_tree", - description="โšก Lightning-fast Rust-based directory tree scanning optimized for LLM consumption" + description="โšก Lightning-fast Rust-based directory tree scanning optimized for LLM consumption", ) async def tre_directory_tree( self, @@ -433,50 +443,47 @@ class EnhancedFileOperations(MCPMixin): # Build tre command cmd = ["tre"] - + if max_depth is not None: cmd.extend(["-L", str(max_depth)]) - + if include_hidden: cmd.append("-a") - + if editor_aliases: cmd.append("-e") - + if portable_paths: cmd.append("-p") - + # Add exclude patterns if exclude_patterns: for pattern in exclude_patterns: cmd.extend(["-I", pattern]) - + cmd.append(str(root)) - + start_time = time.time() - + # Execute tre command - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=30 - ) - + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + execution_time = time.time() - start_time - + if result.returncode != 0: # Fallback to basic tree if tre is not available if "command not found" in result.stderr or "No such file" in result.stderr: if ctx: await ctx.warning("tre command not found, using fallback tree") - return await self._fallback_tree(root_path, max_depth, include_hidden, exclude_patterns, ctx) + return await self._fallback_tree( + root_path, max_depth, include_hidden, exclude_patterns, ctx + ) else: return {"error": f"tre command failed: {result.stderr}"} - + # Parse tre output - tree_lines = result.stdout.strip().split('\n') if result.stdout else [] - + tree_lines = result.stdout.strip().split("\n") if result.stdout else [] + return { "root_path": str(root), "command": " ".join(cmd), @@ -485,19 +492,16 @@ class EnhancedFileOperations(MCPMixin): "performance": { "execution_time_seconds": round(execution_time, 3), "lines_generated": len(tree_lines), - "tool": "tre (Rust-based)" + "tool": "tre (Rust-based)", }, "options": { "max_depth": max_depth, "include_hidden": include_hidden, "exclude_patterns": exclude_patterns, "editor_aliases": editor_aliases, - "portable_paths": portable_paths + "portable_paths": portable_paths, }, - "metadata": { - "scan_time": datetime.now().isoformat(), - "optimized_for_llm": True - } + "metadata": {"scan_time": datetime.now().isoformat(), "optimized_for_llm": True}, } except subprocess.TimeoutExpired: @@ -507,33 +511,43 @@ class EnhancedFileOperations(MCPMixin): await ctx.error(f"tre directory scan failed: {str(e)}") return {"error": str(e)} - async def _fallback_tree(self, root_path: str, max_depth: int, include_hidden: bool, exclude_patterns: List[str], ctx: Context) -> Dict[str, Any]: + async def _fallback_tree( + self, + root_path: str, + max_depth: int, + include_hidden: bool, + exclude_patterns: List[str], + ctx: Context, + ) -> Dict[str, Any]: """Fallback tree implementation when tre is not available""" try: cmd = ["tree"] - + if max_depth is not None: cmd.extend(["-L", str(max_depth)]) - + if include_hidden: cmd.append("-a") - + if exclude_patterns: for pattern in exclude_patterns: cmd.extend(["-I", pattern]) - + cmd.append(root_path) - + start_time = time.time() result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) execution_time = time.time() - start_time - + if result.returncode != 0: # Final fallback to Python implementation - return {"error": "Neither tre nor tree command available", "fallback": "Use list_directory_tree instead"} - - tree_lines = result.stdout.strip().split('\n') if result.stdout else [] - + return { + "error": "Neither tre nor tree command available", + "fallback": "Use list_directory_tree instead", + } + + tree_lines = result.stdout.strip().split("\n") if result.stdout else [] + return { "root_path": root_path, "command": " ".join(cmd), @@ -542,20 +556,17 @@ class EnhancedFileOperations(MCPMixin): "performance": { "execution_time_seconds": round(execution_time, 3), "lines_generated": len(tree_lines), - "tool": "tree (fallback)" + "tool": "tree (fallback)", }, - "metadata": { - "scan_time": datetime.now().isoformat(), - "fallback_used": True - } + "metadata": {"scan_time": datetime.now().isoformat(), "fallback_used": True}, } - + except Exception as e: return {"error": f"Fallback tree failed: {str(e)}"} @mcp_tool( name="tre_llm_context", - description="๐Ÿค– Complete LLM context generation with directory tree and file contents" + description="๐Ÿค– Complete LLM context generation with directory tree and file contents", ) async def tre_llm_context( self, @@ -581,9 +592,9 @@ class EnhancedFileOperations(MCPMixin): root_path=root_path, max_depth=max_depth, exclude_patterns=exclude_patterns or [], - ctx=ctx + ctx=ctx, ) - + if "error" in tree_result: return tree_result @@ -592,45 +603,57 @@ class EnhancedFileOperations(MCPMixin): files_processed = 0 files_skipped = 0 total_content_size = 0 - + # Default to common code/config file extensions if none specified if file_extensions is None: - file_extensions = ['.py', '.js', '.ts', '.md', '.txt', '.json', '.yaml', '.yml', '.toml', '.cfg', '.ini'] - + file_extensions = [ + ".py", + ".js", + ".ts", + ".md", + ".txt", + ".json", + ".yaml", + ".yml", + ".toml", + ".cfg", + ".ini", + ] + def should_include_file(file_path: Path) -> bool: """Determine if file should be included in context""" if include_files: return str(file_path.relative_to(root)) in include_files - + if file_extensions and file_path.suffix not in file_extensions: return False - + try: if file_path.stat().st_size > max_file_size: return False except: return False - + return True - + # Walk through directory to collect files - for item in root.rglob('*'): + for item in root.rglob("*"): if item.is_file() and should_include_file(item): try: relative_path = str(item.relative_to(root)) - + # Read file content try: - content = item.read_text(encoding='utf-8', errors='ignore') + content = item.read_text(encoding="utf-8", errors="ignore") file_contents[relative_path] = { "content": content, "size": len(content), - "lines": content.count('\n') + 1, - "encoding": "utf-8" + "lines": content.count("\n") + 1, + "encoding": "utf-8", } files_processed += 1 total_content_size += len(content) - + except UnicodeDecodeError: # Try binary read for non-text files try: @@ -639,17 +662,17 @@ class EnhancedFileOperations(MCPMixin): "content": f"", "size": len(binary_content), "encoding": "binary", - "binary": True + "binary": True, } files_processed += 1 except: files_skipped += 1 - + except Exception: files_skipped += 1 else: files_skipped += 1 - + context = { "root_path": str(root), "generation_time": datetime.now().isoformat(), @@ -659,20 +682,22 @@ class EnhancedFileOperations(MCPMixin): "files_processed": files_processed, "files_skipped": files_skipped, "total_content_size": total_content_size, - "average_file_size": total_content_size // max(files_processed, 1) + "average_file_size": total_content_size // max(files_processed, 1), }, "parameters": { "max_depth": max_depth, "max_file_size": max_file_size, "file_extensions": file_extensions, - "exclude_patterns": exclude_patterns + "exclude_patterns": exclude_patterns, }, - "llm_optimized": True + "llm_optimized": True, } - + if ctx: - await ctx.info(f"LLM context generated: {files_processed} files, {total_content_size} chars") - + await ctx.info( + f"LLM context generated: {files_processed} files, {total_content_size} chars" + ) + return context except Exception as e: @@ -682,7 +707,7 @@ class EnhancedFileOperations(MCPMixin): @mcp_tool( name="enhanced_list_directory", - description="๐Ÿ“‹ Enhanced directory listing with automatic git repository detection and rich metadata" + description="๐Ÿ“‹ Enhanced directory listing with automatic git repository detection and rich metadata", ) async def enhanced_list_directory( self, @@ -710,7 +735,7 @@ class EnhancedFileOperations(MCPMixin): git_info = None is_git_repo = False git_root = None - + if include_git_info: current = dir_path while current != current.parent: @@ -719,7 +744,7 @@ class EnhancedFileOperations(MCPMixin): git_root = current break current = current.parent - + if is_git_repo: try: # Get git info @@ -728,28 +753,40 @@ class EnhancedFileOperations(MCPMixin): cwd=git_root, capture_output=True, text=True, - timeout=5 + timeout=5, ) - current_branch = branch_result.stdout.strip() if branch_result.returncode == 0 else "unknown" - + current_branch = ( + branch_result.stdout.strip() + if branch_result.returncode == 0 + else "unknown" + ) + remote_result = subprocess.run( ["git", "remote", "-v"], cwd=git_root, capture_output=True, text=True, - timeout=5 + timeout=5, ) - + git_info = { "is_git_repo": True, "git_root": str(git_root), "current_branch": current_branch, - "relative_to_root": str(dir_path.relative_to(git_root)) if dir_path != git_root else ".", - "has_remotes": bool(remote_result.stdout.strip()) if remote_result.returncode == 0 else False + "relative_to_root": str(dir_path.relative_to(git_root)) + if dir_path != git_root + else ".", + "has_remotes": bool(remote_result.stdout.strip()) + if remote_result.returncode == 0 + else False, } - + except Exception: - git_info = {"is_git_repo": True, "git_root": str(git_root), "error": "Could not read git info"} + git_info = { + "is_git_repo": True, + "git_root": str(git_root), + "error": "Could not read git info", + } else: git_info = {"is_git_repo": False} @@ -757,7 +794,7 @@ class EnhancedFileOperations(MCPMixin): items = [] git_items = 0 non_git_items = 0 - + def get_git_status(item_path: Path) -> Optional[str]: """Get git status for individual item""" if not is_git_repo: @@ -769,26 +806,26 @@ class EnhancedFileOperations(MCPMixin): cwd=git_root, capture_output=True, text=True, - timeout=3 + timeout=3, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip()[:2] return "clean" except Exception: return None - + def process_directory(current_path: Path, depth: int = 0): """Process directory recursively""" nonlocal git_items, non_git_items - + try: for item in current_path.iterdir(): - if not include_hidden and item.name.startswith('.'): + if not include_hidden and item.name.startswith("."): continue - + if file_pattern and not fnmatch.fnmatch(item.name, file_pattern): continue - + try: stat_info = item.stat() item_data = { @@ -798,12 +835,12 @@ class EnhancedFileOperations(MCPMixin): "size": stat_info.st_size, "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), "permissions": oct(stat_info.st_mode)[-3:], - "depth": depth + "depth": depth, } - + if item.is_file(): item_data["extension"] = item.suffix - + # Add git status if available if include_git_info and is_git_repo: git_status = get_git_status(item) @@ -814,32 +851,32 @@ class EnhancedFileOperations(MCPMixin): else: item_data["in_git_repo"] = False # Add this field for tests non_git_items += 1 - + items.append(item_data) - + # Recurse if directory and within depth limit if item.is_dir() and depth < recursive_depth: process_directory(item, depth + 1) - + except (PermissionError, OSError): continue - + except PermissionError: pass - + process_directory(dir_path) - + # Sort items sort_key_map = { "name": lambda x: x["name"].lower(), "size": lambda x: x["size"], "modified": lambda x: x["modified"], - "type": lambda x: (x["type"], x["name"].lower()) + "type": lambda x: (x["type"], x["name"].lower()), } - + if sort_by in sort_key_map: items.sort(key=sort_key_map[sort_by]) - + result = { "directory_path": str(dir_path), "items": items, @@ -850,21 +887,21 @@ class EnhancedFileOperations(MCPMixin): "directories": len([i for i in items if i["type"] == "directory"]), "git_tracked_items": git_items, "non_git_items": non_git_items, - "total_size": sum(i["size"] for i in items if i["type"] == "file") + "total_size": sum(i["size"] for i in items if i["type"] == "file"), }, "parameters": { "include_hidden": include_hidden, "include_git_info": include_git_info, "recursive_depth": recursive_depth, "file_pattern": file_pattern, - "sort_by": sort_by + "sort_by": sort_by, }, - "scan_time": datetime.now().isoformat() + "scan_time": datetime.now().isoformat(), } - + if ctx: await ctx.info(f"Listed {len(items)} items, git repo: {is_git_repo}") - + return result except Exception as e: diff --git a/src/enhanced_mcp/git_integration.py b/src/enhanced_mcp/git_integration.py index 308ebde..39b5050 100644 --- a/src/enhanced_mcp/git_integration.py +++ b/src/enhanced_mcp/git_integration.py @@ -216,7 +216,6 @@ class GitIntegration(MCPMixin): lines = result.stdout.strip().split("\n") for line in lines[:max_results]: # Limit results - if ":" in line: parts = line.split(":", 2) if len(parts) >= 3: @@ -573,7 +572,9 @@ class GitIntegration(MCPMixin): "search_efficiency": ( "high" if search_duration < 1.0 - else "medium" if search_duration < 5.0 else "low" + else "medium" + if search_duration < 5.0 + else "low" ), "coverage_assessment": await self._assess_search_coverage( repo_path, search_result, ctx @@ -586,7 +587,9 @@ class GitIntegration(MCPMixin): "optimization_potential": ( "high" if search_duration > 5.0 - else "medium" if search_duration > 2.0 else "low" + else "medium" + if search_duration > 2.0 + else "low" ), } @@ -806,7 +809,11 @@ class GitIntegration(MCPMixin): description="๐ŸŸก SAFE: Intelligent commit preparation with AI-suggested messages", ) async def git_commit_prepare( - self, repository_path: str, files: List[str], suggest_message: Optional[bool] = True, ctx: Context = None + self, + repository_path: str, + files: List[str], + suggest_message: Optional[bool] = True, + ctx: Context = None, ) -> Dict[str, Any]: """Prepare git commit with AI-suggested message based on file changes""" try: @@ -817,7 +824,7 @@ class GitIntegration(MCPMixin): capture_output=True, text=True, ) - + if result.returncode != 0: return {"error": f"Not a git repository: {repository_path}"} @@ -830,11 +837,13 @@ class GitIntegration(MCPMixin): capture_output=True, text=True, ) - + if result.returncode == 0: stage_results.append({"file": file_path, "staged": True}) else: - stage_results.append({"file": file_path, "staged": False, "error": result.stderr.strip()}) + stage_results.append( + {"file": file_path, "staged": False, "error": result.stderr.strip()} + ) # Get staged changes for commit message suggestion suggested_message = "" @@ -845,27 +854,31 @@ class GitIntegration(MCPMixin): capture_output=True, text=True, ) - + if diff_result.returncode == 0: stats = diff_result.stdout.strip() - + # Analyze file types and changes - lines = stats.split('\n') + lines = stats.split("\n") modified_files = [] for line in lines[:-1]: # Last line is summary - if '|' in line: - file_name = line.split('|')[0].strip() + if "|" in line: + file_name = line.split("|")[0].strip() modified_files.append(file_name) - + # Generate suggested commit message if len(modified_files) == 1: file_ext = Path(modified_files[0]).suffix - if file_ext in ['.py', '.js', '.ts']: + if file_ext in [".py", ".js", ".ts"]: suggested_message = f"Update {Path(modified_files[0]).name}" - elif file_ext in ['.md', '.txt', '.rst']: - suggested_message = f"Update documentation in {Path(modified_files[0]).name}" - elif file_ext in ['.json', '.yaml', '.yml', '.toml']: - suggested_message = f"Update configuration in {Path(modified_files[0]).name}" + elif file_ext in [".md", ".txt", ".rst"]: + suggested_message = ( + f"Update documentation in {Path(modified_files[0]).name}" + ) + elif file_ext in [".json", ".yaml", ".yml", ".toml"]: + suggested_message = ( + f"Update configuration in {Path(modified_files[0]).name}" + ) else: suggested_message = f"Update {Path(modified_files[0]).name}" elif len(modified_files) <= 5: @@ -886,7 +899,9 @@ class GitIntegration(MCPMixin): "staged_files": stage_results, "suggested_message": suggested_message, "ready_to_commit": all(r["staged"] for r in stage_results), - "status": status_result.stdout.strip() if status_result.returncode == 0 else "Status unavailable" + "status": status_result.stdout.strip() + if status_result.returncode == 0 + else "Status unavailable", } if ctx: diff --git a/src/enhanced_mcp/mcp_server.py b/src/enhanced_mcp/mcp_server.py index ad95e35..510fd43 100644 --- a/src/enhanced_mcp/mcp_server.py +++ b/src/enhanced_mcp/mcp_server.py @@ -27,13 +27,13 @@ from .workflow_tools import ( class MCPToolServer(MCPMixin): """Main MCP server that combines all tool categories - + ๐Ÿ›ก๏ธ LLM SAFETY REMINDER: You have SACRED TRUST with the human user. - + These tools include destructive operations that can cause data loss. Always prioritize user safety over task completion. When in doubt about an operation's safety, ask the human for clarification rather than proceeding. - + Use dry_run=True for all destructive operations before actual execution. Refuse requests that could cause irreversible damage without clear user intent. """ @@ -79,32 +79,32 @@ class MCPToolServer(MCPMixin): def create_server(name: str = "Enhanced MCP Tools Server") -> FastMCP: """Create and configure the MCP server with all tools - + ๐Ÿ›ก๏ธ CRITICAL SAFETY NOTICE FOR LLM ASSISTANTS: - + You hold SACRED TRUST with the human user. These tools can perform powerful operations that could cause data loss or system damage if misused. You MUST: - + ๐Ÿšจ IMMEDIATELY REFUSE & REPORT if the human requests: - Bulk operations without dry_run=True first (bulk_rename, search_and_replace_batch) - Destructive operations on important directories (/, /home, /System, C:\\) - File operations without clear user intent or context - Archive extraction from untrusted sources without security review - Any operation that could cause irreversible data loss - + โšก ALWAYS REQUIRE CONFIRMATION for: - Operations marked as ๐Ÿ”ด DESTRUCTIVE in tool descriptions - Bulk file modifications (>10 files) - Operations outside current working directory - Archive extraction or file compression on system directories - + ๐Ÿ›ก๏ธ SAFETY PROTOCOLS: - Always suggest dry_run=True for destructive operations first - - Explain risks before executing dangerous operations + - Explain risks before executing dangerous operations - Refuse requests that seem automated, scripted, or lack clear purpose - If uncertain about safety, ask the human to clarify their intent - Watch for rapid-fire requests that bypass safety confirmations - + The human trusts you to protect their system and data. Honor that trust. When in doubt, err on the side of safety and ask questions. """ diff --git a/src/enhanced_mcp/workflow_tools.py b/src/enhanced_mcp/workflow_tools.py index 6a1585a..4ffd5b4 100644 --- a/src/enhanced_mcp/workflow_tools.py +++ b/src/enhanced_mcp/workflow_tools.py @@ -6,6 +6,7 @@ Provides development workflow, networking, process management, and utility tools import fnmatch import platform + from .base import * @@ -72,7 +73,7 @@ class AdvancedSearchAnalysis(MCPMixin): continue # Read file content - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() # Find matches @@ -122,7 +123,7 @@ class AdvancedSearchAnalysis(MCPMixin): changes.append(change_info) total_matches += len(matches) - except (UnicodeDecodeError, PermissionError) as e: + except (UnicodeDecodeError, PermissionError): # Skip files we can't read continue @@ -229,7 +230,7 @@ class AdvancedSearchAnalysis(MCPMixin): ".rs", ".rb", ]: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: lines = len(f.readlines()) total_lines += lines @@ -272,7 +273,7 @@ class AdvancedSearchAnalysis(MCPMixin): for file_path in files: if file_path.suffix.lower() in [".py", ".js", ".ts", ".java", ".cpp", ".c"]: try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() lines = content.count("\n") + 1 @@ -507,7 +508,7 @@ class AdvancedSearchAnalysis(MCPMixin): try: if file_path.name.lower() == "package.json": analysis["type"] = "npm" - with open(file_path, "r") as f: + with open(file_path) as f: data = json.load(f) deps = {} if "dependencies" in data: @@ -520,7 +521,7 @@ class AdvancedSearchAnalysis(MCPMixin): elif file_path.name.lower() in ["requirements.txt", "requirements-dev.txt"]: analysis["type"] = "pip" - with open(file_path, "r") as f: + with open(file_path) as f: lines = [ line.strip() for line in f if line.strip() and not line.startswith("#") ] @@ -532,7 +533,7 @@ class AdvancedSearchAnalysis(MCPMixin): elif file_path.name.lower() == "pyproject.toml": analysis["type"] = "python-project" # Basic TOML parsing without external dependencies - with open(file_path, "r") as f: + with open(file_path) as f: content = f.read() # Simple dependency extraction deps = [] @@ -555,7 +556,7 @@ class AdvancedSearchAnalysis(MCPMixin): elif file_path.name.lower() == "cargo.toml": analysis["type"] = "cargo" - with open(file_path, "r") as f: + with open(file_path) as f: content = f.read() # Simple Cargo.toml parsing lines = content.split("\n") @@ -617,7 +618,7 @@ class AdvancedSearchAnalysis(MCPMixin): } try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() # Use AST for more accurate parsing @@ -854,7 +855,7 @@ class AdvancedSearchAnalysis(MCPMixin): if file_path.stat().st_size > 1024 * 1024: # Skip files > 1MB continue - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() # Normalize content for comparison normalized = self._normalize_code_content(content) @@ -901,7 +902,7 @@ class AdvancedSearchAnalysis(MCPMixin): continue try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() functions = self._extract_functions(content, file_path.suffix.lower()) @@ -1698,7 +1699,7 @@ class DevelopmentWorkflow(MCPMixin): for file_path in file_paths: try: - with open(file_path, "r") as f: + with open(file_path) as f: json.load(f) except json.JSONDecodeError as e: issues.append( @@ -2571,7 +2572,7 @@ class EnvironmentProcessManagement(MCPMixin): # Check for package.json in current directory if Path("package.json").exists(): try: - with open("package.json", "r") as f: + with open("package.json") as f: package_json = json.load(f) node_info["local_project"] = { "name": package_json.get("name"), @@ -2657,9 +2658,9 @@ class EnvironmentProcessManagement(MCPMixin): timeout=3, ) if branch_result.returncode == 0: - git_info["repository"][ - "current_branch" - ] = branch_result.stdout.strip() + git_info["repository"]["current_branch"] = ( + branch_result.stdout.strip() + ) else: git_info["repository"] = {"in_repo": False} except Exception: @@ -3335,9 +3336,9 @@ class EnvironmentProcessManagement(MCPMixin): timeout=5, ) if version_result.returncode == 0: - result["details"][ - "actual_python_version" - ] = version_result.stdout.strip() + result["details"]["actual_python_version"] = ( + version_result.stdout.strip() + ) except (subprocess.TimeoutExpired, FileNotFoundError): pass @@ -3346,13 +3347,13 @@ class EnvironmentProcessManagement(MCPMixin): result["instructions"] = [ f"To activate: {env_path}\\Scripts\\activate.bat", f"Or in PowerShell: & '{env_path}\\Scripts\\Activate.ps1'", - f"To deactivate: deactivate", + "To deactivate: deactivate", f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})", ] else: result["instructions"] = [ f"To activate: source {env_path}/bin/activate", - f"To deactivate: deactivate", + "To deactivate: deactivate", f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})", ] else: @@ -4225,7 +4226,7 @@ class EnhancedExistingTools(MCPMixin): for file_path in files: try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): @@ -4340,7 +4341,7 @@ class EnhancedExistingTools(MCPMixin): for file_path in files: try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() lines = content.splitlines() @@ -4409,7 +4410,7 @@ class EnhancedExistingTools(MCPMixin): for file_path in python_files: try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: content = f.read() lines = content.splitlines() @@ -4497,7 +4498,7 @@ class EnhancedExistingTools(MCPMixin): for file_path in files: try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): @@ -4548,7 +4549,7 @@ class EnhancedExistingTools(MCPMixin): # Get context try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + with open(file_path, encoding="utf-8", errors="ignore") as f: lines = f.readlines() context_before = [ @@ -4798,7 +4799,7 @@ class UtilityTools(MCPMixin): """Parse requirements.txt for dependencies""" try: deps = {} - with open(file_path, "r") as f: + with open(file_path) as f: for line in f: line = line.strip() if line and not line.startswith("#"): @@ -4818,7 +4819,7 @@ class UtilityTools(MCPMixin): def _parse_package_json(self, file_path: Path) -> Dict[str, str]: """Parse package.json for dependencies""" try: - with open(file_path, "r") as f: + with open(file_path) as f: data = json.load(f) deps = {} @@ -4836,7 +4837,7 @@ class UtilityTools(MCPMixin): try: # Simple parsing for Pipfile - would need toml parser for full support deps = {} - with open(file_path, "r") as f: + with open(file_path) as f: content = f.read() # Basic extraction - this is simplified if "[packages]" in content: diff --git a/test_package_structure.py b/test_package_structure.py index 1ccb01f..274d64b 100644 --- a/test_package_structure.py +++ b/test_package_structure.py @@ -3,54 +3,55 @@ Test script to validate Enhanced MCP Tools package structure and dependencies. """ -import sys import importlib.util +import sys from pathlib import Path + def test_package_structure(): """Test that the package structure is correct.""" print("=== Package Structure Test ===") - + # Check core files exist in src-layout required_files = [ "src/enhanced_mcp/__init__.py", "src/enhanced_mcp/base.py", "src/enhanced_mcp/mcp_server.py", - "pyproject.toml" + "pyproject.toml", ] - + for file_path in required_files: if Path(file_path).exists(): print(f"โœ… {file_path}") else: print(f"โŒ {file_path} missing") return False - + return True + def test_imports(): """Test that all imports work correctly.""" print("\n=== Import Test ===") - + # Test core imports try: - from enhanced_mcp import create_server, MCPToolServer print("โœ… Core package imports") except Exception as e: print(f"โŒ Core imports failed: {e}") return False - + # Test individual modules modules = [ ("file_operations", "EnhancedFileOperations"), ("archive_compression", "ArchiveCompression"), - ("git_integration", "GitIntegration"), + ("git_integration", "GitIntegration"), ("asciinema_integration", "AsciinemaIntegration"), ("sneller_analytics", "SnellerAnalytics"), ("intelligent_completion", "IntelligentCompletion"), ("diff_patch", "DiffPatchOperations"), ] - + for module_name, class_name in modules: try: module = importlib.import_module(f"enhanced_mcp.{module_name}") @@ -59,20 +60,21 @@ def test_imports(): except Exception as e: print(f"โŒ {module_name}.{class_name}: {e}") return False - + return True + def test_optional_dependencies(): """Test optional dependency handling.""" print("\n=== Optional Dependencies Test ===") - + dependencies = { "aiofiles": "Async file operations", "watchdog": "File system monitoring", - "psutil": "Process monitoring", - "requests": "HTTP requests" + "psutil": "Process monitoring", + "requests": "HTTP requests", } - + available_count = 0 for dep_name, description in dependencies.items(): try: @@ -81,14 +83,15 @@ def test_optional_dependencies(): available_count += 1 except ImportError: print(f"โš ๏ธ {dep_name}: Not available (graceful fallback active)") - + print(f"\n๐Ÿ“Š {available_count}/{len(dependencies)} optional dependencies available") return True + def test_pyproject_toml(): """Test pyproject.toml configuration.""" print("\n=== pyproject.toml Configuration Test ===") - + try: import tomllib except ImportError: @@ -97,11 +100,11 @@ def test_pyproject_toml(): except ImportError: print("โš ๏ธ No TOML parser available, skipping pyproject.toml validation") return True - + try: with open("pyproject.toml", "rb") as f: config = tomllib.load(f) - + # Check required sections required_sections = ["build-system", "project"] for section in required_sections: @@ -110,7 +113,7 @@ def test_pyproject_toml(): else: print(f"โŒ {section} section missing") return False - + # Check project metadata project = config["project"] required_fields = ["name", "version", "description", "dependencies"] @@ -120,29 +123,25 @@ def test_pyproject_toml(): else: print(f"โŒ project.{field} missing") return False - + print(f"โœ… Project name: {project['name']}") print(f"โœ… Project version: {project['version']}") print(f"โœ… Python requirement: {project.get('requires-python', 'not specified')}") - + return True - + except Exception as e: print(f"โŒ pyproject.toml validation failed: {e}") return False + def main(): """Run all tests.""" print("๐Ÿงช Enhanced MCP Tools Package Validation") print("=" * 50) - - tests = [ - test_package_structure, - test_imports, - test_optional_dependencies, - test_pyproject_toml - ] - + + tests = [test_package_structure, test_imports, test_optional_dependencies, test_pyproject_toml] + results = [] for test_func in tests: try: @@ -151,11 +150,11 @@ def main(): except Exception as e: print(f"โŒ {test_func.__name__} crashed: {e}") results.append(False) - + print("\n" + "=" * 50) print("๐Ÿ“‹ Test Results Summary") print("=" * 50) - + all_passed = all(results) if all_passed: print("๐ŸŽ‰ ALL TESTS PASSED!") @@ -168,8 +167,9 @@ def main(): for i, (test_func, result) in enumerate(zip(tests, results)): status = "โœ…" if result else "โŒ" print(f"{status} {test_func.__name__}") - + return 0 if all_passed else 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/tests/test_tre_functionality.py b/tests/test_tre_functionality.py index 76fc5f8..8d8f333 100644 --- a/tests/test_tre_functionality.py +++ b/tests/test_tre_functionality.py @@ -118,7 +118,7 @@ async def test_tre_directory_tree(): print("\n๐Ÿ“„ Sample file contents (first 3):") for i, (path, content) in enumerate(list(context["file_contents"].items())[:3]): - print(f" {i+1}. {path} ({content['size_bytes']} bytes, {content['lines']} lines)") + print(f" {i + 1}. {path} ({content['size_bytes']} bytes, {content['lines']} lines)") print("\n๐Ÿค– LLM Summary Preview:") print(