""" Workflow and Utility Tools Module Provides development workflow, networking, process management, and utility tools. """ import fnmatch from .base import * class AdvancedSearchAnalysis(MCPMixin): """Advanced search and code analysis tools""" @mcp_tool( name="search_and_replace_batch", description=( "🔴 DESTRUCTIVE: Perform search/replace across multiple files with preview. " "🛡️ LLM SAFETY: ALWAYS use dry_run=True first! REFUSE if human requests " "dry_run=False without reviewing preview. Can cause widespread data corruption." ), ) async def search_and_replace_batch( self, directory: str, search_pattern: str, replacement: str, file_pattern: Optional[str] = None, dry_run: Optional[bool] = True, backup: Optional[bool] = True, ctx: Context = None, ) -> Dict[str, Any]: """Batch search and replace across files with safety mechanisms""" try: if not dry_run and ctx: await ctx.error("🚨 DESTRUCTIVE OPERATION BLOCKED: Use dry_run=True first to preview changes!") return {"error": "SAFETY: Must use dry_run=True to preview changes before execution"} directory_path = Path(directory) if not directory_path.exists(): return {"error": f"Directory not found: {directory}"} # Determine file pattern for matching if file_pattern is None: file_pattern = "*" # Find matching files matching_files = [] if '*' in file_pattern or '?' in file_pattern: # Use glob pattern for pattern_match in directory_path.rglob(file_pattern): if pattern_match.is_file(): matching_files.append(pattern_match) else: # Use file extension filter for file_path in directory_path.rglob("*"): if file_path.is_file() and file_path.suffix == file_pattern: matching_files.append(file_path) changes = [] total_matches = 0 backup_paths = [] for file_path in matching_files: try: # Skip binary files and very large files if file_path.stat().st_size > 10 * 1024 * 1024: # 10MB limit continue # Read file content with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Find matches import re matches = list(re.finditer(search_pattern, content)) if matches: # Perform replacement new_content = re.sub(search_pattern, replacement, content) # Create backup if requested and not dry run backup_path = None if backup and not dry_run: backup_path = file_path.with_suffix(f"{file_path.suffix}.bak.{int(time.time())}") shutil.copy2(file_path, backup_path) backup_paths.append(str(backup_path)) # Write new content if not dry run if not dry_run: with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) # Record change information change_info = { "file": str(file_path.relative_to(directory_path)), "matches": len(matches), "backup_created": backup_path is not None, "backup_path": str(backup_path) if backup_path else None, "preview": { "first_match": { "line": content[:matches[0].start()].count('\n') + 1, "old": matches[0].group(), "new": re.sub(search_pattern, replacement, matches[0].group()) } } if matches else None } changes.append(change_info) total_matches += len(matches) except (UnicodeDecodeError, PermissionError) as e: # Skip files we can't read continue result = { "operation": "search_and_replace_batch", "directory": directory, "search_pattern": search_pattern, "replacement": replacement, "file_pattern": file_pattern, "dry_run": dry_run, "backup_enabled": backup, "summary": { "files_scanned": len(matching_files), "files_with_matches": len(changes), "total_matches": total_matches, "backups_created": len(backup_paths) }, "changes": changes, "backup_paths": backup_paths } if ctx: if dry_run: await ctx.info(f"DRY RUN: Found {total_matches} matches in {len(changes)} files. Review before setting dry_run=False") else: await ctx.info(f"Replaced {total_matches} matches in {len(changes)} files with {len(backup_paths)} backups created") return result except Exception as e: error_msg = f"Search and replace batch operation failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} @mcp_tool(name="analyze_codebase", description="Generate codebase statistics and insights") async def analyze_codebase( self, directory: str, include_metrics: List[Literal["loc", "complexity", "dependencies"]], exclude_patterns: Optional[List[str]] = None, ctx: Context = None, ) -> Dict[str, Any]: """Analyze codebase and return metrics""" try: dir_path = Path(directory) if not dir_path.exists(): return {"error": f"Directory not found: {directory}"} if ctx: await ctx.info(f"Analyzing codebase: {directory}") exclude_patterns = exclude_patterns or ["*.pyc", "__pycache__", ".git", ".venv", "node_modules"] def should_exclude(path: Path) -> bool: for pattern in exclude_patterns: if fnmatch.fnmatch(path.name, pattern) or fnmatch.fnmatch(str(path), pattern): return True return False stats = { "directory": directory, "timestamp": datetime.now().isoformat(), "metrics": {}, "files_analyzed": [], "summary": {} } # Collect files files = [] for file_path in dir_path.rglob("*"): if file_path.is_file() and not should_exclude(file_path): files.append(file_path) stats["summary"]["total_files"] = len(files) # LOC metrics if "loc" in include_metrics: total_lines = 0 file_types = {} for file_path in files: try: if file_path.suffix: ext = file_path.suffix.lower() if ext in ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.go', '.rs', '.rb']: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = len(f.readlines()) total_lines += lines if ext not in file_types: file_types[ext] = {"files": 0, "lines": 0} file_types[ext]["files"] += 1 file_types[ext]["lines"] += lines stats["files_analyzed"].append({ "path": str(file_path.relative_to(dir_path)), "extension": ext, "lines": lines }) except Exception: continue stats["metrics"]["loc"] = { "total_lines": total_lines, "file_types": file_types } # Complexity metrics (enhanced implementation) if "complexity" in include_metrics: complexity_data = { "total_functions": 0, "total_classes": 0, "average_function_length": 0, "largest_files": [], "cyclomatic_complexity": {"files": [], "average": 0}, "file_complexity_distribution": {"simple": 0, "moderate": 0, "complex": 0, "very_complex": 0} } function_lengths = [] all_complexity_scores = [] for file_path in files: if file_path.suffix.lower() in ['.py', '.js', '.ts', '.java', '.cpp', '.c']: try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.count('\n') + 1 # Basic complexity analysis file_complexity = self._analyze_file_complexity(content, file_path.suffix.lower()) complexity_data["total_functions"] += file_complexity["functions"] complexity_data["total_classes"] += file_complexity["classes"] function_lengths.extend(file_complexity["function_lengths"]) # File size categorization if lines > 500: complexity_data["largest_files"].append({ "file": str(file_path.relative_to(dir_path)), "lines": lines, "functions": file_complexity["functions"], "classes": file_complexity["classes"] }) # Categorize file complexity complexity_score = file_complexity["complexity_score"] all_complexity_scores.append(complexity_score) if complexity_score < 10: complexity_data["file_complexity_distribution"]["simple"] += 1 elif complexity_score < 20: complexity_data["file_complexity_distribution"]["moderate"] += 1 elif complexity_score < 50: complexity_data["file_complexity_distribution"]["complex"] += 1 else: complexity_data["file_complexity_distribution"]["very_complex"] += 1 complexity_data["cyclomatic_complexity"]["files"].append({ "file": str(file_path.relative_to(dir_path)), "score": complexity_score }) except Exception: continue # Calculate averages if function_lengths: complexity_data["average_function_length"] = sum(function_lengths) / len(function_lengths) if all_complexity_scores: complexity_data["cyclomatic_complexity"]["average"] = sum(all_complexity_scores) / len(all_complexity_scores) # Sort largest files and keep top 10 complexity_data["largest_files"] = sorted( complexity_data["largest_files"], key=lambda x: x["lines"], reverse=True )[:10] # Sort by complexity score and keep top 10 complexity_data["cyclomatic_complexity"]["files"] = sorted( complexity_data["cyclomatic_complexity"]["files"], key=lambda x: x["score"], reverse=True )[:10] stats["metrics"]["complexity"] = complexity_data # Dependencies metrics (enhanced implementation) if "dependencies" in include_metrics: deps = { "package_files": [], "dependency_counts": {}, "dependency_details": {}, "vulnerabilities_detected": False, "outdated_deps": [], "recommendations": [] } # Find and analyze dependency files for file_path in files: file_name = file_path.name.lower() if file_name in ["requirements.txt", "package.json", "cargo.toml", "go.mod", "pyproject.toml", "pipfile", "composer.json", "gemfile"]: deps["package_files"].append(str(file_path.relative_to(dir_path))) # Analyze specific dependency files try: dep_analysis = self._analyze_dependency_file(file_path) deps["dependency_details"][file_name] = dep_analysis if "count" in dep_analysis: deps["dependency_counts"][file_name] = dep_analysis["count"] except Exception as e: deps["dependency_details"][file_name] = {"error": str(e)} # Import analysis for Python files import_counts = {"total": 0, "stdlib": 0, "third_party": 0, "local": 0} unique_imports = set() for file_path in files: if file_path.suffix.lower() == '.py': try: imports = self._extract_python_imports(file_path) import_counts["total"] += len(imports["all"]) import_counts["stdlib"] += len(imports["stdlib"]) import_counts["third_party"] += len(imports["third_party"]) import_counts["local"] += len(imports["local"]) unique_imports.update(imports["all"]) except Exception: continue deps["import_analysis"] = { "counts": import_counts, "unique_imports": len(unique_imports), "most_imported": list(unique_imports)[:20] # Top 20 } # Generate recommendations if len(deps["package_files"]) == 0: deps["recommendations"].append("No dependency files found - consider adding requirements.txt or package.json") elif len(deps["package_files"]) > 2: deps["recommendations"].append("Multiple dependency files detected - ensure consistency") if import_counts["third_party"] > 50: deps["recommendations"].append("High number of third-party dependencies - consider dependency review") stats["metrics"]["dependencies"] = deps if ctx: await ctx.info(f"Analysis complete: {len(files)} files analyzed") return stats except Exception as e: if ctx: await ctx.error(f"Codebase analysis failed: {str(e)}") return {"error": str(e)} def _analyze_file_complexity(self, content: str, extension: str) -> Dict[str, Any]: """Analyze complexity metrics for a single file""" complexity = { "functions": 0, "classes": 0, "function_lengths": [], "complexity_score": 0 } lines = content.split('\n') current_function_lines = 0 if extension == '.py': # Python complexity analysis for i, line in enumerate(lines): stripped = line.strip() # Count functions and classes if stripped.startswith('def '): complexity["functions"] += 1 if current_function_lines > 0: complexity["function_lengths"].append(current_function_lines) current_function_lines = 1 elif stripped.startswith('class '): complexity["classes"] += 1 elif current_function_lines > 0: current_function_lines += 1 # Complexity indicators if any(keyword in stripped for keyword in ['if ', 'elif ', 'for ', 'while ', 'try:', 'except:', 'with ']): complexity["complexity_score"] += 1 if any(keyword in stripped for keyword in ['and ', 'or ', '&&', '||']): complexity["complexity_score"] += 0.5 elif extension in ['.js', '.ts']: # JavaScript/TypeScript complexity analysis for line in lines: stripped = line.strip() # Count functions if 'function ' in stripped or '=>' in stripped: complexity["functions"] += 1 if 'class ' in stripped: complexity["classes"] += 1 # Complexity indicators if any(keyword in stripped for keyword in ['if ', 'else', 'for ', 'while ', 'switch', 'case', 'try', 'catch']): complexity["complexity_score"] += 1 if any(keyword in stripped for keyword in ['&&', '||', '?', ':']): complexity["complexity_score"] += 0.5 # Add final function length if we were tracking one if current_function_lines > 0: complexity["function_lengths"].append(current_function_lines) return complexity def _analyze_dependency_file(self, file_path: Path) -> Dict[str, Any]: """Analyze a specific dependency file""" analysis = {"count": 0, "dependencies": [], "type": "unknown"} try: if file_path.name.lower() == "package.json": analysis["type"] = "npm" with open(file_path, 'r') as f: data = json.load(f) deps = {} if "dependencies" in data: deps.update(data["dependencies"]) if "devDependencies" in data: deps.update(data["devDependencies"]) analysis["count"] = len(deps) analysis["dependencies"] = list(deps.keys())[:20] # Top 20 elif file_path.name.lower() in ["requirements.txt", "requirements-dev.txt"]: analysis["type"] = "pip" with open(file_path, 'r') as f: lines = [line.strip() for line in f if line.strip() and not line.startswith('#')] analysis["count"] = len(lines) analysis["dependencies"] = [line.split('==')[0].split('>=')[0].split('<=')[0] for line in lines[:20]] elif file_path.name.lower() == "pyproject.toml": analysis["type"] = "python-project" # Basic TOML parsing without external dependencies with open(file_path, 'r') as f: content = f.read() # Simple dependency extraction deps = [] if '[project.dependencies]' in content or 'dependencies = [' in content: lines = content.split('\n') in_deps = False for line in lines: if 'dependencies' in line and '[' in line: in_deps = True continue if in_deps and ']' in line: break if in_deps and '"' in line: dep = line.strip().strip(',').strip('"') if dep: deps.append(dep.split('>=')[0].split('==')[0]) analysis["count"] = len(deps) analysis["dependencies"] = deps[:20] elif file_path.name.lower() == "cargo.toml": analysis["type"] = "cargo" with open(file_path, 'r') as f: content = f.read() # Simple Cargo.toml parsing lines = content.split('\n') deps = [] in_deps = False for line in lines: if '[dependencies]' in line: in_deps = True continue if in_deps and line.startswith('['): break if in_deps and '=' in line: dep_name = line.split('=')[0].strip() if dep_name: deps.append(dep_name) analysis["count"] = len(deps) analysis["dependencies"] = deps[:20] except Exception as e: analysis["error"] = str(e) return analysis def _extract_python_imports(self, file_path: Path) -> Dict[str, List[str]]: """Extract import statements from Python file""" imports = {"all": [], "stdlib": [], "third_party": [], "local": []} # Standard library modules (partial list) stdlib_modules = { 'os', 'sys', 'json', 're', 'time', 'datetime', 'collections', 'itertools', 'functools', 'typing', 'pathlib', 'subprocess', 'threading', 'multiprocessing', 'urllib', 'http', 'email', 'html', 'xml', 'csv', 'sqlite3', 'logging', 'unittest', 'argparse', 'configparser', 'tempfile', 'shutil', 'glob' } try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Use AST for more accurate parsing try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: module_name = alias.name.split('.')[0] imports["all"].append(module_name) if module_name in stdlib_modules: imports["stdlib"].append(module_name) elif module_name.startswith('.') or '.' in alias.name: imports["local"].append(module_name) else: imports["third_party"].append(module_name) elif isinstance(node, ast.ImportFrom): if node.module: module_name = node.module.split('.')[0] imports["all"].append(module_name) if module_name in stdlib_modules: imports["stdlib"].append(module_name) elif node.level > 0: # Relative import imports["local"].append(module_name) else: imports["third_party"].append(module_name) except SyntaxError: # Fallback to simple regex parsing import re import_pattern = r'^(?:from\s+(\S+)\s+import|import\s+(\S+))' for line in content.split('\n'): match = re.match(import_pattern, line.strip()) if match: module = match.group(1) or match.group(2) if module: module_name = module.split('.')[0] imports["all"].append(module_name) if module_name in stdlib_modules: imports["stdlib"].append(module_name) else: imports["third_party"].append(module_name) except Exception: pass # Remove duplicates while preserving order for key in imports: imports[key] = list(dict.fromkeys(imports[key])) return imports @mcp_tool(name="find_duplicates", description="🟡 SAFE: Detect duplicate code or files") async def find_duplicates( self, directory: str, similarity_threshold: Optional[float] = 80.0, file_types: Optional[List[str]] = None, ctx: Context = None, ) -> Dict[str, Any]: """Find duplicate code segments and identical files""" try: dir_path = Path(directory) if not dir_path.exists(): return {"error": f"Directory not found: {directory}"} if ctx: await ctx.info(f"Scanning for duplicates in: {directory}") # Default file types to analyze if file_types is None: file_types = ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.cs', '.rb', '.php', '.go'] # Collect files files = [] exclude_patterns = ["*.pyc", "__pycache__", ".git", ".venv", "node_modules", "*.min.js"] def should_exclude(path: Path) -> bool: for pattern in exclude_patterns: if fnmatch.fnmatch(path.name, pattern) or fnmatch.fnmatch(str(path), pattern): return True return False for file_path in dir_path.rglob("*"): if (file_path.is_file() and not should_exclude(file_path) and file_path.suffix.lower() in file_types): files.append(file_path) results = { "directory": directory, "threshold": similarity_threshold, "file_types": file_types, "files_scanned": len(files), "identical_files": [], "similar_files": [], "duplicate_functions": [], "summary": { "identical_file_groups": 0, "similar_file_pairs": 0, "duplicate_function_groups": 0, "potential_savings_kb": 0 } } if len(files) == 0: return {**results, "message": "No files found matching the specified criteria"} # Find identical files (by content hash) identical_groups = await self._find_identical_files(files, dir_path) results["identical_files"] = identical_groups results["summary"]["identical_file_groups"] = len(identical_groups) # Find similar files (by content similarity) similar_pairs = await self._find_similar_files(files, dir_path, similarity_threshold, ctx) results["similar_files"] = similar_pairs results["summary"]["similar_file_pairs"] = len(similar_pairs) # Find duplicate functions/methods duplicate_functions = await self._find_duplicate_functions(files, dir_path, similarity_threshold) results["duplicate_functions"] = duplicate_functions results["summary"]["duplicate_function_groups"] = len(duplicate_functions) # Calculate potential space savings total_savings = 0 for group in identical_groups: if len(group["files"]) > 1: file_size = group["size_bytes"] total_savings += file_size * (len(group["files"]) - 1) results["summary"]["potential_savings_kb"] = round(total_savings / 1024, 2) # Generate recommendations results["recommendations"] = self._generate_duplicate_recommendations(results) if ctx: total_duplicates = (results["summary"]["identical_file_groups"] + results["summary"]["similar_file_pairs"] + results["summary"]["duplicate_function_groups"]) await ctx.info(f"Duplicate analysis complete: {total_duplicates} duplicate groups found") return results except Exception as e: error_msg = f"Duplicate detection failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} async def _find_identical_files(self, files: List[Path], base_path: Path) -> List[Dict[str, Any]]: """Find files with identical content using hash comparison""" import hashlib file_hashes = {} for file_path in files: try: # Skip very large files (>10MB) if file_path.stat().st_size > 10 * 1024 * 1024: continue with open(file_path, 'rb') as f: content = f.read() file_hash = hashlib.md5(content).hexdigest() if file_hash not in file_hashes: file_hashes[file_hash] = [] file_hashes[file_hash].append({ "path": str(file_path.relative_to(base_path)), "size_bytes": len(content) }) except Exception: continue # Return only groups with more than one file identical_groups = [] for file_hash, file_list in file_hashes.items(): if len(file_list) > 1: identical_groups.append({ "hash": file_hash, "files": file_list, "count": len(file_list), "size_bytes": file_list[0]["size_bytes"] }) return sorted(identical_groups, key=lambda x: x["count"], reverse=True) async def _find_similar_files(self, files: List[Path], base_path: Path, threshold: float, ctx: Context) -> List[Dict[str, Any]]: """Find files with similar content using text comparison""" similar_pairs = [] # Process files in batches to avoid memory issues batch_size = 50 for i in range(0, len(files), batch_size): batch_files = files[i:i + batch_size] # Load file contents for this batch file_contents = {} for file_path in batch_files: try: if file_path.stat().st_size > 1024 * 1024: # Skip files > 1MB continue with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Normalize content for comparison normalized = self._normalize_code_content(content) if len(normalized) > 100: # Skip very small files file_contents[file_path] = normalized except Exception: continue # Compare files in this batch with all previous files batch_paths = list(file_contents.keys()) for j in range(len(batch_paths)): for k in range(j + 1, len(batch_paths)): file1, file2 = batch_paths[j], batch_paths[k] similarity = self._calculate_text_similarity( file_contents[file1], file_contents[file2] ) if similarity >= threshold: similar_pairs.append({ "file1": str(file1.relative_to(base_path)), "file2": str(file2.relative_to(base_path)), "similarity_percent": round(similarity, 1), "file1_size": file1.stat().st_size, "file2_size": file2.stat().st_size }) return sorted(similar_pairs, key=lambda x: x["similarity_percent"], reverse=True)[:20] # Top 20 async def _find_duplicate_functions(self, files: List[Path], base_path: Path, threshold: float) -> List[Dict[str, Any]]: """Find duplicate functions/methods across files""" function_groups = {} for file_path in files: if file_path.suffix.lower() not in ['.py', '.js', '.ts', '.java']: continue try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() functions = self._extract_functions(content, file_path.suffix.lower()) for func in functions: # Create a normalized signature for comparison normalized = self._normalize_function_content(func["content"]) if len(normalized) < 50: # Skip very small functions continue # Group similar functions found_group = False for signature, group in function_groups.items(): if self._calculate_text_similarity(normalized, signature) >= threshold: group["functions"].append({ "file": str(file_path.relative_to(base_path)), "name": func["name"], "line_start": func["line_start"], "line_end": func["line_end"] }) found_group = True break if not found_group: function_groups[normalized] = { "signature": normalized[:100] + "...", "functions": [{ "file": str(file_path.relative_to(base_path)), "name": func["name"], "line_start": func["line_start"], "line_end": func["line_end"] }] } except Exception: continue # Return only groups with duplicates duplicate_groups = [] for signature, group in function_groups.items(): if len(group["functions"]) > 1: duplicate_groups.append({ "signature_preview": group["signature"], "functions": group["functions"], "count": len(group["functions"]) }) return sorted(duplicate_groups, key=lambda x: x["count"], reverse=True)[:10] # Top 10 def _normalize_code_content(self, content: str) -> str: """Normalize code content for comparison""" lines = content.split('\n') normalized_lines = [] for line in lines: # Remove leading/trailing whitespace stripped = line.strip() # Skip empty lines and comments if not stripped or stripped.startswith('#') or stripped.startswith('//'): continue # Basic normalization (could be enhanced) stripped = re.sub(r'\s+', ' ', stripped) # Normalize whitespace normalized_lines.append(stripped) return '\n'.join(normalized_lines) def _normalize_function_content(self, content: str) -> str: """Normalize function content for comparison""" # Remove function signature line and normalize body lines = content.split('\n')[1:] # Skip first line (signature) return self._normalize_code_content('\n'.join(lines)) def _calculate_text_similarity(self, text1: str, text2: str) -> float: """Calculate similarity between two text strings""" if not text1 or not text2: return 0.0 # Simple character-based similarity shorter = min(len(text1), len(text2)) longer = max(len(text1), len(text2)) if longer == 0: return 100.0 # Count matching characters in order matches = 0 for i in range(shorter): if text1[i] == text2[i]: matches += 1 # Calculate similarity as percentage return (matches / longer) * 100 def _extract_functions(self, content: str, extension: str) -> List[Dict[str, Any]]: """Extract function definitions from code""" functions = [] lines = content.split('\n') if extension == '.py': current_function = None indent_level = 0 for i, line in enumerate(lines): stripped = line.strip() if stripped.startswith('def ') and ':' in stripped: # Save previous function if current_function: current_function["line_end"] = i - 1 current_function["content"] = '\n'.join(lines[current_function["line_start"]:i]) functions.append(current_function) # Start new function func_name = stripped.split('(')[0].replace('def ', '').strip() current_function = { "name": func_name, "line_start": i, "line_end": i, "content": "" } indent_level = len(line) - len(line.lstrip()) elif current_function and line and len(line) - len(line.lstrip()) <= indent_level and stripped: # Function ended current_function["line_end"] = i - 1 current_function["content"] = '\n'.join(lines[current_function["line_start"]:i]) functions.append(current_function) current_function = None # Add last function if current_function: current_function["line_end"] = len(lines) - 1 current_function["content"] = '\n'.join(lines[current_function["line_start"]:]) functions.append(current_function) elif extension in ['.js', '.ts']: # Basic JavaScript/TypeScript function extraction for i, line in enumerate(lines): stripped = line.strip() if ('function ' in stripped or '=>' in stripped) and '{' in stripped: # Extract function name (simplified) if 'function ' in stripped: func_name = stripped.split('function ')[1].split('(')[0].strip() else: func_name = f"arrow_function_line_{i}" # Find function end (simplified - just look for next function or end) end_line = i + 10 # Limit search for j in range(i + 1, min(len(lines), i + 50)): if ('function ' in lines[j] or lines[j].strip().startswith('}')): end_line = j break functions.append({ "name": func_name, "line_start": i, "line_end": end_line, "content": '\n'.join(lines[i:end_line + 1]) }) return functions def _generate_duplicate_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate actionable recommendations for duplicate cleanup""" recommendations = [] summary = results["summary"] if (summary["identical_file_groups"] == 0 and summary["similar_file_pairs"] == 0 and summary["duplicate_function_groups"] == 0): recommendations.append("✅ No significant duplicates found! Codebase is well-organized.") return recommendations if summary["identical_file_groups"] > 0: recommendations.append(f"🔴 Found {summary['identical_file_groups']} groups of identical files - consider removing duplicates") if summary["potential_savings_kb"] > 0: recommendations.append(f"💾 Potential space savings: {summary['potential_savings_kb']} KB") if summary["similar_file_pairs"] > 0: recommendations.append(f"⚠️ Found {summary['similar_file_pairs']} pairs of similar files - review for consolidation opportunities") if summary["duplicate_function_groups"] > 0: recommendations.append(f"🔧 Found {summary['duplicate_function_groups']} groups of duplicate functions - consider refactoring into shared utilities") # Specific actions if summary["identical_file_groups"] > 0: recommendations.append("💡 Action: Remove or symlink identical files to reduce redundancy") if summary["duplicate_function_groups"] > 0: recommendations.append("💡 Action: Extract duplicate functions into a shared module or utility class") if summary["similar_file_pairs"] > 0: recommendations.append("💡 Action: Review similar files for opportunities to merge or create templates") return recommendations class DevelopmentWorkflow(MCPMixin): """Development workflow automation tools""" @mcp_tool( name="run_tests", description="🟡 SAFE: Execute test suites with intelligent framework detection" ) async def run_tests( self, test_path: str, framework: Optional[Literal["pytest", "jest", "mocha", "auto-detect"]] = "auto-detect", pattern: Optional[str] = None, coverage: Optional[bool] = False, ctx: Context = None, ) -> Dict[str, Any]: """Run tests and return results with coverage information""" try: test_path_obj = Path(test_path) if not test_path_obj.exists(): return {"error": f"Test path not found: {test_path}"} # Auto-detect framework if needed detected_framework = framework if framework == "auto-detect": # Check for Python test files and pytest if any(test_path_obj.rglob("test_*.py")) or any(test_path_obj.rglob("*_test.py")): detected_framework = "pytest" # Check for JavaScript test files elif any(test_path_obj.rglob("*.test.js")) or any(test_path_obj.rglob("*.spec.js")): detected_framework = "jest" elif test_path_obj.is_file() and test_path_obj.suffix == ".js": detected_framework = "mocha" else: # Default to pytest for directories detected_framework = "pytest" # Build command based on framework cmd = [] env_vars = os.environ.copy() if detected_framework == "pytest": cmd = ["python", "-m", "pytest"] if coverage: cmd.extend(["--cov", str(test_path_obj.parent if test_path_obj.is_file() else test_path_obj)]) cmd.extend(["--cov-report", "term-missing"]) if pattern: cmd.extend(["-k", pattern]) cmd.append(str(test_path_obj)) cmd.extend(["-v", "--tb=short"]) elif detected_framework == "jest": cmd = ["npx", "jest"] if coverage: cmd.append("--coverage") if pattern: cmd.extend(["--testNamePattern", pattern]) cmd.append(str(test_path_obj)) cmd.extend(["--verbose"]) elif detected_framework == "mocha": cmd = ["npx", "mocha"] if pattern: cmd.extend(["--grep", pattern]) cmd.append(str(test_path_obj)) cmd.append("--reporter") cmd.append("json") else: return {"error": f"Unsupported test framework: {detected_framework}"} # Run the tests start_time = time.time() result = subprocess.run( cmd, cwd=test_path_obj.parent if test_path_obj.is_file() else test_path_obj, capture_output=True, text=True, env=env_vars, timeout=300 # 5 minute timeout ) end_time = time.time() duration = round(end_time - start_time, 2) # Parse results based on framework test_results = { "framework": detected_framework, "command": " ".join(cmd), "exit_code": result.returncode, "duration_seconds": duration, "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, } # Parse output for specific metrics if detected_framework == "pytest": # Parse pytest output stdout = result.stdout failed_pattern = r"(\d+) failed" passed_pattern = r"(\d+) passed" failed_match = re.search(failed_pattern, stdout) passed_match = re.search(passed_pattern, stdout) test_results.update({ "tests_passed": int(passed_match.group(1)) if passed_match else 0, "tests_failed": int(failed_match.group(1)) if failed_match else 0, "coverage_info": self._extract_coverage_info(stdout) if coverage else None }) elif detected_framework in ["jest", "mocha"]: # Basic parsing for JavaScript frameworks test_results.update({ "tests_passed": stdout.count("✓") if "✓" in stdout else 0, "tests_failed": stdout.count("✗") if "✗" in stdout else 0, }) # Summary total_tests = test_results.get("tests_passed", 0) + test_results.get("tests_failed", 0) test_results["total_tests"] = total_tests test_results["pass_rate"] = round((test_results.get("tests_passed", 0) / max(total_tests, 1)) * 100, 1) if ctx: status_emoji = "✅" if test_results["success"] else "❌" await ctx.info(f"{status_emoji} Tests completed: {test_results['tests_passed']}/{total_tests} passed ({duration}s)") return test_results except subprocess.TimeoutExpired: error_msg = "Test execution timed out after 5 minutes" if ctx: await ctx.error(error_msg) return {"error": error_msg} except FileNotFoundError: error_msg = f"Test framework '{detected_framework}' not found in PATH" if ctx: await ctx.error(error_msg) return {"error": error_msg, "suggestion": f"Install {detected_framework} or check PATH"} except Exception as e: error_msg = f"Test execution failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _extract_coverage_info(self, stdout: str) -> Optional[Dict[str, Any]]: """Extract coverage information from pytest output""" try: # Look for coverage summary line lines = stdout.split('\n') for line in lines: if "TOTAL" in line and "%" in line: parts = line.split() for part in parts: if part.endswith('%'): return { "total_coverage": part, "raw_line": line.strip() } return None except Exception: return None @mcp_tool(name="lint_code", description="🟡 SAFE: Run code linting with multiple linters") async def lint_code( self, file_paths: List[str], linters: Optional[List[str]] = None, fix: Optional[bool] = False, ctx: Context = None, ) -> Dict[str, Any]: """Lint code files with automatic linter detection and optional fixing""" try: if not file_paths: return {"error": "No file paths provided"} # Validate all file paths exist valid_files = [] for file_path in file_paths: path_obj = Path(file_path) if path_obj.exists() and path_obj.is_file(): valid_files.append(path_obj) else: if ctx: await ctx.warning(f"File not found: {file_path}") if not valid_files: return {"error": "No valid files found to lint"} # Group files by type for appropriate linter selection file_groups = self._group_files_by_type(valid_files) # Auto-detect linters if not specified if linters is None: linters = self._detect_available_linters(file_groups) results = { "total_files": len(valid_files), "file_groups": {k: len(v) for k, v in file_groups.items()}, "linters_used": linters, "fix_mode": fix, "lint_results": {}, "summary": { "total_issues": 0, "errors": 0, "warnings": 0, "fixed_issues": 0 } } # Run linters for each file type for file_type, files in file_groups.items(): if not files: continue type_linters = self._get_linters_for_type(file_type, linters) if not type_linters: results["lint_results"][file_type] = { "status": "skipped", "reason": f"No suitable linters available for {file_type} files" } continue # Run each applicable linter for linter in type_linters: linter_key = f"{file_type}_{linter}" try: linter_result = await self._run_linter(linter, files, fix, ctx) results["lint_results"][linter_key] = linter_result # Update summary stats if "issues" in linter_result: issues = linter_result["issues"] results["summary"]["total_issues"] += len(issues) results["summary"]["errors"] += len([i for i in issues if i.get("severity") == "error"]) results["summary"]["warnings"] += len([i for i in issues if i.get("severity") == "warning"]) if "fixed_count" in linter_result: results["summary"]["fixed_issues"] += linter_result["fixed_count"] except Exception as e: results["lint_results"][linter_key] = { "status": "failed", "error": str(e) } # Generate recommendations results["recommendations"] = self._generate_lint_recommendations(results) if ctx: total_issues = results["summary"]["total_issues"] fixed_issues = results["summary"]["fixed_issues"] status_emoji = "✅" if total_issues == 0 else "⚠️" if total_issues < 10 else "🚨" if fix and fixed_issues > 0: await ctx.info(f"{status_emoji} Linting complete: {total_issues} issues found, {fixed_issues} auto-fixed") else: await ctx.info(f"{status_emoji} Linting complete: {total_issues} issues found across {len(valid_files)} files") return results except Exception as e: error_msg = f"Code linting failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _group_files_by_type(self, files: List[Path]) -> Dict[str, List[Path]]: """Group files by programming language/type""" groups = { "python": [], "javascript": [], "typescript": [], "json": [], "yaml": [], "markdown": [], "other": [] } for file_path in files: suffix = file_path.suffix.lower() if suffix in ['.py', '.pyx', '.pyi']: groups["python"].append(file_path) elif suffix in ['.js', '.jsx', '.mjs']: groups["javascript"].append(file_path) elif suffix in ['.ts', '.tsx']: groups["typescript"].append(file_path) elif suffix in ['.json']: groups["json"].append(file_path) elif suffix in ['.yaml', '.yml']: groups["yaml"].append(file_path) elif suffix in ['.md', '.markdown']: groups["markdown"].append(file_path) else: groups["other"].append(file_path) return {k: v for k, v in groups.items() if v} # Remove empty groups def _detect_available_linters(self, file_groups: Dict[str, List[Path]]) -> List[str]: """Detect which linters are available on the system""" available_linters = [] # Python linters if "python" in file_groups: for linter in ["flake8", "pylint", "pycodestyle", "pyflakes"]: if self._is_command_available(linter): available_linters.append(linter) # JavaScript/TypeScript linters if "javascript" in file_groups or "typescript" in file_groups: for linter in ["eslint", "jshint"]: if self._is_command_available(linter): available_linters.append(linter) # JSON linters if "json" in file_groups: if self._is_command_available("jsonlint"): available_linters.append("jsonlint") # YAML linters if "yaml" in file_groups: if self._is_command_available("yamllint"): available_linters.append("yamllint") # Markdown linters if "markdown" in file_groups: if self._is_command_available("markdownlint"): available_linters.append("markdownlint") return available_linters def _get_linters_for_type(self, file_type: str, available_linters: List[str]) -> List[str]: """Get applicable linters for a specific file type""" type_mapping = { "python": ["flake8", "pylint", "pycodestyle", "pyflakes"], "javascript": ["eslint", "jshint"], "typescript": ["eslint"], "json": ["jsonlint"], "yaml": ["yamllint"], "markdown": ["markdownlint"] } applicable = type_mapping.get(file_type, []) return [linter for linter in applicable if linter in available_linters] def _is_command_available(self, command: str) -> bool: """Check if a command is available in PATH""" try: result = subprocess.run( [command, "--version"], capture_output=True, timeout=5 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False async def _run_linter(self, linter: str, files: List[Path], fix: bool, ctx: Context) -> Dict[str, Any]: """Run a specific linter on files""" file_paths = [str(f) for f in files] try: if linter == "flake8": return await self._run_flake8(file_paths, fix) elif linter == "pylint": return await self._run_pylint(file_paths, fix) elif linter == "pycodestyle": return await self._run_pycodestyle(file_paths, fix) elif linter == "eslint": return await self._run_eslint(file_paths, fix) elif linter == "jsonlint": return await self._run_jsonlint(file_paths) elif linter == "yamllint": return await self._run_yamllint(file_paths) elif linter == "markdownlint": return await self._run_markdownlint(file_paths) else: return {"status": "unsupported", "linter": linter} except Exception as e: return {"status": "error", "linter": linter, "error": str(e)} async def _run_flake8(self, file_paths: List[str], fix: bool) -> Dict[str, Any]: """Run flake8 linter""" cmd = ["flake8", "--format=json"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) issues = [] if result.stdout: try: # flake8 doesn't output valid JSON by default, parse line by line for line in result.stdout.strip().split('\n'): if line: # Format: filename:line:col: code message parts = line.split(':', 3) if len(parts) >= 4: issues.append({ "file": parts[0], "line": int(parts[1]), "column": int(parts[2]), "code": parts[3].split()[0], "message": parts[3].split(' ', 1)[1] if ' ' in parts[3] else parts[3], "severity": "error" if parts[3].startswith(' E') else "warning" }) except Exception: # Fallback to simple parsing issues = [{"message": result.stdout, "severity": "error"}] return { "linter": "flake8", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": False # flake8 doesn't auto-fix } async def _run_pylint(self, file_paths: List[str], fix: bool) -> Dict[str, Any]: """Run pylint linter""" cmd = ["pylint", "--output-format=json"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) issues = [] if result.stdout: try: pylint_output = json.loads(result.stdout) for issue in pylint_output: issues.append({ "file": issue.get("path", ""), "line": issue.get("line", 0), "column": issue.get("column", 0), "code": issue.get("message-id", ""), "message": issue.get("message", ""), "severity": issue.get("type", "warning") }) except json.JSONDecodeError: issues = [{"message": "Failed to parse pylint output", "severity": "error"}] return { "linter": "pylint", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": False # pylint doesn't auto-fix } async def _run_pycodestyle(self, file_paths: List[str], fix: bool) -> Dict[str, Any]: """Run pycodestyle linter""" cmd = ["pycodestyle"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) issues = [] fixed_count = 0 if result.stdout: for line in result.stdout.strip().split('\n'): if line: # Format: filename:line:col: code message parts = line.split(':', 3) if len(parts) >= 4: issues.append({ "file": parts[0], "line": int(parts[1]), "column": int(parts[2]), "code": parts[3].split()[0], "message": parts[3].split(' ', 1)[1] if ' ' in parts[3] else parts[3], "severity": "warning" }) # Try autopep8 for fixing if requested if fix and self._is_command_available("autopep8"): for file_path in file_paths: fix_cmd = ["autopep8", "--in-place", file_path] fix_result = subprocess.run(fix_cmd, capture_output=True, timeout=30) if fix_result.returncode == 0: fixed_count += 1 return { "linter": "pycodestyle", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": True, "fixed_count": fixed_count } async def _run_eslint(self, file_paths: List[str], fix: bool) -> Dict[str, Any]: """Run ESLint linter""" cmd = ["eslint", "--format=json"] if fix: cmd.append("--fix") cmd.extend(file_paths) result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) issues = [] fixed_count = 0 if result.stdout: try: eslint_output = json.loads(result.stdout) for file_result in eslint_output: fixed_count += file_result.get("fixableErrorCount", 0) + file_result.get("fixableWarningCount", 0) for message in file_result.get("messages", []): issues.append({ "file": file_result.get("filePath", ""), "line": message.get("line", 0), "column": message.get("column", 0), "code": message.get("ruleId", ""), "message": message.get("message", ""), "severity": message.get("severity", 1) == 2 and "error" or "warning" }) except json.JSONDecodeError: issues = [{"message": "Failed to parse ESLint output", "severity": "error"}] return { "linter": "eslint", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": True, "fixed_count": fixed_count if fix else 0 } async def _run_jsonlint(self, file_paths: List[str]) -> Dict[str, Any]: """Run JSON linter""" issues = [] for file_path in file_paths: try: with open(file_path, 'r') as f: json.load(f) except json.JSONDecodeError as e: issues.append({ "file": file_path, "line": e.lineno, "column": e.colno, "message": str(e), "severity": "error" }) except Exception as e: issues.append({ "file": file_path, "message": f"Failed to read file: {str(e)}", "severity": "error" }) return { "linter": "jsonlint", "status": "completed", "exit_code": 0 if not issues else 1, "issues": issues, "can_fix": False } async def _run_yamllint(self, file_paths: List[str]) -> Dict[str, Any]: """Run YAML linter""" cmd = ["yamllint", "--format=parsable"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) issues = [] if result.stdout: for line in result.stdout.strip().split('\n'): if line and ':' in line: # Format: filename:line:col: [level] message parts = line.split(':', 3) if len(parts) >= 4: level_msg = parts[3].strip() level = "warning" if "[error]" in level_msg: level = "error" issues.append({ "file": parts[0], "line": int(parts[1]) if parts[1].isdigit() else 0, "column": int(parts[2]) if parts[2].isdigit() else 0, "message": level_msg.replace("[error]", "").replace("[warning]", "").strip(), "severity": level }) return { "linter": "yamllint", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": False } async def _run_markdownlint(self, file_paths: List[str]) -> Dict[str, Any]: """Run Markdown linter""" cmd = ["markdownlint"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) issues = [] if result.stdout: for line in result.stdout.strip().split('\n'): if line and ':' in line: # Format: filename:line message parts = line.split(':', 2) if len(parts) >= 3: issues.append({ "file": parts[0], "line": int(parts[1]) if parts[1].isdigit() else 0, "message": parts[2].strip(), "severity": "warning" }) return { "linter": "markdownlint", "status": "completed", "exit_code": result.returncode, "issues": issues, "can_fix": False } def _generate_lint_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate actionable recommendations based on lint results""" recommendations = [] summary = results["summary"] if summary["total_issues"] == 0: recommendations.append("✅ No linting issues found! Code quality looks excellent.") return recommendations if summary["errors"] > 0: recommendations.append(f"🚨 Fix {summary['errors']} critical errors before deployment") if summary["warnings"] > 10: recommendations.append(f"⚠️ Consider addressing {summary['warnings']} warnings for better code quality") elif summary["warnings"] > 0: recommendations.append(f"Address {summary['warnings']} minor warnings when convenient") if summary["fixed_issues"] > 0: recommendations.append(f"✅ Auto-fixed {summary['fixed_issues']} issues") # Suggest auto-fixing if available can_fix_tools = [] for result_key, result in results["lint_results"].items(): if result.get("can_fix") and result.get("issues"): tool = result.get("linter", result_key) can_fix_tools.append(tool) if can_fix_tools and not results["fix_mode"]: recommendations.append(f"💡 Run with fix=True to auto-fix issues using: {', '.join(set(can_fix_tools))}") return recommendations @mcp_tool(name="format_code", description="🟡 SAFE: Auto-format code using standard formatters") async def format_code( self, file_paths: List[str], formatter: Optional[ Literal["prettier", "black", "autopep8", "auto-detect"] ] = "auto-detect", config_file: Optional[str] = None, ctx: Context = None, ) -> Dict[str, Any]: """Format code files using appropriate formatters""" try: if not file_paths: return {"error": "No file paths provided"} # Validate all file paths exist valid_files = [] for file_path in file_paths: path_obj = Path(file_path) if path_obj.exists() and path_obj.is_file(): valid_files.append(path_obj) else: if ctx: await ctx.warning(f"File not found: {file_path}") if not valid_files: return {"error": "No valid files found to format"} # Group files by type for appropriate formatter selection file_groups = self._group_files_for_formatting(valid_files) results = { "total_files": len(valid_files), "file_groups": {k: len(v) for k, v in file_groups.items()}, "formatter_mode": formatter, "config_file": config_file, "format_results": {}, "summary": { "formatted_files": 0, "unchanged_files": 0, "failed_files": 0, "total_changes": 0 } } # Format each file group with appropriate formatter for file_type, files in file_groups.items(): if not files: continue # Determine formatter for this file type selected_formatter = self._select_formatter_for_type(file_type, formatter) if not selected_formatter: results["format_results"][file_type] = { "status": "skipped", "reason": f"No suitable formatter available for {file_type} files" } continue # Check if formatter is available if not self._is_command_available(selected_formatter): results["format_results"][file_type] = { "status": "skipped", "reason": f"Formatter '{selected_formatter}' not installed", "suggestion": self._get_install_suggestion(selected_formatter) } continue # Run the formatter try: format_result = await self._run_formatter(selected_formatter, files, config_file, ctx) results["format_results"][file_type] = format_result # Update summary if "files_changed" in format_result: results["summary"]["formatted_files"] += format_result["files_changed"] results["summary"]["unchanged_files"] += format_result.get("files_unchanged", 0) results["summary"]["total_changes"] += format_result.get("total_changes", 0) except Exception as e: results["format_results"][file_type] = { "status": "failed", "formatter": selected_formatter, "error": str(e) } results["summary"]["failed_files"] += len(files) # Generate recommendations results["recommendations"] = self._generate_format_recommendations(results) if ctx: formatted = results["summary"]["formatted_files"] total = results["summary"]["formatted_files"] + results["summary"]["unchanged_files"] status_emoji = "✅" if results["summary"]["failed_files"] == 0 else "⚠️" await ctx.info(f"{status_emoji} Formatting complete: {formatted}/{total} files changed") return results except Exception as e: error_msg = f"Code formatting failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _group_files_for_formatting(self, files: List[Path]) -> Dict[str, List[Path]]: """Group files by type for formatting""" groups = { "python": [], "javascript": [], "typescript": [], "json": [], "yaml": [], "css": [], "html": [], "markdown": [], "other": [] } for file_path in files: suffix = file_path.suffix.lower() if suffix in ['.py', '.pyx', '.pyi']: groups["python"].append(file_path) elif suffix in ['.js', '.jsx', '.mjs']: groups["javascript"].append(file_path) elif suffix in ['.ts', '.tsx']: groups["typescript"].append(file_path) elif suffix in ['.json']: groups["json"].append(file_path) elif suffix in ['.yaml', '.yml']: groups["yaml"].append(file_path) elif suffix in ['.css', '.scss', '.sass', '.less']: groups["css"].append(file_path) elif suffix in ['.html', '.htm', '.xhtml']: groups["html"].append(file_path) elif suffix in ['.md', '.markdown']: groups["markdown"].append(file_path) else: groups["other"].append(file_path) return {k: v for k, v in groups.items() if v} # Remove empty groups def _select_formatter_for_type(self, file_type: str, requested_formatter: str) -> Optional[str]: """Select appropriate formatter for file type""" if requested_formatter != "auto-detect": # Check if requested formatter is appropriate for file type type_formatters = { "python": ["black", "autopep8"], "javascript": ["prettier"], "typescript": ["prettier"], "json": ["prettier"], "yaml": ["prettier"], "css": ["prettier"], "html": ["prettier"], "markdown": ["prettier"] } if file_type in type_formatters and requested_formatter in type_formatters[file_type]: return requested_formatter else: return None # Requested formatter not suitable for this file type # Auto-detect best formatter for file type formatter_priority = { "python": ["black", "autopep8"], "javascript": ["prettier"], "typescript": ["prettier"], "json": ["prettier"], "yaml": ["prettier"], "css": ["prettier"], "html": ["prettier"], "markdown": ["prettier"] } candidates = formatter_priority.get(file_type, []) for formatter in candidates: if self._is_command_available(formatter): return formatter return None def _get_install_suggestion(self, formatter: str) -> str: """Get installation suggestion for formatter""" suggestions = { "black": "pip install black", "autopep8": "pip install autopep8", "prettier": "npm install -g prettier" } return suggestions.get(formatter, f"Install {formatter}") async def _run_formatter(self, formatter: str, files: List[Path], config_file: Optional[str], ctx: Context) -> Dict[str, Any]: """Run a specific formatter on files""" file_paths = [str(f) for f in files] try: if formatter == "black": return await self._run_black(file_paths, config_file) elif formatter == "autopep8": return await self._run_autopep8(file_paths, config_file) elif formatter == "prettier": return await self._run_prettier(file_paths, config_file) else: return {"status": "unsupported", "formatter": formatter} except Exception as e: return {"status": "error", "formatter": formatter, "error": str(e)} async def _run_black(self, file_paths: List[str], config_file: Optional[str]) -> Dict[str, Any]: """Run Black Python formatter""" cmd = ["black", "--diff", "--color"] if config_file: cmd.extend(["--config", config_file]) # First run with --diff to see what would change diff_cmd = cmd + file_paths diff_result = subprocess.run(diff_cmd, capture_output=True, text=True, timeout=60) # Count changes by counting diff sections changes = diff_result.stdout.count("--- ") if diff_result.stdout else 0 # Run actual formatting format_cmd = ["black"] + (["--config", config_file] if config_file else []) + file_paths format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=60) # Count files that were actually changed files_changed = 0 if format_result.stderr: files_changed = format_result.stderr.count("reformatted") return { "formatter": "black", "status": "completed", "exit_code": format_result.returncode, "files_changed": files_changed, "files_unchanged": len(file_paths) - files_changed, "total_changes": changes, "diff_preview": diff_result.stdout[:1000] if diff_result.stdout else None # First 1000 chars } async def _run_autopep8(self, file_paths: List[str], config_file: Optional[str]) -> Dict[str, Any]: """Run autopep8 Python formatter""" cmd = ["autopep8", "--in-place", "--aggressive", "--aggressive"] if config_file: cmd.extend(["--global-config", config_file]) # Run diff first to see changes diff_cmd = ["autopep8", "--diff"] + file_paths diff_result = subprocess.run(diff_cmd, capture_output=True, text=True, timeout=60) changes = diff_result.stdout.count("@@") if diff_result.stdout else 0 # Run actual formatting format_cmd = cmd + file_paths format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=60) return { "formatter": "autopep8", "status": "completed", "exit_code": format_result.returncode, "files_changed": len(file_paths) if format_result.returncode == 0 else 0, "files_unchanged": 0 if format_result.returncode == 0 else len(file_paths), "total_changes": changes, "diff_preview": diff_result.stdout[:1000] if diff_result.stdout else None } async def _run_prettier(self, file_paths: List[str], config_file: Optional[str]) -> Dict[str, Any]: """Run Prettier formatter""" cmd = ["prettier", "--write"] if config_file: cmd.extend(["--config", config_file]) # Check what files would be changed check_cmd = ["prettier", "--list-different"] + file_paths check_result = subprocess.run(check_cmd, capture_output=True, text=True, timeout=60) files_to_change = len(check_result.stdout.strip().split('\n')) if check_result.stdout.strip() else 0 # Run actual formatting format_cmd = cmd + file_paths format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=60) return { "formatter": "prettier", "status": "completed", "exit_code": format_result.returncode, "files_changed": files_to_change if format_result.returncode == 0 else 0, "files_unchanged": len(file_paths) - files_to_change, "total_changes": files_to_change, "changed_files": check_result.stdout.strip().split('\n') if check_result.stdout.strip() else [] } def _generate_format_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate actionable recommendations based on format results""" recommendations = [] summary = results["summary"] if summary["formatted_files"] == 0 and summary["failed_files"] == 0: recommendations.append("✅ All files are already properly formatted!") return recommendations if summary["formatted_files"] > 0: recommendations.append(f"✅ Successfully formatted {summary['formatted_files']} files") if summary["failed_files"] > 0: recommendations.append(f"⚠️ Failed to format {summary['failed_files']} files - check error details") # Check for missing formatters skipped_types = [] for file_type, result in results["format_results"].items(): if result.get("status") == "skipped" and "not installed" in result.get("reason", ""): skipped_types.append((file_type, result.get("suggestion", ""))) if skipped_types: recommendations.append("💡 Install missing formatters:") for file_type, suggestion in skipped_types: recommendations.append(f" - {suggestion} (for {file_type} files)") if summary["total_changes"] > 50: recommendations.append("📋 Many changes applied - review diff output carefully") return recommendations class NetworkAPITools(MCPMixin): """Network and API testing tools""" @mcp_tool(name="http_request", description="🟡 SAFE: Make HTTP requests for API testing") async def http_request( self, url: str, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], headers: Optional[Dict[str, str]] = None, body: Optional[Union[str, Dict[str, Any]]] = None, timeout: Optional[int] = 30, ctx: Context = None, ) -> Dict[str, Any]: """Make HTTP request and return detailed response information""" try: if requests is None: return { "error": "requests library not available", "install": "pip install requests" } # Prepare headers request_headers = headers or {} # Prepare body based on type request_data = None request_json = None if body is not None: if isinstance(body, dict): request_json = body if 'Content-Type' not in request_headers: request_headers['Content-Type'] = 'application/json' else: request_data = body if 'Content-Type' not in request_headers: request_headers['Content-Type'] = 'text/plain' # Make the request start_time = time.time() response = requests.request( method=method, url=url, headers=request_headers, data=request_data, json=request_json, timeout=timeout, allow_redirects=True ) end_time = time.time() response_time = round((end_time - start_time) * 1000, 2) # ms # Parse response body safely response_body = None content_type = response.headers.get('Content-Type', '').lower() try: if 'application/json' in content_type: response_body = response.json() else: response_body = response.text # Truncate very long text responses if len(response_body) > 5000: response_body = response_body[:5000] + "... [truncated]" except Exception: response_body = f"" # Build response object result = { "request": { "method": method, "url": url, "headers": request_headers, "body": body }, "response": { "status_code": response.status_code, "status_text": response.reason, "headers": dict(response.headers), "body": response_body, "size_bytes": len(response.content), "response_time_ms": response_time }, "success": 200 <= response.status_code < 300, "redirected": len(response.history) > 0, "final_url": response.url } if ctx: status_emoji = "✅" if result["success"] else "❌" await ctx.info(f"{status_emoji} {method} {url} → {response.status_code} ({response_time}ms)") return result except requests.exceptions.Timeout: error_msg = f"Request timeout after {timeout}s" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "timeout"} except requests.exceptions.ConnectionError as e: error_msg = f"Connection error: {str(e)}" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "connection_error"} except requests.exceptions.RequestException as e: error_msg = f"Request failed: {str(e)}" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "request_error"} except Exception as e: error_msg = f"HTTP request failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg, "type": "unexpected_error"} @mcp_tool(name="api_mock_server", description="Start a simple mock API server") def api_mock_server( self, port: int, routes: List[Dict[str, Any]], cors: Optional[bool] = True ) -> Dict[str, Any]: """Start mock API server""" raise NotImplementedError("api_mock_server not implemented") class ProcessTracingTools(MCPMixin): """Process tracing and system call analysis tools""" @mcp_tool( name="trace_process", description="Trace system calls and signals for process debugging" ) def trace_process( self, target: Union[int, str], action: Literal["attach", "launch", "follow"], duration: Optional[int] = 30, output_format: Optional[Literal["summary", "detailed", "json", "timeline"]] = "summary", filter_calls: Optional[List[Literal["file", "network", "process"]]] = None, exclude_calls: Optional[List[str]] = None, follow_children: Optional[bool] = False, show_timestamps: Optional[bool] = True, buffer_size: Optional[int] = 10, filter_paths: Optional[List[str]] = None, ) -> Dict[str, Any]: """Trace process system calls (cross-platform strace equivalent)""" raise NotImplementedError("trace_process not implemented") @mcp_tool(name="analyze_syscalls", description="Analyze and summarize system call traces") def analyze_syscalls( self, trace_data: str, analysis_type: Literal["file_access", "network", "performance", "errors", "overview"], group_by: Optional[Literal["call_type", "file_path", "process", "time_window"]] = None, threshold_ms: Optional[float] = None, ) -> Dict[str, Any]: """Analyze system call traces with insights""" raise NotImplementedError("analyze_syscalls not implemented") @mcp_tool( name="process_monitor", description="Real-time process monitoring with system call tracking" ) def process_monitor( self, process_pattern: Union[str, int], watch_events: List[Literal["file_access", "network", "registry", "process_creation"]], duration: Optional[int] = 60, alert_threshold: Optional[Dict[str, Any]] = None, output_format: Optional[Literal["live", "summary", "alerts_only"]] = "summary", ) -> Dict[str, Any]: """Monitor process activity in real-time""" raise NotImplementedError("process_monitor not implemented") class EnvironmentProcessManagement(MCPMixin): """Environment and process management tools""" @mcp_tool( name="environment_info", description="Get comprehensive system and environment information" ) def environment_info( self, include_sections: List[Literal["system", "python", "node", "git", "env_vars"]] ) -> Dict[str, Any]: """Get detailed environment information""" raise NotImplementedError("environment_info not implemented") @mcp_tool(name="process_tree", description="Show process hierarchy and relationships") def process_tree( self, root_pid: Optional[int] = None, include_children: Optional[bool] = True ) -> Dict[str, Any]: """Show process tree with resource usage""" raise NotImplementedError("process_tree not implemented") @mcp_tool(name="manage_virtual_env", description="Create and manage virtual environments") def manage_virtual_env( self, action: Literal["create", "activate", "deactivate", "list", "remove"], env_name: str, python_version: Optional[str] = None, ) -> Dict[str, Any]: """Manage Python virtual environments""" raise NotImplementedError("manage_virtual_env not implemented") class EnhancedExistingTools(MCPMixin): """Enhanced versions of existing tools""" @mcp_tool( name="execute_command_enhanced", description="Enhanced command execution with advanced features", ) def execute_command_enhanced( self, command: Union[str, List[str]], working_directory: Optional[str] = None, environment_vars: Optional[Dict[str, str]] = None, capture_output: Optional[Literal["all", "stdout", "stderr", "none"]] = "all", stream_callback: Optional[Any] = None, # Callback function type retry_count: Optional[int] = 0, ) -> Dict[str, Any]: """Execute command with enhanced features""" raise NotImplementedError("execute_command_enhanced not implemented") @mcp_tool( name="search_code_enhanced", description="Enhanced code search with semantic and AST support", ) def search_code_enhanced( self, query: str, directory: str, search_type: Optional[Literal["text", "semantic", "ast", "cross-reference"]] = "text", file_pattern: Optional[str] = None, save_to_history: Optional[bool] = True, ) -> List[Dict[str, Any]]: """Enhanced code search with multiple search modes""" raise NotImplementedError("search_code_enhanced not implemented") @mcp_tool( name="edit_block_enhanced", description="Enhanced block editing with multi-file support" ) def edit_block_enhanced( self, edits: List[Dict[str, Any]], rollback_support: Optional[bool] = True, template_name: Optional[str] = None, conflict_resolution: Optional[Literal["manual", "theirs", "ours", "auto"]] = "manual", ) -> Dict[str, Any]: """Enhanced edit operations with advanced features""" raise NotImplementedError("edit_block_enhanced not implemented") class UtilityTools(MCPMixin): """Utility and convenience tools""" @mcp_tool(name="generate_documentation", description="Generate documentation from code") def generate_documentation( self, source_directory: str, output_format: Literal["markdown", "html", "pdf"], include_private: Optional[bool] = False, ) -> str: """Generate documentation from source code""" raise NotImplementedError("generate_documentation not implemented") @mcp_tool(name="project_template", description="Generate project templates and boilerplate") def project_template( self, template_type: Literal[ "python-package", "react-app", "node-api", "django-app", "fastapi", "cli-tool" ], project_name: str, options: Optional[Dict[str, Any]] = None, ) -> str: """Generate project from template""" raise NotImplementedError("project_template not implemented") @mcp_tool(name="dependency_check", description="🟡 SAFE: Analyze and update project dependencies") async def dependency_check( self, project_path: str, check_security: Optional[bool] = True, suggest_updates: Optional[bool] = True, ctx: Context = None, ) -> Dict[str, Any]: """Check dependencies for updates and vulnerabilities""" try: project_path_obj = Path(project_path) if not project_path_obj.exists(): return {"error": f"Project path not found: {project_path}"} results = { "project_path": project_path, "project_type": None, "dependencies": {}, "updates_available": [], "security_issues": [], "recommendations": [], "summary": {} } # Detect project type and dependency files dependency_files = [] # Python projects pyproject_toml = project_path_obj / "pyproject.toml" requirements_txt = project_path_obj / "requirements.txt" pipfile = project_path_obj / "Pipfile" # Node.js projects package_json = project_path_obj / "package.json" if pyproject_toml.exists(): results["project_type"] = "python-pyproject" dependency_files.append(("pyproject.toml", pyproject_toml)) elif requirements_txt.exists(): results["project_type"] = "python-requirements" dependency_files.append(("requirements.txt", requirements_txt)) elif pipfile.exists(): results["project_type"] = "python-pipfile" dependency_files.append(("Pipfile", pipfile)) elif package_json.exists(): results["project_type"] = "nodejs" dependency_files.append(("package.json", package_json)) else: return {"error": "No supported dependency files found (pyproject.toml, requirements.txt, package.json)"} # Parse dependency files for file_type, file_path in dependency_files: try: if file_type == "pyproject.toml": deps = self._parse_pyproject_toml(file_path) elif file_type == "requirements.txt": deps = self._parse_requirements_txt(file_path) elif file_type == "package.json": deps = self._parse_package_json(file_path) elif file_type == "Pipfile": deps = self._parse_pipfile(file_path) else: deps = {} results["dependencies"][file_type] = deps except Exception as e: results["dependencies"][file_type] = {"error": f"Failed to parse: {str(e)}"} # Check for updates if requested if suggest_updates and results["project_type"]: if results["project_type"].startswith("python"): updates = await self._check_python_updates(project_path_obj, ctx) results["updates_available"] = updates elif results["project_type"] == "nodejs": updates = await self._check_nodejs_updates(project_path_obj, ctx) results["updates_available"] = updates # Basic security checks if check_security: security_issues = await self._check_security_issues(project_path_obj, results["project_type"], ctx) results["security_issues"] = security_issues # Generate recommendations results["recommendations"] = self._generate_recommendations(results) # Create summary total_deps = sum(len(deps) if isinstance(deps, dict) and "error" not in deps else 0 for deps in results["dependencies"].values()) results["summary"] = { "total_dependencies": total_deps, "updates_available": len(results["updates_available"]), "security_issues": len(results["security_issues"]), "project_type": results["project_type"] } if ctx: await ctx.info(f"Dependency check complete: {total_deps} deps, {len(results['updates_available'])} updates, {len(results['security_issues'])} security issues") return results except Exception as e: error_msg = f"Dependency check failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _parse_pyproject_toml(self, file_path: Path) -> Dict[str, str]: """Parse pyproject.toml for dependencies""" try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: return {"error": "tomllib/tomli not available for parsing pyproject.toml"} try: with open(file_path, 'rb') as f: data = tomllib.load(f) deps = {} # Get dependencies from different sections if 'project' in data and 'dependencies' in data['project']: for dep in data['project']['dependencies']: name = dep.split('>=')[0].split('==')[0].split('~=')[0].split('>')[0].split('<')[0].strip() deps[name] = dep if 'tool' in data and 'poetry' in data['tool'] and 'dependencies' in data['tool']['poetry']: poetry_deps = data['tool']['poetry']['dependencies'] for name, version in poetry_deps.items(): if name != 'python': deps[name] = str(version) if not isinstance(version, dict) else version.get('version', 'latest') return deps except Exception as e: return {"error": f"Failed to parse pyproject.toml: {str(e)}"} def _parse_requirements_txt(self, file_path: Path) -> Dict[str, str]: """Parse requirements.txt for dependencies""" try: deps = {} with open(file_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): name = line.split('>=')[0].split('==')[0].split('~=')[0].split('>')[0].split('<')[0].strip() deps[name] = line return deps except Exception as e: return {"error": f"Failed to parse requirements.txt: {str(e)}"} def _parse_package_json(self, file_path: Path) -> Dict[str, str]: """Parse package.json for dependencies""" try: with open(file_path, 'r') as f: data = json.load(f) deps = {} if 'dependencies' in data: deps.update(data['dependencies']) if 'devDependencies' in data: deps.update(data['devDependencies']) return deps except Exception as e: return {"error": f"Failed to parse package.json: {str(e)}"} def _parse_pipfile(self, file_path: Path) -> Dict[str, str]: """Parse Pipfile for dependencies""" try: # Simple parsing for Pipfile - would need toml parser for full support deps = {} with open(file_path, 'r') as f: content = f.read() # Basic extraction - this is simplified if '[packages]' in content: lines = content.split('[packages]')[1].split('[')[0].strip().split('\n') for line in lines: if '=' in line and line.strip(): name, version = line.split('=', 1) deps[name.strip()] = version.strip().strip('"') return deps except Exception as e: return {"error": f"Failed to parse Pipfile: {str(e)}"} async def _check_python_updates(self, project_path: Path, ctx: Context) -> List[Dict[str, Any]]: """Check for Python package updates using pip""" try: result = subprocess.run( ["python", "-m", "pip", "list", "--outdated", "--format=json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if result.returncode == 0: try: outdated = json.loads(result.stdout) return [ { "package": pkg["name"], "current_version": pkg["version"], "latest_version": pkg["latest_version"], "type": pkg.get("latest_filetype", "wheel") } for pkg in outdated ] except json.JSONDecodeError: return [] return [] except Exception: return [] async def _check_nodejs_updates(self, project_path: Path, ctx: Context) -> List[Dict[str, Any]]: """Check for Node.js package updates using npm""" try: result = subprocess.run( ["npm", "outdated", "--json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) # npm outdated returns exit code 1 when there are outdated packages if result.stdout: try: outdated = json.loads(result.stdout) return [ { "package": name, "current_version": info.get("current"), "latest_version": info.get("latest"), "wanted_version": info.get("wanted") } for name, info in outdated.items() ] except json.JSONDecodeError: return [] return [] except Exception: return [] async def _check_security_issues(self, project_path: Path, project_type: str, ctx: Context) -> List[Dict[str, Any]]: """Check for known security vulnerabilities""" issues = [] try: if project_type.startswith("python"): # Try to use pip-audit if available result = subprocess.run( ["python", "-m", "pip", "install", "pip-audit"], cwd=project_path, capture_output=True, timeout=30 ) if result.returncode == 0: audit_result = subprocess.run( ["python", "-m", "pip-audit", "--format=json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if audit_result.returncode == 0: try: audit_data = json.loads(audit_result.stdout) if audit_data: issues.extend(audit_data) except json.JSONDecodeError: pass elif project_type == "nodejs": # Try npm audit audit_result = subprocess.run( ["npm", "audit", "--json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if audit_result.stdout: try: audit_data = json.loads(audit_result.stdout) if "vulnerabilities" in audit_data: for vuln_name, vuln_info in audit_data["vulnerabilities"].items(): issues.append({ "package": vuln_name, "severity": vuln_info.get("severity", "unknown"), "description": vuln_info.get("via", [{}])[0].get("title", "Unknown vulnerability") }) except json.JSONDecodeError: pass except Exception: pass return issues def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate actionable recommendations""" recommendations = [] if results["updates_available"]: recommendations.append(f"Update {len(results['updates_available'])} outdated packages") if results["security_issues"]: critical_issues = [issue for issue in results["security_issues"] if issue.get("severity") in ["critical", "high"]] if critical_issues: recommendations.append(f"🚨 Address {len(critical_issues)} critical/high security vulnerabilities immediately") else: recommendations.append(f"Review {len(results['security_issues'])} security issues") project_type = results.get("project_type") if project_type == "python-requirements": recommendations.append("Consider migrating to pyproject.toml for better dependency management") elif project_type == "nodejs": recommendations.append("Run 'npm update' to install available updates") elif project_type and project_type.startswith("python"): recommendations.append("Run 'pip install --upgrade' for packages that need updates") if not results["updates_available"] and not results["security_issues"]: recommendations.append("✅ All dependencies are up to date and secure") return recommendations