diff --git a/poetry.lock b/poetry.lock index ebd88d3..95aae06 100644 --- a/poetry.lock +++ b/poetry.lock @@ -691,6 +691,21 @@ rich = ">=13.9.4" [package.extras] websockets = ["websockets (>=15.0.1)"] +[[package]] +name = "feedparser" +version = "6.0.11" +description = "Universal feed parser, handles RSS 0.9x, RSS 1.0, RSS 2.0, CDF, Atom 0.3, and Atom 1.0 feeds" +optional = false +python-versions = ">=3.6" +groups = ["main"] +files = [ + {file = "feedparser-6.0.11-py3-none-any.whl", hash = "sha256:0be7ee7b395572b19ebeb1d6aafb0028dee11169f1c934e0ed67d54992f4ad45"}, + {file = "feedparser-6.0.11.tar.gz", hash = "sha256:c9d0407b64c6f2a065d0ebb292c2b35c01050cc0dc33757461aaabdc4c4184d5"}, +] + +[package.dependencies] +sgmllib3k = "*" + [[package]] name = "filelock" version = "3.19.1" @@ -1994,6 +2009,17 @@ files = [ {file = "ruff-0.12.9.tar.gz", hash = "sha256:fbd94b2e3c623f659962934e52c2bea6fc6da11f667a427a368adaf3af2c866a"}, ] +[[package]] +name = "sgmllib3k" +version = "1.0.0" +description = "Py3k port of sgmllib." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "sgmllib3k-1.0.0.tar.gz", hash = "sha256:7868fb1c8bfa764c1ac563d3cf369c381d1325d36124933a726f29fcdaa812e9"}, +] + [[package]] name = "six" version = "1.17.0" @@ -2250,4 +2276,4 @@ watchdog = ["watchdog (>=2.3)"] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "9785e18d2d996f5e58e1b06c722f6de31c445a1a83528f39227d1c373b91f989" +content-hash = "13bc4176d567d6738ca9ca5ebd67565f8526853434911137f4b51b39e275a546" diff --git a/pypi_query_mcp/core/search_client.py b/pypi_query_mcp/core/search_client.py index af9a8be..a861aba 100644 --- a/pypi_query_mcp/core/search_client.py +++ b/pypi_query_mcp/core/search_client.py @@ -126,20 +126,42 @@ class PyPISearchClient: try: # Use PyPI's search API as the primary source - pypi_results = await self._search_pypi_api(query, limit * 3) # Get more for filtering + try: + pypi_results = await self._search_pypi_api(query, limit * 3) # Get more for filtering + logger.info(f"Got {len(pypi_results)} raw results from PyPI API") + except Exception as e: + logger.error(f"PyPI API search failed: {e}") + pypi_results = [] # Enhance results with additional metadata - enhanced_results = await self._enhance_search_results(pypi_results) + try: + enhanced_results = await self._enhance_search_results(pypi_results) + logger.info(f"Enhanced to {len(enhanced_results)} results") + except Exception as e: + logger.error(f"Enhancement failed: {e}") + enhanced_results = pypi_results # Apply filters - filtered_results = self._apply_filters(enhanced_results, filters) + try: + filtered_results = self._apply_filters(enhanced_results, filters) + logger.info(f"Filtered to {len(filtered_results)} results") + except Exception as e: + logger.error(f"Filtering failed: {e}") + filtered_results = enhanced_results # Apply semantic search if requested if semantic_search: - filtered_results = self._apply_semantic_search(filtered_results, query) + try: + filtered_results = self._apply_semantic_search(filtered_results, query) + except Exception as e: + logger.error(f"Semantic search failed: {e}") # Sort results - sorted_results = self._sort_results(filtered_results, sort) + try: + sorted_results = self._sort_results(filtered_results, sort) + except Exception as e: + logger.error(f"Sorting failed: {e}") + sorted_results = filtered_results # Limit results final_results = sorted_results[:limit] @@ -161,72 +183,318 @@ class PyPISearchClient: raise SearchError(f"Search failed: {e}") from e async def _search_pypi_api(self, query: str, limit: int) -> List[Dict[str, Any]]: - """Search using PyPI's official search API.""" - url = "https://pypi.org/search/" - params = { - "q": query, - "page": 1, - } + """Search using available PyPI methods - no native search API exists.""" + logger.info(f"PyPI has no native search API, using curated search for: '{query}'") - async with httpx.AsyncClient(timeout=self.timeout) as client: - try: - response = await client.get(url, params=params) - response.raise_for_status() - - # Parse the HTML response (PyPI search returns HTML) - return await self._parse_search_html(response.text, limit) - - except httpx.HTTPError as e: - logger.error(f"PyPI search API error: {e}") - # Fallback to alternative search method - return await self._fallback_search(query, limit) + # PyPI doesn't have a search API, so we'll use our curated approach + # combined with direct package lookups for exact matches + results = [] + + # First: try direct package lookup (exact match) + try: + direct_result = await self._try_direct_package_lookup(query) + if direct_result: + results.extend(direct_result) + except Exception as e: + logger.debug(f"Direct lookup failed: {e}") + + # Second: search curated packages + try: + curated_results = await self._search_curated_packages(query, limit) + # Add curated results that aren't already in the list + existing_names = {r["name"].lower() for r in results} + for result in curated_results: + if result["name"].lower() not in existing_names: + results.append(result) + except Exception as e: + logger.error(f"Curated search failed: {e}") + + return results[:limit] - async def _fallback_search(self, query: str, limit: int) -> List[Dict[str, Any]]: - """Fallback search using PyPI JSON API and our curated data.""" - from ..data.popular_packages import PACKAGES_BY_NAME, get_popular_packages + async def _try_direct_package_lookup(self, query: str) -> List[Dict[str, Any]]: + """Try to get package info directly using PyPI JSON API.""" + candidates = [ + query.strip(), + query.strip().lower(), + query.strip().replace(" ", "-"), + query.strip().replace(" ", "_"), + query.strip().replace("_", "-"), + query.strip().replace("-", "_"), + ] + + results = [] + for candidate in candidates: + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(candidate) + + results.append({ + "name": package_data["info"]["name"], + "summary": package_data["info"]["summary"] or "", + "version": package_data["info"]["version"], + "source": "direct_api", + "description": package_data["info"]["description"] or "", + "author": package_data["info"]["author"] or "", + "license": package_data["info"]["license"] or "", + "home_page": package_data["info"]["home_page"] or "", + "requires_python": package_data["info"]["requires_python"] or "", + "classifiers": package_data["info"]["classifiers"] or [], + "keywords": package_data["info"]["keywords"] or "", + }) + break # Found exact match, stop looking + + except Exception: + continue # Try next candidate + + return results + + async def _search_curated_packages(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Search our curated package database.""" + from ..data.popular_packages import ALL_POPULAR_PACKAGES - # Search in our curated packages first curated_matches = [] query_lower = query.lower() - for package_info in get_popular_packages(limit=1000): - name_match = query_lower in package_info.name.lower() - desc_match = query_lower in package_info.description.lower() - - if name_match or desc_match: + logger.info(f"Searching {len(ALL_POPULAR_PACKAGES)} curated packages for '{query}'") + + # First: exact name matches + for pkg in ALL_POPULAR_PACKAGES: + if query_lower == pkg.name.lower(): curated_matches.append({ - "name": package_info.name, - "summary": package_info.description, - "version": "unknown", - "source": "curated", - "category": package_info.category, - "estimated_downloads": package_info.estimated_monthly_downloads, + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_exact", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + "primary_use_case": pkg.primary_use_case, }) - # If we have some matches, return them - if curated_matches: - return curated_matches[:limit] + # Second: name contains query (if not too many exact matches) + if len(curated_matches) < limit: + for pkg in ALL_POPULAR_PACKAGES: + if (query_lower in pkg.name.lower() and + pkg.name not in [m["name"] for m in curated_matches]): + curated_matches.append({ + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_name", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + "primary_use_case": pkg.primary_use_case, + }) - # Last resort: try simple package name search + # Third: description or use case matches (if still need more results) + if len(curated_matches) < limit: + for pkg in ALL_POPULAR_PACKAGES: + if ((query_lower in pkg.description.lower() or + query_lower in pkg.primary_use_case.lower()) and + pkg.name not in [m["name"] for m in curated_matches]): + curated_matches.append({ + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_desc", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + "primary_use_case": pkg.primary_use_case, + }) + + # Sort by popularity (downloads) + curated_matches.sort(key=lambda x: x.get("estimated_downloads", 0), reverse=True) + + logger.info(f"Found {len(curated_matches)} curated matches") + return curated_matches[:limit] + + async def _fallback_search(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Fallback search using PyPI JSON API and our curated data.""" try: - async with PyPIClient() as client: - # Try to get the package directly if it's an exact match - try: + from ..data.popular_packages import PACKAGES_BY_NAME, get_popular_packages, ALL_POPULAR_PACKAGES + + # Search in our curated packages first + curated_matches = [] + query_lower = query.lower() + + logger.info(f"Searching in {len(ALL_POPULAR_PACKAGES)} curated packages for '{query}'") + + # First: exact name matches + for package_info in ALL_POPULAR_PACKAGES: + if query_lower == package_info.name.lower(): + curated_matches.append({ + "name": package_info.name, + "summary": package_info.description, + "version": "latest", + "source": "curated_exact", + "category": package_info.category, + "estimated_downloads": package_info.estimated_monthly_downloads, + "github_stars": package_info.github_stars, + }) + + # Second: name contains query + for package_info in ALL_POPULAR_PACKAGES: + if (query_lower in package_info.name.lower() and + package_info.name not in [m["name"] for m in curated_matches]): + curated_matches.append({ + "name": package_info.name, + "summary": package_info.description, + "version": "latest", + "source": "curated_name", + "category": package_info.category, + "estimated_downloads": package_info.estimated_monthly_downloads, + "github_stars": package_info.github_stars, + }) + + # Third: description or use case matches + for package_info in ALL_POPULAR_PACKAGES: + if ((query_lower in package_info.description.lower() or + query_lower in package_info.primary_use_case.lower()) and + package_info.name not in [m["name"] for m in curated_matches]): + curated_matches.append({ + "name": package_info.name, + "summary": package_info.description, + "version": "latest", + "source": "curated_desc", + "category": package_info.category, + "estimated_downloads": package_info.estimated_monthly_downloads, + "github_stars": package_info.github_stars, + }) + + logger.info(f"Found {len(curated_matches)} curated matches") + + # If we have some matches, return them (sorted by popularity) + if curated_matches: + curated_matches.sort(key=lambda x: x.get("estimated_downloads", 0), reverse=True) + return curated_matches[:limit] + + # Last resort: try direct package lookup + logger.info("No curated matches, trying direct package lookup") + try: + async with PyPIClient() as client: package_data = await client.get_package_info(query) return [{ "name": package_data["info"]["name"], "summary": package_data["info"]["summary"] or "", "version": package_data["info"]["version"], - "source": "direct", + "source": "direct_fallback", + "description": package_data["info"]["description"] or "", + "author": package_data["info"]["author"] or "", }] - except: - pass + except Exception as e: + logger.info(f"Direct lookup failed: {e}") except Exception as e: - logger.warning(f"Fallback search failed: {e}") + logger.error(f"Fallback search failed: {e}") return [] + async def _search_xmlrpc(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Search using enhanced curated search with fuzzy matching.""" + # Since PyPI XML-RPC search is deprecated, use our enhanced curated search + try: + from ..data.popular_packages import get_popular_packages, ALL_POPULAR_PACKAGES + + query_lower = query.lower() + results = [] + + # First pass: exact name matches + for pkg in ALL_POPULAR_PACKAGES: + if query_lower == pkg.name.lower(): + results.append({ + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_exact", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + }) + + # Second pass: name contains query + for pkg in ALL_POPULAR_PACKAGES: + if query_lower in pkg.name.lower() and pkg.name not in [r["name"] for r in results]: + results.append({ + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_name", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + }) + + # Third pass: description contains query + for pkg in ALL_POPULAR_PACKAGES: + if (query_lower in pkg.description.lower() or + query_lower in pkg.primary_use_case.lower()) and pkg.name not in [r["name"] for r in results]: + results.append({ + "name": pkg.name, + "summary": pkg.description, + "version": "latest", + "source": "curated_desc", + "category": pkg.category, + "estimated_downloads": pkg.estimated_monthly_downloads, + "github_stars": pkg.github_stars, + }) + + # Sort by popularity (downloads) + results.sort(key=lambda x: x.get("estimated_downloads", 0), reverse=True) + + return results[:limit] + + except Exception as e: + logger.debug(f"Enhanced curated search error: {e}") + + return [] + + async def _search_simple_api(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Search using direct PyPI JSON API for specific packages.""" + try: + # Try direct package lookup if query looks like a package name + query_clean = query.strip().lower().replace(" ", "-") + + # Try variations of the query as package names + candidates = [ + query_clean, + query_clean.replace("-", "_"), + query_clean.replace("_", "-"), + query.strip(), # Original query + ] + + results = [] + + for candidate in candidates: + if len(results) >= limit: + break + + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(candidate) + + results.append({ + "name": package_data["info"]["name"], + "summary": package_data["info"]["summary"] or "", + "version": package_data["info"]["version"], + "source": "direct_api", + "description": package_data["info"]["description"] or "", + "author": package_data["info"]["author"] or "", + "license": package_data["info"]["license"] or "", + }) + + except Exception: + # Package doesn't exist, continue to next candidate + continue + + return results + + except Exception as e: + logger.debug(f"Simple API search error: {e}") + + return [] + async def _parse_search_html(self, html: str, limit: int) -> List[Dict[str, Any]]: """Parse PyPI search results from HTML (simplified parser).""" # This is a simplified parser - in production, you'd use BeautifulSoup @@ -237,9 +505,19 @@ class PyPISearchClient: """Enhance search results with additional metadata from PyPI API.""" enhanced = [] - # Process in batches to avoid overwhelming the API - batch_size = 5 - for i in range(0, len(results), batch_size): + # Skip enhancement if results already have good metadata from curated source + if results and results[0].get("source", "").startswith("curated"): + logger.info("Using curated results without enhancement") + return results + + # For direct API results, they're already enhanced + if results and results[0].get("source") == "direct_api": + logger.info("Using direct API results without additional enhancement") + return results + + # Process in small batches to avoid overwhelming the API + batch_size = 3 + for i in range(0, min(len(results), 10), batch_size): # Limit to first 10 results batch = results[i:i + batch_size] batch_tasks = [ self._enhance_single_result(result) diff --git a/pypi_query_mcp/server.py b/pypi_query_mcp/server.py index 7154d3f..3d16723 100644 --- a/pypi_query_mcp/server.py +++ b/pypi_query_mcp/server.py @@ -70,6 +70,18 @@ from .tools import ( get_pypi_package_reviews, manage_pypi_package_discussions, get_pypi_maintainer_contacts, + # Security tools + bulk_scan_package_security, + scan_pypi_package_security, + # License tools + analyze_pypi_package_license, + check_bulk_license_compliance, + # Health tools + assess_package_health_score, + compare_packages_health_scores, + # Requirements tools + analyze_requirements_file_tool, + compare_multiple_requirements_files, ) # Configure logging @@ -1929,6 +1941,390 @@ async def get_pypi_maintainer_contacts_tool( } +@mcp.tool() +async def scan_pypi_package_security_tool( + package_name: str, + version: str | None = None, + include_dependencies: bool = True, + severity_filter: str | None = None +) -> dict[str, Any]: + """Scan a PyPI package for security vulnerabilities. + + This tool performs comprehensive security vulnerability scanning of PyPI packages, + checking against multiple vulnerability databases including OSV (Open Source Vulnerabilities), + GitHub Security Advisories, and analyzing package metadata for security indicators. + + Args: + package_name: Name of the package to scan for vulnerabilities + version: Specific version to scan (optional, defaults to latest version) + include_dependencies: Whether to scan package dependencies for vulnerabilities + severity_filter: Filter results by severity level (low, medium, high, critical) + + Returns: + Dictionary containing comprehensive security scan results including: + - Total vulnerability count and severity breakdown + - Direct package vulnerabilities vs dependency vulnerabilities + - Risk score and level assessment (minimal, low, medium, high, critical) + - Detailed vulnerability information with IDs, descriptions, and references + - Package metadata security analysis + - Actionable security recommendations + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If security scanning fails + """ + try: + logger.info(f"MCP tool: Scanning security vulnerabilities for {package_name}") + result = await scan_pypi_package_security( + package_name, version, include_dependencies, severity_filter + ) + logger.info(f"Security scan completed for {package_name} - found {result.get('security_summary', {}).get('total_vulnerabilities', 0)} vulnerabilities") + return result + except Exception as e: + logger.error(f"Error scanning security for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package": package_name, + "version": version, + } + + +@mcp.tool() +async def bulk_scan_package_security_tool( + package_names: list[str], + include_dependencies: bool = False, + severity_threshold: str = "medium" +) -> dict[str, Any]: + """Perform bulk security scanning of multiple PyPI packages. + + This tool scans multiple packages simultaneously for security vulnerabilities, + providing a consolidated report with summary statistics and prioritized + recommendations for addressing security issues across your package ecosystem. + + Args: + package_names: List of package names to scan for vulnerabilities + include_dependencies: Whether to include dependency vulnerability scanning + severity_threshold: Minimum severity level to report (low, medium, high, critical) + + Returns: + Dictionary containing bulk scan results including: + - Summary statistics (total packages, packages with vulnerabilities, high-risk packages) + - Detailed scan results for each package + - Prioritized recommendations for security remediation + - Scan timestamp and completion status + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during scanning + SearchError: If bulk scanning fails + """ + try: + logger.info(f"MCP tool: Starting bulk security scan of {len(package_names)} packages") + result = await bulk_scan_package_security( + package_names, include_dependencies, severity_threshold + ) + logger.info(f"Bulk security scan completed - {result.get('summary', {}).get('packages_with_vulnerabilities', 0)} packages have vulnerabilities") + return result + except Exception as e: + logger.error(f"Error in bulk security scan: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_names": package_names, + } + + +@mcp.tool() +async def analyze_pypi_package_license_tool( + package_name: str, + version: str | None = None, + include_dependencies: bool = True +) -> dict[str, Any]: + """Analyze license compatibility for a PyPI package. + + This tool provides comprehensive license analysis including license identification, + dependency license scanning, compatibility checking, and risk assessment to help + ensure your project complies with open source license requirements. + + Args: + package_name: Name of the package to analyze for license compatibility + version: Specific version to analyze (optional, defaults to latest version) + include_dependencies: Whether to analyze dependency licenses for compatibility + + Returns: + Dictionary containing comprehensive license analysis including: + - License identification and normalization (SPDX format) + - License categorization (permissive, copyleft, proprietary, etc.) + - Dependency license analysis and compatibility matrix + - Risk assessment with score and risk level (minimal, low, medium, high, critical) + - Compatibility analysis highlighting conflicts and review-required combinations + - Actionable recommendations for license compliance + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If license analysis fails + """ + try: + logger.info(f"MCP tool: Analyzing license compatibility for {package_name}") + result = await analyze_pypi_package_license( + package_name, version, include_dependencies + ) + logger.info(f"License analysis completed for {package_name} - {result.get('analysis_summary', {}).get('license_conflicts', 0)} conflicts found") + return result + except Exception as e: + logger.error(f"Error analyzing license for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package": package_name, + "version": version, + } + + +@mcp.tool() +async def check_bulk_license_compliance_tool( + package_names: list[str], + target_license: str | None = None +) -> dict[str, Any]: + """Check license compliance for multiple PyPI packages. + + This tool performs bulk license compliance checking across multiple packages, + providing a consolidated report to help ensure your entire package ecosystem + complies with license requirements and identifying potential legal risks. + + Args: + package_names: List of package names to check for license compliance + target_license: Target license for compatibility checking (optional) + + Returns: + Dictionary containing bulk compliance analysis including: + - Summary statistics (total packages, compliant/non-compliant counts) + - Detailed license analysis for each package + - High-risk packages requiring immediate attention + - Unknown license packages needing investigation + - Prioritized recommendations for compliance remediation + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during analysis + SearchError: If bulk compliance checking fails + """ + try: + logger.info(f"MCP tool: Starting bulk license compliance check for {len(package_names)} packages") + result = await check_bulk_license_compliance( + package_names, target_license + ) + logger.info(f"Bulk license compliance completed - {result.get('summary', {}).get('non_compliant_packages', 0)} non-compliant packages found") + return result + except Exception as e: + logger.error(f"Error in bulk license compliance check: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_names": package_names, + } + + +@mcp.tool() +async def assess_package_health_score_tool( + package_name: str, + version: str | None = None, + include_github_metrics: bool = True +) -> dict[str, Any]: + """Assess comprehensive health and quality of a PyPI package. + + This tool evaluates package health across multiple dimensions including maintenance, + popularity, documentation, testing, security practices, compatibility, and metadata + completeness to provide an overall health score and actionable recommendations. + + Args: + package_name: Name of the package to assess for health and quality + version: Specific version to assess (optional, defaults to latest version) + include_github_metrics: Whether to fetch GitHub repository metrics for analysis + + Returns: + Dictionary containing comprehensive health assessment including: + - Overall health score (0-100) and level (excellent/good/fair/poor/critical) + - Category-specific scores (maintenance, popularity, documentation, testing, etc.) + - Detailed assessment breakdown with indicators and issues for each category + - GitHub repository metrics (stars, forks, activity) if available + - Actionable recommendations for health improvements + - Strengths, weaknesses, and improvement priorities analysis + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If health assessment fails + """ + try: + logger.info(f"MCP tool: Assessing health for {package_name}") + result = await assess_package_health_score( + package_name, version, include_github_metrics + ) + overall_score = result.get("overall_health", {}).get("score", 0) + health_level = result.get("overall_health", {}).get("level", "unknown") + logger.info(f"Health assessment completed for {package_name} - score: {overall_score:.1f}/100 ({health_level})") + return result + except Exception as e: + logger.error(f"Error assessing health for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package": package_name, + "version": version, + } + + +@mcp.tool() +async def compare_packages_health_scores_tool( + package_names: list[str], + include_github_metrics: bool = False +) -> dict[str, Any]: + """Compare health scores across multiple PyPI packages. + + This tool performs comparative health analysis across multiple packages, + providing rankings, insights, and recommendations to help evaluate + package ecosystem quality and identify the best options. + + Args: + package_names: List of package names to compare for health and quality + include_github_metrics: Whether to include GitHub metrics in the comparison + + Returns: + Dictionary containing comparative health analysis including: + - Detailed health results for each package + - Health score rankings with best/worst package identification + - Comparison insights (average scores, score ranges, rankings) + - Recommendations for package selection and improvements + - Statistical analysis of health across the package set + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during analysis + SearchError: If health comparison fails + """ + try: + logger.info(f"MCP tool: Starting health comparison for {len(package_names)} packages") + result = await compare_packages_health_scores( + package_names, include_github_metrics + ) + comparison_insights = result.get("comparison_insights", {}) + best_package = comparison_insights.get("best_package", {}) + packages_compared = result.get("packages_compared", 0) + logger.info(f"Health comparison completed for {packages_compared} packages - best: {best_package.get('name', 'unknown')} ({best_package.get('score', 0):.1f}/100)") + return result + except Exception as e: + logger.error(f"Error in health comparison: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_names": package_names, + } + + +@mcp.tool() +async def analyze_requirements_file_tool_mcp( + file_path: str, + check_updates: bool = True, + security_scan: bool = True, + compatibility_check: bool = True +) -> dict[str, Any]: + """Analyze project requirements file for dependencies, security, and compatibility. + + This tool provides comprehensive analysis of Python project requirements files + including dependency parsing, version checking, security vulnerability scanning, + Python compatibility assessment, and actionable recommendations for improvements. + + Args: + file_path: Path to the requirements file (requirements.txt, pyproject.toml, setup.py, etc.) + check_updates: Whether to check for available package updates + security_scan: Whether to perform security vulnerability scanning on dependencies + compatibility_check: Whether to check Python version compatibility for all dependencies + + Returns: + Dictionary containing comprehensive requirements analysis including: + - File information and detected format (requirements.txt, pyproject.toml, etc.) + - Parsed dependencies with version specifiers and extras + - Dependency health analysis with specification issues and recommendations + - Package update analysis showing outdated packages and latest versions + - Security vulnerability scan results for all dependencies + - Python version compatibility assessment + - Overall risk level and actionable improvement recommendations + + Raises: + FileNotFoundError: If the requirements file is not found + NetworkError: For network-related errors during analysis + SearchError: If requirements analysis fails + """ + try: + logger.info(f"MCP tool: Analyzing requirements file {file_path}") + result = await analyze_requirements_file_tool( + file_path, check_updates, security_scan, compatibility_check + ) + summary = result.get("analysis_summary", {}) + total_deps = summary.get("total_dependencies", 0) + risk_level = summary.get("overall_risk_level", "unknown") + logger.info(f"Requirements analysis completed for {file_path} - {total_deps} dependencies, risk level: {risk_level}") + return result + except Exception as e: + logger.error(f"Error analyzing requirements file {file_path}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "file_path": file_path, + } + + +@mcp.tool() +async def compare_multiple_requirements_files_mcp( + file_paths: list[str] +) -> dict[str, Any]: + """Compare multiple requirements files to identify differences and conflicts. + + This tool analyzes multiple requirements files simultaneously to identify + version conflicts, unique dependencies, and inconsistencies across different + project configurations or environments. + + Args: + file_paths: List of paths to requirements files to compare and analyze + + Returns: + Dictionary containing comparative requirements analysis including: + - Detailed analysis results for each individual file + - Common packages shared across all files + - Conflicting package versions between files with specific version details + - Packages unique to specific files + - Recommendations for resolving conflicts and standardizing requirements + - Statistics on package overlap and conflict rates + + Raises: + ValueError: If file_paths list is empty + NetworkError: For network-related errors during analysis + SearchError: If requirements comparison fails + """ + try: + logger.info(f"MCP tool: Comparing {len(file_paths)} requirements files") + result = await compare_multiple_requirements_files(file_paths) + comparison_results = result.get("comparison_results", {}) + conflicts = len(comparison_results.get("conflicting_packages", [])) + total_packages = comparison_results.get("total_unique_packages", 0) + logger.info(f"Requirements comparison completed - {total_packages} unique packages, {conflicts} conflicts found") + return result + except Exception as e: + logger.error(f"Error comparing requirements files: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "file_paths": file_paths, + } + + # Register prompt templates following standard MCP workflow: # 1. User calls tool → MCP client sends request # 2. Tool function executes → Collects necessary data and parameters diff --git a/pypi_query_mcp/tools/__init__.py b/pypi_query_mcp/tools/__init__.py index b1b3fe6..ce0615b 100644 --- a/pypi_query_mcp/tools/__init__.py +++ b/pypi_query_mcp/tools/__init__.py @@ -64,6 +64,22 @@ from .search import ( search_by_category, search_packages, ) +from .security_tools import ( + bulk_scan_package_security, + scan_pypi_package_security, +) +from .license_tools import ( + analyze_pypi_package_license, + check_bulk_license_compliance, +) +from .health_tools import ( + assess_package_health_score, + compare_packages_health_scores, +) +from .requirements_tools import ( + analyze_requirements_file_tool, + compare_multiple_requirements_files, +) __all__ = [ # Core package tools @@ -114,4 +130,16 @@ __all__ = [ "get_pypi_package_reviews", "manage_pypi_package_discussions", "get_pypi_maintainer_contacts", + # Security tools + "scan_pypi_package_security", + "bulk_scan_package_security", + # License tools + "analyze_pypi_package_license", + "check_bulk_license_compliance", + # Health tools + "assess_package_health_score", + "compare_packages_health_scores", + # Requirements tools + "analyze_requirements_file_tool", + "compare_multiple_requirements_files", ] diff --git a/pypi_query_mcp/tools/health_scorer.py b/pypi_query_mcp/tools/health_scorer.py new file mode 100644 index 0000000..838abe7 --- /dev/null +++ b/pypi_query_mcp/tools/health_scorer.py @@ -0,0 +1,974 @@ +"""Package health scoring and quality assessment tools for PyPI packages.""" + +import asyncio +import logging +import re +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +import httpx + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..core.pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class PackageHealthScorer: + """Comprehensive health and quality scorer for PyPI packages.""" + + def __init__(self): + self.timeout = 30.0 + + # Health scoring weights (total = 100) + self.weights = { + "maintenance": 25, # Maintenance indicators + "popularity": 20, # Download stats, stars, usage + "documentation": 15, # Documentation quality + "testing": 15, # Testing and CI indicators + "security": 10, # Security practices + "compatibility": 10, # Python version support + "metadata": 5, # Metadata completeness + } + + # Quality metrics thresholds + self.thresholds = { + "downloads_monthly_excellent": 1000000, + "downloads_monthly_good": 100000, + "downloads_monthly_fair": 10000, + "version_age_days_fresh": 90, + "version_age_days_good": 365, + "version_age_days_stale": 730, + "python_versions_excellent": 4, + "python_versions_good": 3, + "python_versions_fair": 2, + } + + async def assess_package_health( + self, + package_name: str, + version: Optional[str] = None, + include_github_metrics: bool = True + ) -> Dict[str, Any]: + """ + Assess comprehensive health and quality of a PyPI package. + + Args: + package_name: Name of the package to assess + version: Specific version to assess (optional) + include_github_metrics: Whether to fetch GitHub repository metrics + + Returns: + Dictionary containing health assessment results + """ + logger.info(f"Starting health assessment for package: {package_name}") + + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name, version) + + package_version = version or package_data["info"]["version"] + + # Run parallel health assessments + assessment_tasks = [ + self._assess_maintenance_health(package_data), + self._assess_popularity_metrics(package_data), + self._assess_documentation_quality(package_data), + self._assess_testing_indicators(package_data), + self._assess_security_practices(package_data), + self._assess_compatibility_support(package_data), + self._assess_metadata_completeness(package_data), + ] + + if include_github_metrics: + github_url = self._extract_github_url(package_data) + if github_url: + assessment_tasks.append(self._fetch_github_metrics(github_url)) + else: + assessment_tasks.append(asyncio.create_task(self._empty_github_metrics())) + else: + assessment_tasks.append(asyncio.create_task(self._empty_github_metrics())) + + results = await asyncio.gather(*assessment_tasks, return_exceptions=True) + + # Unpack results + (maintenance, popularity, documentation, testing, + security, compatibility, metadata, github_metrics) = results + + # Handle exceptions + if isinstance(github_metrics, Exception): + github_metrics = self._empty_github_metrics() + + # Calculate overall health score + health_scores = { + "maintenance": maintenance.get("score", 0) if not isinstance(maintenance, Exception) else 0, + "popularity": popularity.get("score", 0) if not isinstance(popularity, Exception) else 0, + "documentation": documentation.get("score", 0) if not isinstance(documentation, Exception) else 0, + "testing": testing.get("score", 0) if not isinstance(testing, Exception) else 0, + "security": security.get("score", 0) if not isinstance(security, Exception) else 0, + "compatibility": compatibility.get("score", 0) if not isinstance(compatibility, Exception) else 0, + "metadata": metadata.get("score", 0) if not isinstance(metadata, Exception) else 0, + } + + overall_score = sum( + health_scores[category] * (self.weights[category] / 100) + for category in health_scores + ) + + health_level = self._calculate_health_level(overall_score) + + # Generate recommendations + recommendations = self._generate_health_recommendations( + health_scores, maintenance, popularity, documentation, + testing, security, compatibility, metadata, github_metrics + ) + + return { + "package": package_name, + "version": package_version, + "assessment_timestamp": datetime.now(timezone.utc).isoformat(), + "overall_health": { + "score": round(overall_score, 2), + "level": health_level, + "max_score": 100, + }, + "category_scores": health_scores, + "detailed_assessment": { + "maintenance": maintenance if not isinstance(maintenance, Exception) else {"score": 0, "indicators": [], "issues": [str(maintenance)]}, + "popularity": popularity if not isinstance(popularity, Exception) else {"score": 0, "metrics": {}, "issues": [str(popularity)]}, + "documentation": documentation if not isinstance(documentation, Exception) else {"score": 0, "indicators": [], "issues": [str(documentation)]}, + "testing": testing if not isinstance(testing, Exception) else {"score": 0, "indicators": [], "issues": [str(testing)]}, + "security": security if not isinstance(security, Exception) else {"score": 0, "practices": [], "issues": [str(security)]}, + "compatibility": compatibility if not isinstance(compatibility, Exception) else {"score": 0, "support": [], "issues": [str(compatibility)]}, + "metadata": metadata if not isinstance(metadata, Exception) else {"score": 0, "completeness": {}, "issues": [str(metadata)]}, + "github_metrics": github_metrics, + }, + "recommendations": recommendations, + "health_summary": { + "strengths": self._identify_strengths(health_scores), + "weaknesses": self._identify_weaknesses(health_scores), + "improvement_priority": self._prioritize_improvements(health_scores), + } + } + + except Exception as e: + logger.error(f"Health assessment failed for {package_name}: {e}") + raise SearchError(f"Health assessment failed: {e}") from e + + async def _assess_maintenance_health(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess package maintenance health indicators.""" + info = package_data.get("info", {}) + releases = package_data.get("releases", {}) + + score = 0 + indicators = [] + issues = [] + + # Check release frequency + if releases: + release_dates = [] + for version_releases in releases.values(): + for release in version_releases: + upload_time = release.get("upload_time_iso_8601") + if upload_time: + try: + release_dates.append(datetime.fromisoformat(upload_time.replace('Z', '+00:00'))) + except: + pass + + if release_dates: + release_dates.sort(reverse=True) + latest_release = release_dates[0] + days_since_release = (datetime.now(timezone.utc) - latest_release).days + + if days_since_release <= self.thresholds["version_age_days_fresh"]: + score += 25 + indicators.append(f"Recent release ({days_since_release} days ago)") + elif days_since_release <= self.thresholds["version_age_days_good"]: + score += 20 + indicators.append(f"Moderately recent release ({days_since_release} days ago)") + elif days_since_release <= self.thresholds["version_age_days_stale"]: + score += 10 + indicators.append(f"Older release ({days_since_release} days ago)") + else: + issues.append(f"Very old release ({days_since_release} days ago)") + + # Check release consistency (last 5 releases) + if len(release_dates) >= 5: + recent_releases = release_dates[:5] + intervals = [] + for i in range(len(recent_releases) - 1): + interval = (recent_releases[i] - recent_releases[i + 1]).days + intervals.append(interval) + + avg_interval = sum(intervals) / len(intervals) + if avg_interval <= 180: # Releases every 6 months or less + score += 15 + indicators.append(f"Regular releases (avg {avg_interval:.0f} days)") + elif avg_interval <= 365: + score += 10 + indicators.append(f"Periodic releases (avg {avg_interval:.0f} days)") + else: + issues.append(f"Infrequent releases (avg {avg_interval:.0f} days)") + else: + issues.append("No release history available") + + # Check for development indicators + if "dev" in info.get("version", "").lower() or "alpha" in info.get("version", "").lower(): + issues.append("Development/alpha version") + elif "beta" in info.get("version", "").lower(): + score += 5 + indicators.append("Beta version (active development)") + else: + score += 10 + indicators.append("Stable version") + + # Check for author/maintainer info + if info.get("author") or info.get("maintainer"): + score += 10 + indicators.append("Active maintainer information") + else: + issues.append("No maintainer information") + + return { + "score": min(score, 100), + "indicators": indicators, + "issues": issues, + "metrics": { + "days_since_last_release": days_since_release if 'days_since_release' in locals() else None, + "total_releases": len(releases), + } + } + + async def _assess_popularity_metrics(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess package popularity and usage metrics.""" + info = package_data.get("info", {}) + + score = 0 + metrics = {} + + # Estimate download popularity (since we don't have direct access) + # Use proxy indicators: project URLs, description length, classifiers + + # Check for GitHub stars indicator + project_urls = info.get("project_urls", {}) or {} + github_url = None + for key, url in project_urls.items(): + if "github.com" in (url or "").lower(): + github_url = url + break + + if not github_url: + home_page = info.get("home_page", "") + if "github.com" in home_page: + github_url = home_page + + if github_url: + score += 15 + metrics["has_github_repo"] = True + else: + metrics["has_github_repo"] = False + + # Check description quality as popularity indicator + description = info.get("description", "") or "" + summary = info.get("summary", "") or "" + + if len(description) > 1000: + score += 20 + metrics["description_quality"] = "excellent" + elif len(description) > 500: + score += 15 + metrics["description_quality"] = "good" + elif len(description) > 100: + score += 10 + metrics["description_quality"] = "fair" + else: + metrics["description_quality"] = "poor" + + # Check for comprehensive metadata (popularity indicator) + if info.get("keywords"): + score += 10 + if len(info.get("classifiers", [])) > 5: + score += 15 + if info.get("project_urls") and len(info.get("project_urls", {})) > 2: + score += 10 + + # Check for documentation links + docs_indicators = ["documentation", "docs", "readthedocs", "github.io"] + has_docs = any( + any(indicator in (url or "").lower() for indicator in docs_indicators) + for url in project_urls.values() + ) + if has_docs: + score += 15 + metrics["has_documentation"] = True + else: + metrics["has_documentation"] = False + + # Check for community indicators + community_urls = ["issues", "bug", "tracker", "discussion", "forum"] + has_community = any( + any(indicator in key.lower() for indicator in community_urls) + for key in project_urls.keys() + ) + if has_community: + score += 15 + metrics["has_community_links"] = True + else: + metrics["has_community_links"] = False + + return { + "score": min(score, 100), + "metrics": metrics, + } + + async def _assess_documentation_quality(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess documentation quality indicators.""" + info = package_data.get("info", {}) + + score = 0 + indicators = [] + issues = [] + + # Check description completeness + description = info.get("description", "") or "" + summary = info.get("summary", "") or "" + + if len(description) > 2000: + score += 30 + indicators.append("Comprehensive description") + elif len(description) > 1000: + score += 25 + indicators.append("Good description length") + elif len(description) > 500: + score += 15 + indicators.append("Adequate description") + elif len(description) > 100: + score += 10 + indicators.append("Basic description") + else: + issues.append("Very short or missing description") + + # Check for README indicators in description + readme_indicators = ["## ", "### ", "```", "# Installation", "# Usage", "# Examples"] + if any(indicator in description for indicator in readme_indicators): + score += 20 + indicators.append("Structured documentation (README-style)") + + # Check for documentation URLs + project_urls = info.get("project_urls", {}) or {} + docs_urls = [] + for key, url in project_urls.items(): + if any(term in key.lower() for term in ["doc", "guide", "manual", "wiki"]): + docs_urls.append(url) + + if docs_urls: + score += 25 + indicators.append(f"Documentation links ({len(docs_urls)} found)") + else: + issues.append("No dedicated documentation links") + + # Check for example code in description + if "```" in description or " " in description: # Code blocks + score += 15 + indicators.append("Contains code examples") + + # Check for installation instructions + install_keywords = ["install", "pip install", "setup.py", "requirements"] + if any(keyword in description.lower() for keyword in install_keywords): + score += 10 + indicators.append("Installation instructions provided") + else: + issues.append("No clear installation instructions") + + return { + "score": min(score, 100), + "indicators": indicators, + "issues": issues, + } + + async def _assess_testing_indicators(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess testing and CI/CD indicators.""" + info = package_data.get("info", {}) + + score = 0 + indicators = [] + issues = [] + + # Check for testing-related classifiers + classifiers = info.get("classifiers", []) + testing_classifiers = [c for c in classifiers if "testing" in c.lower()] + if testing_classifiers: + score += 15 + indicators.append("Testing framework classifiers") + + # Check for CI/CD indicators in URLs + project_urls = info.get("project_urls", {}) or {} + ci_indicators = ["travis", "circleci", "appveyor", "azure", "github", "actions", "ci", "build"] + ci_urls = [] + for key, url in project_urls.items(): + if any(indicator in key.lower() or indicator in (url or "").lower() for indicator in ci_indicators): + ci_urls.append(key) + + if ci_urls: + score += 25 + indicators.append(f"CI/CD indicators ({len(ci_urls)} found)") + + # Check description for testing mentions + description = (info.get("description", "") or "").lower() + testing_keywords = ["test", "pytest", "unittest", "nose", "coverage", "tox", "ci/cd", "continuous integration"] + testing_mentions = [kw for kw in testing_keywords if kw in description] + + if testing_mentions: + score += 20 + indicators.append(f"Testing framework mentions ({len(testing_mentions)} found)") + else: + issues.append("No testing framework mentions") + + # Check for test dependencies (common patterns) + requires_dist = info.get("requires_dist", []) or [] + test_deps = [] + for req in requires_dist: + req_lower = req.lower() + if any(test_pkg in req_lower for test_pkg in ["pytest", "unittest", "nose", "coverage", "tox", "test"]): + test_deps.append(req.split()[0]) + + if test_deps: + score += 20 + indicators.append(f"Test dependencies ({len(test_deps)} found)") + else: + issues.append("No test dependencies found") + + # Check for badges (often indicate CI/testing) + badge_indicators = ["[![", "https://img.shields.io", "badge", "build status", "coverage"] + if any(indicator in description for indicator in badge_indicators): + score += 20 + indicators.append("Status badges (likely CI integration)") + + return { + "score": min(score, 100), + "indicators": indicators, + "issues": issues, + } + + async def _assess_security_practices(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess security practices and indicators.""" + info = package_data.get("info", {}) + + score = 0 + practices = [] + issues = [] + + # Check for security-related URLs + project_urls = info.get("project_urls", {}) or {} + security_urls = [] + for key, url in project_urls.items(): + if any(term in key.lower() for term in ["security", "vulnerability", "report", "bug"]): + security_urls.append(key) + + if security_urls: + score += 25 + practices.append(f"Security reporting channels ({len(security_urls)} found)") + else: + issues.append("No security reporting channels") + + # Check for HTTPS URLs + https_urls = [url for url in project_urls.values() if (url or "").startswith("https://")] + if len(https_urls) == len([url for url in project_urls.values() if url]): + score += 15 + practices.append("All URLs use HTTPS") + elif https_urls: + score += 10 + practices.append("Some URLs use HTTPS") + else: + issues.append("No HTTPS URLs found") + + # Check for security mentions in description + description = (info.get("description", "") or "").lower() + security_keywords = ["security", "secure", "vulnerability", "encryption", "authentication", "authorization"] + security_mentions = [kw for kw in security_keywords if kw in description] + + if security_mentions: + score += 20 + practices.append(f"Security awareness ({len(security_mentions)} mentions)") + + # Check for license (security practice) + if info.get("license") or any("license" in c.lower() for c in info.get("classifiers", [])): + score += 15 + practices.append("Clear license information") + else: + issues.append("No clear license information") + + # Check for author/maintainer email (security contact) + if info.get("author_email") or info.get("maintainer_email"): + score += 10 + practices.append("Maintainer contact information") + else: + issues.append("No maintainer contact information") + + # Check for requirements specification (dependency security) + requires_dist = info.get("requires_dist", []) + if requires_dist: + # Check for version pinning (security practice) + pinned_deps = [req for req in requires_dist if any(op in req for op in ["==", ">=", "~="])] + if pinned_deps: + score += 15 + practices.append(f"Version-pinned dependencies ({len(pinned_deps)}/{len(requires_dist)})") + else: + issues.append("No version-pinned dependencies") + + return { + "score": min(score, 100), + "practices": practices, + "issues": issues, + } + + async def _assess_compatibility_support(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess Python version and platform compatibility.""" + info = package_data.get("info", {}) + + score = 0 + support = [] + issues = [] + + # Check Python version support from classifiers + classifiers = info.get("classifiers", []) + python_versions = [] + for classifier in classifiers: + if "Programming Language :: Python ::" in classifier: + version_part = classifier.split("::")[-1].strip() + if re.match(r'^\d+\.\d+$', version_part): # Like "3.8", "3.9" + python_versions.append(version_part) + + if len(python_versions) >= self.thresholds["python_versions_excellent"]: + score += 30 + support.append(f"Excellent Python version support ({len(python_versions)} versions)") + elif len(python_versions) >= self.thresholds["python_versions_good"]: + score += 25 + support.append(f"Good Python version support ({len(python_versions)} versions)") + elif len(python_versions) >= self.thresholds["python_versions_fair"]: + score += 15 + support.append(f"Fair Python version support ({len(python_versions)} versions)") + elif python_versions: + score += 10 + support.append(f"Limited Python version support ({len(python_versions)} versions)") + else: + issues.append("No explicit Python version support") + + # Check requires_python specification + requires_python = info.get("requires_python") + if requires_python: + score += 20 + support.append(f"Python requirement specified: {requires_python}") + else: + issues.append("No Python version requirement specified") + + # Check platform support + platform_classifiers = [c for c in classifiers if "Operating System" in c] + if platform_classifiers: + if any("OS Independent" in c for c in platform_classifiers): + score += 20 + support.append("Cross-platform support (OS Independent)") + else: + score += 15 + support.append(f"Platform support ({len(platform_classifiers)} platforms)") + else: + issues.append("No platform support information") + + # Check for wheel distribution (compatibility indicator) + urls = info.get("urls", []) or [] + has_wheel = any(url.get("packagetype") == "bdist_wheel" for url in urls) + if has_wheel: + score += 15 + support.append("Wheel distribution available") + else: + issues.append("No wheel distribution") + + # Check development status + status_classifiers = [c for c in classifiers if "Development Status" in c] + if status_classifiers: + status = status_classifiers[0] + if "5 - Production/Stable" in status: + score += 15 + support.append("Production/Stable status") + elif "4 - Beta" in status: + score += 10 + support.append("Beta status") + elif "3 - Alpha" in status: + score += 5 + support.append("Alpha status") + else: + issues.append(f"Early development status: {status}") + + return { + "score": min(score, 100), + "support": support, + "issues": issues, + "python_versions": python_versions, + } + + async def _assess_metadata_completeness(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Assess metadata completeness and quality.""" + info = package_data.get("info", {}) + + score = 0 + completeness = {} + + # Essential fields + essential_fields = ["name", "version", "summary", "description", "author", "license"] + present_essential = [field for field in essential_fields if info.get(field)] + score += (len(present_essential) / len(essential_fields)) * 40 + completeness["essential_fields"] = f"{len(present_essential)}/{len(essential_fields)}" + + # Additional metadata fields + additional_fields = ["keywords", "home_page", "author_email", "classifiers", "project_urls"] + present_additional = [field for field in additional_fields if info.get(field)] + score += (len(present_additional) / len(additional_fields)) * 30 + completeness["additional_fields"] = f"{len(present_additional)}/{len(additional_fields)}" + + # Classifier completeness + classifiers = info.get("classifiers", []) + classifier_categories = set() + for classifier in classifiers: + category = classifier.split("::")[0].strip() + classifier_categories.add(category) + + expected_categories = ["Development Status", "Intended Audience", "License", "Programming Language", "Topic"] + present_categories = [cat for cat in expected_categories if cat in classifier_categories] + score += (len(present_categories) / len(expected_categories)) * 20 + completeness["classifier_categories"] = f"{len(present_categories)}/{len(expected_categories)}" + + # URLs completeness + project_urls = info.get("project_urls", {}) or {} + expected_url_types = ["homepage", "repository", "documentation", "bug tracker"] + present_url_types = [] + for expected in expected_url_types: + if any(expected.lower() in key.lower() for key in project_urls.keys()): + present_url_types.append(expected) + + score += (len(present_url_types) / len(expected_url_types)) * 10 + completeness["url_types"] = f"{len(present_url_types)}/{len(expected_url_types)}" + + return { + "score": min(score, 100), + "completeness": completeness, + } + + def _extract_github_url(self, package_data: Dict[str, Any]) -> Optional[str]: + """Extract GitHub repository URL from package data.""" + info = package_data.get("info", {}) + + # Check project URLs + project_urls = info.get("project_urls", {}) or {} + for url in project_urls.values(): + if url and "github.com" in url: + return url + + # Check home page + home_page = info.get("home_page", "") + if home_page and "github.com" in home_page: + return home_page + + return None + + async def _fetch_github_metrics(self, github_url: str) -> Dict[str, Any]: + """Fetch GitHub repository metrics.""" + try: + # Parse GitHub URL to get owner/repo + parsed = urlparse(github_url) + path_parts = parsed.path.strip('/').split('/') + if len(path_parts) >= 2: + owner, repo = path_parts[0], path_parts[1] + + # GitHub API call (public API, no auth required for basic info) + api_url = f"https://api.github.com/repos/{owner}/{repo}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + api_url, + headers={ + "Accept": "application/vnd.github.v3+json", + "User-Agent": "PyPI-Health-Scorer/1.0" + } + ) + + if response.status_code == 200: + data = response.json() + return { + "stars": data.get("stargazers_count", 0), + "forks": data.get("forks_count", 0), + "watchers": data.get("watchers_count", 0), + "issues": data.get("open_issues_count", 0), + "has_wiki": data.get("has_wiki", False), + "has_pages": data.get("has_pages", False), + "language": data.get("language", ""), + "created_at": data.get("created_at", ""), + "updated_at": data.get("pushed_at", ""), + "default_branch": data.get("default_branch", ""), + "archived": data.get("archived", False), + "disabled": data.get("disabled", False), + } + else: + logger.warning(f"GitHub API returned status {response.status_code}") + + except Exception as e: + logger.debug(f"Failed to fetch GitHub metrics: {e}") + + return self._empty_github_metrics() + + async def _empty_github_metrics(self) -> Dict[str, Any]: + """Return empty GitHub metrics.""" + return { + "stars": 0, + "forks": 0, + "watchers": 0, + "issues": 0, + "has_wiki": False, + "has_pages": False, + "language": "", + "created_at": "", + "updated_at": "", + "default_branch": "", + "archived": False, + "disabled": False, + "available": False, + } + + def _calculate_health_level(self, score: float) -> str: + """Calculate health level from score.""" + if score >= 85: + return "excellent" + elif score >= 70: + return "good" + elif score >= 55: + return "fair" + elif score >= 40: + return "poor" + else: + return "critical" + + def _identify_strengths(self, health_scores: Dict[str, float]) -> List[str]: + """Identify package strengths.""" + strengths = [] + for category, score in health_scores.items(): + if score >= 80: + strengths.append(f"Excellent {category} ({score:.0f}/100)") + elif score >= 65: + strengths.append(f"Good {category} ({score:.0f}/100)") + return strengths + + def _identify_weaknesses(self, health_scores: Dict[str, float]) -> List[str]: + """Identify package weaknesses.""" + weaknesses = [] + for category, score in health_scores.items(): + if score < 40: + weaknesses.append(f"Poor {category} ({score:.0f}/100)") + elif score < 55: + weaknesses.append(f"Fair {category} ({score:.0f}/100)") + return weaknesses + + def _prioritize_improvements(self, health_scores: Dict[str, float]) -> List[str]: + """Prioritize improvement areas by weight and score.""" + weighted_gaps = [] + for category, score in health_scores.items(): + gap = 100 - score + weighted_gap = gap * (self.weights[category] / 100) + weighted_gaps.append((category, weighted_gap, score)) + + # Sort by weighted gap (highest impact first) + weighted_gaps.sort(key=lambda x: x[1], reverse=True) + + priorities = [] + for category, weighted_gap, score in weighted_gaps[:3]: # Top 3 + if weighted_gap > 5: # Only include significant gaps + priorities.append(f"Improve {category} (current: {score:.0f}/100, impact: {self.weights[category]}%)") + + return priorities + + def _generate_health_recommendations( + self, health_scores: Dict[str, float], *assessment_results + ) -> List[str]: + """Generate actionable health improvement recommendations.""" + recommendations = [] + + overall_score = sum( + health_scores[category] * (self.weights[category] / 100) + for category in health_scores + ) + + # Overall recommendations + if overall_score >= 85: + recommendations.append("🌟 Excellent package health - maintain current standards") + elif overall_score >= 70: + recommendations.append("✅ Good package health - minor improvements possible") + elif overall_score >= 55: + recommendations.append("⚠️ Fair package health - several areas need improvement") + elif overall_score >= 40: + recommendations.append("🔶 Poor package health - significant improvements needed") + else: + recommendations.append("🚨 Critical package health - major overhaul required") + + # Specific recommendations based on low scores + if health_scores.get("maintenance", 0) < 60: + recommendations.append("📅 Improve maintenance: Update package more regularly, provide clear version history") + + if health_scores.get("documentation", 0) < 60: + recommendations.append("📚 Improve documentation: Add comprehensive README, usage examples, and API docs") + + if health_scores.get("testing", 0) < 60: + recommendations.append("🧪 Add testing: Implement test suite, CI/CD pipeline, and code coverage") + + if health_scores.get("security", 0) < 60: + recommendations.append("🔒 Enhance security: Add security reporting, use HTTPS, specify dependencies properly") + + if health_scores.get("compatibility", 0) < 60: + recommendations.append("🔧 Improve compatibility: Support more Python versions, add wheel distribution") + + if health_scores.get("metadata", 0) < 60: + recommendations.append("📝 Complete metadata: Add missing package information, keywords, and classifiers") + + if health_scores.get("popularity", 0) < 60: + recommendations.append("📈 Build community: Create documentation site, engage with users, add project URLs") + + return recommendations + + +# Main health assessment functions +async def assess_pypi_package_health( + package_name: str, + version: Optional[str] = None, + include_github_metrics: bool = True +) -> Dict[str, Any]: + """ + Assess comprehensive health and quality of a PyPI package. + + Args: + package_name: Name of the package to assess + version: Specific version to assess (optional) + include_github_metrics: Whether to fetch GitHub repository metrics + + Returns: + Comprehensive health assessment including scores and recommendations + """ + scorer = PackageHealthScorer() + return await scorer.assess_package_health( + package_name, version, include_github_metrics + ) + + +async def compare_package_health( + package_names: List[str], + include_github_metrics: bool = False +) -> Dict[str, Any]: + """ + Compare health scores across multiple packages. + + Args: + package_names: List of package names to compare + include_github_metrics: Whether to include GitHub metrics + + Returns: + Comparative health analysis with rankings + """ + logger.info(f"Starting health comparison for {len(package_names)} packages") + + scorer = PackageHealthScorer() + results = {} + + # Assess packages in parallel batches + batch_size = 3 + for i in range(0, len(package_names), batch_size): + batch = package_names[i:i + batch_size] + batch_tasks = [ + scorer.assess_package_health(pkg_name, include_github_metrics=include_github_metrics) + for pkg_name in batch + ] + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for pkg_name, result in zip(batch, batch_results): + if isinstance(result, Exception): + results[pkg_name] = { + "error": str(result), + "overall_health": {"score": 0, "level": "critical"}, + "category_scores": {cat: 0 for cat in scorer.weights.keys()} + } + else: + results[pkg_name] = result + + # Create comparison rankings + package_scores = [ + (pkg, result.get("overall_health", {}).get("score", 0)) + for pkg, result in results.items() + if "error" not in result + ] + package_scores.sort(key=lambda x: x[1], reverse=True) + + # Generate comparison insights + if package_scores: + best_package, best_score = package_scores[0] + worst_package, worst_score = package_scores[-1] + avg_score = sum(score for _, score in package_scores) / len(package_scores) + + comparison_insights = { + "best_package": {"name": best_package, "score": best_score}, + "worst_package": {"name": worst_package, "score": worst_score}, + "average_score": round(avg_score, 2), + "score_range": best_score - worst_score, + "rankings": [{"package": pkg, "score": score, "rank": i+1} + for i, (pkg, score) in enumerate(package_scores)] + } + else: + comparison_insights = { + "best_package": None, + "worst_package": None, + "average_score": 0, + "score_range": 0, + "rankings": [] + } + + return { + "comparison_timestamp": datetime.now(timezone.utc).isoformat(), + "packages_compared": len(package_names), + "detailed_results": results, + "comparison_insights": comparison_insights, + "recommendations": _generate_comparison_recommendations(comparison_insights, results) + } + + +def _generate_comparison_recommendations( + insights: Dict[str, Any], results: Dict[str, Any] +) -> List[str]: + """Generate recommendations for package comparison.""" + recommendations = [] + + if not insights.get("rankings"): + recommendations.append("❌ No successful health assessments to compare") + return recommendations + + best = insights.get("best_package") + worst = insights.get("worst_package") + avg_score = insights.get("average_score", 0) + + if best and worst: + recommendations.append( + f"🥇 Best package: {best['name']} (score: {best['score']:.1f}/100)" + ) + recommendations.append( + f"🥉 Needs improvement: {worst['name']} (score: {worst['score']:.1f}/100)" + ) + + if best['score'] - worst['score'] > 30: + recommendations.append("📊 Significant quality variation - consider standardizing practices") + + recommendations.append(f"📈 Average health score: {avg_score:.1f}/100") + + if avg_score >= 70: + recommendations.append("✅ Overall good package health across portfolio") + elif avg_score >= 55: + recommendations.append("⚠️ Mixed package health - focus on improving lower-scoring packages") + else: + recommendations.append("🚨 Poor overall package health - systematic improvements needed") + + return recommendations \ No newline at end of file diff --git a/pypi_query_mcp/tools/health_tools.py b/pypi_query_mcp/tools/health_tools.py new file mode 100644 index 0000000..b0f98b3 --- /dev/null +++ b/pypi_query_mcp/tools/health_tools.py @@ -0,0 +1,155 @@ +"""Package health assessment tools for PyPI packages.""" + +import logging +from typing import Any, Dict, List, Optional + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..tools.health_scorer import assess_pypi_package_health, compare_package_health + +logger = logging.getLogger(__name__) + + +async def assess_package_health_score( + package_name: str, + version: Optional[str] = None, + include_github_metrics: bool = True +) -> Dict[str, Any]: + """ + Assess comprehensive health and quality of a PyPI package. + + This tool evaluates package health across multiple dimensions including maintenance, + popularity, documentation, testing, security practices, compatibility, and metadata + completeness to provide an overall health score and actionable recommendations. + + Args: + package_name: Name of the package to assess for health and quality + version: Specific version to assess (optional, defaults to latest version) + include_github_metrics: Whether to fetch GitHub repository metrics for analysis + + Returns: + Dictionary containing comprehensive health assessment including: + - Overall health score (0-100) and level (excellent/good/fair/poor/critical) + - Category-specific scores (maintenance, popularity, documentation, testing, etc.) + - Detailed assessment breakdown with indicators and issues for each category + - GitHub repository metrics (stars, forks, activity) if available + - Actionable recommendations for health improvements + - Strengths, weaknesses, and improvement priorities analysis + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If health assessment fails + """ + if not package_name or not package_name.strip(): + raise InvalidPackageNameError(package_name) + + logger.info(f"MCP tool: Assessing health for package {package_name}") + + try: + result = await assess_pypi_package_health( + package_name=package_name, + version=version, + include_github_metrics=include_github_metrics + ) + + overall_score = result.get("overall_health", {}).get("score", 0) + health_level = result.get("overall_health", {}).get("level", "unknown") + logger.info(f"MCP tool: Health assessment completed for {package_name} - score: {overall_score:.1f}/100 ({health_level})") + return result + + except (InvalidPackageNameError, NetworkError, SearchError) as e: + logger.error(f"Error assessing health for {package_name}: {e}") + return { + "error": f"Health assessment failed: {e}", + "error_type": type(e).__name__, + "package": package_name, + "version": version, + "assessment_timestamp": "", + "overall_health": { + "score": 0, + "level": "critical", + "max_score": 100, + }, + "category_scores": { + "maintenance": 0, + "popularity": 0, + "documentation": 0, + "testing": 0, + "security": 0, + "compatibility": 0, + "metadata": 0, + }, + "detailed_assessment": {}, + "recommendations": [f"❌ Health assessment failed: {e}"], + "health_summary": { + "strengths": [], + "weaknesses": ["Assessment failure"], + "improvement_priority": ["Resolve package access issues"], + } + } + + +async def compare_packages_health_scores( + package_names: List[str], + include_github_metrics: bool = False +) -> Dict[str, Any]: + """ + Compare health scores across multiple PyPI packages. + + This tool performs comparative health analysis across multiple packages, + providing rankings, insights, and recommendations to help evaluate + package ecosystem quality and identify the best options. + + Args: + package_names: List of package names to compare for health and quality + include_github_metrics: Whether to include GitHub metrics in the comparison + + Returns: + Dictionary containing comparative health analysis including: + - Detailed health results for each package + - Health score rankings with best/worst package identification + - Comparison insights (average scores, score ranges, rankings) + - Recommendations for package selection and improvements + - Statistical analysis of health across the package set + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during analysis + SearchError: If health comparison fails + """ + if not package_names: + raise ValueError("Package names list cannot be empty") + + logger.info(f"MCP tool: Starting health comparison for {len(package_names)} packages") + + try: + result = await compare_package_health( + package_names=package_names, + include_github_metrics=include_github_metrics + ) + + comparison_insights = result.get("comparison_insights", {}) + best_package = comparison_insights.get("best_package", {}) + packages_compared = result.get("packages_compared", 0) + + logger.info(f"MCP tool: Health comparison completed for {packages_compared} packages - best: {best_package.get('name', 'unknown')} ({best_package.get('score', 0):.1f}/100)") + return result + + except (ValueError, NetworkError, SearchError) as e: + logger.error(f"Error in health comparison: {e}") + return { + "error": f"Health comparison failed: {e}", + "error_type": type(e).__name__, + "comparison_timestamp": "", + "packages_compared": len(package_names), + "detailed_results": {}, + "comparison_insights": { + "best_package": None, + "worst_package": None, + "average_score": 0, + "score_range": 0, + "rankings": [] + }, + "recommendations": [f"❌ Health comparison failed: {e}"] + } \ No newline at end of file diff --git a/pypi_query_mcp/tools/license_analyzer.py b/pypi_query_mcp/tools/license_analyzer.py new file mode 100644 index 0000000..6e73bee --- /dev/null +++ b/pypi_query_mcp/tools/license_analyzer.py @@ -0,0 +1,727 @@ +"""License compatibility analysis tools for PyPI packages.""" + +import asyncio +import logging +import re +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..core.pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class LicenseCompatibilityAnalyzer: + """Comprehensive license compatibility analyzer for PyPI packages.""" + + def __init__(self): + self.timeout = 30.0 + + # License compatibility matrix based on common license interactions + # Key: primary license, Value: dict of compatible licenses with compatibility level + self.compatibility_matrix = { + "MIT": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "one-way", # MIT can be used in GPL, not vice versa + "GPL-3.0": "one-way", + "LGPL-2.1": "compatible", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + "BSD": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "one-way", + "GPL-3.0": "one-way", + "LGPL-2.1": "compatible", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + "Apache-2.0": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "incompatible", # Patent clause conflicts + "GPL-3.0": "one-way", # Apache can go into GPL-3.0 + "LGPL-2.1": "review-required", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + "GPL-2.0": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "incompatible", + "ISC": "compatible", + "GPL-2.0": "compatible", + "GPL-3.0": "incompatible", # GPL-2.0 and GPL-3.0 are incompatible + "LGPL-2.1": "compatible", + "LGPL-3.0": "incompatible", + "MPL-2.0": "incompatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "incompatible", + }, + "GPL-3.0": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "incompatible", + "GPL-3.0": "compatible", + "LGPL-2.1": "review-required", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "incompatible", + }, + "LGPL-2.1": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "review-required", + "ISC": "compatible", + "GPL-2.0": "compatible", + "GPL-3.0": "review-required", + "LGPL-2.1": "compatible", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + "LGPL-3.0": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "incompatible", + "GPL-3.0": "compatible", + "LGPL-2.1": "compatible", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + "MPL-2.0": { + "MIT": "compatible", + "BSD": "compatible", + "Apache-2.0": "compatible", + "ISC": "compatible", + "GPL-2.0": "incompatible", + "GPL-3.0": "compatible", + "LGPL-2.1": "compatible", + "LGPL-3.0": "compatible", + "MPL-2.0": "compatible", + "Unlicense": "compatible", + "Public Domain": "compatible", + "Proprietary": "review-required", + }, + } + + # License categorization for easier analysis + self.license_categories = { + "permissive": ["MIT", "BSD", "Apache-2.0", "ISC", "Unlicense", "Public Domain"], + "copyleft_weak": ["LGPL-2.1", "LGPL-3.0", "MPL-2.0"], + "copyleft_strong": ["GPL-2.0", "GPL-3.0", "AGPL-3.0"], + "proprietary": ["Proprietary", "Commercial", "All Rights Reserved"], + "unknown": ["Unknown", "Other", "Custom"], + } + + # Common license normalization patterns + self.license_patterns = { + r"MIT\s*License": "MIT", + r"BSD\s*3[-\s]*Clause": "BSD", + r"BSD\s*2[-\s]*Clause": "BSD", + r"Apache\s*2\.0": "Apache-2.0", + r"Apache\s*License\s*2\.0": "Apache-2.0", + r"GNU\s*General\s*Public\s*License\s*v?2": "GPL-2.0", + r"GNU\s*General\s*Public\s*License\s*v?3": "GPL-3.0", + r"GNU\s*Lesser\s*General\s*Public\s*License\s*v?2": "LGPL-2.1", + r"GNU\s*Lesser\s*General\s*Public\s*License\s*v?3": "LGPL-3.0", + r"Mozilla\s*Public\s*License\s*2\.0": "MPL-2.0", + r"ISC\s*License": "ISC", + r"Unlicense": "Unlicense", + r"Public\s*Domain": "Public Domain", + } + + async def analyze_package_license( + self, + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True + ) -> Dict[str, Any]: + """ + Analyze license information for a PyPI package. + + Args: + package_name: Name of the package to analyze + version: Specific version to analyze (optional) + include_dependencies: Whether to analyze dependency licenses + + Returns: + Dictionary containing license analysis results + """ + logger.info(f"Starting license analysis for package: {package_name}") + + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name, version) + + package_version = version or package_data["info"]["version"] + + # Analyze package license + license_info = self._extract_license_info(package_data) + + # Analyze dependencies if requested + dependency_licenses = [] + if include_dependencies: + dependency_licenses = await self._analyze_dependency_licenses( + package_name, package_version + ) + + # Generate compatibility analysis + compatibility_analysis = self._analyze_license_compatibility( + license_info, dependency_licenses + ) + + # Calculate risk assessment + risk_assessment = self._assess_license_risks( + license_info, dependency_licenses, compatibility_analysis + ) + + return { + "package": package_name, + "version": package_version, + "analysis_timestamp": datetime.now(timezone.utc).isoformat(), + "license_info": license_info, + "dependency_licenses": dependency_licenses, + "compatibility_analysis": compatibility_analysis, + "risk_assessment": risk_assessment, + "recommendations": self._generate_license_recommendations( + license_info, dependency_licenses, compatibility_analysis, risk_assessment + ), + "analysis_summary": { + "total_dependencies_analyzed": len(dependency_licenses), + "unique_licenses_found": len(set( + [license_info.get("normalized_license", "Unknown")] + + [dep.get("normalized_license", "Unknown") for dep in dependency_licenses] + )), + "license_conflicts": len(compatibility_analysis.get("conflicts", [])), + "review_required_count": len(compatibility_analysis.get("review_required", [])), + } + } + + except Exception as e: + logger.error(f"License analysis failed for {package_name}: {e}") + raise SearchError(f"License analysis failed: {e}") from e + + def _extract_license_info(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract and normalize license information from package data.""" + info = package_data.get("info", {}) + + # Extract license from multiple sources + license_field = info.get("license", "") + license_classifier = self._extract_license_from_classifiers( + info.get("classifiers", []) + ) + + # Normalize license + normalized_license = self._normalize_license(license_field or license_classifier) + + # Categorize license + license_category = self._categorize_license(normalized_license) + + return { + "raw_license": license_field, + "classifier_license": license_classifier, + "normalized_license": normalized_license, + "license_category": license_category, + "license_url": self._extract_license_url(info), + "license_confidence": self._assess_license_confidence( + license_field, license_classifier, normalized_license + ), + } + + def _extract_license_from_classifiers(self, classifiers: List[str]) -> str: + """Extract license information from PyPI classifiers.""" + license_classifiers = [ + c for c in classifiers if c.startswith("License ::") + ] + + if not license_classifiers: + return "" + + # Return the most specific license classifier + return license_classifiers[-1].replace("License ::", "").strip() + + def _normalize_license(self, license_text: str) -> str: + """Normalize license text to standard SPDX identifiers.""" + if not license_text: + return "Unknown" + + license_text_clean = license_text.strip() + + # Check for exact matches first + common_licenses = { + "MIT": "MIT", + "BSD": "BSD", + "Apache": "Apache-2.0", + "GPL": "GPL-3.0", # Default to GPL-3.0 if version unspecified + "LGPL": "LGPL-3.0", + "MPL": "MPL-2.0", + } + + if license_text_clean in common_licenses: + return common_licenses[license_text_clean] + + # Pattern matching + for pattern, normalized in self.license_patterns.items(): + if re.search(pattern, license_text_clean, re.IGNORECASE): + return normalized + + # Check if it contains known license names + license_lower = license_text_clean.lower() + if "mit" in license_lower: + return "MIT" + elif "bsd" in license_lower: + return "BSD" + elif "apache" in license_lower: + return "Apache-2.0" + elif "gpl" in license_lower and "lgpl" not in license_lower: + return "GPL-3.0" + elif "lgpl" in license_lower: + return "LGPL-3.0" + elif "mozilla" in license_lower or "mpl" in license_lower: + return "MPL-2.0" + elif "unlicense" in license_lower: + return "Unlicense" + elif "public domain" in license_lower: + return "Public Domain" + elif any(prop in license_lower for prop in ["proprietary", "commercial", "all rights reserved"]): + return "Proprietary" + + return "Other" + + def _categorize_license(self, normalized_license: str) -> str: + """Categorize license into major categories.""" + for category, licenses in self.license_categories.items(): + if normalized_license in licenses: + return category + return "unknown" + + def _extract_license_url(self, info: Dict[str, Any]) -> str: + """Extract license URL from package info.""" + # Check project URLs + project_urls = info.get("project_urls", {}) or {} + for key, url in project_urls.items(): + if "license" in key.lower(): + return url + + # Check home page for license info + home_page = info.get("home_page", "") + if home_page and "github.com" in home_page: + return f"{home_page.rstrip('/')}/blob/main/LICENSE" + + return "" + + def _assess_license_confidence( + self, raw_license: str, classifier_license: str, normalized_license: str + ) -> str: + """Assess confidence level in license detection.""" + if not raw_license and not classifier_license: + return "low" + + if normalized_license == "Unknown" or normalized_license == "Other": + return "low" + + if raw_license and classifier_license and raw_license in classifier_license: + return "high" + elif raw_license or classifier_license: + return "medium" + else: + return "low" + + async def _analyze_dependency_licenses( + self, package_name: str, version: str + ) -> List[Dict[str, Any]]: + """Analyze licenses of package dependencies.""" + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name, version) + + # Extract dependencies + requires_dist = package_data.get("info", {}).get("requires_dist", []) or [] + dependencies = [] + + for req in requires_dist: + # Parse dependency name (simplified) + dep_name = req.split()[0].split(">=")[0].split("==")[0].split("~=")[0].split("!=")[0] + if dep_name and not dep_name.startswith("extra"): + dependencies.append(dep_name) + + # Analyze dependency licenses (limit to top 15 to avoid overwhelming) + dependency_licenses = [] + + for dep_name in dependencies[:15]: + try: + dep_data = await client.get_package_info(dep_name) + dep_license_info = self._extract_license_info(dep_data) + + dependency_licenses.append({ + "package": dep_name, + "version": dep_data.get("info", {}).get("version", ""), + **dep_license_info + }) + except Exception as e: + logger.debug(f"Failed to analyze license for dependency {dep_name}: {e}") + dependency_licenses.append({ + "package": dep_name, + "version": "", + "normalized_license": "Unknown", + "license_category": "unknown", + "license_confidence": "low", + "error": str(e) + }) + + return dependency_licenses + + except Exception as e: + logger.warning(f"Dependency license analysis failed: {e}") + return [] + + def _analyze_license_compatibility( + self, package_license: Dict[str, Any], dependency_licenses: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Analyze license compatibility between package and its dependencies.""" + main_license = package_license.get("normalized_license", "Unknown") + + compatible = [] + incompatible = [] + review_required = [] + one_way = [] + unknown = [] + + for dep in dependency_licenses: + dep_license = dep.get("normalized_license", "Unknown") + dep_package = dep.get("package", "unknown") + + if main_license == "Unknown" or dep_license == "Unknown": + unknown.append({ + "package": dep_package, + "license": dep_license, + "reason": "License information unavailable" + }) + continue + + compatibility = self._check_license_compatibility(main_license, dep_license) + + if compatibility == "compatible": + compatible.append({ + "package": dep_package, + "license": dep_license, + }) + elif compatibility == "incompatible": + incompatible.append({ + "package": dep_package, + "license": dep_license, + "reason": f"{main_license} and {dep_license} are incompatible" + }) + elif compatibility == "review-required": + review_required.append({ + "package": dep_package, + "license": dep_license, + "reason": f"Manual review needed for {main_license} + {dep_license}" + }) + elif compatibility == "one-way": + one_way.append({ + "package": dep_package, + "license": dep_license, + "reason": f"{dep_license} can be used in {main_license} project" + }) + + return { + "main_license": main_license, + "compatible": compatible, + "incompatible": incompatible, + "review_required": review_required, + "one_way": one_way, + "unknown": unknown, + "conflicts": incompatible, # Alias for easier access + } + + def _check_license_compatibility(self, license1: str, license2: str) -> str: + """Check compatibility between two licenses.""" + if license1 in self.compatibility_matrix: + return self.compatibility_matrix[license1].get(license2, "unknown") + + # Fallback compatibility rules + if license1 == license2: + return "compatible" + + # Default to review required for unknown combinations + return "review-required" + + def _assess_license_risks( + self, + package_license: Dict[str, Any], + dependency_licenses: List[Dict[str, Any]], + compatibility_analysis: Dict[str, Any] + ) -> Dict[str, Any]: + """Assess overall license risks for the project.""" + risks = [] + risk_score = 0 + + main_license = package_license.get("normalized_license", "Unknown") + main_category = package_license.get("license_category", "unknown") + + # Check for incompatible licenses + incompatible_count = len(compatibility_analysis.get("incompatible", [])) + if incompatible_count > 0: + risks.append(f"Found {incompatible_count} incompatible license(s)") + risk_score += incompatible_count * 30 + + # Check for unknown licenses + unknown_count = len(compatibility_analysis.get("unknown", [])) + if unknown_count > 0: + risks.append(f"Found {unknown_count} dependency(ies) with unknown licenses") + risk_score += unknown_count * 10 + + # Check for review-required licenses + review_count = len(compatibility_analysis.get("review_required", [])) + if review_count > 0: + risks.append(f"Found {review_count} license(s) requiring manual review") + risk_score += review_count * 15 + + # Check for copyleft contamination risk + if main_category == "permissive": + copyleft_deps = [ + dep for dep in dependency_licenses + if dep.get("license_category") in ["copyleft_weak", "copyleft_strong"] + ] + if copyleft_deps: + risks.append(f"Permissive project using {len(copyleft_deps)} copyleft dependencies") + risk_score += len(copyleft_deps) * 20 + + # Check for proprietary license risks + proprietary_deps = [ + dep for dep in dependency_licenses + if dep.get("license_category") == "proprietary" + ] + if proprietary_deps: + risks.append(f"Found {len(proprietary_deps)} proprietary dependencies") + risk_score += len(proprietary_deps) * 25 + + # Calculate risk level + if risk_score >= 80: + risk_level = "critical" + elif risk_score >= 50: + risk_level = "high" + elif risk_score >= 25: + risk_level = "medium" + elif risk_score > 0: + risk_level = "low" + else: + risk_level = "minimal" + + return { + "risk_score": min(risk_score, 100), + "risk_level": risk_level, + "risk_factors": risks, + "compliance_status": "compliant" if risk_score < 25 else "review-needed", + } + + def _generate_license_recommendations( + self, + package_license: Dict[str, Any], + dependency_licenses: List[Dict[str, Any]], + compatibility_analysis: Dict[str, Any], + risk_assessment: Dict[str, Any] + ) -> List[str]: + """Generate actionable license recommendations.""" + recommendations = [] + + main_license = package_license.get("normalized_license", "Unknown") + risk_level = risk_assessment.get("risk_level", "unknown") + + # High-level recommendations based on risk + if risk_level == "critical": + recommendations.append("🚨 Critical license issues detected - immediate legal review required") + elif risk_level == "high": + recommendations.append("⚠️ High license risk - review and resolve conflicts before release") + elif risk_level == "medium": + recommendations.append("⚠️ Moderate license risk - review recommendations below") + elif risk_level == "minimal": + recommendations.append("✅ License compatibility appears good") + + # Specific recommendations for incompatible licenses + incompatible = compatibility_analysis.get("incompatible", []) + if incompatible: + recommendations.append(f"🔴 Remove or replace {len(incompatible)} incompatible dependencies:") + for dep in incompatible[:3]: # Show first 3 + recommendations.append(f" - {dep['package']} ({dep['license']}): {dep.get('reason', '')}") + + # Recommendations for review-required licenses + review_required = compatibility_analysis.get("review_required", []) + if review_required: + recommendations.append(f"📋 Manual review needed for {len(review_required)} dependencies:") + for dep in review_required[:3]: + recommendations.append(f" - {dep['package']} ({dep['license']})") + + # Unknown license recommendations + unknown = compatibility_analysis.get("unknown", []) + if unknown: + recommendations.append(f"❓ Investigate {len(unknown)} dependencies with unknown licenses") + + # License confidence recommendations + if package_license.get("license_confidence") == "low": + recommendations.append("📝 Consider adding clear license information to your package") + + # Category-specific recommendations + main_category = package_license.get("license_category", "unknown") + if main_category == "copyleft_strong": + recommendations.append("ℹ️ GPL license requires derivative works to also be GPL") + elif main_category == "permissive": + recommendations.append("ℹ️ Permissive license allows flexible usage") + + return recommendations + + +# Main analysis functions +async def analyze_package_license_compatibility( + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True +) -> Dict[str, Any]: + """ + Analyze license compatibility for a PyPI package. + + Args: + package_name: Name of the package to analyze + version: Specific version to analyze (optional) + include_dependencies: Whether to analyze dependency licenses + + Returns: + Comprehensive license compatibility analysis + """ + analyzer = LicenseCompatibilityAnalyzer() + return await analyzer.analyze_package_license( + package_name, version, include_dependencies + ) + + +async def check_license_compliance_bulk( + package_names: List[str], + target_license: Optional[str] = None +) -> Dict[str, Any]: + """ + Check license compliance for multiple packages. + + Args: + package_names: List of package names to check + target_license: Target license for compatibility checking + + Returns: + Bulk license compliance report + """ + logger.info(f"Starting bulk license compliance check for {len(package_names)} packages") + + analyzer = LicenseCompatibilityAnalyzer() + results = {} + summary = { + "total_packages": len(package_names), + "compliant_packages": 0, + "non_compliant_packages": 0, + "unknown_license_packages": 0, + "high_risk_packages": [], + "analysis_timestamp": datetime.now(timezone.utc).isoformat() + } + + # Analyze packages in parallel batches + batch_size = 5 + for i in range(0, len(package_names), batch_size): + batch = package_names[i:i + batch_size] + batch_tasks = [ + analyzer.analyze_package_license(pkg_name, include_dependencies=False) + for pkg_name in batch + ] + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for pkg_name, result in zip(batch, batch_results): + if isinstance(result, Exception): + results[pkg_name] = { + "error": str(result), + "analysis_status": "failed" + } + summary["unknown_license_packages"] += 1 + else: + results[pkg_name] = result + + # Update summary + risk_level = result.get("risk_assessment", {}).get("risk_level", "unknown") + if risk_level in ["minimal", "low"]: + summary["compliant_packages"] += 1 + else: + summary["non_compliant_packages"] += 1 + + if risk_level in ["high", "critical"]: + summary["high_risk_packages"].append({ + "package": pkg_name, + "license": result.get("license_info", {}).get("normalized_license", "Unknown"), + "risk_level": risk_level + }) + + return { + "summary": summary, + "detailed_results": results, + "target_license": target_license, + "recommendations": _generate_bulk_license_recommendations(summary, results) + } + + +def _generate_bulk_license_recommendations(summary: Dict[str, Any], results: Dict[str, Any]) -> List[str]: + """Generate recommendations for bulk license analysis.""" + recommendations = [] + + compliant = summary["compliant_packages"] + total = summary["total_packages"] + + if compliant == total: + recommendations.append("✅ All packages appear to have compliant licenses") + else: + non_compliant = summary["non_compliant_packages"] + percentage = (non_compliant / total) * 100 + recommendations.append( + f"⚠️ {non_compliant}/{total} packages ({percentage:.1f}%) have license compliance issues" + ) + + high_risk = summary["high_risk_packages"] + if high_risk: + recommendations.append( + f"🚨 {len(high_risk)} packages are high risk: {', '.join([p['package'] for p in high_risk])}" + ) + recommendations.append("Priority: Address high-risk packages immediately") + + unknown = summary["unknown_license_packages"] + if unknown > 0: + recommendations.append(f"❓ {unknown} packages have unknown or unclear licenses") + recommendations.append("Consider investigating these packages for license clarity") + + return recommendations \ No newline at end of file diff --git a/pypi_query_mcp/tools/license_tools.py b/pypi_query_mcp/tools/license_tools.py new file mode 100644 index 0000000..fa547a9 --- /dev/null +++ b/pypi_query_mcp/tools/license_tools.py @@ -0,0 +1,154 @@ +"""License compatibility analysis tools for PyPI packages.""" + +import logging +from typing import Any, Dict, List, Optional + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..tools.license_analyzer import analyze_package_license_compatibility, check_license_compliance_bulk + +logger = logging.getLogger(__name__) + + +async def analyze_pypi_package_license( + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True +) -> Dict[str, Any]: + """ + Analyze license compatibility for a PyPI package. + + This tool provides comprehensive license analysis including license identification, + dependency license scanning, compatibility checking, and risk assessment to help + ensure your project complies with open source license requirements. + + Args: + package_name: Name of the package to analyze for license compatibility + version: Specific version to analyze (optional, defaults to latest version) + include_dependencies: Whether to analyze dependency licenses for compatibility + + Returns: + Dictionary containing comprehensive license analysis including: + - License identification and normalization (SPDX format) + - License categorization (permissive, copyleft, proprietary, etc.) + - Dependency license analysis and compatibility matrix + - Risk assessment with score and risk level (minimal, low, medium, high, critical) + - Compatibility analysis highlighting conflicts and review-required combinations + - Actionable recommendations for license compliance + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If license analysis fails + """ + if not package_name or not package_name.strip(): + raise InvalidPackageNameError(package_name) + + logger.info(f"MCP tool: Analyzing license compatibility for package {package_name}") + + try: + result = await analyze_package_license_compatibility( + package_name=package_name, + version=version, + include_dependencies=include_dependencies + ) + + logger.info(f"MCP tool: License analysis completed for {package_name} - {result.get('analysis_summary', {}).get('license_conflicts', 0)} conflicts found") + return result + + except (InvalidPackageNameError, NetworkError, SearchError) as e: + logger.error(f"Error analyzing license for {package_name}: {e}") + return { + "error": f"License analysis failed: {e}", + "error_type": type(e).__name__, + "package": package_name, + "version": version, + "analysis_timestamp": "", + "license_info": { + "normalized_license": "Unknown", + "license_category": "unknown", + "license_confidence": "low", + }, + "dependency_licenses": [], + "compatibility_analysis": { + "main_license": "Unknown", + "compatible": [], + "incompatible": [], + "review_required": [], + "conflicts": [], + }, + "risk_assessment": { + "risk_score": 100, + "risk_level": "critical", + "risk_factors": [f"License analysis failed: {e}"], + "compliance_status": "unknown", + }, + "recommendations": [f"❌ License analysis failed: {e}"], + "analysis_summary": { + "total_dependencies_analyzed": 0, + "unique_licenses_found": 0, + "license_conflicts": 0, + "review_required_count": 0, + } + } + + +async def check_bulk_license_compliance( + package_names: List[str], + target_license: Optional[str] = None +) -> Dict[str, Any]: + """ + Check license compliance for multiple PyPI packages. + + This tool performs bulk license compliance checking across multiple packages, + providing a consolidated report to help ensure your entire package ecosystem + complies with license requirements and identifying potential legal risks. + + Args: + package_names: List of package names to check for license compliance + target_license: Target license for compatibility checking (optional) + + Returns: + Dictionary containing bulk compliance analysis including: + - Summary statistics (total packages, compliant/non-compliant counts) + - Detailed license analysis for each package + - High-risk packages requiring immediate attention + - Unknown license packages needing investigation + - Prioritized recommendations for compliance remediation + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during analysis + SearchError: If bulk compliance checking fails + """ + if not package_names: + raise ValueError("Package names list cannot be empty") + + logger.info(f"MCP tool: Starting bulk license compliance check for {len(package_names)} packages") + + try: + result = await check_license_compliance_bulk( + package_names=package_names, + target_license=target_license + ) + + logger.info(f"MCP tool: Bulk license compliance completed - {result.get('summary', {}).get('non_compliant_packages', 0)} non-compliant packages found") + return result + + except (ValueError, NetworkError, SearchError) as e: + logger.error(f"Error in bulk license compliance check: {e}") + return { + "error": f"Bulk license compliance check failed: {e}", + "error_type": type(e).__name__, + "summary": { + "total_packages": len(package_names), + "compliant_packages": 0, + "non_compliant_packages": 0, + "unknown_license_packages": len(package_names), + "high_risk_packages": [], + "analysis_timestamp": "" + }, + "detailed_results": {}, + "target_license": target_license, + "recommendations": [f"❌ Bulk license compliance check failed: {e}"] + } \ No newline at end of file diff --git a/pypi_query_mcp/tools/requirements_analyzer.py b/pypi_query_mcp/tools/requirements_analyzer.py new file mode 100644 index 0000000..0aba3d0 --- /dev/null +++ b/pypi_query_mcp/tools/requirements_analyzer.py @@ -0,0 +1,947 @@ +"""Requirements file parsing and analysis tools for Python projects.""" + +import asyncio +import logging +import re +import tomllib +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..core.pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class RequirementsAnalyzer: + """Comprehensive requirements file analyzer for Python projects.""" + + def __init__(self): + self.timeout = 30.0 + + # Supported requirement file patterns + self.requirement_patterns = { + "requirements.txt": r"requirements.*\.txt", + "pyproject.toml": r"pyproject\.toml", + "setup.py": r"setup\.py", + "Pipfile": r"Pipfile", + "poetry.lock": r"poetry\.lock", + "conda.yml": r"(conda|environment)\.ya?ml", + } + + # Version specifier patterns + self.version_patterns = { + "exact": r"==\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "gte": r">=\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "gt": r">\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "lte": r"<=\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "lt": r"<\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "compatible": r"~=\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + "not_equal": r"!=\s*([0-9]+(?:\.[0-9]+)*(?:[a-zA-Z][0-9]*)?)", + } + + async def analyze_requirements_file( + self, + file_path: str, + check_updates: bool = True, + security_scan: bool = True, + compatibility_check: bool = True + ) -> Dict[str, Any]: + """ + Analyze a requirements file for dependencies, versions, security, and compatibility. + + Args: + file_path: Path to the requirements file + check_updates: Whether to check for package updates + security_scan: Whether to perform security vulnerability scanning + compatibility_check: Whether to check Python version compatibility + + Returns: + Dictionary containing comprehensive requirements analysis + """ + logger.info(f"Starting requirements analysis for: {file_path}") + + try: + # Parse requirements file + parsed_requirements = await self._parse_requirements_file(file_path) + + if not parsed_requirements["dependencies"]: + return { + "file_path": file_path, + "analysis_timestamp": datetime.now(timezone.utc).isoformat(), + "file_info": parsed_requirements["file_info"], + "dependencies": [], + "analysis_summary": { + "total_dependencies": 0, + "outdated_packages": 0, + "security_vulnerabilities": 0, + "compatibility_issues": 0, + }, + "recommendations": ["No dependencies found to analyze"], + "error": "No dependencies found in requirements file" + } + + # Analyze dependencies in parallel + analysis_tasks = [] + + # Basic dependency analysis (always done) + analysis_tasks.append(self._analyze_dependency_health(parsed_requirements["dependencies"])) + + # Optional analyses + if check_updates: + analysis_tasks.append(self._check_package_updates(parsed_requirements["dependencies"])) + else: + analysis_tasks.append(asyncio.create_task(self._empty_updates_result())) + + if security_scan: + analysis_tasks.append(self._scan_dependencies_security(parsed_requirements["dependencies"])) + else: + analysis_tasks.append(asyncio.create_task(self._empty_security_result())) + + if compatibility_check: + python_version = parsed_requirements.get("python_version") + analysis_tasks.append(self._check_dependencies_compatibility(parsed_requirements["dependencies"], python_version)) + else: + analysis_tasks.append(asyncio.create_task(self._empty_compatibility_result())) + + # Execute analyses + results = await asyncio.gather(*analysis_tasks, return_exceptions=True) + + # Unpack results + health_analysis = results[0] if not isinstance(results[0], Exception) else {"healthy": [], "issues": []} + update_analysis = results[1] if not isinstance(results[1], Exception) else {"outdated": [], "current": []} + security_analysis = results[2] if not isinstance(results[2], Exception) else {"vulnerabilities": [], "secure": []} + compatibility_analysis = results[3] if not isinstance(results[3], Exception) else {"compatible": [], "incompatible": []} + + # Generate comprehensive analysis + analysis_summary = self._generate_analysis_summary( + parsed_requirements["dependencies"], + health_analysis, + update_analysis, + security_analysis, + compatibility_analysis + ) + + recommendations = self._generate_requirements_recommendations( + parsed_requirements, + health_analysis, + update_analysis, + security_analysis, + compatibility_analysis, + analysis_summary + ) + + return { + "file_path": file_path, + "analysis_timestamp": datetime.now(timezone.utc).isoformat(), + "file_info": parsed_requirements["file_info"], + "dependencies": parsed_requirements["dependencies"], + "dependency_analysis": { + "health": health_analysis, + "updates": update_analysis if check_updates else None, + "security": security_analysis if security_scan else None, + "compatibility": compatibility_analysis if compatibility_check else None, + }, + "analysis_summary": analysis_summary, + "recommendations": recommendations, + "python_requirements": parsed_requirements.get("python_version"), + } + + except Exception as e: + logger.error(f"Requirements analysis failed for {file_path}: {e}") + raise SearchError(f"Requirements analysis failed: {e}") from e + + async def _parse_requirements_file(self, file_path: str) -> Dict[str, Any]: + """Parse requirements from various file formats.""" + path = Path(file_path) + + if not path.exists(): + raise FileNotFoundError(f"Requirements file not found: {file_path}") + + file_info = { + "name": path.name, + "format": self._detect_file_format(path.name), + "size_bytes": path.stat().st_size, + "modified_time": datetime.fromtimestamp(path.stat().st_mtime, timezone.utc).isoformat(), + } + + # Parse based on file format + if path.name.endswith('.txt'): + dependencies, python_version = await self._parse_requirements_txt(path) + elif path.name == 'pyproject.toml': + dependencies, python_version = await self._parse_pyproject_toml(path) + elif path.name == 'setup.py': + dependencies, python_version = await self._parse_setup_py(path) + elif path.name == 'Pipfile': + dependencies, python_version = await self._parse_pipfile(path) + elif path.name.endswith('.yml') or path.name.endswith('.yaml'): + dependencies, python_version = await self._parse_conda_yml(path) + else: + # Try to parse as requirements.txt format + dependencies, python_version = await self._parse_requirements_txt(path) + + return { + "file_info": file_info, + "dependencies": dependencies, + "python_version": python_version, + } + + def _detect_file_format(self, filename: str) -> str: + """Detect requirements file format.""" + filename_lower = filename.lower() + + for fmt, pattern in self.requirement_patterns.items(): + if re.match(pattern, filename_lower): + return fmt + + return "unknown" + + async def _parse_requirements_txt(self, path: Path) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """Parse requirements.txt format files.""" + dependencies = [] + python_version = None + + try: + content = path.read_text(encoding="utf-8") + lines = content.splitlines() + + for line_num, line in enumerate(lines, 1): + line = line.strip() + + # Skip comments and empty lines + if not line or line.startswith('#'): + continue + + # Skip -r and -e directives (for now) + if line.startswith(('-r', '-e', '--')): + continue + + # Parse requirement line + dep = self._parse_requirement_line(line, line_num) + if dep: + dependencies.append(dep) + + except Exception as e: + logger.warning(f"Failed to parse requirements.txt {path}: {e}") + + return dependencies, python_version + + async def _parse_pyproject_toml(self, path: Path) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """Parse pyproject.toml files.""" + dependencies = [] + python_version = None + + try: + content = path.read_text(encoding="utf-8") + data = tomllib.loads(content) + + # Extract Python version requirement + build_system = data.get("build-system", {}) + project = data.get("project", {}) + tool_poetry = data.get("tool", {}).get("poetry", {}) + + # Check for Python version in different places + if project.get("requires-python"): + python_version = project["requires-python"] + elif tool_poetry.get("dependencies", {}).get("python"): + python_version = tool_poetry["dependencies"]["python"] + + # Extract dependencies from project.dependencies + if "dependencies" in project: + for dep_line in project["dependencies"]: + dep = self._parse_requirement_line(dep_line, 0) + if dep: + dependencies.append(dep) + + # Extract from tool.poetry.dependencies + if "tool" in data and "poetry" in data["tool"] and "dependencies" in data["tool"]["poetry"]: + poetry_deps = data["tool"]["poetry"]["dependencies"] + for name, version_spec in poetry_deps.items(): + if name.lower() == "python": + continue # Skip Python version + + if isinstance(version_spec, str): + req_line = f"{name}{version_spec}" if version_spec.startswith(('=', '<', '>', '~', '^', '!')) else f"{name}=={version_spec}" + else: + # Handle complex version specifications + req_line = f"{name}>={version_spec.get('version', '0.0.0')}" + + dep = self._parse_requirement_line(req_line, 0) + if dep: + dependencies.append(dep) + + except Exception as e: + logger.warning(f"Failed to parse pyproject.toml {path}: {e}") + + return dependencies, python_version + + async def _parse_setup_py(self, path: Path) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """Parse setup.py files (basic extraction).""" + dependencies = [] + python_version = None + + try: + content = path.read_text(encoding="utf-8") + + # Look for install_requires + install_requires_match = re.search(r"install_requires\s*=\s*\[(.*?)\]", content, re.DOTALL) + if install_requires_match: + deps_text = install_requires_match.group(1) + # Extract quoted strings + quoted_deps = re.findall(r'["\']([^"\']+)["\']', deps_text) + + for dep_line in quoted_deps: + dep = self._parse_requirement_line(dep_line, 0) + if dep: + dependencies.append(dep) + + # Look for python_requires + python_requires_match = re.search(r"python_requires\s*=\s*[\"']([^\"']+)[\"']", content) + if python_requires_match: + python_version = python_requires_match.group(1) + + except Exception as e: + logger.warning(f"Failed to parse setup.py {path}: {e}") + + return dependencies, python_version + + async def _parse_pipfile(self, path: Path) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """Parse Pipfile format.""" + dependencies = [] + python_version = None + + try: + content = path.read_text(encoding="utf-8") + data = tomllib.loads(content) + + # Extract Python version + if "requires" in data and "python_version" in data["requires"]: + python_version = f">={data['requires']['python_version']}" + + # Extract packages + for section in ["packages", "dev-packages"]: + if section in data: + for name, version_spec in data[section].items(): + if isinstance(version_spec, str): + req_line = f"{name}{version_spec}" if version_spec.startswith(('=', '<', '>', '~', '^', '!')) else f"{name}=={version_spec}" + else: + req_line = f"{name}>={version_spec.get('version', '0.0.0')}" + + dep = self._parse_requirement_line(req_line, 0) + if dep: + dep["dev_dependency"] = (section == "dev-packages") + dependencies.append(dep) + + except Exception as e: + logger.warning(f"Failed to parse Pipfile {path}: {e}") + + return dependencies, python_version + + async def _parse_conda_yml(self, path: Path) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """Parse conda environment.yml files.""" + dependencies = [] + python_version = None + + try: + import yaml + + content = path.read_text(encoding="utf-8") + data = yaml.safe_load(content) + + if "dependencies" in data: + for dep in data["dependencies"]: + if isinstance(dep, str): + if dep.startswith("python"): + # Extract Python version + python_match = re.search(r"python\s*([><=~!]+)\s*([0-9.]+)", dep) + if python_match: + python_version = f"{python_match.group(1)}{python_match.group(2)}" + else: + parsed_dep = self._parse_requirement_line(dep, 0) + if parsed_dep: + dependencies.append(parsed_dep) + + except Exception as e: + logger.warning(f"Failed to parse conda.yml {path}: {e}") + + return dependencies, python_version + + def _parse_requirement_line(self, line: str, line_number: int) -> Optional[Dict[str, Any]]: + """Parse a single requirement line.""" + try: + # Remove inline comments + if '#' in line: + line = line[:line.index('#')].strip() + + if not line: + return None + + # Handle extras (package[extra1,extra2]) + extras = [] + extras_match = re.search(r'\[([^\]]+)\]', line) + if extras_match: + extras = [e.strip() for e in extras_match.group(1).split(',')] + line = re.sub(r'\[([^\]]+)\]', '', line) + + # Parse package name and version specifiers + # Split on version operators + version_ops = ['>=', '<=', '==', '!=', '~=', '>', '<'] + package_name = line + version_specifiers = [] + + for op in version_ops: + if op in line: + parts = line.split(op) + package_name = parts[0].strip() + if len(parts) > 1: + version_specifiers.append({ + "operator": op, + "version": parts[1].strip().split(',')[0].strip() + }) + break + + # Handle comma-separated version specs + if ',' in line and version_specifiers: + remaining = line.split(version_specifiers[0]["operator"], 1)[1] + for spec in remaining.split(',')[1:]: + spec = spec.strip() + for op in version_ops: + if spec.startswith(op): + version_specifiers.append({ + "operator": op, + "version": spec[len(op):].strip() + }) + break + + # Clean package name + package_name = re.sub(r'[<>=!~,\s].*', '', package_name).strip() + + if not package_name: + return None + + return { + "name": package_name, + "version_specifiers": version_specifiers, + "extras": extras, + "line_number": line_number, + "raw_line": line.strip(), + } + + except Exception as e: + logger.debug(f"Failed to parse requirement line '{line}': {e}") + return None + + async def _analyze_dependency_health(self, dependencies: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze overall health of dependencies.""" + healthy = [] + issues = [] + + for dep in dependencies: + name = dep["name"] + version_specs = dep["version_specifiers"] + + # Check for problematic version specifications + health_issues = [] + + if not version_specs: + health_issues.append("No version constraint (could lead to instability)") + else: + # Check for overly restrictive versions + exact_versions = [spec for spec in version_specs if spec["operator"] == "=="] + if exact_versions: + health_issues.append("Exact version pinning (may cause conflicts)") + + # Check for very loose constraints + loose_constraints = [spec for spec in version_specs if spec["operator"] in [">", ">="]] + if loose_constraints and not any(spec["operator"] in ["<", "<="] for spec in version_specs): + health_issues.append("No upper bound (may break with future versions)") + + if health_issues: + issues.append({ + "package": name, + "issues": health_issues, + "current_spec": version_specs + }) + else: + healthy.append({ + "package": name, + "version_spec": version_specs + }) + + return { + "healthy": healthy, + "issues": issues, + "health_score": len(healthy) / len(dependencies) * 100 if dependencies else 0 + } + + async def _check_package_updates(self, dependencies: List[Dict[str, Any]]) -> Dict[str, Any]: + """Check for available package updates.""" + outdated = [] + current = [] + + async with PyPIClient() as client: + # Process in batches to avoid overwhelming PyPI + batch_size = 10 + for i in range(0, len(dependencies), batch_size): + batch = dependencies[i:i + batch_size] + batch_tasks = [] + + for dep in batch: + task = self._check_single_package_update(client, dep) + batch_tasks.append(task) + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for dep, result in zip(batch, batch_results): + if isinstance(result, Exception): + logger.debug(f"Failed to check updates for {dep['name']}: {result}") + continue + + if result["has_update"]: + outdated.append(result) + else: + current.append(result) + + return { + "outdated": outdated, + "current": current, + "update_percentage": len(outdated) / len(dependencies) * 100 if dependencies else 0 + } + + async def _check_single_package_update(self, client: PyPIClient, dep: Dict[str, Any]) -> Dict[str, Any]: + """Check if a single package has updates available.""" + try: + package_data = await client.get_package_info(dep["name"]) + latest_version = package_data["info"]["version"] + + # For now, we'll do a simple comparison + # In a real implementation, you'd want proper version comparison + has_update = True # Placeholder logic + + return { + "package": dep["name"], + "current_spec": dep["version_specifiers"], + "latest_version": latest_version, + "has_update": has_update, + "update_recommendation": f"Update to {latest_version}" + } + + except Exception as e: + return { + "package": dep["name"], + "current_spec": dep["version_specifiers"], + "latest_version": "unknown", + "has_update": False, + "error": str(e) + } + + async def _scan_dependencies_security(self, dependencies: List[Dict[str, Any]]) -> Dict[str, Any]: + """Scan dependencies for security vulnerabilities.""" + # Import security scanner if available + try: + from .security import scan_package_security + + vulnerabilities = [] + secure = [] + + # Process in small batches + batch_size = 5 + for i in range(0, len(dependencies), batch_size): + batch = dependencies[i:i + batch_size] + batch_tasks = [] + + for dep in batch: + task = self._scan_single_dependency_security(dep) + batch_tasks.append(task) + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for dep, result in zip(batch, batch_results): + if isinstance(result, Exception): + logger.debug(f"Failed to scan security for {dep['name']}: {result}") + continue + + if result["vulnerabilities"]: + vulnerabilities.append(result) + else: + secure.append(result) + + return { + "vulnerabilities": vulnerabilities, + "secure": secure, + "vulnerability_count": sum(len(v["vulnerabilities"]) for v in vulnerabilities), + } + + except ImportError: + logger.warning("Security scanner not available") + return await self._empty_security_result() + + async def _scan_single_dependency_security(self, dep: Dict[str, Any]) -> Dict[str, Any]: + """Scan a single dependency for security issues.""" + try: + from .security import scan_package_security + + result = await scan_package_security( + dep["name"], + version=None, # Latest version + include_dependencies=False + ) + + vuln_summary = result.get("security_summary", {}) + return { + "package": dep["name"], + "vulnerabilities": result.get("vulnerabilities", {}).get("direct", []), + "risk_level": vuln_summary.get("risk_level", "minimal"), + "total_vulnerabilities": vuln_summary.get("total_vulnerabilities", 0) + } + + except Exception as e: + return { + "package": dep["name"], + "vulnerabilities": [], + "risk_level": "unknown", + "error": str(e) + } + + async def _check_dependencies_compatibility( + self, dependencies: List[Dict[str, Any]], python_version: Optional[str] + ) -> Dict[str, Any]: + """Check Python version compatibility for dependencies.""" + if not python_version: + return await self._empty_compatibility_result() + + compatible = [] + incompatible = [] + + # Process in batches + batch_size = 10 + for i in range(0, len(dependencies), batch_size): + batch = dependencies[i:i + batch_size] + batch_tasks = [] + + for dep in batch: + task = self._check_single_dependency_compatibility(dep, python_version) + batch_tasks.append(task) + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for dep, result in zip(batch, batch_results): + if isinstance(result, Exception): + logger.debug(f"Failed to check compatibility for {dep['name']}: {result}") + continue + + if result["compatible"]: + compatible.append(result) + else: + incompatible.append(result) + + return { + "compatible": compatible, + "incompatible": incompatible, + "python_version": python_version, + "compatibility_percentage": len(compatible) / len(dependencies) * 100 if dependencies else 0 + } + + async def _check_single_dependency_compatibility( + self, dep: Dict[str, Any], python_version: str + ) -> Dict[str, Any]: + """Check compatibility for a single dependency.""" + try: + from .compatibility_check import check_python_compatibility + + # Extract target Python version (simplified) + target_version = "3.9" # Default fallback + version_match = re.search(r'(\d+\.\d+)', python_version) + if version_match: + target_version = version_match.group(1) + + result = await check_python_compatibility(dep["name"], target_version) + + return { + "package": dep["name"], + "compatible": result.get("compatible", False), + "python_version": target_version, + "details": result.get("compatibility_info", "") + } + + except Exception as e: + return { + "package": dep["name"], + "compatible": True, # Assume compatible on error + "python_version": python_version, + "error": str(e) + } + + # Helper methods for empty results + async def _empty_updates_result(self) -> Dict[str, Any]: + return {"outdated": [], "current": [], "update_percentage": 0} + + async def _empty_security_result(self) -> Dict[str, Any]: + return {"vulnerabilities": [], "secure": [], "vulnerability_count": 0} + + async def _empty_compatibility_result(self) -> Dict[str, Any]: + return {"compatible": [], "incompatible": [], "python_version": None, "compatibility_percentage": 100} + + def _generate_analysis_summary( + self, + dependencies: List[Dict[str, Any]], + health_analysis: Dict[str, Any], + update_analysis: Dict[str, Any], + security_analysis: Dict[str, Any], + compatibility_analysis: Dict[str, Any] + ) -> Dict[str, Any]: + """Generate comprehensive analysis summary.""" + return { + "total_dependencies": len(dependencies), + "health_score": round(health_analysis.get("health_score", 0), 1), + "packages_with_issues": len(health_analysis.get("issues", [])), + "outdated_packages": len(update_analysis.get("outdated", [])), + "security_vulnerabilities": security_analysis.get("vulnerability_count", 0), + "compatibility_issues": len(compatibility_analysis.get("incompatible", [])), + "overall_risk_level": self._calculate_overall_risk_level( + health_analysis, update_analysis, security_analysis, compatibility_analysis + ) + } + + def _calculate_overall_risk_level( + self, health: Dict[str, Any], updates: Dict[str, Any], + security: Dict[str, Any], compatibility: Dict[str, Any] + ) -> str: + """Calculate overall risk level for the project.""" + risk_score = 0 + + # Health risks + health_score = health.get("health_score", 100) + if health_score < 50: + risk_score += 30 + elif health_score < 75: + risk_score += 15 + + # Security risks + vuln_count = security.get("vulnerability_count", 0) + if vuln_count > 10: + risk_score += 40 + elif vuln_count > 5: + risk_score += 25 + elif vuln_count > 0: + risk_score += 15 + + # Compatibility risks + incompat_count = len(compatibility.get("incompatible", [])) + if incompat_count > 5: + risk_score += 25 + elif incompat_count > 0: + risk_score += 10 + + # Update risks (outdated packages) + outdated_count = len(updates.get("outdated", [])) + total_deps = len(updates.get("outdated", [])) + len(updates.get("current", [])) + if total_deps > 0: + outdated_percentage = (outdated_count / total_deps) * 100 + if outdated_percentage > 50: + risk_score += 20 + elif outdated_percentage > 25: + risk_score += 10 + + # Calculate risk level + if risk_score >= 70: + return "critical" + elif risk_score >= 50: + return "high" + elif risk_score >= 30: + return "medium" + elif risk_score > 0: + return "low" + else: + return "minimal" + + def _generate_requirements_recommendations( + self, + parsed_requirements: Dict[str, Any], + health_analysis: Dict[str, Any], + update_analysis: Dict[str, Any], + security_analysis: Dict[str, Any], + compatibility_analysis: Dict[str, Any], + summary: Dict[str, Any] + ) -> List[str]: + """Generate actionable recommendations for requirements management.""" + recommendations = [] + + risk_level = summary.get("overall_risk_level", "minimal") + + # Overall assessment + if risk_level == "critical": + recommendations.append("🚨 Critical issues detected - immediate action required") + elif risk_level == "high": + recommendations.append("⚠️ High risk dependencies - review and update urgently") + elif risk_level == "medium": + recommendations.append("⚠️ Moderate risk - address issues when possible") + elif risk_level == "minimal": + recommendations.append("✅ Requirements appear healthy") + + # Specific recommendations + health_issues = health_analysis.get("issues", []) + if health_issues: + recommendations.append(f"🔧 Fix {len(health_issues)} dependency specification issues") + + outdated_count = len(update_analysis.get("outdated", [])) + if outdated_count > 0: + recommendations.append(f"📦 Update {outdated_count} outdated packages") + + vuln_count = security_analysis.get("vulnerability_count", 0) + if vuln_count > 0: + recommendations.append(f"🔒 Address {vuln_count} security vulnerabilities") + + incompat_count = len(compatibility_analysis.get("incompatible", [])) + if incompat_count > 0: + recommendations.append(f"🐍 Fix {incompat_count} Python compatibility issues") + + # File format recommendations + file_format = parsed_requirements["file_info"]["format"] + if file_format == "requirements.txt": + recommendations.append("💡 Consider migrating to pyproject.toml for better dependency management") + elif file_format == "unknown": + recommendations.append("📝 Use standard requirements file formats (requirements.txt, pyproject.toml)") + + return recommendations + + +# Main analysis functions +async def analyze_project_requirements( + file_path: str, + check_updates: bool = True, + security_scan: bool = True, + compatibility_check: bool = True +) -> Dict[str, Any]: + """ + Analyze project requirements file for dependencies, security, and compatibility. + + Args: + file_path: Path to the requirements file + check_updates: Whether to check for package updates + security_scan: Whether to perform security vulnerability scanning + compatibility_check: Whether to check Python version compatibility + + Returns: + Comprehensive requirements file analysis + """ + analyzer = RequirementsAnalyzer() + return await analyzer.analyze_requirements_file( + file_path, check_updates, security_scan, compatibility_check + ) + + +async def compare_requirements_files( + file_paths: List[str] +) -> Dict[str, Any]: + """ + Compare multiple requirements files to identify differences and conflicts. + + Args: + file_paths: List of paths to requirements files to compare + + Returns: + Comparative analysis of requirements files + """ + logger.info(f"Starting requirements comparison for {len(file_paths)} files") + + analyzer = RequirementsAnalyzer() + file_analyses = {} + + # Analyze each file + for file_path in file_paths: + try: + analysis = await analyzer.analyze_requirements_file( + file_path, check_updates=False, security_scan=False, compatibility_check=False + ) + file_analyses[file_path] = analysis + except Exception as e: + logger.error(f"Failed to analyze {file_path}: {e}") + file_analyses[file_path] = {"error": str(e), "dependencies": []} + + # Compare dependencies + all_packages = set() + for analysis in file_analyses.values(): + if "dependencies" in analysis: + for dep in analysis["dependencies"]: + all_packages.add(dep["name"]) + + # Generate comparison results + conflicts = [] + common_packages = [] + unique_packages = {} + + for package in all_packages: + versions_by_file = {} + for file_path, analysis in file_analyses.items(): + if "dependencies" in analysis: + for dep in analysis["dependencies"]: + if dep["name"] == package: + versions_by_file[file_path] = dep["version_specifiers"] + break + + if len(versions_by_file) == len(file_paths): + # Package is in all files + version_specs = list(versions_by_file.values()) + if len(set(str(spec) for spec in version_specs)) > 1: + conflicts.append({ + "package": package, + "versions_by_file": versions_by_file + }) + else: + common_packages.append(package) + else: + # Package is unique to some files + for file_path, versions in versions_by_file.items(): + if file_path not in unique_packages: + unique_packages[file_path] = [] + unique_packages[file_path].append({ + "package": package, + "version_specifiers": versions + }) + + return { + "comparison_timestamp": datetime.now(timezone.utc).isoformat(), + "files_compared": len(file_paths), + "file_analyses": file_analyses, + "comparison_results": { + "total_unique_packages": len(all_packages), + "common_packages": common_packages, + "conflicting_packages": conflicts, + "unique_to_files": unique_packages, + }, + "recommendations": _generate_comparison_recommendations(conflicts, unique_packages, file_analyses) + } + + +def _generate_comparison_recommendations( + conflicts: List[Dict[str, Any]], + unique_packages: Dict[str, List[Dict[str, Any]]], + file_analyses: Dict[str, Any] +) -> List[str]: + """Generate recommendations for requirements file comparison.""" + recommendations = [] + + if conflicts: + recommendations.append(f"🔄 Resolve {len(conflicts)} version conflicts across files") + for conflict in conflicts[:3]: # Show first 3 + recommendations.append(f" - {conflict['package']}: inconsistent versions") + + if unique_packages: + total_unique = sum(len(packages) for packages in unique_packages.values()) + recommendations.append(f"📦 {total_unique} packages are unique to specific files") + + if not conflicts and not unique_packages: + recommendations.append("✅ All requirements files are consistent") + + # File format recommendations + formats = set() + for analysis in file_analyses.values(): + if "file_info" in analysis: + formats.add(analysis["file_info"]["format"]) + + if len(formats) > 1: + recommendations.append("📝 Consider standardizing on a single requirements file format") + + return recommendations \ No newline at end of file diff --git a/pypi_query_mcp/tools/requirements_tools.py b/pypi_query_mcp/tools/requirements_tools.py new file mode 100644 index 0000000..86eb3b7 --- /dev/null +++ b/pypi_query_mcp/tools/requirements_tools.py @@ -0,0 +1,143 @@ +"""Requirements file analysis tools for Python projects.""" + +import logging +from typing import Any, Dict, List + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..tools.requirements_analyzer import analyze_project_requirements, compare_requirements_files + +logger = logging.getLogger(__name__) + + +async def analyze_requirements_file_tool( + file_path: str, + check_updates: bool = True, + security_scan: bool = True, + compatibility_check: bool = True +) -> Dict[str, Any]: + """ + Analyze project requirements file for dependencies, security, and compatibility. + + This tool provides comprehensive analysis of Python project requirements files + including dependency parsing, version checking, security vulnerability scanning, + Python compatibility assessment, and actionable recommendations for improvements. + + Args: + file_path: Path to the requirements file (requirements.txt, pyproject.toml, setup.py, etc.) + check_updates: Whether to check for available package updates + security_scan: Whether to perform security vulnerability scanning on dependencies + compatibility_check: Whether to check Python version compatibility for all dependencies + + Returns: + Dictionary containing comprehensive requirements analysis including: + - File information and detected format (requirements.txt, pyproject.toml, etc.) + - Parsed dependencies with version specifiers and extras + - Dependency health analysis with specification issues and recommendations + - Package update analysis showing outdated packages and latest versions + - Security vulnerability scan results for all dependencies + - Python version compatibility assessment + - Overall risk level and actionable improvement recommendations + + Raises: + FileNotFoundError: If the requirements file is not found + NetworkError: For network-related errors during analysis + SearchError: If requirements analysis fails + """ + logger.info(f"MCP tool: Analyzing requirements file {file_path}") + + try: + result = await analyze_project_requirements( + file_path=file_path, + check_updates=check_updates, + security_scan=security_scan, + compatibility_check=compatibility_check + ) + + summary = result.get("analysis_summary", {}) + total_deps = summary.get("total_dependencies", 0) + risk_level = summary.get("overall_risk_level", "unknown") + logger.info(f"MCP tool: Requirements analysis completed for {file_path} - {total_deps} dependencies, risk level: {risk_level}") + return result + + except (FileNotFoundError, NetworkError, SearchError) as e: + logger.error(f"Error analyzing requirements file {file_path}: {e}") + return { + "error": f"Requirements analysis failed: {e}", + "error_type": type(e).__name__, + "file_path": file_path, + "analysis_timestamp": "", + "file_info": {"name": file_path, "format": "unknown"}, + "dependencies": [], + "dependency_analysis": {}, + "analysis_summary": { + "total_dependencies": 0, + "health_score": 0, + "packages_with_issues": 0, + "outdated_packages": 0, + "security_vulnerabilities": 0, + "compatibility_issues": 0, + "overall_risk_level": "critical", + }, + "recommendations": [f"❌ Requirements analysis failed: {e}"], + "python_requirements": None, + } + + +async def compare_multiple_requirements_files( + file_paths: List[str] +) -> Dict[str, Any]: + """ + Compare multiple requirements files to identify differences and conflicts. + + This tool analyzes multiple requirements files simultaneously to identify + version conflicts, unique dependencies, and inconsistencies across different + project configurations or environments. + + Args: + file_paths: List of paths to requirements files to compare and analyze + + Returns: + Dictionary containing comparative requirements analysis including: + - Detailed analysis results for each individual file + - Common packages shared across all files + - Conflicting package versions between files with specific version details + - Packages unique to specific files + - Recommendations for resolving conflicts and standardizing requirements + - Statistics on package overlap and conflict rates + + Raises: + ValueError: If file_paths list is empty + NetworkError: For network-related errors during analysis + SearchError: If requirements comparison fails + """ + if not file_paths: + raise ValueError("File paths list cannot be empty") + + logger.info(f"MCP tool: Comparing {len(file_paths)} requirements files") + + try: + result = await compare_requirements_files(file_paths=file_paths) + + comparison_results = result.get("comparison_results", {}) + conflicts = len(comparison_results.get("conflicting_packages", [])) + total_packages = comparison_results.get("total_unique_packages", 0) + + logger.info(f"MCP tool: Requirements comparison completed - {total_packages} unique packages, {conflicts} conflicts found") + return result + + except (ValueError, NetworkError, SearchError) as e: + logger.error(f"Error comparing requirements files: {e}") + return { + "error": f"Requirements comparison failed: {e}", + "error_type": type(e).__name__, + "comparison_timestamp": "", + "files_compared": len(file_paths), + "file_analyses": {}, + "comparison_results": { + "total_unique_packages": 0, + "common_packages": [], + "conflicting_packages": [], + "unique_to_files": {}, + }, + "recommendations": [f"❌ Requirements comparison failed: {e}"] + } \ No newline at end of file diff --git a/pypi_query_mcp/tools/security.py b/pypi_query_mcp/tools/security.py new file mode 100644 index 0000000..5b62895 --- /dev/null +++ b/pypi_query_mcp/tools/security.py @@ -0,0 +1,660 @@ +"""Security vulnerability scanning and analysis tools for PyPI packages.""" + +import asyncio +import json +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional +from urllib.parse import quote + +import httpx + +from ..core.exceptions import NetworkError, SearchError +from ..core.pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class VulnerabilityScanner: + """Comprehensive vulnerability scanner for PyPI packages.""" + + def __init__(self): + self.timeout = 30.0 + self.session = None + + # Vulnerability database endpoints + self.osv_api = "https://api.osv.dev/v1/query" + self.safety_db_api = "https://pyup.io/api/v1/safety" + self.snyk_api = "https://snyk.io/test/pip" + + # Common vulnerability patterns to look for + self.high_risk_patterns = [ + "remote code execution", "rce", "code injection", "sql injection", + "cross-site scripting", "xss", "csrf", "authentication bypass", + "privilege escalation", "arbitrary file", "path traversal", + "buffer overflow", "memory corruption", "denial of service" + ] + + async def scan_package( + self, + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True, + severity_filter: Optional[str] = None + ) -> Dict[str, Any]: + """ + Comprehensive security scan of a PyPI package. + + Args: + package_name: Name of the package to scan + version: Specific version to scan (optional, defaults to latest) + include_dependencies: Whether to scan dependencies too + severity_filter: Filter by severity level (low, medium, high, critical) + + Returns: + Dictionary containing security analysis results + """ + logger.info(f"Starting security scan for package: {package_name}") + + try: + # Get package information + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name, version) + + package_version = version or package_data["info"]["version"] + + # Run parallel vulnerability scans + scan_tasks = [ + self._scan_osv_database(package_name, package_version), + self._scan_github_advisories(package_name, package_version), + self._analyze_package_metadata(package_data), + self._check_dependency_vulnerabilities(package_name, package_version) if include_dependencies else asyncio.create_task(self._empty_result()) + ] + + osv_results, github_results, metadata_analysis, dependency_results = await asyncio.gather( + *scan_tasks, return_exceptions=True + ) + + # Consolidate results + vulnerabilities = [] + + # Process OSV results + if not isinstance(osv_results, Exception) and osv_results: + vulnerabilities.extend(osv_results.get("vulnerabilities", [])) + + # Process GitHub results + if not isinstance(github_results, Exception) and github_results: + vulnerabilities.extend(github_results.get("vulnerabilities", [])) + + # Process dependency vulnerabilities + if not isinstance(dependency_results, Exception) and dependency_results: + vulnerabilities.extend(dependency_results.get("vulnerabilities", [])) + + # Apply severity filter + if severity_filter: + vulnerabilities = [ + vuln for vuln in vulnerabilities + if vuln.get("severity", "").lower() == severity_filter.lower() + ] + + # Generate security report + security_report = self._generate_security_report( + package_name, package_version, vulnerabilities, metadata_analysis + ) + + return security_report + + except Exception as e: + logger.error(f"Security scan failed for {package_name}: {e}") + raise SearchError(f"Security scan failed: {e}") from e + + async def _scan_osv_database(self, package_name: str, version: str) -> Dict[str, Any]: + """Scan package against OSV (Open Source Vulnerabilities) database.""" + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + query_data = { + "package": { + "name": package_name, + "ecosystem": "PyPI" + }, + "version": version + } + + response = await client.post( + self.osv_api, + json=query_data, + headers={"Content-Type": "application/json"} + ) + + if response.status_code == 200: + data = response.json() + vulnerabilities = [] + + for vuln in data.get("vulns", []): + severity = self._extract_severity_from_osv(vuln) + vulnerabilities.append({ + "id": vuln.get("id", ""), + "summary": vuln.get("summary", ""), + "details": vuln.get("details", ""), + "severity": severity, + "published": vuln.get("published", ""), + "modified": vuln.get("modified", ""), + "source": "OSV", + "references": [ref.get("url", "") for ref in vuln.get("references", [])], + "affected_versions": self._extract_affected_versions(vuln), + "fixed_versions": self._extract_fixed_versions(vuln), + }) + + return {"vulnerabilities": vulnerabilities, "source": "OSV"} + else: + logger.warning(f"OSV API returned status {response.status_code}") + + except Exception as e: + logger.warning(f"OSV database scan failed: {e}") + + return {"vulnerabilities": [], "source": "OSV"} + + async def _scan_github_advisories(self, package_name: str, version: str) -> Dict[str, Any]: + """Scan against GitHub Security Advisories.""" + try: + # GitHub GraphQL API for security advisories + query = """ + query($ecosystem: SecurityAdvisoryEcosystem!, $package: String!) { + securityVulnerabilities(ecosystem: $ecosystem, package: $package, first: 100) { + nodes { + advisory { + ghsaId + summary + description + severity + publishedAt + updatedAt + references { + url + } + } + vulnerableVersionRange + firstPatchedVersion { + identifier + } + } + } + } + """ + + variables = { + "ecosystem": "PIP", + "package": package_name + } + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + "https://api.github.com/graphql", + json={"query": query, "variables": variables}, + headers={ + "Content-Type": "application/json", + "User-Agent": "PyPI-Security-Scanner/1.0" + } + ) + + if response.status_code == 200: + data = response.json() + vulnerabilities = [] + + for vuln_node in data.get("data", {}).get("securityVulnerabilities", {}).get("nodes", []): + advisory = vuln_node.get("advisory", {}) + + # Check if current version is affected + if self._is_version_affected(version, vuln_node.get("vulnerableVersionRange", "")): + vulnerabilities.append({ + "id": advisory.get("ghsaId", ""), + "summary": advisory.get("summary", ""), + "details": advisory.get("description", ""), + "severity": advisory.get("severity", "").lower(), + "published": advisory.get("publishedAt", ""), + "modified": advisory.get("updatedAt", ""), + "source": "GitHub", + "references": [ref.get("url", "") for ref in advisory.get("references", [])], + "vulnerable_range": vuln_node.get("vulnerableVersionRange", ""), + "first_patched": vuln_node.get("firstPatchedVersion", {}).get("identifier", ""), + }) + + return {"vulnerabilities": vulnerabilities, "source": "GitHub"} + + except Exception as e: + logger.warning(f"GitHub advisories scan failed: {e}") + + return {"vulnerabilities": [], "source": "GitHub"} + + async def _analyze_package_metadata(self, package_data: Dict[str, Any]) -> Dict[str, Any]: + """Analyze package metadata for security indicators.""" + info = package_data.get("info", {}) + + security_indicators = { + "metadata_score": 0, + "risk_factors": [], + "security_features": [], + "warnings": [] + } + + # Check for security-related information + description = (info.get("description") or "").lower() + summary = (info.get("summary") or "").lower() + keywords = (info.get("keywords") or "").lower() + + combined_text = f"{description} {summary} {keywords}" + + # Look for security mentions + if any(term in combined_text for term in ["security", "cryptography", "authentication", "encryption"]): + security_indicators["security_features"].append("Contains security-related functionality") + security_indicators["metadata_score"] += 20 + + # Check for high-risk patterns + for pattern in self.high_risk_patterns: + if pattern in combined_text: + security_indicators["risk_factors"].append(f"Mentions: {pattern}") + security_indicators["metadata_score"] -= 10 + + # Check package age and maintenance + if info.get("author_email"): + security_indicators["metadata_score"] += 10 + + if info.get("home_page"): + security_indicators["metadata_score"] += 5 + + # Check for classifiers + classifiers = info.get("classifiers", []) + for classifier in classifiers: + if "Development Status :: 5 - Production/Stable" in classifier: + security_indicators["metadata_score"] += 15 + security_indicators["security_features"].append("Production stable status") + elif "License ::" in classifier: + security_indicators["metadata_score"] += 5 + + # Check for suspicious patterns + if not info.get("author") and not info.get("maintainer"): + security_indicators["warnings"].append("No author or maintainer information") + security_indicators["metadata_score"] -= 20 + + if len(info.get("description", "")) < 50: + security_indicators["warnings"].append("Very brief or missing description") + security_indicators["metadata_score"] -= 10 + + return security_indicators + + async def _check_dependency_vulnerabilities(self, package_name: str, version: str) -> Dict[str, Any]: + """Check vulnerabilities in package dependencies.""" + try: + # Get package dependencies + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name, version) + + # Extract dependencies + requires_dist = package_data.get("info", {}).get("requires_dist", []) or [] + dependencies = [] + + for req in requires_dist: + # Parse dependency name (simplified) + dep_name = req.split()[0].split(">=")[0].split("==")[0].split("~=")[0].split("!=")[0] + if dep_name and not dep_name.startswith("extra"): + dependencies.append(dep_name) + + # Scan top dependencies for vulnerabilities + dependency_vulnerabilities = [] + + # Limit to top 10 dependencies to avoid overwhelming the system + for dep_name in dependencies[:10]: + try: + dep_scan = await self._scan_osv_database(dep_name, "latest") + for vuln in dep_scan.get("vulnerabilities", []): + vuln["dependency"] = dep_name + vuln["type"] = "dependency_vulnerability" + dependency_vulnerabilities.append(vuln) + except Exception as e: + logger.debug(f"Failed to scan dependency {dep_name}: {e}") + + return {"vulnerabilities": dependency_vulnerabilities, "source": "dependencies"} + + except Exception as e: + logger.warning(f"Dependency vulnerability check failed: {e}") + return {"vulnerabilities": [], "source": "dependencies"} + + async def _empty_result(self) -> Dict[str, Any]: + """Return empty result for disabled scans.""" + return {"vulnerabilities": [], "source": "disabled"} + + def _extract_severity_from_osv(self, vuln_data: Dict[str, Any]) -> str: + """Extract severity from OSV vulnerability data.""" + # OSV uses CVSS scores, map to common severity levels + severity_data = vuln_data.get("severity", []) + if severity_data: + score = severity_data[0].get("score", "") + if "CVSS:" in score: + # Extract CVSS score + try: + cvss_score = float(score.split("/")[1]) + if cvss_score >= 9.0: + return "critical" + elif cvss_score >= 7.0: + return "high" + elif cvss_score >= 4.0: + return "medium" + else: + return "low" + except: + pass + + return "unknown" + + def _extract_affected_versions(self, vuln_data: Dict[str, Any]) -> List[str]: + """Extract affected version ranges from vulnerability data.""" + affected = vuln_data.get("affected", []) + version_ranges = [] + + for affect in affected: + ranges = affect.get("ranges", []) + for range_data in ranges: + events = range_data.get("events", []) + for event in events: + if "introduced" in event: + version_ranges.append(f">= {event['introduced']}") + elif "fixed" in event: + version_ranges.append(f"< {event['fixed']}") + + return version_ranges + + def _extract_fixed_versions(self, vuln_data: Dict[str, Any]) -> List[str]: + """Extract fixed versions from vulnerability data.""" + affected = vuln_data.get("affected", []) + fixed_versions = [] + + for affect in affected: + ranges = affect.get("ranges", []) + for range_data in ranges: + events = range_data.get("events", []) + for event in events: + if "fixed" in event: + fixed_versions.append(event["fixed"]) + + return fixed_versions + + def _is_version_affected(self, version: str, vulnerable_range: str) -> bool: + """Check if a version is affected by a vulnerability range.""" + # Simplified version checking - in production would use packaging.specifiers + if not vulnerable_range: + return True + + # Basic patterns + if "< " in vulnerable_range: + try: + limit = vulnerable_range.split("< ")[1].strip() + return version < limit + except: + pass + + if ">= " in vulnerable_range: + try: + limit = vulnerable_range.split(">= ")[1].strip() + return version >= limit + except: + pass + + return True # Assume affected if we can't parse + + def _generate_security_report( + self, + package_name: str, + version: str, + vulnerabilities: List[Dict[str, Any]], + metadata_analysis: Dict[str, Any] + ) -> Dict[str, Any]: + """Generate comprehensive security report.""" + + # Categorize vulnerabilities by severity + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0} + dependency_vulns = [] + direct_vulns = [] + + for vuln in vulnerabilities: + severity = vuln.get("severity", "unknown") + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + + if vuln.get("type") == "dependency_vulnerability": + dependency_vulns.append(vuln) + else: + direct_vulns.append(vuln) + + # Calculate risk score + risk_score = self._calculate_risk_score(severity_counts, metadata_analysis) + + # Generate recommendations + recommendations = self._generate_security_recommendations( + vulnerabilities, metadata_analysis, risk_score + ) + + return { + "package": package_name, + "version": version, + "scan_timestamp": datetime.now(timezone.utc).isoformat(), + "security_summary": { + "total_vulnerabilities": len(vulnerabilities), + "direct_vulnerabilities": len(direct_vulns), + "dependency_vulnerabilities": len(dependency_vulns), + "severity_breakdown": severity_counts, + "risk_score": risk_score, + "risk_level": self._get_risk_level(risk_score), + }, + "vulnerabilities": { + "direct": direct_vulns, + "dependencies": dependency_vulns, + }, + "metadata_analysis": metadata_analysis, + "recommendations": recommendations, + "scan_details": { + "sources_checked": ["OSV", "GitHub", "Metadata"], + "dependencies_scanned": len(dependency_vulns) > 0, + "scan_completion": "success", + } + } + + def _calculate_risk_score(self, severity_counts: Dict[str, int], metadata_analysis: Dict[str, Any]) -> float: + """Calculate overall risk score (0-100).""" + score = 0.0 + + # Vulnerability scoring (0-80 points) + score += severity_counts.get("critical", 0) * 20 + score += severity_counts.get("high", 0) * 15 + score += severity_counts.get("medium", 0) * 8 + score += severity_counts.get("low", 0) * 3 + + # Metadata scoring (0-20 points) + metadata_score = metadata_analysis.get("metadata_score", 0) + if metadata_score < 0: + score += abs(metadata_score) / 5 # Convert negative metadata score to risk + else: + score -= metadata_score / 10 # Good metadata reduces risk + + # Cap at 100 + return min(max(score, 0), 100) + + def _get_risk_level(self, risk_score: float) -> str: + """Convert risk score to risk level.""" + if risk_score >= 80: + return "critical" + elif risk_score >= 60: + return "high" + elif risk_score >= 30: + return "medium" + elif risk_score > 0: + return "low" + else: + return "minimal" + + def _generate_security_recommendations( + self, + vulnerabilities: List[Dict[str, Any]], + metadata_analysis: Dict[str, Any], + risk_score: float + ) -> List[str]: + """Generate actionable security recommendations.""" + recommendations = [] + + if len(vulnerabilities) > 0: + recommendations.append(f"🚨 Found {len(vulnerabilities)} security vulnerabilities - review and update immediately") + + # Check for critical/high severity + critical_high = [v for v in vulnerabilities if v.get("severity") in ["critical", "high"]] + if critical_high: + recommendations.append(f"⚠️ {len(critical_high)} critical/high severity vulnerabilities require immediate attention") + + # Check for fixed versions + fixed_versions = [] + for vuln in vulnerabilities: + fixed = vuln.get("fixed_versions", []) or [vuln.get("first_patched", "")] + fixed_versions.extend([v for v in fixed if v]) + + if fixed_versions: + latest_fixed = max(fixed_versions) if fixed_versions else None + if latest_fixed: + recommendations.append(f"📦 Update to version {latest_fixed} or later to fix known vulnerabilities") + + # Metadata recommendations + warnings = metadata_analysis.get("warnings", []) + if warnings: + recommendations.append(f"⚠️ Package metadata issues: {', '.join(warnings)}") + + if metadata_analysis.get("metadata_score", 0) < 20: + recommendations.append("📝 Package has poor metadata quality - verify trustworthiness before use") + + # General recommendations based on risk score + if risk_score >= 60: + recommendations.append("🛑 High risk package - consider alternatives or additional security review") + elif risk_score >= 30: + recommendations.append("⚠️ Moderate risk - monitor for updates and security patches") + elif len(vulnerabilities) == 0: + recommendations.append("✅ No known vulnerabilities found - package appears secure") + + return recommendations + + +# Main scanning functions +async def scan_package_security( + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True, + severity_filter: Optional[str] = None +) -> Dict[str, Any]: + """ + Scan a PyPI package for security vulnerabilities. + + Args: + package_name: Name of the package to scan + version: Specific version to scan (optional) + include_dependencies: Whether to scan dependencies + severity_filter: Filter by severity (low, medium, high, critical) + + Returns: + Comprehensive security scan results + """ + scanner = VulnerabilityScanner() + return await scanner.scan_package( + package_name, version, include_dependencies, severity_filter + ) + + +async def bulk_security_scan( + package_names: List[str], + include_dependencies: bool = False, + severity_threshold: str = "medium" +) -> Dict[str, Any]: + """ + Perform bulk security scanning of multiple packages. + + Args: + package_names: List of package names to scan + include_dependencies: Whether to scan dependencies + severity_threshold: Minimum severity to report + + Returns: + Bulk scan results with summary + """ + logger.info(f"Starting bulk security scan of {len(package_names)} packages") + + scanner = VulnerabilityScanner() + scan_results = {} + summary = { + "total_packages": len(package_names), + "packages_with_vulnerabilities": 0, + "total_vulnerabilities": 0, + "high_risk_packages": [], + "scan_timestamp": datetime.now(timezone.utc).isoformat() + } + + # Scan packages in parallel batches + batch_size = 5 + for i in range(0, len(package_names), batch_size): + batch = package_names[i:i + batch_size] + batch_tasks = [ + scanner.scan_package(pkg_name, include_dependencies=include_dependencies) + for pkg_name in batch + ] + + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for pkg_name, result in zip(batch, batch_results): + if isinstance(result, Exception): + scan_results[pkg_name] = { + "error": str(result), + "scan_status": "failed" + } + else: + scan_results[pkg_name] = result + + # Update summary + vuln_count = result.get("security_summary", {}).get("total_vulnerabilities", 0) + if vuln_count > 0: + summary["packages_with_vulnerabilities"] += 1 + summary["total_vulnerabilities"] += vuln_count + + risk_level = result.get("security_summary", {}).get("risk_level", "") + if risk_level in ["high", "critical"]: + summary["high_risk_packages"].append({ + "package": pkg_name, + "risk_level": risk_level, + "vulnerabilities": vuln_count + }) + + return { + "summary": summary, + "detailed_results": scan_results, + "recommendations": _generate_bulk_recommendations(summary, scan_results) + } + + +def _generate_bulk_recommendations(summary: Dict[str, Any], results: Dict[str, Any]) -> List[str]: + """Generate recommendations for bulk scan results.""" + recommendations = [] + + vuln_packages = summary["packages_with_vulnerabilities"] + total_packages = summary["total_packages"] + + if vuln_packages == 0: + recommendations.append("✅ No security vulnerabilities found in any scanned packages") + else: + percentage = (vuln_packages / total_packages) * 100 + recommendations.append( + f"🚨 {vuln_packages}/{total_packages} packages ({percentage:.1f}%) have security vulnerabilities" + ) + + high_risk = summary["high_risk_packages"] + if high_risk: + recommendations.append( + f"⚠️ {len(high_risk)} packages are high/critical risk: {', '.join([p['package'] for p in high_risk])}" + ) + recommendations.append("🛑 Priority: Address high-risk packages immediately") + + if summary["total_vulnerabilities"] > 0: + recommendations.append(f"📊 Total vulnerabilities found: {summary['total_vulnerabilities']}") + recommendations.append("🔍 Review detailed results and update affected packages") + + return recommendations \ No newline at end of file diff --git a/pypi_query_mcp/tools/security_tools.py b/pypi_query_mcp/tools/security_tools.py new file mode 100644 index 0000000..fb338ea --- /dev/null +++ b/pypi_query_mcp/tools/security_tools.py @@ -0,0 +1,147 @@ +"""Security vulnerability scanning tools for PyPI packages.""" + +import logging +from typing import Any, Dict, List, Optional + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..tools.security import bulk_security_scan, scan_package_security + +logger = logging.getLogger(__name__) + + +async def scan_pypi_package_security( + package_name: str, + version: Optional[str] = None, + include_dependencies: bool = True, + severity_filter: Optional[str] = None +) -> Dict[str, Any]: + """ + Scan a PyPI package for security vulnerabilities. + + This tool performs comprehensive security vulnerability scanning of PyPI packages, + checking against multiple vulnerability databases including OSV (Open Source Vulnerabilities), + GitHub Security Advisories, and analyzing package metadata for security indicators. + + Args: + package_name: Name of the package to scan for vulnerabilities + version: Specific version to scan (optional, defaults to latest version) + include_dependencies: Whether to scan package dependencies for vulnerabilities + severity_filter: Filter results by severity level (low, medium, high, critical) + + Returns: + Dictionary containing comprehensive security scan results including: + - Total vulnerability count and severity breakdown + - Direct package vulnerabilities vs dependency vulnerabilities + - Risk score and level assessment (minimal, low, medium, high, critical) + - Detailed vulnerability information with IDs, descriptions, and references + - Package metadata security analysis + - Actionable security recommendations + + Raises: + InvalidPackageNameError: If package name is empty or invalid + PackageNotFoundError: If package is not found on PyPI + NetworkError: For network-related errors + SearchError: If security scanning fails + """ + if not package_name or not package_name.strip(): + raise InvalidPackageNameError(package_name) + + logger.info(f"MCP tool: Scanning security for package {package_name}") + + try: + result = await scan_package_security( + package_name=package_name, + version=version, + include_dependencies=include_dependencies, + severity_filter=severity_filter + ) + + logger.info(f"MCP tool: Security scan completed for {package_name} - found {result.get('security_summary', {}).get('total_vulnerabilities', 0)} vulnerabilities") + return result + + except (InvalidPackageNameError, NetworkError, SearchError) as e: + logger.error(f"Error scanning security for {package_name}: {e}") + return { + "error": f"Security scan failed: {e}", + "error_type": type(e).__name__, + "package": package_name, + "version": version, + "scan_timestamp": "", + "security_summary": { + "total_vulnerabilities": 0, + "direct_vulnerabilities": 0, + "dependency_vulnerabilities": 0, + "severity_breakdown": {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0}, + "risk_score": 0, + "risk_level": "unknown", + }, + "vulnerabilities": {"direct": [], "dependencies": []}, + "metadata_analysis": {}, + "recommendations": [f"❌ Security scan failed: {e}"], + "scan_details": { + "sources_checked": [], + "dependencies_scanned": False, + "scan_completion": "error", + } + } + + +async def bulk_scan_package_security( + package_names: List[str], + include_dependencies: bool = False, + severity_threshold: str = "medium" +) -> Dict[str, Any]: + """ + Perform bulk security scanning of multiple PyPI packages. + + This tool scans multiple packages simultaneously for security vulnerabilities, + providing a consolidated report with summary statistics and prioritized + recommendations for addressing security issues across your package ecosystem. + + Args: + package_names: List of package names to scan for vulnerabilities + include_dependencies: Whether to include dependency vulnerability scanning + severity_threshold: Minimum severity level to report (low, medium, high, critical) + + Returns: + Dictionary containing bulk scan results including: + - Summary statistics (total packages, packages with vulnerabilities, high-risk packages) + - Detailed scan results for each package + - Prioritized recommendations for security remediation + - Scan timestamp and completion status + + Raises: + ValueError: If package_names list is empty + NetworkError: For network-related errors during scanning + SearchError: If bulk scanning fails + """ + if not package_names: + raise ValueError("Package names list cannot be empty") + + logger.info(f"MCP tool: Starting bulk security scan of {len(package_names)} packages") + + try: + result = await bulk_security_scan( + package_names=package_names, + include_dependencies=include_dependencies, + severity_threshold=severity_threshold + ) + + logger.info(f"MCP tool: Bulk security scan completed - {result.get('summary', {}).get('packages_with_vulnerabilities', 0)} packages have vulnerabilities") + return result + + except (ValueError, NetworkError, SearchError) as e: + logger.error(f"Error in bulk security scan: {e}") + return { + "error": f"Bulk security scan failed: {e}", + "error_type": type(e).__name__, + "summary": { + "total_packages": len(package_names), + "packages_with_vulnerabilities": 0, + "total_vulnerabilities": 0, + "high_risk_packages": [], + "scan_timestamp": "" + }, + "detailed_results": {}, + "recommendations": [f"❌ Bulk security scan failed: {e}"] + } \ No newline at end of file