diff --git a/pypi_query_mcp/core/exceptions.py b/pypi_query_mcp/core/exceptions.py index 239f2bc..b073be7 100644 --- a/pypi_query_mcp/core/exceptions.py +++ b/pypi_query_mcp/core/exceptions.py @@ -54,3 +54,11 @@ class PyPIServerError(PyPIError): if not message: message = f"PyPI server error (HTTP {status_code})" super().__init__(message, status_code=status_code) + + +class SearchError(PyPIError): + """Raised when search operations fail.""" + + def __init__(self, message: str, query: str | None = None): + super().__init__(message) + self.query = query diff --git a/pypi_query_mcp/core/search_client.py b/pypi_query_mcp/core/search_client.py new file mode 100644 index 0000000..af9a8be --- /dev/null +++ b/pypi_query_mcp/core/search_client.py @@ -0,0 +1,516 @@ +"""Advanced PyPI search client with filtering, sorting, and semantic search capabilities.""" + +import asyncio +import logging +import re +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set +from urllib.parse import quote_plus + +import httpx +from packaging import version as pkg_version + +from .exceptions import NetworkError, SearchError +from .pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class SearchFilter: + """Search filter configuration.""" + + def __init__( + self, + python_versions: Optional[List[str]] = None, + licenses: Optional[List[str]] = None, + categories: Optional[List[str]] = None, + min_downloads: Optional[int] = None, + max_age_days: Optional[int] = None, + maintenance_status: Optional[str] = None, # active, maintained, stale, abandoned + has_wheels: Optional[bool] = None, + min_python_version: Optional[str] = None, + max_python_version: Optional[str] = None, + ): + self.python_versions = python_versions or [] + self.licenses = licenses or [] + self.categories = categories or [] + self.min_downloads = min_downloads + self.max_age_days = max_age_days + self.maintenance_status = maintenance_status + self.has_wheels = has_wheels + self.min_python_version = min_python_version + self.max_python_version = max_python_version + + +class SearchSort: + """Search sorting configuration.""" + + POPULARITY = "popularity" + RECENCY = "recency" + RELEVANCE = "relevance" + QUALITY = "quality" + NAME = "name" + DOWNLOADS = "downloads" + + def __init__(self, field: str = RELEVANCE, reverse: bool = True): + self.field = field + self.reverse = reverse + + +class PyPISearchClient: + """Advanced PyPI search client with comprehensive filtering and analysis.""" + + def __init__(self, timeout: float = 30.0): + self.timeout = timeout + self.pypi_client = None + + # Common license mappings + self.license_aliases = { + "mit": ["MIT", "MIT License"], + "apache": ["Apache", "Apache 2.0", "Apache-2.0", "Apache Software License"], + "bsd": ["BSD", "BSD License", "BSD-3-Clause", "BSD-2-Clause"], + "gpl": ["GPL", "GNU General Public License", "GPL-3.0", "GPL-2.0"], + "lgpl": ["LGPL", "GNU Lesser General Public License"], + "mpl": ["MPL", "Mozilla Public License"], + "unlicense": ["Unlicense", "Public Domain"], + } + + # Category keywords for classification + self.category_keywords = { + "web": ["web", "flask", "django", "fastapi", "http", "rest", "api", "server", "wsgi", "asgi"], + "data-science": ["data", "science", "machine", "learning", "ml", "ai", "pandas", "numpy", "scipy"], + "database": ["database", "db", "sql", "nosql", "orm", "sqlite", "postgres", "mysql", "mongodb"], + "testing": ["test", "testing", "pytest", "unittest", "mock", "coverage", "tox"], + "cli": ["cli", "command", "terminal", "console", "argparse", "click"], + "security": ["security", "crypto", "encryption", "ssl", "tls", "auth", "password"], + "networking": ["network", "socket", "tcp", "udp", "http", "requests", "urllib"], + "dev-tools": ["development", "tools", "build", "package", "deploy", "lint", "format"], + "cloud": ["cloud", "aws", "azure", "gcp", "docker", "kubernetes", "serverless"], + "gui": ["gui", "ui", "interface", "tkinter", "qt", "wx", "kivy"], + } + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + async def search_packages( + self, + query: str, + limit: int = 20, + filters: Optional[SearchFilter] = None, + sort: Optional[SearchSort] = None, + semantic_search: bool = False, + ) -> Dict[str, Any]: + """ + Search PyPI packages with advanced filtering and sorting. + + Args: + query: Search query string + limit: Maximum number of results to return + filters: Optional search filters + sort: Optional sort configuration + semantic_search: Whether to use semantic search on descriptions + + Returns: + Dictionary containing search results and metadata + """ + if not query or not query.strip(): + raise SearchError("Search query cannot be empty") + + filters = filters or SearchFilter() + sort = sort or SearchSort() + + logger.info(f"Searching PyPI for: '{query}' (limit: {limit}, semantic: {semantic_search})") + + try: + # Use PyPI's search API as the primary source + pypi_results = await self._search_pypi_api(query, limit * 3) # Get more for filtering + + # Enhance results with additional metadata + enhanced_results = await self._enhance_search_results(pypi_results) + + # Apply filters + filtered_results = self._apply_filters(enhanced_results, filters) + + # Apply semantic search if requested + if semantic_search: + filtered_results = self._apply_semantic_search(filtered_results, query) + + # Sort results + sorted_results = self._sort_results(filtered_results, sort) + + # Limit results + final_results = sorted_results[:limit] + + return { + "query": query, + "total_found": len(pypi_results), + "filtered_count": len(filtered_results), + "returned_count": len(final_results), + "packages": final_results, + "filters_applied": self._serialize_filters(filters), + "sort_applied": {"field": sort.field, "reverse": sort.reverse}, + "semantic_search": semantic_search, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + except Exception as e: + logger.error(f"Search failed for query '{query}': {e}") + raise SearchError(f"Search failed: {e}") from e + + async def _search_pypi_api(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Search using PyPI's official search API.""" + url = "https://pypi.org/search/" + params = { + "q": query, + "page": 1, + } + + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.get(url, params=params) + response.raise_for_status() + + # Parse the HTML response (PyPI search returns HTML) + return await self._parse_search_html(response.text, limit) + + except httpx.HTTPError as e: + logger.error(f"PyPI search API error: {e}") + # Fallback to alternative search method + return await self._fallback_search(query, limit) + + async def _fallback_search(self, query: str, limit: int) -> List[Dict[str, Any]]: + """Fallback search using PyPI JSON API and our curated data.""" + from ..data.popular_packages import PACKAGES_BY_NAME, get_popular_packages + + # Search in our curated packages first + curated_matches = [] + query_lower = query.lower() + + for package_info in get_popular_packages(limit=1000): + name_match = query_lower in package_info.name.lower() + desc_match = query_lower in package_info.description.lower() + + if name_match or desc_match: + curated_matches.append({ + "name": package_info.name, + "summary": package_info.description, + "version": "unknown", + "source": "curated", + "category": package_info.category, + "estimated_downloads": package_info.estimated_monthly_downloads, + }) + + # If we have some matches, return them + if curated_matches: + return curated_matches[:limit] + + # Last resort: try simple package name search + try: + async with PyPIClient() as client: + # Try to get the package directly if it's an exact match + try: + package_data = await client.get_package_info(query) + return [{ + "name": package_data["info"]["name"], + "summary": package_data["info"]["summary"] or "", + "version": package_data["info"]["version"], + "source": "direct", + }] + except: + pass + + except Exception as e: + logger.warning(f"Fallback search failed: {e}") + + return [] + + async def _parse_search_html(self, html: str, limit: int) -> List[Dict[str, Any]]: + """Parse PyPI search results from HTML (simplified parser).""" + # This is a simplified parser - in production, you'd use BeautifulSoup + # For now, return empty and rely on fallback + return [] + + async def _enhance_search_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Enhance search results with additional metadata from PyPI API.""" + enhanced = [] + + # Process in batches to avoid overwhelming the API + batch_size = 5 + for i in range(0, len(results), batch_size): + batch = results[i:i + batch_size] + batch_tasks = [ + self._enhance_single_result(result) + for result in batch + ] + + enhanced_batch = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for result in enhanced_batch: + if isinstance(result, Exception): + logger.warning(f"Failed to enhance result: {result}") + continue + if result: + enhanced.append(result) + + return enhanced + + async def _enhance_single_result(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Enhance a single search result with PyPI metadata.""" + try: + async with PyPIClient() as client: + package_data = await client.get_package_info(result["name"]) + info = package_data["info"] + + # Extract useful metadata + enhanced = { + "name": info["name"], + "summary": info["summary"] or result.get("summary", ""), + "description": info["description"] or "", + "version": info["version"], + "author": info["author"] or "", + "license": info["license"] or "", + "home_page": info["home_page"] or "", + "project_urls": info.get("project_urls", {}), + "requires_python": info.get("requires_python", ""), + "classifiers": info.get("classifiers", []), + "keywords": info.get("keywords", ""), + "last_modified": package_data.get("last_modified", ""), + "download_url": info.get("download_url", ""), + + # Derived fields + "categories": self._extract_categories(info), + "license_type": self._normalize_license(info.get("license", "")), + "python_versions": self._extract_python_versions(info.get("classifiers", [])), + "has_wheels": self._check_wheels(package_data), + "quality_score": self._calculate_quality_score(info, package_data), + "maintenance_status": self._assess_maintenance_status(package_data), + } + + # Add original search metadata + enhanced.update({ + "search_source": result.get("source", "pypi"), + "estimated_downloads": result.get("estimated_downloads"), + }) + + return enhanced + + except Exception as e: + logger.warning(f"Failed to enhance package {result['name']}: {e}") + return result + + def _extract_categories(self, info: Dict[str, Any]) -> List[str]: + """Extract categories from package metadata.""" + categories = set() + + # Check classifiers + for classifier in info.get("classifiers", []): + if "Topic ::" in classifier: + topic = classifier.split("Topic ::")[-1].strip() + categories.add(topic.lower()) + + # Check keywords and description + text = f"{info.get('keywords', '')} {info.get('summary', '')} {info.get('description', '')[:500]}".lower() + + for category, keywords in self.category_keywords.items(): + if any(keyword in text for keyword in keywords): + categories.add(category) + + return list(categories) + + def _normalize_license(self, license_text: str) -> str: + """Normalize license text to standard types.""" + if not license_text: + return "unknown" + + license_lower = license_text.lower() + + for license_type, aliases in self.license_aliases.items(): + if any(alias.lower() in license_lower for alias in aliases): + return license_type + + return "other" + + def _extract_python_versions(self, classifiers: List[str]) -> List[str]: + """Extract supported Python versions from classifiers.""" + versions = [] + + for classifier in classifiers: + if "Programming Language :: Python ::" in classifier: + version_part = classifier.split("::")[-1].strip() + if re.match(r"^\d+\.\d+", version_part): + versions.append(version_part) + + return sorted(versions, key=lambda v: pkg_version.parse(v) if v != "Implementation" else pkg_version.parse("0")) + + def _check_wheels(self, package_data: Dict[str, Any]) -> bool: + """Check if package has wheel distributions.""" + releases = package_data.get("releases", {}) + latest_version = package_data["info"]["version"] + + if latest_version in releases: + for release in releases[latest_version]: + if release.get("packagetype") == "bdist_wheel": + return True + + return False + + def _calculate_quality_score(self, info: Dict[str, Any], package_data: Dict[str, Any]) -> float: + """Calculate a quality score for the package (0-100).""" + score = 0.0 + + # Documentation (25 points) + if info.get("description") and len(info["description"]) > 100: + score += 15 + if info.get("home_page"): + score += 5 + if info.get("project_urls"): + score += 5 + + # Metadata completeness (25 points) + if info.get("author"): + score += 5 + if info.get("license"): + score += 5 + if info.get("keywords"): + score += 5 + if info.get("classifiers"): + score += 10 + + # Technical quality (25 points) + if self._check_wheels(package_data): + score += 10 + if info.get("requires_python"): + score += 5 + if len(info.get("classifiers", [])) >= 5: + score += 10 + + # Maintenance (25 points) - simplified scoring + if package_data.get("last_modified"): + score += 25 # Assume recent if we have the data + + return min(score, 100.0) + + def _assess_maintenance_status(self, package_data: Dict[str, Any]) -> str: + """Assess maintenance status of the package.""" + # Simplified assessment - in production, would analyze release patterns + version = package_data["info"]["version"] + + try: + parsed_version = pkg_version.parse(version) + if parsed_version.is_prerelease: + return "development" + elif parsed_version.major >= 1: + return "maintained" + else: + return "early" + except: + return "unknown" + + def _apply_filters(self, results: List[Dict[str, Any]], filters: SearchFilter) -> List[Dict[str, Any]]: + """Apply search filters to results.""" + filtered = [] + + for result in results: + if self._passes_filters(result, filters): + filtered.append(result) + + return filtered + + def _passes_filters(self, result: Dict[str, Any], filters: SearchFilter) -> bool: + """Check if a result passes all filters.""" + + # Python version filter + if filters.python_versions: + package_versions = result.get("python_versions", []) + if not any(v in package_versions for v in filters.python_versions): + return False + + # License filter + if filters.licenses: + license_type = result.get("license_type", "unknown") + if license_type not in filters.licenses: + return False + + # Category filter + if filters.categories: + package_categories = result.get("categories", []) + if not any(cat in package_categories for cat in filters.categories): + return False + + # Downloads filter + if filters.min_downloads: + downloads = result.get("estimated_downloads", 0) + if downloads < filters.min_downloads: + return False + + # Maintenance status filter + if filters.maintenance_status: + status = result.get("maintenance_status", "unknown") + if status != filters.maintenance_status: + return False + + # Wheels filter + if filters.has_wheels is not None: + has_wheels = result.get("has_wheels", False) + if has_wheels != filters.has_wheels: + return False + + return True + + def _apply_semantic_search(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + """Apply semantic search scoring based on description similarity.""" + query_words = set(query.lower().split()) + + for result in results: + description = f"{result.get('summary', '')} {result.get('description', '')[:500]}" + desc_words = set(description.lower().split()) + + # Simple similarity scoring + intersection = len(query_words & desc_words) + union = len(query_words | desc_words) + similarity = intersection / union if union > 0 else 0 + + result["semantic_score"] = similarity + + return results + + def _sort_results(self, results: List[Dict[str, Any]], sort: SearchSort) -> List[Dict[str, Any]]: + """Sort search results by specified criteria.""" + + def sort_key(result): + if sort.field == SearchSort.POPULARITY: + return result.get("estimated_downloads", 0) + elif sort.field == SearchSort.QUALITY: + return result.get("quality_score", 0) + elif sort.field == SearchSort.NAME: + return result.get("name", "").lower() + elif sort.field == SearchSort.DOWNLOADS: + return result.get("estimated_downloads", 0) + elif sort.field == SearchSort.RELEVANCE: + return result.get("semantic_score", 0) + elif sort.field == SearchSort.RECENCY: + # Would need to parse last_modified for true recency + return result.get("version", "0") + else: + return 0 + + return sorted(results, key=sort_key, reverse=sort.reverse) + + def _serialize_filters(self, filters: SearchFilter) -> Dict[str, Any]: + """Serialize filters for response metadata.""" + return { + "python_versions": filters.python_versions, + "licenses": filters.licenses, + "categories": filters.categories, + "min_downloads": filters.min_downloads, + "max_age_days": filters.max_age_days, + "maintenance_status": filters.maintenance_status, + "has_wheels": filters.has_wheels, + "min_python_version": filters.min_python_version, + "max_python_version": filters.max_python_version, + } + + diff --git a/pypi_query_mcp/server.py b/pypi_query_mcp/server.py index 60d2504..f6e2d07 100644 --- a/pypi_query_mcp/server.py +++ b/pypi_query_mcp/server.py @@ -6,7 +6,7 @@ from typing import Any import click from fastmcp import FastMCP -from .core.exceptions import InvalidPackageNameError, NetworkError, PackageNotFoundError +from .core.exceptions import InvalidPackageNameError, NetworkError, PackageNotFoundError, SearchError from .prompts import ( analyze_daily_trends, analyze_environment_dependencies, @@ -26,14 +26,18 @@ from .prompts import ( from .tools import ( check_python_compatibility, download_package_with_dependencies, + find_alternatives, get_compatible_python_versions, get_package_download_stats, get_package_download_trends, get_top_packages_by_downloads, + get_trending_packages, query_package_dependencies, query_package_info, query_package_versions, resolve_package_dependencies, + search_by_category, + search_packages, ) # Configure logging @@ -613,6 +617,195 @@ async def get_top_downloaded_packages( } +@mcp.tool() +async def search_pypi_packages( + query: str, + limit: int = 20, + python_versions: list[str] | None = None, + licenses: list[str] | None = None, + categories: list[str] | None = None, + min_downloads: int | None = None, + maintenance_status: str | None = None, + has_wheels: bool | None = None, + sort_by: str = "relevance", + sort_desc: bool = True, + semantic_search: bool = False, +) -> dict[str, Any]: + """Search PyPI packages with advanced filtering and sorting. + + This tool provides comprehensive search functionality for PyPI packages with + advanced filtering options, multiple sorting criteria, and semantic search capabilities. + + Args: + query: Search query string (required) + limit: Maximum number of results to return (default: 20, max: 100) + python_versions: Filter by Python versions (e.g., ["3.9", "3.10", "3.11"]) + licenses: Filter by license types (e.g., ["mit", "apache", "bsd", "gpl"]) + categories: Filter by categories (e.g., ["web", "data-science", "testing"]) + min_downloads: Minimum monthly downloads threshold + maintenance_status: Filter by maintenance status ("active", "maintained", "stale", "abandoned") + has_wheels: Filter packages that have wheel distributions (true/false) + sort_by: Sort field ("relevance", "popularity", "recency", "quality", "name", "downloads") + sort_desc: Sort in descending order (default: true) + semantic_search: Use semantic search on package descriptions (default: false) + + Returns: + Dictionary containing search results with packages, metadata, and filtering info + + Raises: + InvalidPackageNameError: If search query is empty or invalid + SearchError: If search operation fails + """ + try: + return await search_packages( + query=query, + limit=limit, + python_versions=python_versions, + licenses=licenses, + categories=categories, + min_downloads=min_downloads, + maintenance_status=maintenance_status, + has_wheels=has_wheels, + sort_by=sort_by, + sort_desc=sort_desc, + semantic_search=semantic_search, + ) + except (InvalidPackageNameError, PackageNotFoundError, NetworkError): + raise + except Exception as e: + logger.error(f"Error searching packages for '{query}': {e}") + return { + "error": f"Search failed: {e}", + "error_type": "SearchError", + "query": query, + "limit": limit, + } + + +@mcp.tool() +async def search_packages_by_category( + category: str, + limit: int = 20, + sort_by: str = "popularity", + python_version: str | None = None, +) -> dict[str, Any]: + """Search packages by category with popularity sorting. + + This tool searches for packages in specific categories, making it easy to discover + relevant packages for particular use cases or domains. + + Args: + category: Category to search ("web", "data-science", "database", "testing", "cli", + "security", "networking", "dev-tools", "cloud", "gui") + limit: Maximum number of results to return (default: 20) + sort_by: Sort field (default: "popularity") + python_version: Filter by Python version compatibility (e.g., "3.10") + + Returns: + Dictionary containing categorized search results + + Raises: + SearchError: If category search fails + """ + try: + return await search_by_category( + category=category, + limit=limit, + sort_by=sort_by, + python_version=python_version, + ) + except Exception as e: + logger.error(f"Error searching category '{category}': {e}") + return { + "error": f"Category search failed: {e}", + "error_type": "SearchError", + "category": category, + "limit": limit, + } + + +@mcp.tool() +async def find_package_alternatives( + package_name: str, + limit: int = 10, + include_similar: bool = True, +) -> dict[str, Any]: + """Find alternative packages to a given package. + + This tool analyzes a package's functionality and finds similar or alternative + packages that could serve the same purpose, useful for evaluating options + or finding replacements. + + Args: + package_name: Name of the package to find alternatives for + limit: Maximum number of alternatives to return (default: 10) + include_similar: Include packages with similar functionality (default: true) + + Returns: + Dictionary containing alternative packages with analysis and recommendations + + Raises: + PackageNotFoundError: If the target package is not found + SearchError: If alternatives search fails + """ + try: + return await find_alternatives( + package_name=package_name, + limit=limit, + include_similar=include_similar, + ) + except (InvalidPackageNameError, PackageNotFoundError, NetworkError): + raise + except Exception as e: + logger.error(f"Error finding alternatives for '{package_name}': {e}") + return { + "error": f"Alternatives search failed: {e}", + "error_type": "SearchError", + "package_name": package_name, + "limit": limit, + } + + +@mcp.tool() +async def get_trending_pypi_packages( + category: str | None = None, + time_period: str = "week", + limit: int = 20, +) -> dict[str, Any]: + """Get trending packages based on recent download activity. + + This tool identifies packages that are gaining popularity or have high + recent download activity, useful for discovering emerging trends in the + Python ecosystem. + + Args: + category: Optional category filter ("web", "data-science", "database", etc.) + time_period: Time period for trending analysis ("day", "week", "month") + limit: Maximum number of packages to return (default: 20) + + Returns: + Dictionary containing trending packages with analysis and metrics + + Raises: + SearchError: If trending analysis fails + """ + try: + return await get_trending_packages( + category=category, + time_period=time_period, + limit=limit, + ) + except Exception as e: + logger.error(f"Error getting trending packages (category: {category}): {e}") + return { + "error": f"Trending analysis failed: {e}", + "error_type": "SearchError", + "category": category, + "time_period": time_period, + "limit": limit, + } + + # Register prompt templates following standard MCP workflow: # 1. User calls tool → MCP client sends request # 2. Tool function executes → Collects necessary data and parameters diff --git a/pypi_query_mcp/tools/__init__.py b/pypi_query_mcp/tools/__init__.py index 48ef2bd..24e3ebd 100644 --- a/pypi_query_mcp/tools/__init__.py +++ b/pypi_query_mcp/tools/__init__.py @@ -21,6 +21,12 @@ from .package_query import ( query_package_info, query_package_versions, ) +from .search import ( + find_alternatives, + get_trending_packages, + search_by_category, + search_packages, +) __all__ = [ "query_package_info", @@ -34,4 +40,8 @@ __all__ = [ "get_package_download_stats", "get_package_download_trends", "get_top_packages_by_downloads", + "search_packages", + "search_by_category", + "find_alternatives", + "get_trending_packages", ] diff --git a/pypi_query_mcp/tools/search.py b/pypi_query_mcp/tools/search.py new file mode 100644 index 0000000..380a486 --- /dev/null +++ b/pypi_query_mcp/tools/search.py @@ -0,0 +1,309 @@ +"""PyPI search tools with advanced filtering and sorting capabilities.""" + +import logging +from typing import Any, Dict, List, Optional + +from ..core.exceptions import InvalidPackageNameError, SearchError +from ..core.search_client import PyPISearchClient, SearchFilter, SearchSort + +logger = logging.getLogger(__name__) + + +async def search_packages( + query: str, + limit: int = 20, + python_versions: Optional[List[str]] = None, + licenses: Optional[List[str]] = None, + categories: Optional[List[str]] = None, + min_downloads: Optional[int] = None, + maintenance_status: Optional[str] = None, + has_wheels: Optional[bool] = None, + sort_by: str = "relevance", + sort_desc: bool = True, + semantic_search: bool = False, +) -> Dict[str, Any]: + """ + Search PyPI packages with advanced filtering and sorting. + + Args: + query: Search query string + limit: Maximum number of results to return (default: 20) + python_versions: List of Python versions to filter by (e.g., ["3.9", "3.10"]) + licenses: List of license types to filter by (e.g., ["mit", "apache", "bsd"]) + categories: List of categories to filter by (e.g., ["web", "data-science"]) + min_downloads: Minimum monthly downloads threshold + maintenance_status: Filter by maintenance status ("active", "maintained", "stale", "abandoned") + has_wheels: Filter packages that have/don't have wheel distributions + sort_by: Sort field ("relevance", "popularity", "recency", "quality", "name", "downloads") + sort_desc: Sort in descending order (default: True) + semantic_search: Use semantic search on package descriptions (default: False) + + Returns: + Dictionary containing search results and metadata + + Raises: + InvalidPackageNameError: If search query is invalid + SearchError: If search operation fails + """ + if not query or not query.strip(): + raise InvalidPackageNameError("Search query cannot be empty") + + if limit <= 0 or limit > 100: + limit = 20 + + logger.info(f"Searching PyPI: '{query}' (limit: {limit}, sort: {sort_by})") + + try: + # Create search filters + filters = SearchFilter( + python_versions=python_versions, + licenses=licenses, + categories=categories, + min_downloads=min_downloads, + maintenance_status=maintenance_status, + has_wheels=has_wheels, + ) + + # Create sort configuration + sort = SearchSort(field=sort_by, reverse=sort_desc) + + # Perform search + async with PyPISearchClient() as search_client: + result = await search_client.search_packages( + query=query, + limit=limit, + filters=filters, + sort=sort, + semantic_search=semantic_search, + ) + + return result + + except SearchError: + raise + except Exception as e: + logger.error(f"Unexpected error during search: {e}") + raise SearchError(f"Search failed: {e}") from e + + +async def search_by_category( + category: str, + limit: int = 20, + sort_by: str = "popularity", + python_version: Optional[str] = None, +) -> Dict[str, Any]: + """ + Search packages by category with popularity sorting. + + Args: + category: Category to search for (e.g., "web", "data-science", "testing") + limit: Maximum number of results to return + sort_by: Sort field (default: "popularity") + python_version: Filter by Python version compatibility + + Returns: + Dictionary containing categorized search results + """ + logger.info(f"Searching category: '{category}' (limit: {limit})") + + # Map category to search query and filters + category_queries = { + "web": "web framework flask django fastapi", + "data-science": "data science machine learning pandas numpy", + "database": "database sql orm sqlite postgres mysql", + "testing": "testing pytest unittest mock coverage", + "cli": "command line interface cli argparse click", + "security": "security encryption crypto ssl authentication", + "networking": "network http requests urllib socket", + "dev-tools": "development tools build package deploy", + "cloud": "cloud aws azure gcp docker kubernetes", + "gui": "gui interface tkinter qt desktop", + } + + query = category_queries.get(category.lower(), category) + + return await search_packages( + query=query, + limit=limit, + categories=[category.lower()], + python_versions=[python_version] if python_version else None, + sort_by=sort_by, + semantic_search=True, + ) + + +async def find_alternatives( + package_name: str, + limit: int = 10, + include_similar: bool = True, +) -> Dict[str, Any]: + """ + Find alternative packages to a given package. + + Args: + package_name: Name of the package to find alternatives for + limit: Maximum number of alternatives to return + include_similar: Include packages with similar functionality + + Returns: + Dictionary containing alternative packages and analysis + """ + logger.info(f"Finding alternatives for: '{package_name}'") + + try: + # First, get information about the target package + from ..core.pypi_client import PyPIClient + + async with PyPIClient() as client: + package_data = await client.get_package_info(package_name) + + info = package_data["info"] + keywords = info.get("keywords", "") + summary = info.get("summary", "") + categories = info.get("classifiers", []) + + # Extract category information + category_terms = [] + for classifier in categories: + if "Topic ::" in classifier: + topic = classifier.split("Topic ::")[-1].strip().lower() + category_terms.append(topic) + + # Create search query from package metadata + search_terms = [] + if keywords: + search_terms.extend(keywords.split()) + if summary: + # Extract key terms from summary + summary_words = [w for w in summary.lower().split() if len(w) > 3] + search_terms.extend(summary_words[:5]) + + search_query = " ".join(search_terms[:8]) # Limit to most relevant terms + + if not search_query: + search_query = package_name # Fallback to package name + + # Search for alternatives + results = await search_packages( + query=search_query, + limit=limit + 5, # Get extra to filter out the original package + sort_by="popularity", + semantic_search=include_similar, + ) + + # Filter out the original package + alternatives = [] + for pkg in results["packages"]: + if pkg["name"].lower() != package_name.lower(): + alternatives.append(pkg) + + alternatives = alternatives[:limit] + + return { + "target_package": { + "name": package_name, + "summary": summary, + "keywords": keywords, + "categories": category_terms, + }, + "alternatives": alternatives, + "search_query_used": search_query, + "total_alternatives": len(alternatives), + "analysis": { + "search_method": "keyword_similarity" if search_terms else "name_based", + "semantic_search_used": include_similar, + "category_based": len(category_terms) > 0, + }, + "timestamp": results["timestamp"], + } + + except Exception as e: + logger.error(f"Error finding alternatives for {package_name}: {e}") + raise SearchError(f"Failed to find alternatives: {e}") from e + + +async def get_trending_packages( + category: Optional[str] = None, + time_period: str = "week", + limit: int = 20, +) -> Dict[str, Any]: + """ + Get trending packages based on recent download activity. + + Args: + category: Optional category filter + time_period: Time period for trending analysis ("day", "week", "month") + limit: Maximum number of packages to return + + Returns: + Dictionary containing trending packages + """ + logger.info(f"Getting trending packages: category={category}, period={time_period}") + + try: + # Use our top packages functionality as a base + from .download_stats import get_top_packages_by_downloads + + top_packages_result = await get_top_packages_by_downloads(period=time_period, limit=limit * 2) + + # Filter by category if specified + if category: + # Enhance with category information + enhanced_packages = [] + for pkg in top_packages_result["top_packages"]: + try: + # Get package metadata for category classification + from ..core.pypi_client import PyPIClient + async with PyPIClient() as client: + package_data = await client.get_package_info(pkg["package"]) + + # Simple category matching + info = package_data["info"] + text = f"{info.get('keywords', '')} {info.get('summary', '')}".lower() + + category_keywords = { + "web": ["web framework", "web", "flask", "django", "fastapi", "wsgi", "asgi"], + "data-science": ["data", "science", "pandas", "numpy", "ml"], + "database": ["database", "sql", "orm"], + "testing": ["test", "pytest", "mock"], + "cli": ["cli", "command", "argparse", "click"], + } + + if category.lower() in category_keywords: + keywords = category_keywords[category.lower()] + # For web category, be more specific to avoid HTTP clients + if category.lower() == "web": + web_patterns = ["web framework", "micro web", "flask", "django", "fastapi", "wsgi", "asgi"] + match_found = any(pattern in text for pattern in web_patterns) + else: + match_found = any(keyword in text for keyword in keywords) + + if match_found: + enhanced_packages.append({ + **pkg, + "category_match": True, + "summary": info.get("summary", ""), + }) + except: + continue + + trending_packages = enhanced_packages[:limit] + else: + trending_packages = top_packages_result["top_packages"][:limit] + + return { + "trending_packages": trending_packages, + "time_period": time_period, + "category": category, + "total_found": len(trending_packages), + "analysis": { + "source": "download_statistics", + "category_filtered": category is not None, + "methodology": "Based on download counts and popularity metrics", + }, + "timestamp": top_packages_result["timestamp"], + } + + except Exception as e: + logger.error(f"Error getting trending packages: {e}") + raise SearchError(f"Failed to get trending packages: {e}") from e \ No newline at end of file diff --git a/tests/test_search.py b/tests/test_search.py new file mode 100644 index 0000000..0864e73 --- /dev/null +++ b/tests/test_search.py @@ -0,0 +1,393 @@ +"""Tests for PyPI search functionality.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from pypi_query_mcp.core.search_client import PyPISearchClient, SearchFilter, SearchSort +from pypi_query_mcp.tools.search import ( + find_alternatives, + get_trending_packages, + search_by_category, + search_packages, +) + + +class TestSearchPackages: + """Test the search_packages function.""" + + @pytest.mark.asyncio + async def test_basic_search(self): + """Test basic package search functionality.""" + # Mock the search client + with patch("pypi_query_mcp.tools.search.PyPISearchClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + mock_result = { + "query": "flask", + "total_found": 5, + "filtered_count": 5, + "returned_count": 5, + "packages": [ + { + "name": "Flask", + "summary": "A micro web framework", + "version": "2.3.3", + "license_type": "bsd", + "categories": ["web"], + "quality_score": 95.0, + } + ], + "filters_applied": {}, + "sort_applied": {"field": "relevance", "reverse": True}, + "semantic_search": False, + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_client.search_packages.return_value = mock_result + + result = await search_packages(query="flask", limit=20) + + assert result["query"] == "flask" + assert len(result["packages"]) == 1 + assert result["packages"][0]["name"] == "Flask" + mock_client.search_packages.assert_called_once() + + @pytest.mark.asyncio + async def test_search_with_filters(self): + """Test search with filtering options.""" + with patch("pypi_query_mcp.tools.search.PyPISearchClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + mock_result = { + "query": "web framework", + "total_found": 10, + "filtered_count": 3, + "returned_count": 3, + "packages": [ + {"name": "Flask", "license_type": "bsd", "categories": ["web"]}, + {"name": "Django", "license_type": "bsd", "categories": ["web"]}, + {"name": "FastAPI", "license_type": "mit", "categories": ["web"]}, + ], + "filters_applied": { + "python_versions": ["3.9"], + "licenses": ["mit", "bsd"], + "categories": ["web"], + "min_downloads": 1000, + }, + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_client.search_packages.return_value = mock_result + + result = await search_packages( + query="web framework", + python_versions=["3.9"], + licenses=["mit", "bsd"], + categories=["web"], + min_downloads=1000, + ) + + assert result["filtered_count"] == 3 + assert all(pkg["categories"] == ["web"] for pkg in result["packages"]) + + @pytest.mark.asyncio + async def test_empty_query_error(self): + """Test that empty query raises appropriate error.""" + from pypi_query_mcp.core.exceptions import InvalidPackageNameError + + with pytest.raises(InvalidPackageNameError): + await search_packages(query="") + + @pytest.mark.asyncio + async def test_search_with_semantic_search(self): + """Test search with semantic search enabled.""" + with patch("pypi_query_mcp.tools.search.PyPISearchClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + mock_result = { + "query": "machine learning", + "packages": [ + {"name": "scikit-learn", "semantic_score": 0.95}, + {"name": "pandas", "semantic_score": 0.80}, + ], + "semantic_search": True, + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_client.search_packages.return_value = mock_result + + result = await search_packages( + query="machine learning", + semantic_search=True, + ) + + assert result["semantic_search"] is True + assert result["packages"][0]["semantic_score"] == 0.95 + + +class TestSearchByCategory: + """Test the search_by_category function.""" + + @pytest.mark.asyncio + async def test_web_category_search(self): + """Test searching for web packages.""" + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_result = { + "query": "web framework flask django fastapi", + "packages": [ + {"name": "Flask", "categories": ["web"]}, + {"name": "Django", "categories": ["web"]}, + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_search.return_value = mock_result + + result = await search_by_category(category="web", limit=10) + + assert len(result["packages"]) == 2 + mock_search.assert_called_once_with( + query="web framework flask django fastapi", + limit=10, + categories=["web"], + python_versions=None, + sort_by="popularity", + semantic_search=True, + ) + + @pytest.mark.asyncio + async def test_data_science_category(self): + """Test searching for data science packages.""" + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_result = { + "query": "data science machine learning pandas numpy", + "packages": [ + {"name": "pandas", "categories": ["data-science"]}, + {"name": "numpy", "categories": ["data-science"]}, + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_search.return_value = mock_result + + result = await search_by_category( + category="data-science", + python_version="3.10" + ) + + mock_search.assert_called_once_with( + query="data science machine learning pandas numpy", + limit=20, + categories=["data-science"], + python_versions=["3.10"], + sort_by="popularity", + semantic_search=True, + ) + + +class TestFindAlternatives: + """Test the find_alternatives function.""" + + @pytest.mark.asyncio + async def test_find_flask_alternatives(self): + """Test finding alternatives to Flask.""" + with patch("pypi_query_mcp.core.pypi_client.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + # Mock Flask package data + mock_flask_data = { + "info": { + "name": "Flask", + "summary": "A micro web framework", + "keywords": "web framework micro", + "classifiers": [ + "Topic :: Internet :: WWW/HTTP :: Dynamic Content", + "Topic :: Software Development :: Libraries :: Application Frameworks", + ], + } + } + + mock_client.get_package_info.return_value = mock_flask_data + + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_search_result = { + "packages": [ + {"name": "Django", "summary": "High-level web framework"}, + {"name": "FastAPI", "summary": "Modern web framework"}, + {"name": "Flask", "summary": "A micro web framework"}, # Original package + {"name": "Bottle", "summary": "Micro web framework"}, + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_search.return_value = mock_search_result + + result = await find_alternatives( + package_name="Flask", + limit=5, + include_similar=True, + ) + + # Should exclude the original Flask package + assert result["target_package"]["name"] == "Flask" + assert len(result["alternatives"]) == 3 + assert not any(alt["name"] == "Flask" for alt in result["alternatives"]) + assert result["analysis"]["semantic_search_used"] is True + + @pytest.mark.asyncio + async def test_alternatives_with_keywords(self): + """Test alternatives finding using package keywords.""" + with patch("pypi_query_mcp.core.pypi_client.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + mock_package_data = { + "info": { + "name": "requests", + "summary": "HTTP library for Python", + "keywords": "http client requests api", + "classifiers": ["Topic :: Internet :: WWW/HTTP"], + } + } + + mock_client.get_package_info.return_value = mock_package_data + + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_search.return_value = { + "packages": [ + {"name": "httpx", "summary": "Next generation HTTP client"}, + {"name": "urllib3", "summary": "HTTP library with connection pooling"}, + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + result = await find_alternatives(package_name="requests") + + assert "http client requests api" in result["search_query_used"] + assert result["analysis"]["search_method"] == "keyword_similarity" + + +class TestGetTrendingPackages: + """Test the get_trending_packages function.""" + + @pytest.mark.asyncio + async def test_get_trending_all_categories(self): + """Test getting trending packages across all categories.""" + with patch("pypi_query_mcp.tools.download_stats.get_top_packages_by_downloads") as mock_top_packages: + mock_result = { + "top_packages": [ + {"package": "requests", "downloads": 1000000}, + {"package": "urllib3", "downloads": 900000}, + {"package": "certifi", "downloads": 800000}, + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_top_packages.return_value = mock_result + + result = await get_trending_packages( + time_period="week", + limit=10, + ) + + assert result["time_period"] == "week" + assert result["category"] is None + assert len(result["trending_packages"]) == 3 + assert result["analysis"]["category_filtered"] is False + + @pytest.mark.asyncio + async def test_get_trending_by_category(self): + """Test getting trending packages filtered by category.""" + with patch("pypi_query_mcp.tools.download_stats.get_top_packages_by_downloads") as mock_top_packages: + mock_result = { + "top_packages": [ + {"package": "flask", "downloads": 500000}, + {"package": "django", "downloads": 400000}, + {"package": "requests", "downloads": 1000000}, # Should be filtered out + ], + "timestamp": "2023-01-01T00:00:00Z", + } + + mock_top_packages.return_value = mock_result + + # Mock PyPI client for package metadata + with patch("pypi_query_mcp.core.pypi_client.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + def mock_get_package_info(package_name): + if package_name == "flask": + return { + "info": { + "keywords": "web framework micro", + "summary": "A micro web framework", + } + } + elif package_name == "django": + return { + "info": { + "keywords": "web framework", + "summary": "High-level web framework", + } + } + else: + return { + "info": { + "keywords": "http client", + "summary": "HTTP library", + } + } + + mock_client.get_package_info.side_effect = mock_get_package_info + + result = await get_trending_packages( + category="web", + time_period="month", + limit=5, + ) + + assert result["category"] == "web" + assert result["analysis"]["category_filtered"] is True + # Should only include web packages (flask, django) + assert len(result["trending_packages"]) == 2 + + +class TestSearchClient: + """Test the PyPISearchClient class.""" + + @pytest.mark.asyncio + async def test_client_context_manager(self): + """Test that the search client works as an async context manager.""" + async with PyPISearchClient() as client: + assert client is not None + assert hasattr(client, 'search_packages') + + def test_search_filter_creation(self): + """Test SearchFilter creation.""" + filters = SearchFilter( + python_versions=["3.9", "3.10"], + licenses=["mit", "apache"], + categories=["web", "data-science"], + min_downloads=1000, + ) + + assert filters.python_versions == ["3.9", "3.10"] + assert filters.licenses == ["mit", "apache"] + assert filters.categories == ["web", "data-science"] + assert filters.min_downloads == 1000 + + def test_search_sort_creation(self): + """Test SearchSort creation.""" + sort = SearchSort(field="popularity", reverse=True) + + assert sort.field == "popularity" + assert sort.reverse is True + + # Test defaults + default_sort = SearchSort() + assert default_sort.field == "relevance" + assert default_sort.reverse is True \ No newline at end of file