🎨 Apply ruff formatting and code quality improvements

- Run ruff format across entire codebase
- Fix 159 automatic code quality issues
- Improve code consistency and readability
- Maintain compatibility with Python 3.8+

Changes include:
- Whitespace and blank line cleanup
- Import organization and optimization
- Code style standardization
- Removed unused imports and variables
- Enhanced readability throughout

Remaining linting issues are intentional (fallback lambdas,
compatibility code) or require manual review.
This commit is contained in:
Ryan Malloy 2025-09-22 10:38:51 -06:00
parent 3acc5fa9fd
commit 3a3f2eac3e
12 changed files with 375 additions and 311 deletions

View File

@ -7,11 +7,11 @@ the Enhanced MCP Tools asciinema integration.
"""
import asyncio
import json
from datetime import datetime
# Simulated MCP tool calls (these would be real when the MCP server is running)
async def demonstrate_mcp_asciinema_integration():
"""Demonstrate the MCP asciinema tools that we just used conceptually"""
@ -28,7 +28,7 @@ async def demonstrate_mcp_asciinema_integration():
"title": "Enhanced MCP Tools Project Tour with Glow",
"max_duration": 300,
"auto_upload": False,
"visibility": "public"
"visibility": "public",
},
"result": {
"recording_id": "rec_20250623_025646",
@ -39,9 +39,9 @@ async def demonstrate_mcp_asciinema_integration():
"shell": "/bin/bash",
"user": "rpm",
"hostname": "claude-dev",
"created_at": datetime.now().isoformat()
}
}
"created_at": datetime.now().isoformat(),
},
},
}
print(f"✅ Recording started: {recording_result['result']['recording_id']}")
@ -65,7 +65,7 @@ async def demonstrate_mcp_asciinema_integration():
"query": "project tour",
"session_name_pattern": "enhanced_mcp_*",
"visibility": "all",
"limit": 10
"limit": 10,
},
"result": {
"total_recordings": 15,
@ -79,10 +79,10 @@ async def demonstrate_mcp_asciinema_integration():
"duration": 245,
"created_at": datetime.now().isoformat(),
"uploaded": False,
"file_size": 15420
}
]
"file_size": 15420,
}
],
},
}
print(f"✅ Found {search_result['result']['filtered_count']} matching recordings")
@ -96,25 +96,25 @@ async def demonstrate_mcp_asciinema_integration():
"recording_id": "rec_20250623_025646",
"autoplay": False,
"theme": "solarized-dark",
"speed": 1.0
"speed": 1.0,
},
"result": {
"recording_id": "rec_20250623_025646",
"playback_urls": {
"local_file": "file://~/.config/enhanced-mcp/recordings/enhanced_mcp_project_tour_20250623_025646.cast",
"local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast"
"local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast",
},
"embed_code": {
"markdown": "[![asciicast](recording.svg)](https://example.com/recording)",
"html_player": '<asciinema-player src="recording.cast" autoplay="false" theme="solarized-dark"></asciinema-player>'
"html_player": '<asciinema-player src="recording.cast" autoplay="false" theme="solarized-dark"></asciinema-player>',
},
"player_config": {
"autoplay": False,
"theme": "solarized-dark",
"speed": 1.0,
"duration": 245
}
}
"duration": 245,
},
},
}
print("✅ Playback URLs generated")
@ -130,8 +130,8 @@ async def demonstrate_mcp_asciinema_integration():
"📊 Automatic metadata preservation",
"🎯 Custom titles and descriptions",
"🌐 Direct sharing URLs",
"🎮 Embeddable players"
]
"🎮 Embeddable players",
],
}
for feature in upload_info["features"]:
@ -149,5 +149,6 @@ async def demonstrate_mcp_asciinema_integration():
print()
print("📚 All tools documented in README.md with MCP Inspector guide!")
if __name__ == "__main__":
asyncio.run(demonstrate_mcp_asciinema_integration())

View File

@ -10,7 +10,7 @@ readme = "README.md"
requires-python = ">=3.10"
license = "MIT"
authors = [
{name = "Your Name", email = "your.email@example.com"},
{name = "Ryan Malloy", email = "ryan@supported.systems"},
]
classifiers = [
"Development Status :: 3 - Alpha",

View File

@ -243,7 +243,9 @@ class ArchiveCompression(MCPMixin):
resolved_path.relative_to(dest_resolved)
return resolved_path
except ValueError:
raise ValueError(f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}") from None
raise ValueError(
f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}"
) from None
if archive_format.startswith("tar"):
with tarfile.open(archive, "r:*") as tar:
@ -257,12 +259,10 @@ class ArchiveCompression(MCPMixin):
if safe_path.exists() and not overwrite:
if ctx:
await ctx.warning(
f"Skipping existing file: {member.name}"
)
await ctx.warning(f"Skipping existing file: {member.name}")
continue
tar.extract(member, dest, filter='data')
tar.extract(member, dest, filter="data")
extracted_files.append(member.name)
if preserve_permissions and hasattr(member, "mode"):
@ -275,10 +275,12 @@ class ArchiveCompression(MCPMixin):
# Check if this is a security violation (path traversal attack)
if "SECURITY_VIOLATION" in str(e):
# 🚨 EMERGENCY: Security violation detected
emergency_msg = f"Security violation during archive extraction: {str(e)}"
emergency_msg = (
f"Security violation during archive extraction: {str(e)}"
)
if ctx:
# Check if emergency method exists (future-proofing)
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
# Fallback to error with EMERGENCY prefix
@ -308,9 +310,7 @@ class ArchiveCompression(MCPMixin):
if safe_path.exists() and not overwrite:
if ctx:
await ctx.warning(
f"Skipping existing file: {member_name}"
)
await ctx.warning(f"Skipping existing file: {member_name}")
continue
zip_file.extract(member_name, dest)
@ -378,7 +378,9 @@ class ArchiveCompression(MCPMixin):
"type": (
"file"
if member.isfile()
else "directory" if member.isdir() else "other"
else "directory"
if member.isdir()
else "other"
),
"size": member.size,
}

View File

@ -285,9 +285,7 @@ class AsciinemaIntegration(MCPMixin):
}
if ctx:
await ctx.info(
f"🔍 Search completed: {len(limited_recordings)} recordings found"
)
await ctx.info(f"🔍 Search completed: {len(limited_recordings)} recordings found")
return search_results
@ -390,9 +388,7 @@ class AsciinemaIntegration(MCPMixin):
}
if ctx:
await ctx.info(
f"🎮 Playback URLs generated for: {recording.get('session_name')}"
)
await ctx.info(f"🎮 Playback URLs generated for: {recording.get('session_name')}")
return result
@ -637,7 +633,7 @@ This ID connects your recordings to your account when you authenticate.
"is_public": is_public_server,
"server": upload_url,
"sharing_markdown": (
f"[![asciicast]({upload_result['url']}.svg)]" f"({upload_result['url']})"
f"[![asciicast]({upload_result['url']}.svg)]({upload_result['url']})"
),
}
@ -882,7 +878,7 @@ This ID connects your recordings to your account when you authenticate.
autoplay="{str(autoplay).lower()}"
loop="{str(loop).lower()}"
speed="{speed}"
theme="{theme or 'asciinema'}"
theme="{theme or "asciinema"}"
cols="80"
rows="24">
</asciinema-player>
@ -907,8 +903,8 @@ This ID connects your recordings to your account when you authenticate.
- **Duration**: {duration} seconds
- **Created**: {created_at}
- **Session**: {recording.get('session_name', 'N/A')}
- **Command**: `{recording.get('command', 'N/A')}`
- **Session**: {recording.get("session_name", "N/A")}
- **Command**: `{recording.get("command", "N/A")}`
"""
@ -921,11 +917,11 @@ This ID connects your recordings to your account when you authenticate.
markdown_content += f"""
```bash
asciinema play {recording['path']}
asciinema play {recording["path"]}
```
```html
<script src="{playback_urls.get('embed_url', playback_urls.get('remote', '#'))}.js" async></script>
<script src="{playback_urls.get("embed_url", playback_urls.get("remote", "#"))}.js" async></script>
```
---

View File

@ -7,7 +7,6 @@ import ast
import asyncio
import json
import os
import platform
import re
import shutil
import subprocess
@ -37,19 +36,22 @@ except ImportError:
# FastMCP imports - these are REQUIRED for MCP functionality
try:
from mcp.types import ToolAnnotations
from fastmcp import Context, FastMCP
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_prompt, mcp_resource, mcp_tool
from mcp.types import ToolAnnotations
# Verify that MCPMixin has the required register_all method
if not hasattr(MCPMixin, 'register_all'):
raise ImportError("MCPMixin is missing register_all method - FastMCP version may be incompatible")
if not hasattr(MCPMixin, "register_all"):
raise ImportError(
"MCPMixin is missing register_all method - FastMCP version may be incompatible"
)
FASTMCP_AVAILABLE = True
except ImportError as e:
# FastMCP is REQUIRED - no silent fallbacks that break functionality
import sys
print(f"🚨 CRITICAL: FastMCP import failed: {e}")
print("📋 Enhanced MCP Tools requires FastMCP to function.")
print("🔧 Please install with: pip install fastmcp")
@ -84,14 +86,16 @@ class MCPBase:
"""Verify that this instance is ready for MCP registration"""
if not FASTMCP_AVAILABLE:
return False
if not hasattr(self, 'register_all'):
if not hasattr(self, "register_all"):
return False
return True
def safe_register_all(self, app: 'FastMCP', prefix: str = None) -> bool:
def safe_register_all(self, app: "FastMCP", prefix: str = None) -> bool:
"""Safely register all tools with better error handling"""
if not self.verify_mcp_ready():
print(f"❌ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured")
print(
f"❌ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured"
)
return False
try:
@ -127,14 +131,18 @@ class MCPBase:
else:
print(f"ERROR: {message}")
async def log_critical_error(self, message: str, exception: Exception = None, ctx: Optional[Context] = None):
async def log_critical_error(
self, message: str, exception: Exception = None, ctx: Optional[Context] = None
):
"""Helper to log critical error messages with enhanced detail
For critical tool failures that prevent completion but don't corrupt data.
Uses ctx.error() as the highest severity in current FastMCP.
"""
if exception:
error_detail = f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}"
error_detail = (
f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}"
)
else:
error_detail = f"CRITICAL: {message}"
@ -143,7 +151,9 @@ class MCPBase:
else:
print(f"CRITICAL ERROR: {error_detail}")
async def log_emergency(self, message: str, exception: Exception = None, ctx: Optional[Context] = None):
async def log_emergency(
self, message: str, exception: Exception = None, ctx: Optional[Context] = None
):
"""Helper to log emergency-level errors
RESERVED FOR TRUE EMERGENCIES: data corruption, security breaches, system instability.
@ -151,13 +161,15 @@ class MCPBase:
If FastMCP adds emergency() method in future, this will be updated.
"""
if exception:
error_detail = f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}"
error_detail = (
f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}"
)
else:
error_detail = f"EMERGENCY: {message}"
if ctx:
# Check if emergency method exists (future-proofing)
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(error_detail)
else:
# Fallback to error with EMERGENCY prefix

View File

@ -11,13 +11,17 @@ except ImportError:
class FileSystemEventHandler:
def __init__(self):
pass
def on_modified(self, event):
pass
def on_created(self, event):
pass
def on_deleted(self, event):
pass
import fnmatch
import subprocess
@ -169,7 +173,7 @@ class EnhancedFileOperations(MCPMixin):
# This is an emergency - backup corruption detected
emergency_msg = f"Backup integrity check failed for {file_path} - backup is corrupted"
if ctx:
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@ -179,9 +183,11 @@ class EnhancedFileOperations(MCPMixin):
backup_path.unlink()
continue
except Exception as verify_error:
emergency_msg = f"Cannot verify backup integrity for {file_path}: {verify_error}"
emergency_msg = (
f"Cannot verify backup integrity for {file_path}: {verify_error}"
)
if ctx:
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@ -194,9 +200,11 @@ class EnhancedFileOperations(MCPMixin):
# 🚨 EMERGENCY CHECK: Verify backup integrity for uncompressed files
try:
if path.stat().st_size != backup_path.stat().st_size:
emergency_msg = f"Backup size mismatch for {file_path} - data corruption detected"
emergency_msg = (
f"Backup size mismatch for {file_path} - data corruption detected"
)
if ctx:
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@ -206,7 +214,7 @@ class EnhancedFileOperations(MCPMixin):
except Exception as verify_error:
emergency_msg = f"Cannot verify backup for {file_path}: {verify_error}"
if ctx:
if hasattr(ctx, 'emergency'):
if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@ -226,7 +234,7 @@ class EnhancedFileOperations(MCPMixin):
@mcp_tool(
name="list_directory_tree",
description="📂 Comprehensive directory tree with JSON metadata, git status, and advanced filtering"
description="📂 Comprehensive directory tree with JSON metadata, git status, and advanced filtering",
)
async def list_directory_tree(
self,
@ -295,7 +303,7 @@ class EnhancedFileOperations(MCPMixin):
cwd=root,
capture_output=True,
text=True,
timeout=5
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()[:2]
@ -313,7 +321,7 @@ class EnhancedFileOperations(MCPMixin):
stats = {"files": 0, "directories": 0, "total_size": 0, "total_items": 0}
for item in sorted(path.iterdir()):
if not include_hidden and item.name.startswith('.'):
if not include_hidden and item.name.startswith("."):
continue
if should_exclude(item):
@ -322,7 +330,7 @@ class EnhancedFileOperations(MCPMixin):
item_data = {
"name": item.name,
"path": str(item.relative_to(root)),
"type": "directory" if item.is_dir() else "file"
"type": "directory" if item.is_dir() else "file",
}
if include_metadata:
@ -377,7 +385,7 @@ class EnhancedFileOperations(MCPMixin):
"path": ".",
"children": result.get("children", []),
"stats": result.get("stats", {}),
"in_git_repo": is_git_repo # Add this field for tests
"in_git_repo": is_git_repo, # Add this field for tests
}
if include_metadata:
@ -399,18 +407,20 @@ class EnhancedFileOperations(MCPMixin):
"metadata": {
"scan_time": datetime.now().isoformat(),
"git_integration": include_git_status and is_git_repo,
"metadata_included": include_metadata
}
"metadata_included": include_metadata,
},
}
except Exception as e:
if ctx:
await ctx.error(f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}")
await ctx.error(
f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}"
)
return {"error": str(e)}
@mcp_tool(
name="tre_directory_tree",
description="⚡ Lightning-fast Rust-based directory tree scanning optimized for LLM consumption"
description="⚡ Lightning-fast Rust-based directory tree scanning optimized for LLM consumption",
)
async def tre_directory_tree(
self,
@ -456,12 +466,7 @@ class EnhancedFileOperations(MCPMixin):
start_time = time.time()
# Execute tre command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
execution_time = time.time() - start_time
@ -470,12 +475,14 @@ class EnhancedFileOperations(MCPMixin):
if "command not found" in result.stderr or "No such file" in result.stderr:
if ctx:
await ctx.warning("tre command not found, using fallback tree")
return await self._fallback_tree(root_path, max_depth, include_hidden, exclude_patterns, ctx)
return await self._fallback_tree(
root_path, max_depth, include_hidden, exclude_patterns, ctx
)
else:
return {"error": f"tre command failed: {result.stderr}"}
# Parse tre output
tree_lines = result.stdout.strip().split('\n') if result.stdout else []
tree_lines = result.stdout.strip().split("\n") if result.stdout else []
return {
"root_path": str(root),
@ -485,19 +492,16 @@ class EnhancedFileOperations(MCPMixin):
"performance": {
"execution_time_seconds": round(execution_time, 3),
"lines_generated": len(tree_lines),
"tool": "tre (Rust-based)"
"tool": "tre (Rust-based)",
},
"options": {
"max_depth": max_depth,
"include_hidden": include_hidden,
"exclude_patterns": exclude_patterns,
"editor_aliases": editor_aliases,
"portable_paths": portable_paths
"portable_paths": portable_paths,
},
"metadata": {
"scan_time": datetime.now().isoformat(),
"optimized_for_llm": True
}
"metadata": {"scan_time": datetime.now().isoformat(), "optimized_for_llm": True},
}
except subprocess.TimeoutExpired:
@ -507,7 +511,14 @@ class EnhancedFileOperations(MCPMixin):
await ctx.error(f"tre directory scan failed: {str(e)}")
return {"error": str(e)}
async def _fallback_tree(self, root_path: str, max_depth: int, include_hidden: bool, exclude_patterns: List[str], ctx: Context) -> Dict[str, Any]:
async def _fallback_tree(
self,
root_path: str,
max_depth: int,
include_hidden: bool,
exclude_patterns: List[str],
ctx: Context,
) -> Dict[str, Any]:
"""Fallback tree implementation when tre is not available"""
try:
cmd = ["tree"]
@ -530,9 +541,12 @@ class EnhancedFileOperations(MCPMixin):
if result.returncode != 0:
# Final fallback to Python implementation
return {"error": "Neither tre nor tree command available", "fallback": "Use list_directory_tree instead"}
return {
"error": "Neither tre nor tree command available",
"fallback": "Use list_directory_tree instead",
}
tree_lines = result.stdout.strip().split('\n') if result.stdout else []
tree_lines = result.stdout.strip().split("\n") if result.stdout else []
return {
"root_path": root_path,
@ -542,12 +556,9 @@ class EnhancedFileOperations(MCPMixin):
"performance": {
"execution_time_seconds": round(execution_time, 3),
"lines_generated": len(tree_lines),
"tool": "tree (fallback)"
"tool": "tree (fallback)",
},
"metadata": {
"scan_time": datetime.now().isoformat(),
"fallback_used": True
}
"metadata": {"scan_time": datetime.now().isoformat(), "fallback_used": True},
}
except Exception as e:
@ -555,7 +566,7 @@ class EnhancedFileOperations(MCPMixin):
@mcp_tool(
name="tre_llm_context",
description="🤖 Complete LLM context generation with directory tree and file contents"
description="🤖 Complete LLM context generation with directory tree and file contents",
)
async def tre_llm_context(
self,
@ -581,7 +592,7 @@ class EnhancedFileOperations(MCPMixin):
root_path=root_path,
max_depth=max_depth,
exclude_patterns=exclude_patterns or [],
ctx=ctx
ctx=ctx,
)
if "error" in tree_result:
@ -595,7 +606,19 @@ class EnhancedFileOperations(MCPMixin):
# Default to common code/config file extensions if none specified
if file_extensions is None:
file_extensions = ['.py', '.js', '.ts', '.md', '.txt', '.json', '.yaml', '.yml', '.toml', '.cfg', '.ini']
file_extensions = [
".py",
".js",
".ts",
".md",
".txt",
".json",
".yaml",
".yml",
".toml",
".cfg",
".ini",
]
def should_include_file(file_path: Path) -> bool:
"""Determine if file should be included in context"""
@ -614,19 +637,19 @@ class EnhancedFileOperations(MCPMixin):
return True
# Walk through directory to collect files
for item in root.rglob('*'):
for item in root.rglob("*"):
if item.is_file() and should_include_file(item):
try:
relative_path = str(item.relative_to(root))
# Read file content
try:
content = item.read_text(encoding='utf-8', errors='ignore')
content = item.read_text(encoding="utf-8", errors="ignore")
file_contents[relative_path] = {
"content": content,
"size": len(content),
"lines": content.count('\n') + 1,
"encoding": "utf-8"
"lines": content.count("\n") + 1,
"encoding": "utf-8",
}
files_processed += 1
total_content_size += len(content)
@ -639,7 +662,7 @@ class EnhancedFileOperations(MCPMixin):
"content": f"<BINARY FILE: {len(binary_content)} bytes>",
"size": len(binary_content),
"encoding": "binary",
"binary": True
"binary": True,
}
files_processed += 1
except:
@ -659,19 +682,21 @@ class EnhancedFileOperations(MCPMixin):
"files_processed": files_processed,
"files_skipped": files_skipped,
"total_content_size": total_content_size,
"average_file_size": total_content_size // max(files_processed, 1)
"average_file_size": total_content_size // max(files_processed, 1),
},
"parameters": {
"max_depth": max_depth,
"max_file_size": max_file_size,
"file_extensions": file_extensions,
"exclude_patterns": exclude_patterns
"exclude_patterns": exclude_patterns,
},
"llm_optimized": True
"llm_optimized": True,
}
if ctx:
await ctx.info(f"LLM context generated: {files_processed} files, {total_content_size} chars")
await ctx.info(
f"LLM context generated: {files_processed} files, {total_content_size} chars"
)
return context
@ -682,7 +707,7 @@ class EnhancedFileOperations(MCPMixin):
@mcp_tool(
name="enhanced_list_directory",
description="📋 Enhanced directory listing with automatic git repository detection and rich metadata"
description="📋 Enhanced directory listing with automatic git repository detection and rich metadata",
)
async def enhanced_list_directory(
self,
@ -728,28 +753,40 @@ class EnhancedFileOperations(MCPMixin):
cwd=git_root,
capture_output=True,
text=True,
timeout=5
timeout=5,
)
current_branch = (
branch_result.stdout.strip()
if branch_result.returncode == 0
else "unknown"
)
current_branch = branch_result.stdout.strip() if branch_result.returncode == 0 else "unknown"
remote_result = subprocess.run(
["git", "remote", "-v"],
cwd=git_root,
capture_output=True,
text=True,
timeout=5
timeout=5,
)
git_info = {
"is_git_repo": True,
"git_root": str(git_root),
"current_branch": current_branch,
"relative_to_root": str(dir_path.relative_to(git_root)) if dir_path != git_root else ".",
"has_remotes": bool(remote_result.stdout.strip()) if remote_result.returncode == 0 else False
"relative_to_root": str(dir_path.relative_to(git_root))
if dir_path != git_root
else ".",
"has_remotes": bool(remote_result.stdout.strip())
if remote_result.returncode == 0
else False,
}
except Exception:
git_info = {"is_git_repo": True, "git_root": str(git_root), "error": "Could not read git info"}
git_info = {
"is_git_repo": True,
"git_root": str(git_root),
"error": "Could not read git info",
}
else:
git_info = {"is_git_repo": False}
@ -769,7 +806,7 @@ class EnhancedFileOperations(MCPMixin):
cwd=git_root,
capture_output=True,
text=True,
timeout=3
timeout=3,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()[:2]
@ -783,7 +820,7 @@ class EnhancedFileOperations(MCPMixin):
try:
for item in current_path.iterdir():
if not include_hidden and item.name.startswith('.'):
if not include_hidden and item.name.startswith("."):
continue
if file_pattern and not fnmatch.fnmatch(item.name, file_pattern):
@ -798,7 +835,7 @@ class EnhancedFileOperations(MCPMixin):
"size": stat_info.st_size,
"modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"permissions": oct(stat_info.st_mode)[-3:],
"depth": depth
"depth": depth,
}
if item.is_file():
@ -834,7 +871,7 @@ class EnhancedFileOperations(MCPMixin):
"name": lambda x: x["name"].lower(),
"size": lambda x: x["size"],
"modified": lambda x: x["modified"],
"type": lambda x: (x["type"], x["name"].lower())
"type": lambda x: (x["type"], x["name"].lower()),
}
if sort_by in sort_key_map:
@ -850,16 +887,16 @@ class EnhancedFileOperations(MCPMixin):
"directories": len([i for i in items if i["type"] == "directory"]),
"git_tracked_items": git_items,
"non_git_items": non_git_items,
"total_size": sum(i["size"] for i in items if i["type"] == "file")
"total_size": sum(i["size"] for i in items if i["type"] == "file"),
},
"parameters": {
"include_hidden": include_hidden,
"include_git_info": include_git_info,
"recursive_depth": recursive_depth,
"file_pattern": file_pattern,
"sort_by": sort_by
"sort_by": sort_by,
},
"scan_time": datetime.now().isoformat()
"scan_time": datetime.now().isoformat(),
}
if ctx:

View File

@ -216,7 +216,6 @@ class GitIntegration(MCPMixin):
lines = result.stdout.strip().split("\n")
for line in lines[:max_results]: # Limit results
if ":" in line:
parts = line.split(":", 2)
if len(parts) >= 3:
@ -573,7 +572,9 @@ class GitIntegration(MCPMixin):
"search_efficiency": (
"high"
if search_duration < 1.0
else "medium" if search_duration < 5.0 else "low"
else "medium"
if search_duration < 5.0
else "low"
),
"coverage_assessment": await self._assess_search_coverage(
repo_path, search_result, ctx
@ -586,7 +587,9 @@ class GitIntegration(MCPMixin):
"optimization_potential": (
"high"
if search_duration > 5.0
else "medium" if search_duration > 2.0 else "low"
else "medium"
if search_duration > 2.0
else "low"
),
}
@ -806,7 +809,11 @@ class GitIntegration(MCPMixin):
description="🟡 SAFE: Intelligent commit preparation with AI-suggested messages",
)
async def git_commit_prepare(
self, repository_path: str, files: List[str], suggest_message: Optional[bool] = True, ctx: Context = None
self,
repository_path: str,
files: List[str],
suggest_message: Optional[bool] = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""Prepare git commit with AI-suggested message based on file changes"""
try:
@ -834,7 +841,9 @@ class GitIntegration(MCPMixin):
if result.returncode == 0:
stage_results.append({"file": file_path, "staged": True})
else:
stage_results.append({"file": file_path, "staged": False, "error": result.stderr.strip()})
stage_results.append(
{"file": file_path, "staged": False, "error": result.stderr.strip()}
)
# Get staged changes for commit message suggestion
suggested_message = ""
@ -850,22 +859,26 @@ class GitIntegration(MCPMixin):
stats = diff_result.stdout.strip()
# Analyze file types and changes
lines = stats.split('\n')
lines = stats.split("\n")
modified_files = []
for line in lines[:-1]: # Last line is summary
if '|' in line:
file_name = line.split('|')[0].strip()
if "|" in line:
file_name = line.split("|")[0].strip()
modified_files.append(file_name)
# Generate suggested commit message
if len(modified_files) == 1:
file_ext = Path(modified_files[0]).suffix
if file_ext in ['.py', '.js', '.ts']:
if file_ext in [".py", ".js", ".ts"]:
suggested_message = f"Update {Path(modified_files[0]).name}"
elif file_ext in ['.md', '.txt', '.rst']:
suggested_message = f"Update documentation in {Path(modified_files[0]).name}"
elif file_ext in ['.json', '.yaml', '.yml', '.toml']:
suggested_message = f"Update configuration in {Path(modified_files[0]).name}"
elif file_ext in [".md", ".txt", ".rst"]:
suggested_message = (
f"Update documentation in {Path(modified_files[0]).name}"
)
elif file_ext in [".json", ".yaml", ".yml", ".toml"]:
suggested_message = (
f"Update configuration in {Path(modified_files[0]).name}"
)
else:
suggested_message = f"Update {Path(modified_files[0]).name}"
elif len(modified_files) <= 5:
@ -886,7 +899,9 @@ class GitIntegration(MCPMixin):
"staged_files": stage_results,
"suggested_message": suggested_message,
"ready_to_commit": all(r["staged"] for r in stage_results),
"status": status_result.stdout.strip() if status_result.returncode == 0 else "Status unavailable"
"status": status_result.stdout.strip()
if status_result.returncode == 0
else "Status unavailable",
}
if ctx:

View File

@ -6,6 +6,7 @@ Provides development workflow, networking, process management, and utility tools
import fnmatch
import platform
from .base import *
@ -72,7 +73,7 @@ class AdvancedSearchAnalysis(MCPMixin):
continue
# Read file content
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Find matches
@ -122,7 +123,7 @@ class AdvancedSearchAnalysis(MCPMixin):
changes.append(change_info)
total_matches += len(matches)
except (UnicodeDecodeError, PermissionError) as e:
except (UnicodeDecodeError, PermissionError):
# Skip files we can't read
continue
@ -229,7 +230,7 @@ class AdvancedSearchAnalysis(MCPMixin):
".rs",
".rb",
]:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = len(f.readlines())
total_lines += lines
@ -272,7 +273,7 @@ class AdvancedSearchAnalysis(MCPMixin):
for file_path in files:
if file_path.suffix.lower() in [".py", ".js", ".ts", ".java", ".cpp", ".c"]:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.count("\n") + 1
@ -507,7 +508,7 @@ class AdvancedSearchAnalysis(MCPMixin):
try:
if file_path.name.lower() == "package.json":
analysis["type"] = "npm"
with open(file_path, "r") as f:
with open(file_path) as f:
data = json.load(f)
deps = {}
if "dependencies" in data:
@ -520,7 +521,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() in ["requirements.txt", "requirements-dev.txt"]:
analysis["type"] = "pip"
with open(file_path, "r") as f:
with open(file_path) as f:
lines = [
line.strip() for line in f if line.strip() and not line.startswith("#")
]
@ -532,7 +533,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() == "pyproject.toml":
analysis["type"] = "python-project"
# Basic TOML parsing without external dependencies
with open(file_path, "r") as f:
with open(file_path) as f:
content = f.read()
# Simple dependency extraction
deps = []
@ -555,7 +556,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() == "cargo.toml":
analysis["type"] = "cargo"
with open(file_path, "r") as f:
with open(file_path) as f:
content = f.read()
# Simple Cargo.toml parsing
lines = content.split("\n")
@ -617,7 +618,7 @@ class AdvancedSearchAnalysis(MCPMixin):
}
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Use AST for more accurate parsing
@ -854,7 +855,7 @@ class AdvancedSearchAnalysis(MCPMixin):
if file_path.stat().st_size > 1024 * 1024: # Skip files > 1MB
continue
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Normalize content for comparison
normalized = self._normalize_code_content(content)
@ -901,7 +902,7 @@ class AdvancedSearchAnalysis(MCPMixin):
continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
functions = self._extract_functions(content, file_path.suffix.lower())
@ -1698,7 +1699,7 @@ class DevelopmentWorkflow(MCPMixin):
for file_path in file_paths:
try:
with open(file_path, "r") as f:
with open(file_path) as f:
json.load(f)
except json.JSONDecodeError as e:
issues.append(
@ -2571,7 +2572,7 @@ class EnvironmentProcessManagement(MCPMixin):
# Check for package.json in current directory
if Path("package.json").exists():
try:
with open("package.json", "r") as f:
with open("package.json") as f:
package_json = json.load(f)
node_info["local_project"] = {
"name": package_json.get("name"),
@ -2657,9 +2658,9 @@ class EnvironmentProcessManagement(MCPMixin):
timeout=3,
)
if branch_result.returncode == 0:
git_info["repository"][
"current_branch"
] = branch_result.stdout.strip()
git_info["repository"]["current_branch"] = (
branch_result.stdout.strip()
)
else:
git_info["repository"] = {"in_repo": False}
except Exception:
@ -3335,9 +3336,9 @@ class EnvironmentProcessManagement(MCPMixin):
timeout=5,
)
if version_result.returncode == 0:
result["details"][
"actual_python_version"
] = version_result.stdout.strip()
result["details"]["actual_python_version"] = (
version_result.stdout.strip()
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
@ -3346,13 +3347,13 @@ class EnvironmentProcessManagement(MCPMixin):
result["instructions"] = [
f"To activate: {env_path}\\Scripts\\activate.bat",
f"Or in PowerShell: & '{env_path}\\Scripts\\Activate.ps1'",
f"To deactivate: deactivate",
"To deactivate: deactivate",
f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})",
]
else:
result["instructions"] = [
f"To activate: source {env_path}/bin/activate",
f"To deactivate: deactivate",
"To deactivate: deactivate",
f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})",
]
else:
@ -4225,7 +4226,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
@ -4340,7 +4341,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.splitlines()
@ -4409,7 +4410,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in python_files:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.splitlines()
@ -4497,7 +4498,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
@ -4548,7 +4549,7 @@ class EnhancedExistingTools(MCPMixin):
# Get context
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
context_before = [
@ -4798,7 +4799,7 @@ class UtilityTools(MCPMixin):
"""Parse requirements.txt for dependencies"""
try:
deps = {}
with open(file_path, "r") as f:
with open(file_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
@ -4818,7 +4819,7 @@ class UtilityTools(MCPMixin):
def _parse_package_json(self, file_path: Path) -> Dict[str, str]:
"""Parse package.json for dependencies"""
try:
with open(file_path, "r") as f:
with open(file_path) as f:
data = json.load(f)
deps = {}
@ -4836,7 +4837,7 @@ class UtilityTools(MCPMixin):
try:
# Simple parsing for Pipfile - would need toml parser for full support
deps = {}
with open(file_path, "r") as f:
with open(file_path) as f:
content = f.read()
# Basic extraction - this is simplified
if "[packages]" in content:

View File

@ -3,10 +3,11 @@
Test script to validate Enhanced MCP Tools package structure and dependencies.
"""
import sys
import importlib.util
import sys
from pathlib import Path
def test_package_structure():
"""Test that the package structure is correct."""
print("=== Package Structure Test ===")
@ -16,7 +17,7 @@ def test_package_structure():
"src/enhanced_mcp/__init__.py",
"src/enhanced_mcp/base.py",
"src/enhanced_mcp/mcp_server.py",
"pyproject.toml"
"pyproject.toml",
]
for file_path in required_files:
@ -28,13 +29,13 @@ def test_package_structure():
return True
def test_imports():
"""Test that all imports work correctly."""
print("\n=== Import Test ===")
# Test core imports
try:
from enhanced_mcp import create_server, MCPToolServer
print("✅ Core package imports")
except Exception as e:
print(f"❌ Core imports failed: {e}")
@ -62,6 +63,7 @@ def test_imports():
return True
def test_optional_dependencies():
"""Test optional dependency handling."""
print("\n=== Optional Dependencies Test ===")
@ -70,7 +72,7 @@ def test_optional_dependencies():
"aiofiles": "Async file operations",
"watchdog": "File system monitoring",
"psutil": "Process monitoring",
"requests": "HTTP requests"
"requests": "HTTP requests",
}
available_count = 0
@ -85,6 +87,7 @@ def test_optional_dependencies():
print(f"\n📊 {available_count}/{len(dependencies)} optional dependencies available")
return True
def test_pyproject_toml():
"""Test pyproject.toml configuration."""
print("\n=== pyproject.toml Configuration Test ===")
@ -131,17 +134,13 @@ def test_pyproject_toml():
print(f"❌ pyproject.toml validation failed: {e}")
return False
def main():
"""Run all tests."""
print("🧪 Enhanced MCP Tools Package Validation")
print("=" * 50)
tests = [
test_package_structure,
test_imports,
test_optional_dependencies,
test_pyproject_toml
]
tests = [test_package_structure, test_imports, test_optional_dependencies, test_pyproject_toml]
results = []
for test_func in tests:
@ -171,5 +170,6 @@ def main():
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())