""" Workflow and Utility Tools Module Provides development workflow, networking, process management, and utility tools. """ import fnmatch from .base import * class AdvancedSearchAnalysis(MCPMixin): """Advanced search and code analysis tools""" @mcp_tool( name="search_and_replace_batch", description=( "🔴 DESTRUCTIVE: Perform search/replace across multiple files with preview. " "🛡️ LLM SAFETY: ALWAYS use dry_run=True first! REFUSE if human requests " "dry_run=False without reviewing preview. Can cause widespread data corruption." ), ) async def search_and_replace_batch( self, directory: str, search_pattern: str, replacement: str, file_pattern: Optional[str] = None, dry_run: Optional[bool] = True, backup: Optional[bool] = True, ctx: Context = None, ) -> Dict[str, Any]: """Batch search and replace across files with safety mechanisms""" try: if not dry_run and ctx: await ctx.error("🚨 DESTRUCTIVE OPERATION BLOCKED: Use dry_run=True first to preview changes!") return {"error": "SAFETY: Must use dry_run=True to preview changes before execution"} directory_path = Path(directory) if not directory_path.exists(): return {"error": f"Directory not found: {directory}"} # Determine file pattern for matching if file_pattern is None: file_pattern = "*" # Find matching files matching_files = [] if '*' in file_pattern or '?' in file_pattern: # Use glob pattern for pattern_match in directory_path.rglob(file_pattern): if pattern_match.is_file(): matching_files.append(pattern_match) else: # Use file extension filter for file_path in directory_path.rglob("*"): if file_path.is_file() and file_path.suffix == file_pattern: matching_files.append(file_path) changes = [] total_matches = 0 backup_paths = [] for file_path in matching_files: try: # Skip binary files and very large files if file_path.stat().st_size > 10 * 1024 * 1024: # 10MB limit continue # Read file content with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Find matches import re matches = list(re.finditer(search_pattern, content)) if matches: # Perform replacement new_content = re.sub(search_pattern, replacement, content) # Create backup if requested and not dry run backup_path = None if backup and not dry_run: backup_path = file_path.with_suffix(f"{file_path.suffix}.bak.{int(time.time())}") shutil.copy2(file_path, backup_path) backup_paths.append(str(backup_path)) # Write new content if not dry run if not dry_run: with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) # Record change information change_info = { "file": str(file_path.relative_to(directory_path)), "matches": len(matches), "backup_created": backup_path is not None, "backup_path": str(backup_path) if backup_path else None, "preview": { "first_match": { "line": content[:matches[0].start()].count('\n') + 1, "old": matches[0].group(), "new": re.sub(search_pattern, replacement, matches[0].group()) } } if matches else None } changes.append(change_info) total_matches += len(matches) except (UnicodeDecodeError, PermissionError) as e: # Skip files we can't read continue result = { "operation": "search_and_replace_batch", "directory": directory, "search_pattern": search_pattern, "replacement": replacement, "file_pattern": file_pattern, "dry_run": dry_run, "backup_enabled": backup, "summary": { "files_scanned": len(matching_files), "files_with_matches": len(changes), "total_matches": total_matches, "backups_created": len(backup_paths) }, "changes": changes, "backup_paths": backup_paths } if ctx: if dry_run: await ctx.info(f"DRY RUN: Found {total_matches} matches in {len(changes)} files. Review before setting dry_run=False") else: await ctx.info(f"Replaced {total_matches} matches in {len(changes)} files with {len(backup_paths)} backups created") return result except Exception as e: error_msg = f"Search and replace batch operation failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} @mcp_tool(name="analyze_codebase", description="Generate codebase statistics and insights") async def analyze_codebase( self, directory: str, include_metrics: List[Literal["loc", "complexity", "dependencies"]], exclude_patterns: Optional[List[str]] = None, ctx: Context = None, ) -> Dict[str, Any]: """Analyze codebase and return metrics""" try: dir_path = Path(directory) if not dir_path.exists(): return {"error": f"Directory not found: {directory}"} if ctx: await ctx.info(f"Analyzing codebase: {directory}") exclude_patterns = exclude_patterns or ["*.pyc", "__pycache__", ".git", ".venv", "node_modules"] def should_exclude(path: Path) -> bool: for pattern in exclude_patterns: if fnmatch.fnmatch(path.name, pattern) or fnmatch.fnmatch(str(path), pattern): return True return False stats = { "directory": directory, "timestamp": datetime.now().isoformat(), "metrics": {}, "files_analyzed": [], "summary": {} } # Collect files files = [] for file_path in dir_path.rglob("*"): if file_path.is_file() and not should_exclude(file_path): files.append(file_path) stats["summary"]["total_files"] = len(files) # LOC metrics if "loc" in include_metrics: total_lines = 0 file_types = {} for file_path in files: try: if file_path.suffix: ext = file_path.suffix.lower() if ext in ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.go', '.rs', '.rb']: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = len(f.readlines()) total_lines += lines if ext not in file_types: file_types[ext] = {"files": 0, "lines": 0} file_types[ext]["files"] += 1 file_types[ext]["lines"] += lines stats["files_analyzed"].append({ "path": str(file_path.relative_to(dir_path)), "extension": ext, "lines": lines }) except Exception: continue stats["metrics"]["loc"] = { "total_lines": total_lines, "file_types": file_types } # Complexity metrics (basic implementation) if "complexity" in include_metrics: stats["metrics"]["complexity"] = { "note": "Basic complexity analysis - full implementation pending", "average_file_size": sum(len(stats["files_analyzed"])) // max(len(files), 1) } # Dependencies metrics (basic implementation) if "dependencies" in include_metrics: deps = {"package_files": []} for file_path in files: if file_path.name in ["requirements.txt", "package.json", "Cargo.toml", "go.mod", "pyproject.toml"]: deps["package_files"].append(str(file_path.relative_to(dir_path))) stats["metrics"]["dependencies"] = deps if ctx: await ctx.info(f"Analysis complete: {len(files)} files analyzed") return stats except Exception as e: if ctx: await ctx.error(f"Codebase analysis failed: {str(e)}") return {"error": str(e)} @mcp_tool(name="find_duplicates", description="Detect duplicate code or files") def find_duplicates( self, directory: str, similarity_threshold: Optional[float] = 80.0, file_types: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """Find duplicate code segments or files""" raise NotImplementedError("find_duplicates not implemented") class DevelopmentWorkflow(MCPMixin): """Development workflow automation tools""" @mcp_tool( name="run_tests", description="🟡 SAFE: Execute test suites with intelligent framework detection" ) async def run_tests( self, test_path: str, framework: Optional[Literal["pytest", "jest", "mocha", "auto-detect"]] = "auto-detect", pattern: Optional[str] = None, coverage: Optional[bool] = False, ctx: Context = None, ) -> Dict[str, Any]: """Run tests and return results with coverage information""" try: test_path_obj = Path(test_path) if not test_path_obj.exists(): return {"error": f"Test path not found: {test_path}"} # Auto-detect framework if needed detected_framework = framework if framework == "auto-detect": # Check for Python test files and pytest if any(test_path_obj.rglob("test_*.py")) or any(test_path_obj.rglob("*_test.py")): detected_framework = "pytest" # Check for JavaScript test files elif any(test_path_obj.rglob("*.test.js")) or any(test_path_obj.rglob("*.spec.js")): detected_framework = "jest" elif test_path_obj.is_file() and test_path_obj.suffix == ".js": detected_framework = "mocha" else: # Default to pytest for directories detected_framework = "pytest" # Build command based on framework cmd = [] env_vars = os.environ.copy() if detected_framework == "pytest": cmd = ["python", "-m", "pytest"] if coverage: cmd.extend(["--cov", str(test_path_obj.parent if test_path_obj.is_file() else test_path_obj)]) cmd.extend(["--cov-report", "term-missing"]) if pattern: cmd.extend(["-k", pattern]) cmd.append(str(test_path_obj)) cmd.extend(["-v", "--tb=short"]) elif detected_framework == "jest": cmd = ["npx", "jest"] if coverage: cmd.append("--coverage") if pattern: cmd.extend(["--testNamePattern", pattern]) cmd.append(str(test_path_obj)) cmd.extend(["--verbose"]) elif detected_framework == "mocha": cmd = ["npx", "mocha"] if pattern: cmd.extend(["--grep", pattern]) cmd.append(str(test_path_obj)) cmd.append("--reporter") cmd.append("json") else: return {"error": f"Unsupported test framework: {detected_framework}"} # Run the tests start_time = time.time() result = subprocess.run( cmd, cwd=test_path_obj.parent if test_path_obj.is_file() else test_path_obj, capture_output=True, text=True, env=env_vars, timeout=300 # 5 minute timeout ) end_time = time.time() duration = round(end_time - start_time, 2) # Parse results based on framework test_results = { "framework": detected_framework, "command": " ".join(cmd), "exit_code": result.returncode, "duration_seconds": duration, "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, } # Parse output for specific metrics if detected_framework == "pytest": # Parse pytest output stdout = result.stdout failed_pattern = r"(\d+) failed" passed_pattern = r"(\d+) passed" failed_match = re.search(failed_pattern, stdout) passed_match = re.search(passed_pattern, stdout) test_results.update({ "tests_passed": int(passed_match.group(1)) if passed_match else 0, "tests_failed": int(failed_match.group(1)) if failed_match else 0, "coverage_info": self._extract_coverage_info(stdout) if coverage else None }) elif detected_framework in ["jest", "mocha"]: # Basic parsing for JavaScript frameworks test_results.update({ "tests_passed": stdout.count("✓") if "✓" in stdout else 0, "tests_failed": stdout.count("✗") if "✗" in stdout else 0, }) # Summary total_tests = test_results.get("tests_passed", 0) + test_results.get("tests_failed", 0) test_results["total_tests"] = total_tests test_results["pass_rate"] = round((test_results.get("tests_passed", 0) / max(total_tests, 1)) * 100, 1) if ctx: status_emoji = "✅" if test_results["success"] else "❌" await ctx.info(f"{status_emoji} Tests completed: {test_results['tests_passed']}/{total_tests} passed ({duration}s)") return test_results except subprocess.TimeoutExpired: error_msg = "Test execution timed out after 5 minutes" if ctx: await ctx.error(error_msg) return {"error": error_msg} except FileNotFoundError: error_msg = f"Test framework '{detected_framework}' not found in PATH" if ctx: await ctx.error(error_msg) return {"error": error_msg, "suggestion": f"Install {detected_framework} or check PATH"} except Exception as e: error_msg = f"Test execution failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _extract_coverage_info(self, stdout: str) -> Optional[Dict[str, Any]]: """Extract coverage information from pytest output""" try: # Look for coverage summary line lines = stdout.split('\n') for line in lines: if "TOTAL" in line and "%" in line: parts = line.split() for part in parts: if part.endswith('%'): return { "total_coverage": part, "raw_line": line.strip() } return None except Exception: return None @mcp_tool(name="lint_code", description="Run code linting with multiple linters") def lint_code( self, file_paths: List[str], linters: Optional[List[str]] = None, fix: Optional[bool] = False, ) -> Dict[str, Any]: """Lint code and optionally fix issues""" raise NotImplementedError("lint_code not implemented") @mcp_tool(name="format_code", description="Auto-format code using standard formatters") def format_code( self, file_paths: List[str], formatter: Optional[ Literal["prettier", "black", "autopep8", "auto-detect"] ] = "auto-detect", config_file: Optional[str] = None, ) -> List[str]: """Format code files""" raise NotImplementedError("format_code not implemented") class NetworkAPITools(MCPMixin): """Network and API testing tools""" @mcp_tool(name="http_request", description="🟡 SAFE: Make HTTP requests for API testing") async def http_request( self, url: str, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], headers: Optional[Dict[str, str]] = None, body: Optional[Union[str, Dict[str, Any]]] = None, timeout: Optional[int] = 30, ctx: Context = None, ) -> Dict[str, Any]: """Make HTTP request and return detailed response information""" try: if requests is None: return { "error": "requests library not available", "install": "pip install requests" } # Prepare headers request_headers = headers or {} # Prepare body based on type request_data = None request_json = None if body is not None: if isinstance(body, dict): request_json = body if 'Content-Type' not in request_headers: request_headers['Content-Type'] = 'application/json' else: request_data = body if 'Content-Type' not in request_headers: request_headers['Content-Type'] = 'text/plain' # Make the request start_time = time.time() response = requests.request( method=method, url=url, headers=request_headers, data=request_data, json=request_json, timeout=timeout, allow_redirects=True ) end_time = time.time() response_time = round((end_time - start_time) * 1000, 2) # ms # Parse response body safely response_body = None content_type = response.headers.get('Content-Type', '').lower() try: if 'application/json' in content_type: response_body = response.json() else: response_body = response.text # Truncate very long text responses if len(response_body) > 5000: response_body = response_body[:5000] + "... [truncated]" except Exception: response_body = f"" # Build response object result = { "request": { "method": method, "url": url, "headers": request_headers, "body": body }, "response": { "status_code": response.status_code, "status_text": response.reason, "headers": dict(response.headers), "body": response_body, "size_bytes": len(response.content), "response_time_ms": response_time }, "success": 200 <= response.status_code < 300, "redirected": len(response.history) > 0, "final_url": response.url } if ctx: status_emoji = "✅" if result["success"] else "❌" await ctx.info(f"{status_emoji} {method} {url} → {response.status_code} ({response_time}ms)") return result except requests.exceptions.Timeout: error_msg = f"Request timeout after {timeout}s" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "timeout"} except requests.exceptions.ConnectionError as e: error_msg = f"Connection error: {str(e)}" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "connection_error"} except requests.exceptions.RequestException as e: error_msg = f"Request failed: {str(e)}" if ctx: await ctx.error(error_msg) return {"error": error_msg, "type": "request_error"} except Exception as e: error_msg = f"HTTP request failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg, "type": "unexpected_error"} @mcp_tool(name="api_mock_server", description="Start a simple mock API server") def api_mock_server( self, port: int, routes: List[Dict[str, Any]], cors: Optional[bool] = True ) -> Dict[str, Any]: """Start mock API server""" raise NotImplementedError("api_mock_server not implemented") class ProcessTracingTools(MCPMixin): """Process tracing and system call analysis tools""" @mcp_tool( name="trace_process", description="Trace system calls and signals for process debugging" ) def trace_process( self, target: Union[int, str], action: Literal["attach", "launch", "follow"], duration: Optional[int] = 30, output_format: Optional[Literal["summary", "detailed", "json", "timeline"]] = "summary", filter_calls: Optional[List[Literal["file", "network", "process"]]] = None, exclude_calls: Optional[List[str]] = None, follow_children: Optional[bool] = False, show_timestamps: Optional[bool] = True, buffer_size: Optional[int] = 10, filter_paths: Optional[List[str]] = None, ) -> Dict[str, Any]: """Trace process system calls (cross-platform strace equivalent)""" raise NotImplementedError("trace_process not implemented") @mcp_tool(name="analyze_syscalls", description="Analyze and summarize system call traces") def analyze_syscalls( self, trace_data: str, analysis_type: Literal["file_access", "network", "performance", "errors", "overview"], group_by: Optional[Literal["call_type", "file_path", "process", "time_window"]] = None, threshold_ms: Optional[float] = None, ) -> Dict[str, Any]: """Analyze system call traces with insights""" raise NotImplementedError("analyze_syscalls not implemented") @mcp_tool( name="process_monitor", description="Real-time process monitoring with system call tracking" ) def process_monitor( self, process_pattern: Union[str, int], watch_events: List[Literal["file_access", "network", "registry", "process_creation"]], duration: Optional[int] = 60, alert_threshold: Optional[Dict[str, Any]] = None, output_format: Optional[Literal["live", "summary", "alerts_only"]] = "summary", ) -> Dict[str, Any]: """Monitor process activity in real-time""" raise NotImplementedError("process_monitor not implemented") class EnvironmentProcessManagement(MCPMixin): """Environment and process management tools""" @mcp_tool( name="environment_info", description="Get comprehensive system and environment information" ) def environment_info( self, include_sections: List[Literal["system", "python", "node", "git", "env_vars"]] ) -> Dict[str, Any]: """Get detailed environment information""" raise NotImplementedError("environment_info not implemented") @mcp_tool(name="process_tree", description="Show process hierarchy and relationships") def process_tree( self, root_pid: Optional[int] = None, include_children: Optional[bool] = True ) -> Dict[str, Any]: """Show process tree with resource usage""" raise NotImplementedError("process_tree not implemented") @mcp_tool(name="manage_virtual_env", description="Create and manage virtual environments") def manage_virtual_env( self, action: Literal["create", "activate", "deactivate", "list", "remove"], env_name: str, python_version: Optional[str] = None, ) -> Dict[str, Any]: """Manage Python virtual environments""" raise NotImplementedError("manage_virtual_env not implemented") class EnhancedExistingTools(MCPMixin): """Enhanced versions of existing tools""" @mcp_tool( name="execute_command_enhanced", description="Enhanced command execution with advanced features", ) def execute_command_enhanced( self, command: Union[str, List[str]], working_directory: Optional[str] = None, environment_vars: Optional[Dict[str, str]] = None, capture_output: Optional[Literal["all", "stdout", "stderr", "none"]] = "all", stream_callback: Optional[Any] = None, # Callback function type retry_count: Optional[int] = 0, ) -> Dict[str, Any]: """Execute command with enhanced features""" raise NotImplementedError("execute_command_enhanced not implemented") @mcp_tool( name="search_code_enhanced", description="Enhanced code search with semantic and AST support", ) def search_code_enhanced( self, query: str, directory: str, search_type: Optional[Literal["text", "semantic", "ast", "cross-reference"]] = "text", file_pattern: Optional[str] = None, save_to_history: Optional[bool] = True, ) -> List[Dict[str, Any]]: """Enhanced code search with multiple search modes""" raise NotImplementedError("search_code_enhanced not implemented") @mcp_tool( name="edit_block_enhanced", description="Enhanced block editing with multi-file support" ) def edit_block_enhanced( self, edits: List[Dict[str, Any]], rollback_support: Optional[bool] = True, template_name: Optional[str] = None, conflict_resolution: Optional[Literal["manual", "theirs", "ours", "auto"]] = "manual", ) -> Dict[str, Any]: """Enhanced edit operations with advanced features""" raise NotImplementedError("edit_block_enhanced not implemented") class UtilityTools(MCPMixin): """Utility and convenience tools""" @mcp_tool(name="generate_documentation", description="Generate documentation from code") def generate_documentation( self, source_directory: str, output_format: Literal["markdown", "html", "pdf"], include_private: Optional[bool] = False, ) -> str: """Generate documentation from source code""" raise NotImplementedError("generate_documentation not implemented") @mcp_tool(name="project_template", description="Generate project templates and boilerplate") def project_template( self, template_type: Literal[ "python-package", "react-app", "node-api", "django-app", "fastapi", "cli-tool" ], project_name: str, options: Optional[Dict[str, Any]] = None, ) -> str: """Generate project from template""" raise NotImplementedError("project_template not implemented") @mcp_tool(name="dependency_check", description="🟡 SAFE: Analyze and update project dependencies") async def dependency_check( self, project_path: str, check_security: Optional[bool] = True, suggest_updates: Optional[bool] = True, ctx: Context = None, ) -> Dict[str, Any]: """Check dependencies for updates and vulnerabilities""" try: project_path_obj = Path(project_path) if not project_path_obj.exists(): return {"error": f"Project path not found: {project_path}"} results = { "project_path": project_path, "project_type": None, "dependencies": {}, "updates_available": [], "security_issues": [], "recommendations": [], "summary": {} } # Detect project type and dependency files dependency_files = [] # Python projects pyproject_toml = project_path_obj / "pyproject.toml" requirements_txt = project_path_obj / "requirements.txt" pipfile = project_path_obj / "Pipfile" # Node.js projects package_json = project_path_obj / "package.json" if pyproject_toml.exists(): results["project_type"] = "python-pyproject" dependency_files.append(("pyproject.toml", pyproject_toml)) elif requirements_txt.exists(): results["project_type"] = "python-requirements" dependency_files.append(("requirements.txt", requirements_txt)) elif pipfile.exists(): results["project_type"] = "python-pipfile" dependency_files.append(("Pipfile", pipfile)) elif package_json.exists(): results["project_type"] = "nodejs" dependency_files.append(("package.json", package_json)) else: return {"error": "No supported dependency files found (pyproject.toml, requirements.txt, package.json)"} # Parse dependency files for file_type, file_path in dependency_files: try: if file_type == "pyproject.toml": deps = self._parse_pyproject_toml(file_path) elif file_type == "requirements.txt": deps = self._parse_requirements_txt(file_path) elif file_type == "package.json": deps = self._parse_package_json(file_path) elif file_type == "Pipfile": deps = self._parse_pipfile(file_path) else: deps = {} results["dependencies"][file_type] = deps except Exception as e: results["dependencies"][file_type] = {"error": f"Failed to parse: {str(e)}"} # Check for updates if requested if suggest_updates and results["project_type"]: if results["project_type"].startswith("python"): updates = await self._check_python_updates(project_path_obj, ctx) results["updates_available"] = updates elif results["project_type"] == "nodejs": updates = await self._check_nodejs_updates(project_path_obj, ctx) results["updates_available"] = updates # Basic security checks if check_security: security_issues = await self._check_security_issues(project_path_obj, results["project_type"], ctx) results["security_issues"] = security_issues # Generate recommendations results["recommendations"] = self._generate_recommendations(results) # Create summary total_deps = sum(len(deps) if isinstance(deps, dict) and "error" not in deps else 0 for deps in results["dependencies"].values()) results["summary"] = { "total_dependencies": total_deps, "updates_available": len(results["updates_available"]), "security_issues": len(results["security_issues"]), "project_type": results["project_type"] } if ctx: await ctx.info(f"Dependency check complete: {total_deps} deps, {len(results['updates_available'])} updates, {len(results['security_issues'])} security issues") return results except Exception as e: error_msg = f"Dependency check failed: {str(e)}" if ctx: await self.log_critical(error_msg, exception=e, ctx=ctx) return {"error": error_msg} def _parse_pyproject_toml(self, file_path: Path) -> Dict[str, str]: """Parse pyproject.toml for dependencies""" try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: return {"error": "tomllib/tomli not available for parsing pyproject.toml"} try: with open(file_path, 'rb') as f: data = tomllib.load(f) deps = {} # Get dependencies from different sections if 'project' in data and 'dependencies' in data['project']: for dep in data['project']['dependencies']: name = dep.split('>=')[0].split('==')[0].split('~=')[0].split('>')[0].split('<')[0].strip() deps[name] = dep if 'tool' in data and 'poetry' in data['tool'] and 'dependencies' in data['tool']['poetry']: poetry_deps = data['tool']['poetry']['dependencies'] for name, version in poetry_deps.items(): if name != 'python': deps[name] = str(version) if not isinstance(version, dict) else version.get('version', 'latest') return deps except Exception as e: return {"error": f"Failed to parse pyproject.toml: {str(e)}"} def _parse_requirements_txt(self, file_path: Path) -> Dict[str, str]: """Parse requirements.txt for dependencies""" try: deps = {} with open(file_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): name = line.split('>=')[0].split('==')[0].split('~=')[0].split('>')[0].split('<')[0].strip() deps[name] = line return deps except Exception as e: return {"error": f"Failed to parse requirements.txt: {str(e)}"} def _parse_package_json(self, file_path: Path) -> Dict[str, str]: """Parse package.json for dependencies""" try: with open(file_path, 'r') as f: data = json.load(f) deps = {} if 'dependencies' in data: deps.update(data['dependencies']) if 'devDependencies' in data: deps.update(data['devDependencies']) return deps except Exception as e: return {"error": f"Failed to parse package.json: {str(e)}"} def _parse_pipfile(self, file_path: Path) -> Dict[str, str]: """Parse Pipfile for dependencies""" try: # Simple parsing for Pipfile - would need toml parser for full support deps = {} with open(file_path, 'r') as f: content = f.read() # Basic extraction - this is simplified if '[packages]' in content: lines = content.split('[packages]')[1].split('[')[0].strip().split('\n') for line in lines: if '=' in line and line.strip(): name, version = line.split('=', 1) deps[name.strip()] = version.strip().strip('"') return deps except Exception as e: return {"error": f"Failed to parse Pipfile: {str(e)}"} async def _check_python_updates(self, project_path: Path, ctx: Context) -> List[Dict[str, Any]]: """Check for Python package updates using pip""" try: result = subprocess.run( ["python", "-m", "pip", "list", "--outdated", "--format=json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if result.returncode == 0: try: outdated = json.loads(result.stdout) return [ { "package": pkg["name"], "current_version": pkg["version"], "latest_version": pkg["latest_version"], "type": pkg.get("latest_filetype", "wheel") } for pkg in outdated ] except json.JSONDecodeError: return [] return [] except Exception: return [] async def _check_nodejs_updates(self, project_path: Path, ctx: Context) -> List[Dict[str, Any]]: """Check for Node.js package updates using npm""" try: result = subprocess.run( ["npm", "outdated", "--json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) # npm outdated returns exit code 1 when there are outdated packages if result.stdout: try: outdated = json.loads(result.stdout) return [ { "package": name, "current_version": info.get("current"), "latest_version": info.get("latest"), "wanted_version": info.get("wanted") } for name, info in outdated.items() ] except json.JSONDecodeError: return [] return [] except Exception: return [] async def _check_security_issues(self, project_path: Path, project_type: str, ctx: Context) -> List[Dict[str, Any]]: """Check for known security vulnerabilities""" issues = [] try: if project_type.startswith("python"): # Try to use pip-audit if available result = subprocess.run( ["python", "-m", "pip", "install", "pip-audit"], cwd=project_path, capture_output=True, timeout=30 ) if result.returncode == 0: audit_result = subprocess.run( ["python", "-m", "pip-audit", "--format=json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if audit_result.returncode == 0: try: audit_data = json.loads(audit_result.stdout) if audit_data: issues.extend(audit_data) except json.JSONDecodeError: pass elif project_type == "nodejs": # Try npm audit audit_result = subprocess.run( ["npm", "audit", "--json"], cwd=project_path, capture_output=True, text=True, timeout=60 ) if audit_result.stdout: try: audit_data = json.loads(audit_result.stdout) if "vulnerabilities" in audit_data: for vuln_name, vuln_info in audit_data["vulnerabilities"].items(): issues.append({ "package": vuln_name, "severity": vuln_info.get("severity", "unknown"), "description": vuln_info.get("via", [{}])[0].get("title", "Unknown vulnerability") }) except json.JSONDecodeError: pass except Exception: pass return issues def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate actionable recommendations""" recommendations = [] if results["updates_available"]: recommendations.append(f"Update {len(results['updates_available'])} outdated packages") if results["security_issues"]: critical_issues = [issue for issue in results["security_issues"] if issue.get("severity") in ["critical", "high"]] if critical_issues: recommendations.append(f"🚨 Address {len(critical_issues)} critical/high security vulnerabilities immediately") else: recommendations.append(f"Review {len(results['security_issues'])} security issues") project_type = results.get("project_type") if project_type == "python-requirements": recommendations.append("Consider migrating to pyproject.toml for better dependency management") elif project_type == "nodejs": recommendations.append("Run 'npm update' to install available updates") elif project_type and project_type.startswith("python"): recommendations.append("Run 'pip install --upgrade' for packages that need updates") if not results["updates_available"] and not results["security_issues"]: recommendations.append("✅ All dependencies are up to date and secure") return recommendations