diff --git a/examples/demo_mcp_asciinema.py b/examples/demo_mcp_asciinema.py
index b3ec567..ddbc73b 100644
--- a/examples/demo_mcp_asciinema.py
+++ b/examples/demo_mcp_asciinema.py
@@ -7,18 +7,18 @@ the Enhanced MCP Tools asciinema integration.
"""
import asyncio
-import json
from datetime import datetime
# Simulated MCP tool calls (these would be real when the MCP server is running)
+
async def demonstrate_mcp_asciinema_integration():
"""Demonstrate the MCP asciinema tools that we just used conceptually"""
-
+
print("๐ฌ MCP Asciinema Integration - Tool Demonstration")
print("=" * 60)
print()
-
+
# 1. Start recording
print("๐น 1. Starting asciinema recording...")
recording_result = {
@@ -28,7 +28,7 @@ async def demonstrate_mcp_asciinema_integration():
"title": "Enhanced MCP Tools Project Tour with Glow",
"max_duration": 300,
"auto_upload": False,
- "visibility": "public"
+ "visibility": "public",
},
"result": {
"recording_id": "rec_20250623_025646",
@@ -39,15 +39,15 @@ async def demonstrate_mcp_asciinema_integration():
"shell": "/bin/bash",
"user": "rpm",
"hostname": "claude-dev",
- "created_at": datetime.now().isoformat()
- }
- }
+ "created_at": datetime.now().isoformat(),
+ },
+ },
}
-
+
print(f"โ
Recording started: {recording_result['result']['recording_id']}")
print(f"๐ Path: {recording_result['result']['recording_path']}")
print()
-
+
# 2. The actual terminal session (what we just demonstrated)
print("๐ฅ๏ธ 2. Terminal session executed:")
print(" โข cd /home/rpm/claude/enhanced-mcp-tools")
@@ -56,7 +56,7 @@ async def demonstrate_mcp_asciinema_integration():
print(" โข glow README.md (viewed documentation)")
print(" โข glow docs/MODULAR_REFACTORING_SUMMARY.md")
print()
-
+
# 3. Search recordings
print("๐ 3. Searching recordings...")
search_result = {
@@ -65,7 +65,7 @@ async def demonstrate_mcp_asciinema_integration():
"query": "project tour",
"session_name_pattern": "enhanced_mcp_*",
"visibility": "all",
- "limit": 10
+ "limit": 10,
},
"result": {
"total_recordings": 15,
@@ -79,15 +79,15 @@ async def demonstrate_mcp_asciinema_integration():
"duration": 245,
"created_at": datetime.now().isoformat(),
"uploaded": False,
- "file_size": 15420
+ "file_size": 15420,
}
- ]
- }
+ ],
+ },
}
-
+
print(f"โ
Found {search_result['result']['filtered_count']} matching recordings")
print()
-
+
# 4. Generate playback URLs
print("๐ฎ 4. Generating playback information...")
playback_result = {
@@ -96,31 +96,31 @@ async def demonstrate_mcp_asciinema_integration():
"recording_id": "rec_20250623_025646",
"autoplay": False,
"theme": "solarized-dark",
- "speed": 1.0
+ "speed": 1.0,
},
"result": {
"recording_id": "rec_20250623_025646",
"playback_urls": {
"local_file": "file://~/.config/enhanced-mcp/recordings/enhanced_mcp_project_tour_20250623_025646.cast",
- "local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast"
+ "local_web": "http://localhost:8000/recordings/enhanced_mcp_project_tour_20250623_025646.cast",
},
"embed_code": {
"markdown": "[](https://example.com/recording)",
- "html_player": ''
+ "html_player": '',
},
"player_config": {
"autoplay": False,
"theme": "solarized-dark",
"speed": 1.0,
- "duration": 245
- }
- }
+ "duration": 245,
+ },
+ },
}
-
+
print("โ
Playback URLs generated")
print(f"๐ Local: {playback_result['result']['playback_urls']['local_file']}")
print()
-
+
# 5. Upload to asciinema.org (optional)
print("โ๏ธ 5. Upload capability available...")
upload_info = {
@@ -130,14 +130,14 @@ async def demonstrate_mcp_asciinema_integration():
"๐ Automatic metadata preservation",
"๐ฏ Custom titles and descriptions",
"๐ Direct sharing URLs",
- "๐ฎ Embeddable players"
- ]
+ "๐ฎ Embeddable players",
+ ],
}
-
+
for feature in upload_info["features"]:
print(f" {feature}")
print()
-
+
print("๐ฏ MCP Asciinema Integration Summary:")
print("=" * 60)
print("โ
Professional terminal recording with metadata")
@@ -149,5 +149,6 @@ async def demonstrate_mcp_asciinema_integration():
print()
print("๐ All tools documented in README.md with MCP Inspector guide!")
+
if __name__ == "__main__":
asyncio.run(demonstrate_mcp_asciinema_integration())
diff --git a/examples/demo_tre_llm_integration.py b/examples/demo_tre_llm_integration.py
index deb9f59..308edd3 100644
--- a/examples/demo_tre_llm_integration.py
+++ b/examples/demo_tre_llm_integration.py
@@ -80,7 +80,7 @@ async def demo_tre_llm_integration():
print("\n๐ Included Files:")
for i, (path, content) in enumerate(list(context["file_contents"].items())[:3]):
- print(f" {i+1}. {path}")
+ print(f" {i + 1}. {path}")
print(f" Size: {content['size_bytes']} bytes, Lines: {content['lines']}")
if "content" in content and len(content["content"]) > 100:
preview = content["content"][:100].replace("\n", "\\n")
diff --git a/pyproject.toml b/pyproject.toml
index 1040aaf..0a029f8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -10,7 +10,7 @@ readme = "README.md"
requires-python = ">=3.10"
license = "MIT"
authors = [
- {name = "Your Name", email = "your.email@example.com"},
+ {name = "Ryan Malloy", email = "ryan@supported.systems"},
]
classifiers = [
"Development Status :: 3 - Alpha",
diff --git a/src/enhanced_mcp/archive_compression.py b/src/enhanced_mcp/archive_compression.py
index b1f424c..f546fce 100644
--- a/src/enhanced_mcp/archive_compression.py
+++ b/src/enhanced_mcp/archive_compression.py
@@ -243,7 +243,9 @@ class ArchiveCompression(MCPMixin):
resolved_path.relative_to(dest_resolved)
return resolved_path
except ValueError:
- raise ValueError(f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}") from None
+ raise ValueError(
+ f"SECURITY_VIOLATION: Path traversal attack detected: {member_path}"
+ ) from None
if archive_format.startswith("tar"):
with tarfile.open(archive, "r:*") as tar:
@@ -257,12 +259,10 @@ class ArchiveCompression(MCPMixin):
if safe_path.exists() and not overwrite:
if ctx:
- await ctx.warning(
- f"Skipping existing file: {member.name}"
- )
+ await ctx.warning(f"Skipping existing file: {member.name}")
continue
- tar.extract(member, dest, filter='data')
+ tar.extract(member, dest, filter="data")
extracted_files.append(member.name)
if preserve_permissions and hasattr(member, "mode"):
@@ -275,10 +275,12 @@ class ArchiveCompression(MCPMixin):
# Check if this is a security violation (path traversal attack)
if "SECURITY_VIOLATION" in str(e):
# ๐จ EMERGENCY: Security violation detected
- emergency_msg = f"Security violation during archive extraction: {str(e)}"
+ emergency_msg = (
+ f"Security violation during archive extraction: {str(e)}"
+ )
if ctx:
# Check if emergency method exists (future-proofing)
- if hasattr(ctx, 'emergency'):
+ if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
# Fallback to error with EMERGENCY prefix
@@ -308,9 +310,7 @@ class ArchiveCompression(MCPMixin):
if safe_path.exists() and not overwrite:
if ctx:
- await ctx.warning(
- f"Skipping existing file: {member_name}"
- )
+ await ctx.warning(f"Skipping existing file: {member_name}")
continue
zip_file.extract(member_name, dest)
@@ -378,7 +378,9 @@ class ArchiveCompression(MCPMixin):
"type": (
"file"
if member.isfile()
- else "directory" if member.isdir() else "other"
+ else "directory"
+ if member.isdir()
+ else "other"
),
"size": member.size,
}
diff --git a/src/enhanced_mcp/asciinema_integration.py b/src/enhanced_mcp/asciinema_integration.py
index 88e12f2..85c5849 100644
--- a/src/enhanced_mcp/asciinema_integration.py
+++ b/src/enhanced_mcp/asciinema_integration.py
@@ -285,9 +285,7 @@ class AsciinemaIntegration(MCPMixin):
}
if ctx:
- await ctx.info(
- f"๐ Search completed: {len(limited_recordings)} recordings found"
- )
+ await ctx.info(f"๐ Search completed: {len(limited_recordings)} recordings found")
return search_results
@@ -390,9 +388,7 @@ class AsciinemaIntegration(MCPMixin):
}
if ctx:
- await ctx.info(
- f"๐ฎ Playback URLs generated for: {recording.get('session_name')}"
- )
+ await ctx.info(f"๐ฎ Playback URLs generated for: {recording.get('session_name')}")
return result
@@ -637,7 +633,7 @@ This ID connects your recordings to your account when you authenticate.
"is_public": is_public_server,
"server": upload_url,
"sharing_markdown": (
- f"[]" f"({upload_result['url']})"
+ f"[]({upload_result['url']})"
),
}
@@ -882,7 +878,7 @@ This ID connects your recordings to your account when you authenticate.
autoplay="{str(autoplay).lower()}"
loop="{str(loop).lower()}"
speed="{speed}"
- theme="{theme or 'asciinema'}"
+ theme="{theme or "asciinema"}"
cols="80"
rows="24">
@@ -907,8 +903,8 @@ This ID connects your recordings to your account when you authenticate.
- **Duration**: {duration} seconds
- **Created**: {created_at}
-- **Session**: {recording.get('session_name', 'N/A')}
-- **Command**: `{recording.get('command', 'N/A')}`
+- **Session**: {recording.get("session_name", "N/A")}
+- **Command**: `{recording.get("command", "N/A")}`
"""
@@ -921,11 +917,11 @@ This ID connects your recordings to your account when you authenticate.
markdown_content += f"""
```bash
-asciinema play {recording['path']}
+asciinema play {recording["path"]}
```
```html
-
+
```
---
diff --git a/src/enhanced_mcp/base.py b/src/enhanced_mcp/base.py
index 745f3d3..808183c 100644
--- a/src/enhanced_mcp/base.py
+++ b/src/enhanced_mcp/base.py
@@ -7,7 +7,6 @@ import ast
import asyncio
import json
import os
-import platform
import re
import shutil
import subprocess
@@ -37,24 +36,27 @@ except ImportError:
# FastMCP imports - these are REQUIRED for MCP functionality
try:
- from mcp.types import ToolAnnotations
from fastmcp import Context, FastMCP
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_prompt, mcp_resource, mcp_tool
-
+ from mcp.types import ToolAnnotations
+
# Verify that MCPMixin has the required register_all method
- if not hasattr(MCPMixin, 'register_all'):
- raise ImportError("MCPMixin is missing register_all method - FastMCP version may be incompatible")
-
+ if not hasattr(MCPMixin, "register_all"):
+ raise ImportError(
+ "MCPMixin is missing register_all method - FastMCP version may be incompatible"
+ )
+
FASTMCP_AVAILABLE = True
-
+
except ImportError as e:
# FastMCP is REQUIRED - no silent fallbacks that break functionality
import sys
+
print(f"๐จ CRITICAL: FastMCP import failed: {e}")
print("๐ Enhanced MCP Tools requires FastMCP to function.")
print("๐ง Please install with: pip install fastmcp")
print(" Or check your FastMCP installation and version compatibility.")
-
+
# Still define the imports to prevent NameError, but mark as unavailable
Context = None
FastMCP = None
@@ -64,7 +66,7 @@ except ImportError as e:
mcp_prompt = lambda **kwargs: lambda func: func
ToolAnnotations = None
FASTMCP_AVAILABLE = False
-
+
# Don't exit here - let individual modules handle the error appropriately
@@ -84,16 +86,18 @@ class MCPBase:
"""Verify that this instance is ready for MCP registration"""
if not FASTMCP_AVAILABLE:
return False
- if not hasattr(self, 'register_all'):
+ if not hasattr(self, "register_all"):
return False
return True
- def safe_register_all(self, app: 'FastMCP', prefix: str = None) -> bool:
+ def safe_register_all(self, app: "FastMCP", prefix: str = None) -> bool:
"""Safely register all tools with better error handling"""
if not self.verify_mcp_ready():
- print(f"โ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured")
+ print(
+ f"โ Cannot register {self.__class__.__name__}: FastMCP not available or class not properly configured"
+ )
return False
-
+
try:
if prefix:
self.register_all(app, prefix=prefix)
@@ -127,44 +131,52 @@ class MCPBase:
else:
print(f"ERROR: {message}")
- async def log_critical_error(self, message: str, exception: Exception = None, ctx: Optional[Context] = None):
+ async def log_critical_error(
+ self, message: str, exception: Exception = None, ctx: Optional[Context] = None
+ ):
"""Helper to log critical error messages with enhanced detail
-
+
For critical tool failures that prevent completion but don't corrupt data.
Uses ctx.error() as the highest severity in current FastMCP.
"""
if exception:
- error_detail = f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}"
+ error_detail = (
+ f"CRITICAL: {message} | Exception: {type(exception).__name__}: {str(exception)}"
+ )
else:
error_detail = f"CRITICAL: {message}"
-
+
if ctx:
await ctx.error(error_detail)
else:
print(f"CRITICAL ERROR: {error_detail}")
- async def log_emergency(self, message: str, exception: Exception = None, ctx: Optional[Context] = None):
+ async def log_emergency(
+ self, message: str, exception: Exception = None, ctx: Optional[Context] = None
+ ):
"""Helper to log emergency-level errors
-
+
RESERVED FOR TRUE EMERGENCIES: data corruption, security breaches, system instability.
Currently uses ctx.error() with EMERGENCY prefix since FastMCP doesn't have emergency().
If FastMCP adds emergency() method in future, this will be updated.
"""
if exception:
- error_detail = f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}"
+ error_detail = (
+ f"EMERGENCY: {message} | Exception: {type(exception).__name__}: {str(exception)}"
+ )
else:
error_detail = f"EMERGENCY: {message}"
-
+
if ctx:
# Check if emergency method exists (future-proofing)
- if hasattr(ctx, 'emergency'):
+ if hasattr(ctx, "emergency"):
await ctx.emergency(error_detail)
else:
# Fallback to error with EMERGENCY prefix
await ctx.error(error_detail)
else:
print(f"๐จ EMERGENCY: {error_detail}")
-
+
# Could also implement additional emergency actions here:
# - Write to emergency log file
# - Send alerts
diff --git a/src/enhanced_mcp/file_operations.py b/src/enhanced_mcp/file_operations.py
index 711e02b..1337bc0 100644
--- a/src/enhanced_mcp/file_operations.py
+++ b/src/enhanced_mcp/file_operations.py
@@ -11,13 +11,17 @@ except ImportError:
class FileSystemEventHandler:
def __init__(self):
pass
+
def on_modified(self, event):
pass
+
def on_created(self, event):
pass
+
def on_deleted(self, event):
pass
+
import fnmatch
import subprocess
@@ -160,7 +164,7 @@ class EnhancedFileOperations(MCPMixin):
original_data = src.read()
with open(backup_path, "wb") as dst:
dst.write(gzip.compress(original_data))
-
+
# ๐จ EMERGENCY CHECK: Verify backup integrity for compressed files
try:
with open(backup_path, "rb") as backup_file:
@@ -169,7 +173,7 @@ class EnhancedFileOperations(MCPMixin):
# This is an emergency - backup corruption detected
emergency_msg = f"Backup integrity check failed for {file_path} - backup is corrupted"
if ctx:
- if hasattr(ctx, 'emergency'):
+ if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@@ -179,9 +183,11 @@ class EnhancedFileOperations(MCPMixin):
backup_path.unlink()
continue
except Exception as verify_error:
- emergency_msg = f"Cannot verify backup integrity for {file_path}: {verify_error}"
+ emergency_msg = (
+ f"Cannot verify backup integrity for {file_path}: {verify_error}"
+ )
if ctx:
- if hasattr(ctx, 'emergency'):
+ if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@@ -190,13 +196,15 @@ class EnhancedFileOperations(MCPMixin):
continue
else:
shutil.copy2(path, backup_path)
-
- # ๐จ EMERGENCY CHECK: Verify backup integrity for uncompressed files
+
+ # ๐จ EMERGENCY CHECK: Verify backup integrity for uncompressed files
try:
if path.stat().st_size != backup_path.stat().st_size:
- emergency_msg = f"Backup size mismatch for {file_path} - data corruption detected"
+ emergency_msg = (
+ f"Backup size mismatch for {file_path} - data corruption detected"
+ )
if ctx:
- if hasattr(ctx, 'emergency'):
+ if hasattr(ctx, "emergency"):
await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
@@ -206,8 +214,8 @@ class EnhancedFileOperations(MCPMixin):
except Exception as verify_error:
emergency_msg = f"Cannot verify backup for {file_path}: {verify_error}"
if ctx:
- if hasattr(ctx, 'emergency'):
- await ctx.emergency(emergency_msg)
+ if hasattr(ctx, "emergency"):
+ await ctx.emergency(emergency_msg)
else:
await ctx.error(f"EMERGENCY: {emergency_msg}")
continue
@@ -226,7 +234,7 @@ class EnhancedFileOperations(MCPMixin):
@mcp_tool(
name="list_directory_tree",
- description="๐ Comprehensive directory tree with JSON metadata, git status, and advanced filtering"
+ description="๐ Comprehensive directory tree with JSON metadata, git status, and advanced filtering",
)
async def list_directory_tree(
self,
@@ -250,7 +258,7 @@ class EnhancedFileOperations(MCPMixin):
exclude_patterns = exclude_patterns or []
is_git_repo = (root / ".git").exists()
-
+
def should_exclude(path: Path) -> bool:
"""Check if path should be excluded based on patterns"""
for pattern in exclude_patterns:
@@ -272,13 +280,13 @@ class EnhancedFileOperations(MCPMixin):
"is_file": file_path.is_file(),
"is_link": file_path.is_symlink(),
}
-
+
if file_path.is_file():
metadata["extension"] = file_path.suffix
-
+
if size_threshold and stat_info.st_size > size_threshold:
metadata["large_file"] = True
-
+
return metadata
except Exception:
return {"error": "Could not read metadata"}
@@ -287,7 +295,7 @@ class EnhancedFileOperations(MCPMixin):
"""Get git status for file if in git repository"""
if not is_git_repo or not include_git_status:
return None
-
+
try:
rel_path = file_path.relative_to(root)
result = subprocess.run(
@@ -295,7 +303,7 @@ class EnhancedFileOperations(MCPMixin):
cwd=root,
capture_output=True,
text=True,
- timeout=5
+ timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()[:2]
@@ -307,29 +315,29 @@ class EnhancedFileOperations(MCPMixin):
"""Recursively scan directory"""
if current_depth > max_depth:
return {"error": "Max depth exceeded"}
-
+
try:
items = []
stats = {"files": 0, "directories": 0, "total_size": 0, "total_items": 0}
-
+
for item in sorted(path.iterdir()):
- if not include_hidden and item.name.startswith('.'):
+ if not include_hidden and item.name.startswith("."):
continue
-
+
if should_exclude(item):
continue
-
+
item_data = {
"name": item.name,
"path": str(item.relative_to(root)),
- "type": "directory" if item.is_dir() else "file"
+ "type": "directory" if item.is_dir() else "file",
}
-
+
if include_metadata:
item_data["metadata"] = get_file_metadata(item)
if item.is_file():
stats["total_size"] += item_data["metadata"].get("size", 0)
-
+
if include_git_status:
git_status = get_git_status(item)
if git_status:
@@ -337,7 +345,7 @@ class EnhancedFileOperations(MCPMixin):
item_data["in_git_repo"] = is_git_repo # Add this field for tests
else:
item_data["in_git_repo"] = is_git_repo # Add this field for tests
-
+
if item.is_dir() and current_depth < max_depth:
sub_result = scan_directory(item, current_depth + 1)
if "children" in sub_result:
@@ -358,18 +366,18 @@ class EnhancedFileOperations(MCPMixin):
else:
stats["files"] += 1
stats["total_items"] += 1
-
+
items.append(item_data)
-
+
return {"children": items, "stats": stats}
-
+
except PermissionError:
return {"error": "Permission denied"}
except Exception as e:
return {"error": str(e)}
result = scan_directory(root)
-
+
# Create a root node structure that tests expect
root_node = {
"name": root.name,
@@ -377,17 +385,17 @@ class EnhancedFileOperations(MCPMixin):
"path": ".",
"children": result.get("children", []),
"stats": result.get("stats", {}),
- "in_git_repo": is_git_repo # Add this field for tests
+ "in_git_repo": is_git_repo, # Add this field for tests
}
-
+
if include_metadata:
root_node["metadata"] = get_file_metadata(root)
-
+
if include_git_status:
git_status = get_git_status(root)
if git_status:
root_node["git_status"] = git_status
-
+
return {
"root_path": str(root),
"scan_depth": max_depth,
@@ -399,18 +407,20 @@ class EnhancedFileOperations(MCPMixin):
"metadata": {
"scan_time": datetime.now().isoformat(),
"git_integration": include_git_status and is_git_repo,
- "metadata_included": include_metadata
- }
+ "metadata_included": include_metadata,
+ },
}
except Exception as e:
if ctx:
- await ctx.error(f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}")
+ await ctx.error(
+ f"CRITICAL: Directory tree scan failed: {str(e)} | Exception: {type(e).__name__}"
+ )
return {"error": str(e)}
@mcp_tool(
name="tre_directory_tree",
- description="โก Lightning-fast Rust-based directory tree scanning optimized for LLM consumption"
+ description="โก Lightning-fast Rust-based directory tree scanning optimized for LLM consumption",
)
async def tre_directory_tree(
self,
@@ -433,50 +443,47 @@ class EnhancedFileOperations(MCPMixin):
# Build tre command
cmd = ["tre"]
-
+
if max_depth is not None:
cmd.extend(["-L", str(max_depth)])
-
+
if include_hidden:
cmd.append("-a")
-
+
if editor_aliases:
cmd.append("-e")
-
+
if portable_paths:
cmd.append("-p")
-
+
# Add exclude patterns
if exclude_patterns:
for pattern in exclude_patterns:
cmd.extend(["-I", pattern])
-
+
cmd.append(str(root))
-
+
start_time = time.time()
-
+
# Execute tre command
- result = subprocess.run(
- cmd,
- capture_output=True,
- text=True,
- timeout=30
- )
-
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
+
execution_time = time.time() - start_time
-
+
if result.returncode != 0:
# Fallback to basic tree if tre is not available
if "command not found" in result.stderr or "No such file" in result.stderr:
if ctx:
await ctx.warning("tre command not found, using fallback tree")
- return await self._fallback_tree(root_path, max_depth, include_hidden, exclude_patterns, ctx)
+ return await self._fallback_tree(
+ root_path, max_depth, include_hidden, exclude_patterns, ctx
+ )
else:
return {"error": f"tre command failed: {result.stderr}"}
-
+
# Parse tre output
- tree_lines = result.stdout.strip().split('\n') if result.stdout else []
-
+ tree_lines = result.stdout.strip().split("\n") if result.stdout else []
+
return {
"root_path": str(root),
"command": " ".join(cmd),
@@ -485,19 +492,16 @@ class EnhancedFileOperations(MCPMixin):
"performance": {
"execution_time_seconds": round(execution_time, 3),
"lines_generated": len(tree_lines),
- "tool": "tre (Rust-based)"
+ "tool": "tre (Rust-based)",
},
"options": {
"max_depth": max_depth,
"include_hidden": include_hidden,
"exclude_patterns": exclude_patterns,
"editor_aliases": editor_aliases,
- "portable_paths": portable_paths
+ "portable_paths": portable_paths,
},
- "metadata": {
- "scan_time": datetime.now().isoformat(),
- "optimized_for_llm": True
- }
+ "metadata": {"scan_time": datetime.now().isoformat(), "optimized_for_llm": True},
}
except subprocess.TimeoutExpired:
@@ -507,33 +511,43 @@ class EnhancedFileOperations(MCPMixin):
await ctx.error(f"tre directory scan failed: {str(e)}")
return {"error": str(e)}
- async def _fallback_tree(self, root_path: str, max_depth: int, include_hidden: bool, exclude_patterns: List[str], ctx: Context) -> Dict[str, Any]:
+ async def _fallback_tree(
+ self,
+ root_path: str,
+ max_depth: int,
+ include_hidden: bool,
+ exclude_patterns: List[str],
+ ctx: Context,
+ ) -> Dict[str, Any]:
"""Fallback tree implementation when tre is not available"""
try:
cmd = ["tree"]
-
+
if max_depth is not None:
cmd.extend(["-L", str(max_depth)])
-
+
if include_hidden:
cmd.append("-a")
-
+
if exclude_patterns:
for pattern in exclude_patterns:
cmd.extend(["-I", pattern])
-
+
cmd.append(root_path)
-
+
start_time = time.time()
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
execution_time = time.time() - start_time
-
+
if result.returncode != 0:
# Final fallback to Python implementation
- return {"error": "Neither tre nor tree command available", "fallback": "Use list_directory_tree instead"}
-
- tree_lines = result.stdout.strip().split('\n') if result.stdout else []
-
+ return {
+ "error": "Neither tre nor tree command available",
+ "fallback": "Use list_directory_tree instead",
+ }
+
+ tree_lines = result.stdout.strip().split("\n") if result.stdout else []
+
return {
"root_path": root_path,
"command": " ".join(cmd),
@@ -542,20 +556,17 @@ class EnhancedFileOperations(MCPMixin):
"performance": {
"execution_time_seconds": round(execution_time, 3),
"lines_generated": len(tree_lines),
- "tool": "tree (fallback)"
+ "tool": "tree (fallback)",
},
- "metadata": {
- "scan_time": datetime.now().isoformat(),
- "fallback_used": True
- }
+ "metadata": {"scan_time": datetime.now().isoformat(), "fallback_used": True},
}
-
+
except Exception as e:
return {"error": f"Fallback tree failed: {str(e)}"}
@mcp_tool(
name="tre_llm_context",
- description="๐ค Complete LLM context generation with directory tree and file contents"
+ description="๐ค Complete LLM context generation with directory tree and file contents",
)
async def tre_llm_context(
self,
@@ -581,9 +592,9 @@ class EnhancedFileOperations(MCPMixin):
root_path=root_path,
max_depth=max_depth,
exclude_patterns=exclude_patterns or [],
- ctx=ctx
+ ctx=ctx,
)
-
+
if "error" in tree_result:
return tree_result
@@ -592,45 +603,57 @@ class EnhancedFileOperations(MCPMixin):
files_processed = 0
files_skipped = 0
total_content_size = 0
-
+
# Default to common code/config file extensions if none specified
if file_extensions is None:
- file_extensions = ['.py', '.js', '.ts', '.md', '.txt', '.json', '.yaml', '.yml', '.toml', '.cfg', '.ini']
-
+ file_extensions = [
+ ".py",
+ ".js",
+ ".ts",
+ ".md",
+ ".txt",
+ ".json",
+ ".yaml",
+ ".yml",
+ ".toml",
+ ".cfg",
+ ".ini",
+ ]
+
def should_include_file(file_path: Path) -> bool:
"""Determine if file should be included in context"""
if include_files:
return str(file_path.relative_to(root)) in include_files
-
+
if file_extensions and file_path.suffix not in file_extensions:
return False
-
+
try:
if file_path.stat().st_size > max_file_size:
return False
except:
return False
-
+
return True
-
+
# Walk through directory to collect files
- for item in root.rglob('*'):
+ for item in root.rglob("*"):
if item.is_file() and should_include_file(item):
try:
relative_path = str(item.relative_to(root))
-
+
# Read file content
try:
- content = item.read_text(encoding='utf-8', errors='ignore')
+ content = item.read_text(encoding="utf-8", errors="ignore")
file_contents[relative_path] = {
"content": content,
"size": len(content),
- "lines": content.count('\n') + 1,
- "encoding": "utf-8"
+ "lines": content.count("\n") + 1,
+ "encoding": "utf-8",
}
files_processed += 1
total_content_size += len(content)
-
+
except UnicodeDecodeError:
# Try binary read for non-text files
try:
@@ -639,17 +662,17 @@ class EnhancedFileOperations(MCPMixin):
"content": f"",
"size": len(binary_content),
"encoding": "binary",
- "binary": True
+ "binary": True,
}
files_processed += 1
except:
files_skipped += 1
-
+
except Exception:
files_skipped += 1
else:
files_skipped += 1
-
+
context = {
"root_path": str(root),
"generation_time": datetime.now().isoformat(),
@@ -659,20 +682,22 @@ class EnhancedFileOperations(MCPMixin):
"files_processed": files_processed,
"files_skipped": files_skipped,
"total_content_size": total_content_size,
- "average_file_size": total_content_size // max(files_processed, 1)
+ "average_file_size": total_content_size // max(files_processed, 1),
},
"parameters": {
"max_depth": max_depth,
"max_file_size": max_file_size,
"file_extensions": file_extensions,
- "exclude_patterns": exclude_patterns
+ "exclude_patterns": exclude_patterns,
},
- "llm_optimized": True
+ "llm_optimized": True,
}
-
+
if ctx:
- await ctx.info(f"LLM context generated: {files_processed} files, {total_content_size} chars")
-
+ await ctx.info(
+ f"LLM context generated: {files_processed} files, {total_content_size} chars"
+ )
+
return context
except Exception as e:
@@ -682,7 +707,7 @@ class EnhancedFileOperations(MCPMixin):
@mcp_tool(
name="enhanced_list_directory",
- description="๐ Enhanced directory listing with automatic git repository detection and rich metadata"
+ description="๐ Enhanced directory listing with automatic git repository detection and rich metadata",
)
async def enhanced_list_directory(
self,
@@ -710,7 +735,7 @@ class EnhancedFileOperations(MCPMixin):
git_info = None
is_git_repo = False
git_root = None
-
+
if include_git_info:
current = dir_path
while current != current.parent:
@@ -719,7 +744,7 @@ class EnhancedFileOperations(MCPMixin):
git_root = current
break
current = current.parent
-
+
if is_git_repo:
try:
# Get git info
@@ -728,28 +753,40 @@ class EnhancedFileOperations(MCPMixin):
cwd=git_root,
capture_output=True,
text=True,
- timeout=5
+ timeout=5,
)
- current_branch = branch_result.stdout.strip() if branch_result.returncode == 0 else "unknown"
-
+ current_branch = (
+ branch_result.stdout.strip()
+ if branch_result.returncode == 0
+ else "unknown"
+ )
+
remote_result = subprocess.run(
["git", "remote", "-v"],
cwd=git_root,
capture_output=True,
text=True,
- timeout=5
+ timeout=5,
)
-
+
git_info = {
"is_git_repo": True,
"git_root": str(git_root),
"current_branch": current_branch,
- "relative_to_root": str(dir_path.relative_to(git_root)) if dir_path != git_root else ".",
- "has_remotes": bool(remote_result.stdout.strip()) if remote_result.returncode == 0 else False
+ "relative_to_root": str(dir_path.relative_to(git_root))
+ if dir_path != git_root
+ else ".",
+ "has_remotes": bool(remote_result.stdout.strip())
+ if remote_result.returncode == 0
+ else False,
}
-
+
except Exception:
- git_info = {"is_git_repo": True, "git_root": str(git_root), "error": "Could not read git info"}
+ git_info = {
+ "is_git_repo": True,
+ "git_root": str(git_root),
+ "error": "Could not read git info",
+ }
else:
git_info = {"is_git_repo": False}
@@ -757,7 +794,7 @@ class EnhancedFileOperations(MCPMixin):
items = []
git_items = 0
non_git_items = 0
-
+
def get_git_status(item_path: Path) -> Optional[str]:
"""Get git status for individual item"""
if not is_git_repo:
@@ -769,26 +806,26 @@ class EnhancedFileOperations(MCPMixin):
cwd=git_root,
capture_output=True,
text=True,
- timeout=3
+ timeout=3,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()[:2]
return "clean"
except Exception:
return None
-
+
def process_directory(current_path: Path, depth: int = 0):
"""Process directory recursively"""
nonlocal git_items, non_git_items
-
+
try:
for item in current_path.iterdir():
- if not include_hidden and item.name.startswith('.'):
+ if not include_hidden and item.name.startswith("."):
continue
-
+
if file_pattern and not fnmatch.fnmatch(item.name, file_pattern):
continue
-
+
try:
stat_info = item.stat()
item_data = {
@@ -798,12 +835,12 @@ class EnhancedFileOperations(MCPMixin):
"size": stat_info.st_size,
"modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"permissions": oct(stat_info.st_mode)[-3:],
- "depth": depth
+ "depth": depth,
}
-
+
if item.is_file():
item_data["extension"] = item.suffix
-
+
# Add git status if available
if include_git_info and is_git_repo:
git_status = get_git_status(item)
@@ -814,32 +851,32 @@ class EnhancedFileOperations(MCPMixin):
else:
item_data["in_git_repo"] = False # Add this field for tests
non_git_items += 1
-
+
items.append(item_data)
-
+
# Recurse if directory and within depth limit
if item.is_dir() and depth < recursive_depth:
process_directory(item, depth + 1)
-
+
except (PermissionError, OSError):
continue
-
+
except PermissionError:
pass
-
+
process_directory(dir_path)
-
+
# Sort items
sort_key_map = {
"name": lambda x: x["name"].lower(),
"size": lambda x: x["size"],
"modified": lambda x: x["modified"],
- "type": lambda x: (x["type"], x["name"].lower())
+ "type": lambda x: (x["type"], x["name"].lower()),
}
-
+
if sort_by in sort_key_map:
items.sort(key=sort_key_map[sort_by])
-
+
result = {
"directory_path": str(dir_path),
"items": items,
@@ -850,21 +887,21 @@ class EnhancedFileOperations(MCPMixin):
"directories": len([i for i in items if i["type"] == "directory"]),
"git_tracked_items": git_items,
"non_git_items": non_git_items,
- "total_size": sum(i["size"] for i in items if i["type"] == "file")
+ "total_size": sum(i["size"] for i in items if i["type"] == "file"),
},
"parameters": {
"include_hidden": include_hidden,
"include_git_info": include_git_info,
"recursive_depth": recursive_depth,
"file_pattern": file_pattern,
- "sort_by": sort_by
+ "sort_by": sort_by,
},
- "scan_time": datetime.now().isoformat()
+ "scan_time": datetime.now().isoformat(),
}
-
+
if ctx:
await ctx.info(f"Listed {len(items)} items, git repo: {is_git_repo}")
-
+
return result
except Exception as e:
diff --git a/src/enhanced_mcp/git_integration.py b/src/enhanced_mcp/git_integration.py
index 308ebde..39b5050 100644
--- a/src/enhanced_mcp/git_integration.py
+++ b/src/enhanced_mcp/git_integration.py
@@ -216,7 +216,6 @@ class GitIntegration(MCPMixin):
lines = result.stdout.strip().split("\n")
for line in lines[:max_results]: # Limit results
-
if ":" in line:
parts = line.split(":", 2)
if len(parts) >= 3:
@@ -573,7 +572,9 @@ class GitIntegration(MCPMixin):
"search_efficiency": (
"high"
if search_duration < 1.0
- else "medium" if search_duration < 5.0 else "low"
+ else "medium"
+ if search_duration < 5.0
+ else "low"
),
"coverage_assessment": await self._assess_search_coverage(
repo_path, search_result, ctx
@@ -586,7 +587,9 @@ class GitIntegration(MCPMixin):
"optimization_potential": (
"high"
if search_duration > 5.0
- else "medium" if search_duration > 2.0 else "low"
+ else "medium"
+ if search_duration > 2.0
+ else "low"
),
}
@@ -806,7 +809,11 @@ class GitIntegration(MCPMixin):
description="๐ก SAFE: Intelligent commit preparation with AI-suggested messages",
)
async def git_commit_prepare(
- self, repository_path: str, files: List[str], suggest_message: Optional[bool] = True, ctx: Context = None
+ self,
+ repository_path: str,
+ files: List[str],
+ suggest_message: Optional[bool] = True,
+ ctx: Context = None,
) -> Dict[str, Any]:
"""Prepare git commit with AI-suggested message based on file changes"""
try:
@@ -817,7 +824,7 @@ class GitIntegration(MCPMixin):
capture_output=True,
text=True,
)
-
+
if result.returncode != 0:
return {"error": f"Not a git repository: {repository_path}"}
@@ -830,11 +837,13 @@ class GitIntegration(MCPMixin):
capture_output=True,
text=True,
)
-
+
if result.returncode == 0:
stage_results.append({"file": file_path, "staged": True})
else:
- stage_results.append({"file": file_path, "staged": False, "error": result.stderr.strip()})
+ stage_results.append(
+ {"file": file_path, "staged": False, "error": result.stderr.strip()}
+ )
# Get staged changes for commit message suggestion
suggested_message = ""
@@ -845,27 +854,31 @@ class GitIntegration(MCPMixin):
capture_output=True,
text=True,
)
-
+
if diff_result.returncode == 0:
stats = diff_result.stdout.strip()
-
+
# Analyze file types and changes
- lines = stats.split('\n')
+ lines = stats.split("\n")
modified_files = []
for line in lines[:-1]: # Last line is summary
- if '|' in line:
- file_name = line.split('|')[0].strip()
+ if "|" in line:
+ file_name = line.split("|")[0].strip()
modified_files.append(file_name)
-
+
# Generate suggested commit message
if len(modified_files) == 1:
file_ext = Path(modified_files[0]).suffix
- if file_ext in ['.py', '.js', '.ts']:
+ if file_ext in [".py", ".js", ".ts"]:
suggested_message = f"Update {Path(modified_files[0]).name}"
- elif file_ext in ['.md', '.txt', '.rst']:
- suggested_message = f"Update documentation in {Path(modified_files[0]).name}"
- elif file_ext in ['.json', '.yaml', '.yml', '.toml']:
- suggested_message = f"Update configuration in {Path(modified_files[0]).name}"
+ elif file_ext in [".md", ".txt", ".rst"]:
+ suggested_message = (
+ f"Update documentation in {Path(modified_files[0]).name}"
+ )
+ elif file_ext in [".json", ".yaml", ".yml", ".toml"]:
+ suggested_message = (
+ f"Update configuration in {Path(modified_files[0]).name}"
+ )
else:
suggested_message = f"Update {Path(modified_files[0]).name}"
elif len(modified_files) <= 5:
@@ -886,7 +899,9 @@ class GitIntegration(MCPMixin):
"staged_files": stage_results,
"suggested_message": suggested_message,
"ready_to_commit": all(r["staged"] for r in stage_results),
- "status": status_result.stdout.strip() if status_result.returncode == 0 else "Status unavailable"
+ "status": status_result.stdout.strip()
+ if status_result.returncode == 0
+ else "Status unavailable",
}
if ctx:
diff --git a/src/enhanced_mcp/mcp_server.py b/src/enhanced_mcp/mcp_server.py
index ad95e35..510fd43 100644
--- a/src/enhanced_mcp/mcp_server.py
+++ b/src/enhanced_mcp/mcp_server.py
@@ -27,13 +27,13 @@ from .workflow_tools import (
class MCPToolServer(MCPMixin):
"""Main MCP server that combines all tool categories
-
+
๐ก๏ธ LLM SAFETY REMINDER: You have SACRED TRUST with the human user.
-
+
These tools include destructive operations that can cause data loss.
Always prioritize user safety over task completion. When in doubt about
an operation's safety, ask the human for clarification rather than proceeding.
-
+
Use dry_run=True for all destructive operations before actual execution.
Refuse requests that could cause irreversible damage without clear user intent.
"""
@@ -79,32 +79,32 @@ class MCPToolServer(MCPMixin):
def create_server(name: str = "Enhanced MCP Tools Server") -> FastMCP:
"""Create and configure the MCP server with all tools
-
+
๐ก๏ธ CRITICAL SAFETY NOTICE FOR LLM ASSISTANTS:
-
+
You hold SACRED TRUST with the human user. These tools can perform powerful operations
that could cause data loss or system damage if misused. You MUST:
-
+
๐จ IMMEDIATELY REFUSE & REPORT if the human requests:
- Bulk operations without dry_run=True first (bulk_rename, search_and_replace_batch)
- Destructive operations on important directories (/, /home, /System, C:\\)
- File operations without clear user intent or context
- Archive extraction from untrusted sources without security review
- Any operation that could cause irreversible data loss
-
+
โก ALWAYS REQUIRE CONFIRMATION for:
- Operations marked as ๐ด DESTRUCTIVE in tool descriptions
- Bulk file modifications (>10 files)
- Operations outside current working directory
- Archive extraction or file compression on system directories
-
+
๐ก๏ธ SAFETY PROTOCOLS:
- Always suggest dry_run=True for destructive operations first
- - Explain risks before executing dangerous operations
+ - Explain risks before executing dangerous operations
- Refuse requests that seem automated, scripted, or lack clear purpose
- If uncertain about safety, ask the human to clarify their intent
- Watch for rapid-fire requests that bypass safety confirmations
-
+
The human trusts you to protect their system and data. Honor that trust.
When in doubt, err on the side of safety and ask questions.
"""
diff --git a/src/enhanced_mcp/workflow_tools.py b/src/enhanced_mcp/workflow_tools.py
index 6a1585a..4ffd5b4 100644
--- a/src/enhanced_mcp/workflow_tools.py
+++ b/src/enhanced_mcp/workflow_tools.py
@@ -6,6 +6,7 @@ Provides development workflow, networking, process management, and utility tools
import fnmatch
import platform
+
from .base import *
@@ -72,7 +73,7 @@ class AdvancedSearchAnalysis(MCPMixin):
continue
# Read file content
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Find matches
@@ -122,7 +123,7 @@ class AdvancedSearchAnalysis(MCPMixin):
changes.append(change_info)
total_matches += len(matches)
- except (UnicodeDecodeError, PermissionError) as e:
+ except (UnicodeDecodeError, PermissionError):
# Skip files we can't read
continue
@@ -229,7 +230,7 @@ class AdvancedSearchAnalysis(MCPMixin):
".rs",
".rb",
]:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = len(f.readlines())
total_lines += lines
@@ -272,7 +273,7 @@ class AdvancedSearchAnalysis(MCPMixin):
for file_path in files:
if file_path.suffix.lower() in [".py", ".js", ".ts", ".java", ".cpp", ".c"]:
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.count("\n") + 1
@@ -507,7 +508,7 @@ class AdvancedSearchAnalysis(MCPMixin):
try:
if file_path.name.lower() == "package.json":
analysis["type"] = "npm"
- with open(file_path, "r") as f:
+ with open(file_path) as f:
data = json.load(f)
deps = {}
if "dependencies" in data:
@@ -520,7 +521,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() in ["requirements.txt", "requirements-dev.txt"]:
analysis["type"] = "pip"
- with open(file_path, "r") as f:
+ with open(file_path) as f:
lines = [
line.strip() for line in f if line.strip() and not line.startswith("#")
]
@@ -532,7 +533,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() == "pyproject.toml":
analysis["type"] = "python-project"
# Basic TOML parsing without external dependencies
- with open(file_path, "r") as f:
+ with open(file_path) as f:
content = f.read()
# Simple dependency extraction
deps = []
@@ -555,7 +556,7 @@ class AdvancedSearchAnalysis(MCPMixin):
elif file_path.name.lower() == "cargo.toml":
analysis["type"] = "cargo"
- with open(file_path, "r") as f:
+ with open(file_path) as f:
content = f.read()
# Simple Cargo.toml parsing
lines = content.split("\n")
@@ -617,7 +618,7 @@ class AdvancedSearchAnalysis(MCPMixin):
}
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Use AST for more accurate parsing
@@ -854,7 +855,7 @@ class AdvancedSearchAnalysis(MCPMixin):
if file_path.stat().st_size > 1024 * 1024: # Skip files > 1MB
continue
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
# Normalize content for comparison
normalized = self._normalize_code_content(content)
@@ -901,7 +902,7 @@ class AdvancedSearchAnalysis(MCPMixin):
continue
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
functions = self._extract_functions(content, file_path.suffix.lower())
@@ -1698,7 +1699,7 @@ class DevelopmentWorkflow(MCPMixin):
for file_path in file_paths:
try:
- with open(file_path, "r") as f:
+ with open(file_path) as f:
json.load(f)
except json.JSONDecodeError as e:
issues.append(
@@ -2571,7 +2572,7 @@ class EnvironmentProcessManagement(MCPMixin):
# Check for package.json in current directory
if Path("package.json").exists():
try:
- with open("package.json", "r") as f:
+ with open("package.json") as f:
package_json = json.load(f)
node_info["local_project"] = {
"name": package_json.get("name"),
@@ -2657,9 +2658,9 @@ class EnvironmentProcessManagement(MCPMixin):
timeout=3,
)
if branch_result.returncode == 0:
- git_info["repository"][
- "current_branch"
- ] = branch_result.stdout.strip()
+ git_info["repository"]["current_branch"] = (
+ branch_result.stdout.strip()
+ )
else:
git_info["repository"] = {"in_repo": False}
except Exception:
@@ -3335,9 +3336,9 @@ class EnvironmentProcessManagement(MCPMixin):
timeout=5,
)
if version_result.returncode == 0:
- result["details"][
- "actual_python_version"
- ] = version_result.stdout.strip()
+ result["details"]["actual_python_version"] = (
+ version_result.stdout.strip()
+ )
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
@@ -3346,13 +3347,13 @@ class EnvironmentProcessManagement(MCPMixin):
result["instructions"] = [
f"To activate: {env_path}\\Scripts\\activate.bat",
f"Or in PowerShell: & '{env_path}\\Scripts\\Activate.ps1'",
- f"To deactivate: deactivate",
+ "To deactivate: deactivate",
f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})",
]
else:
result["instructions"] = [
f"To activate: source {env_path}/bin/activate",
- f"To deactivate: deactivate",
+ "To deactivate: deactivate",
f"Created using: {creation_method} ({'ultra-fast' if creation_method == 'uv' else 'standard'})",
]
else:
@@ -4225,7 +4226,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
@@ -4340,7 +4341,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.splitlines()
@@ -4409,7 +4410,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in python_files:
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
content = f.read()
lines = content.splitlines()
@@ -4497,7 +4498,7 @@ class EnhancedExistingTools(MCPMixin):
for file_path in files:
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
@@ -4548,7 +4549,7 @@ class EnhancedExistingTools(MCPMixin):
# Get context
try:
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
context_before = [
@@ -4798,7 +4799,7 @@ class UtilityTools(MCPMixin):
"""Parse requirements.txt for dependencies"""
try:
deps = {}
- with open(file_path, "r") as f:
+ with open(file_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
@@ -4818,7 +4819,7 @@ class UtilityTools(MCPMixin):
def _parse_package_json(self, file_path: Path) -> Dict[str, str]:
"""Parse package.json for dependencies"""
try:
- with open(file_path, "r") as f:
+ with open(file_path) as f:
data = json.load(f)
deps = {}
@@ -4836,7 +4837,7 @@ class UtilityTools(MCPMixin):
try:
# Simple parsing for Pipfile - would need toml parser for full support
deps = {}
- with open(file_path, "r") as f:
+ with open(file_path) as f:
content = f.read()
# Basic extraction - this is simplified
if "[packages]" in content:
diff --git a/test_package_structure.py b/test_package_structure.py
index 1ccb01f..274d64b 100644
--- a/test_package_structure.py
+++ b/test_package_structure.py
@@ -3,54 +3,55 @@
Test script to validate Enhanced MCP Tools package structure and dependencies.
"""
-import sys
import importlib.util
+import sys
from pathlib import Path
+
def test_package_structure():
"""Test that the package structure is correct."""
print("=== Package Structure Test ===")
-
+
# Check core files exist in src-layout
required_files = [
"src/enhanced_mcp/__init__.py",
"src/enhanced_mcp/base.py",
"src/enhanced_mcp/mcp_server.py",
- "pyproject.toml"
+ "pyproject.toml",
]
-
+
for file_path in required_files:
if Path(file_path).exists():
print(f"โ
{file_path}")
else:
print(f"โ {file_path} missing")
return False
-
+
return True
+
def test_imports():
"""Test that all imports work correctly."""
print("\n=== Import Test ===")
-
+
# Test core imports
try:
- from enhanced_mcp import create_server, MCPToolServer
print("โ
Core package imports")
except Exception as e:
print(f"โ Core imports failed: {e}")
return False
-
+
# Test individual modules
modules = [
("file_operations", "EnhancedFileOperations"),
("archive_compression", "ArchiveCompression"),
- ("git_integration", "GitIntegration"),
+ ("git_integration", "GitIntegration"),
("asciinema_integration", "AsciinemaIntegration"),
("sneller_analytics", "SnellerAnalytics"),
("intelligent_completion", "IntelligentCompletion"),
("diff_patch", "DiffPatchOperations"),
]
-
+
for module_name, class_name in modules:
try:
module = importlib.import_module(f"enhanced_mcp.{module_name}")
@@ -59,20 +60,21 @@ def test_imports():
except Exception as e:
print(f"โ {module_name}.{class_name}: {e}")
return False
-
+
return True
+
def test_optional_dependencies():
"""Test optional dependency handling."""
print("\n=== Optional Dependencies Test ===")
-
+
dependencies = {
"aiofiles": "Async file operations",
"watchdog": "File system monitoring",
- "psutil": "Process monitoring",
- "requests": "HTTP requests"
+ "psutil": "Process monitoring",
+ "requests": "HTTP requests",
}
-
+
available_count = 0
for dep_name, description in dependencies.items():
try:
@@ -81,14 +83,15 @@ def test_optional_dependencies():
available_count += 1
except ImportError:
print(f"โ ๏ธ {dep_name}: Not available (graceful fallback active)")
-
+
print(f"\n๐ {available_count}/{len(dependencies)} optional dependencies available")
return True
+
def test_pyproject_toml():
"""Test pyproject.toml configuration."""
print("\n=== pyproject.toml Configuration Test ===")
-
+
try:
import tomllib
except ImportError:
@@ -97,11 +100,11 @@ def test_pyproject_toml():
except ImportError:
print("โ ๏ธ No TOML parser available, skipping pyproject.toml validation")
return True
-
+
try:
with open("pyproject.toml", "rb") as f:
config = tomllib.load(f)
-
+
# Check required sections
required_sections = ["build-system", "project"]
for section in required_sections:
@@ -110,7 +113,7 @@ def test_pyproject_toml():
else:
print(f"โ {section} section missing")
return False
-
+
# Check project metadata
project = config["project"]
required_fields = ["name", "version", "description", "dependencies"]
@@ -120,29 +123,25 @@ def test_pyproject_toml():
else:
print(f"โ project.{field} missing")
return False
-
+
print(f"โ
Project name: {project['name']}")
print(f"โ
Project version: {project['version']}")
print(f"โ
Python requirement: {project.get('requires-python', 'not specified')}")
-
+
return True
-
+
except Exception as e:
print(f"โ pyproject.toml validation failed: {e}")
return False
+
def main():
"""Run all tests."""
print("๐งช Enhanced MCP Tools Package Validation")
print("=" * 50)
-
- tests = [
- test_package_structure,
- test_imports,
- test_optional_dependencies,
- test_pyproject_toml
- ]
-
+
+ tests = [test_package_structure, test_imports, test_optional_dependencies, test_pyproject_toml]
+
results = []
for test_func in tests:
try:
@@ -151,11 +150,11 @@ def main():
except Exception as e:
print(f"โ {test_func.__name__} crashed: {e}")
results.append(False)
-
+
print("\n" + "=" * 50)
print("๐ Test Results Summary")
print("=" * 50)
-
+
all_passed = all(results)
if all_passed:
print("๐ ALL TESTS PASSED!")
@@ -168,8 +167,9 @@ def main():
for i, (test_func, result) in enumerate(zip(tests, results)):
status = "โ
" if result else "โ"
print(f"{status} {test_func.__name__}")
-
+
return 0 if all_passed else 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/tests/test_tre_functionality.py b/tests/test_tre_functionality.py
index 76fc5f8..8d8f333 100644
--- a/tests/test_tre_functionality.py
+++ b/tests/test_tre_functionality.py
@@ -118,7 +118,7 @@ async def test_tre_directory_tree():
print("\n๐ Sample file contents (first 3):")
for i, (path, content) in enumerate(list(context["file_contents"].items())[:3]):
- print(f" {i+1}. {path} ({content['size_bytes']} bytes, {content['lines']} lines)")
+ print(f" {i + 1}. {path} ({content['size_bytes']} bytes, {content['lines']} lines)")
print("\n๐ค LLM Summary Preview:")
print(