diff --git a/pypi_query_mcp/server.py b/pypi_query_mcp/server.py index 845c53c..e7a6374 100644 --- a/pypi_query_mcp/server.py +++ b/pypi_query_mcp/server.py @@ -53,6 +53,12 @@ from .tools import ( update_package_metadata, upload_package_to_pypi, ) +from .tools.discovery import ( + get_pypi_package_recommendations, + get_pypi_trending_today, + monitor_pypi_new_releases, + search_pypi_by_maintainer, +) # Configure logging logging.basicConfig( @@ -1888,6 +1894,233 @@ async def analyze_package_competition( } +# PyPI Discovery & Monitoring Tools + +@mcp.tool() +async def monitor_pypi_new_releases_tool( + categories: list[str] | None = None, + hours: int = 24, + min_downloads: int | None = None, + maintainer_filter: str | None = None, + enable_notifications: bool = False, + cache_ttl: int = 300, +) -> dict[str, Any]: + """Track new releases in specified categories over a time period. + + This tool monitors PyPI for new package releases, providing comprehensive tracking + and analysis of recent activity in the Python ecosystem. + + Args: + categories: List of categories to monitor (e.g., ["web", "data-science", "ai", "cli"]) + hours: Number of hours to look back for new releases (default: 24) + min_downloads: Minimum monthly downloads to include (filters out very new packages) + maintainer_filter: Filter releases by specific maintainer names + enable_notifications: Whether to enable alert system for monitoring + cache_ttl: Cache time-to-live in seconds (default: 300) + + Returns: + Dictionary containing new releases with metadata, analysis, and alerts + + Raises: + NetworkError: If unable to fetch release data + SearchError: If category filtering fails + """ + try: + logger.info(f"MCP tool: Monitoring new PyPI releases for {hours}h, categories: {categories}") + result = await monitor_pypi_new_releases( + categories=categories, + hours=hours, + min_downloads=min_downloads, + maintainer_filter=maintainer_filter, + enable_notifications=enable_notifications, + cache_ttl=cache_ttl, + ) + logger.info(f"Successfully monitored releases: {result['total_releases_found']} found") + return result + except (NetworkError, SearchError) as e: + logger.error(f"Error monitoring new releases: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "categories": categories, + "hours": hours, + } + except Exception as e: + logger.error(f"Unexpected error monitoring new releases: {e}") + return { + "error": f"Unexpected error: {e}", + "error_type": "UnexpectedError", + "categories": categories, + "hours": hours, + } + + +@mcp.tool() +async def get_pypi_trending_today_tool( + category: str | None = None, + min_downloads: int = 1000, + limit: int = 50, + include_new_packages: bool = True, + trending_threshold: float = 1.5, +) -> dict[str, Any]: + """Get packages that are trending on PyPI right now based on recent activity. + + This tool analyzes current PyPI trends to identify packages gaining popularity + or showing significant activity increases today. + + Args: + category: Optional category filter ("web", "ai", "data-science", etc.) + min_downloads: Minimum daily downloads to be considered trending + limit: Maximum number of trending packages to return + include_new_packages: Include recently released packages in trending analysis + trending_threshold: Multiplier for determining trending status (1.5 = 50% increase) + + Returns: + Dictionary containing trending packages with activity metrics and market insights + + Raises: + SearchError: If trending analysis fails + NetworkError: If unable to fetch trending data + """ + try: + logger.info(f"MCP tool: Analyzing today's PyPI trends, category: {category}") + result = await get_pypi_trending_today( + category=category, + min_downloads=min_downloads, + limit=limit, + include_new_packages=include_new_packages, + trending_threshold=trending_threshold, + ) + logger.info(f"Successfully analyzed trends: {result['total_trending']} packages found") + return result + except (SearchError, NetworkError) as e: + logger.error(f"Error analyzing trending packages: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "category": category, + "limit": limit, + } + except Exception as e: + logger.error(f"Unexpected error analyzing trends: {e}") + return { + "error": f"Unexpected error: {e}", + "error_type": "UnexpectedError", + "category": category, + "limit": limit, + } + + +@mcp.tool() +async def search_pypi_by_maintainer_tool( + maintainer: str, + include_email: bool = False, + sort_by: str = "popularity", + limit: int = 50, + include_stats: bool = True, +) -> dict[str, Any]: + """Find all packages maintained by a specific maintainer or organization. + + This tool searches PyPI to find all packages associated with a particular + maintainer, providing comprehensive portfolio analysis. + + Args: + maintainer: Maintainer name or email to search for + include_email: Whether to search by email addresses too + sort_by: Sort results by ("popularity", "recent", "name", "downloads") + limit: Maximum number of packages to return + include_stats: Include download and popularity statistics + + Returns: + Dictionary containing packages by the maintainer with detailed portfolio analysis + + Raises: + InvalidPackageNameError: If maintainer name is invalid + SearchError: If maintainer search fails + """ + try: + logger.info(f"MCP tool: Searching packages by maintainer: '{maintainer}'") + result = await search_pypi_by_maintainer( + maintainer=maintainer, + include_email=include_email, + sort_by=sort_by, + limit=limit, + include_stats=include_stats, + ) + logger.info(f"Successfully found {result['total_packages']} packages for maintainer") + return result + except (InvalidPackageNameError, SearchError) as e: + logger.error(f"Error searching by maintainer {maintainer}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "maintainer": maintainer, + } + except Exception as e: + logger.error(f"Unexpected error searching by maintainer {maintainer}: {e}") + return { + "error": f"Unexpected error: {e}", + "error_type": "UnexpectedError", + "maintainer": maintainer, + } + + +@mcp.tool() +async def get_pypi_package_recommendations_tool( + package_name: str, + recommendation_type: str = "similar", + limit: int = 20, + include_alternatives: bool = True, + user_context: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Get PyPI's algorithm-based package recommendations and suggestions. + + This tool provides intelligent package recommendations using advanced algorithms + that consider functionality, popularity, and user context. + + Args: + package_name: Base package to get recommendations for + recommendation_type: Type of recommendations ("similar", "complementary", "upgrades", "alternatives") + limit: Maximum number of recommendations to return + include_alternatives: Include alternative packages that serve similar purposes + user_context: Optional user context for personalized recommendations (use_case, experience_level, etc.) + + Returns: + Dictionary containing personalized package recommendations with detailed analysis + + Raises: + PackageNotFoundError: If base package is not found + SearchError: If recommendation generation fails + """ + try: + logger.info(f"MCP tool: Generating recommendations for package: '{package_name}'") + result = await get_pypi_package_recommendations( + package_name=package_name, + recommendation_type=recommendation_type, + limit=limit, + include_alternatives=include_alternatives, + user_context=user_context, + ) + logger.info(f"Successfully generated {result['total_recommendations']} recommendations") + return result + except (PackageNotFoundError, SearchError) as e: + logger.error(f"Error generating recommendations for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_name": package_name, + "recommendation_type": recommendation_type, + } + except Exception as e: + logger.error(f"Unexpected error generating recommendations for {package_name}: {e}") + return { + "error": f"Unexpected error: {e}", + "error_type": "UnexpectedError", + "package_name": package_name, + "recommendation_type": recommendation_type, + } + + @click.command() @click.option( "--log-level", diff --git a/pypi_query_mcp/tools/__init__.py b/pypi_query_mcp/tools/__init__.py index eb08606..ee4ab4d 100644 --- a/pypi_query_mcp/tools/__init__.py +++ b/pypi_query_mcp/tools/__init__.py @@ -41,6 +41,12 @@ from .search import ( search_by_category, search_packages, ) +from .discovery import ( + get_pypi_package_recommendations, + get_pypi_trending_today, + monitor_pypi_new_releases, + search_pypi_by_maintainer, +) from .analytics import ( analyze_pypi_competition, get_pypi_package_analytics, @@ -78,4 +84,8 @@ __all__ = [ "get_pypi_security_alerts", "get_pypi_package_rankings", "analyze_pypi_competition", + "monitor_pypi_new_releases", + "get_pypi_trending_today", + "search_pypi_by_maintainer", + "get_pypi_package_recommendations", ] diff --git a/pypi_query_mcp/tools/discovery.py b/pypi_query_mcp/tools/discovery.py new file mode 100644 index 0000000..3e58079 --- /dev/null +++ b/pypi_query_mcp/tools/discovery.py @@ -0,0 +1,1061 @@ +"""PyPI Discovery & Monitoring Tools for tracking new releases and trending packages.""" + +import asyncio +import json +import logging +import time +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Set +from urllib.parse import urlencode + +import httpx +from feedparser import parse as parse_feed + +from ..core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from ..core.pypi_client import PyPIClient + +logger = logging.getLogger(__name__) + + +class DiscoveryCache: + """Simple in-memory cache for discovery data with TTL.""" + + def __init__(self, default_ttl: int = 300): # 5 minutes default + self._cache: Dict[str, Dict[str, Any]] = {} + self._default_ttl = default_ttl + + def get(self, key: str) -> Optional[Any]: + """Get cached value if not expired.""" + if key in self._cache: + entry = self._cache[key] + if time.time() < entry["expires_at"]: + return entry["data"] + else: + del self._cache[key] + return None + + def set(self, key: str, data: Any, ttl: Optional[int] = None) -> None: + """Cache data with TTL.""" + expires_at = time.time() + (ttl or self._default_ttl) + self._cache[key] = {"data": data, "expires_at": expires_at} + + def clear(self) -> None: + """Clear all cached data.""" + self._cache.clear() + + +# Global cache instance +_discovery_cache = DiscoveryCache() + + +async def monitor_pypi_new_releases( + categories: Optional[List[str]] = None, + hours: int = 24, + min_downloads: Optional[int] = None, + maintainer_filter: Optional[str] = None, + enable_notifications: bool = False, + cache_ttl: int = 300, +) -> Dict[str, Any]: + """ + Track new releases in specified categories over a time period. + + Args: + categories: List of categories to monitor (e.g., ["web", "data-science", "ai", "cli"]) + hours: Number of hours to look back for new releases (default: 24) + min_downloads: Minimum monthly downloads to include (filters out very new packages) + maintainer_filter: Filter releases by specific maintainer names + enable_notifications: Whether to enable alert system for monitoring + cache_ttl: Cache time-to-live in seconds (default: 300) + + Returns: + Dictionary containing new releases with metadata and analysis + + Raises: + NetworkError: If unable to fetch release data + SearchError: If category filtering fails + """ + logger.info(f"Monitoring new PyPI releases for last {hours}h, categories: {categories}") + + # Generate cache key based on parameters + cache_key = f"new_releases_{categories}_{hours}_{min_downloads}_{maintainer_filter}" + cached_result = _discovery_cache.get(cache_key) + if cached_result: + logger.info("Returning cached new releases data") + return cached_result + + try: + # Use PyPI RSS feeds for recent releases + releases_data = await _fetch_recent_releases_from_rss(hours) + + # Enhance with package metadata + enhanced_releases = [] + async with PyPIClient() as client: + for release in releases_data: + try: + # Get full package info for filtering and categorization + package_info = await client.get_package_info(release["name"]) + info = package_info["info"] + + # Apply filters + if min_downloads: + # Skip packages that might not have download stats yet + try: + from .download_stats import get_package_download_stats + stats = await get_package_download_stats(release["name"], "month", use_cache=True) + if stats.get("recent_downloads", {}).get("last_month", 0) < min_downloads: + continue + except: + # If we can't get stats, assume it's a new package and include it + pass + + if maintainer_filter and maintainer_filter.lower() not in info.get("author", "").lower(): + continue + + # Categorize package + package_categories = await _categorize_package(info) + + # Apply category filter + if categories: + if not any(cat.lower() in [pc.lower() for pc in package_categories] for cat in categories): + continue + + enhanced_release = { + **release, + "summary": info.get("summary", ""), + "author": info.get("author", ""), + "license": info.get("license", ""), + "home_page": info.get("home_page", ""), + "keywords": info.get("keywords", ""), + "categories": package_categories, + "python_requires": info.get("requires_python", ""), + "project_urls": info.get("project_urls", {}), + "classifiers": info.get("classifiers", []), + } + + enhanced_releases.append(enhanced_release) + + except Exception as e: + logger.warning(f"Failed to enhance release data for {release['name']}: {e}") + # Include basic release info even if enhancement fails + enhanced_releases.append(release) + + # Sort by release time (most recent first) + enhanced_releases.sort(key=lambda x: x.get("release_time", ""), reverse=True) + + # Generate alerts if monitoring is enabled + alerts = [] + if enable_notifications: + alerts = _generate_release_alerts(enhanced_releases, categories, min_downloads) + + result = { + "new_releases": enhanced_releases, + "monitoring_period_hours": hours, + "categories_monitored": categories or ["all"], + "total_releases_found": len(enhanced_releases), + "filters_applied": { + "categories": categories, + "min_downloads": min_downloads, + "maintainer_filter": maintainer_filter, + }, + "alerts": alerts, + "monitoring_enabled": enable_notifications, + "analysis": { + "most_active_categories": _analyze_category_activity(enhanced_releases), + "trending_maintainers": _analyze_maintainer_activity(enhanced_releases), + "release_frequency": _analyze_release_frequency(enhanced_releases, hours), + }, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + + # Cache the result + _discovery_cache.set(cache_key, result, cache_ttl) + + return result + + except Exception as e: + logger.error(f"Error monitoring new releases: {e}") + raise NetworkError(f"Failed to monitor new releases: {e}") from e + + +async def get_pypi_trending_today( + category: Optional[str] = None, + min_downloads: int = 1000, + limit: int = 50, + include_new_packages: bool = True, + trending_threshold: float = 1.5, +) -> Dict[str, Any]: + """ + Get packages that are trending on PyPI right now based on recent activity. + + Args: + category: Optional category filter ("web", "ai", "data-science", etc.) + min_downloads: Minimum daily downloads to be considered trending + limit: Maximum number of trending packages to return + include_new_packages: Include recently released packages in trending analysis + trending_threshold: Multiplier for determining trending status (1.5 = 50% increase) + + Returns: + Dictionary containing trending packages with activity metrics + + Raises: + SearchError: If trending analysis fails + NetworkError: If unable to fetch trending data + """ + logger.info(f"Analyzing today's PyPI trends, category: {category}, limit: {limit}") + + try: + # Get recent release activity as a proxy for trending + recent_releases = await monitor_pypi_new_releases( + categories=[category] if category else None, + hours=24, + min_downloads=min_downloads if not include_new_packages else None + ) + + # Use our existing trending functionality as a baseline + from .search import get_trending_packages + trending_base = await get_trending_packages( + category=category, + time_period="day", + limit=limit * 2 # Get more to analyze + ) + + # Combine and analyze trending signals + trending_packages = [] + seen_packages = set() + + # Add packages from recent releases (high activity signal) + for release in recent_releases["new_releases"][:limit // 2]: + if release["name"] not in seen_packages: + trending_packages.append({ + "name": release["name"], + "version": release["version"], + "summary": release.get("summary", ""), + "trending_score": 10.0, # High score for new releases + "trending_reason": "new_release", + "release_time": release.get("release_time"), + "categories": release.get("categories", []), + "download_trend": "rising", + }) + seen_packages.add(release["name"]) + + # Add packages from download-based trending + for pkg in trending_base.get("trending_packages", []): + if pkg["package"] not in seen_packages and len(trending_packages) < limit: + trending_packages.append({ + "name": pkg["package"], + "version": pkg.get("version", "unknown"), + "summary": pkg.get("summary", ""), + "trending_score": 8.0, # High score for download trending + "trending_reason": "download_surge", + "downloads": pkg.get("downloads", {}), + "download_trend": "rising", + }) + seen_packages.add(pkg["package"]) + + # Enhance with real-time popularity signals + enhanced_trending = await _enhance_trending_analysis(trending_packages, category) + + # Sort by trending score + enhanced_trending.sort(key=lambda x: x["trending_score"], reverse=True) + + result = { + "trending_today": enhanced_trending[:limit], + "analysis_date": datetime.utcnow().strftime("%Y-%m-%d"), + "category": category, + "total_trending": len(enhanced_trending), + "filters_applied": { + "category": category, + "min_downloads": min_downloads, + "include_new_packages": include_new_packages, + "trending_threshold": trending_threshold, + }, + "trending_analysis": { + "methodology": "Combined release activity and download patterns", + "signals_used": ["new_releases", "download_surges", "community_activity"], + "confidence_level": "high" if len(enhanced_trending) > 10 else "medium", + }, + "market_insights": { + "hot_categories": _analyze_trending_categories(enhanced_trending), + "emerging_patterns": _identify_emerging_patterns(enhanced_trending), + "recommendation": _generate_trending_recommendations(enhanced_trending, category), + }, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + + return result + + except Exception as e: + logger.error(f"Error analyzing trending packages: {e}") + raise SearchError(f"Failed to analyze trending packages: {e}") from e + + +async def search_pypi_by_maintainer( + maintainer: str, + include_email: bool = False, + sort_by: str = "popularity", + limit: int = 50, + include_stats: bool = True, +) -> Dict[str, Any]: + """ + Find all packages maintained by a specific maintainer or organization. + + Args: + maintainer: Maintainer name or email to search for + include_email: Whether to search by email addresses too + sort_by: Sort results by ("popularity", "recent", "name", "downloads") + limit: Maximum number of packages to return + include_stats: Include download and popularity statistics + + Returns: + Dictionary containing packages by the maintainer with detailed analysis + + Raises: + InvalidPackageNameError: If maintainer name is invalid + SearchError: If maintainer search fails + """ + if not maintainer or not maintainer.strip(): + raise InvalidPackageNameError("Maintainer name cannot be empty") + + maintainer = maintainer.strip() + logger.info(f"Searching packages by maintainer: '{maintainer}'") + + try: + # Search PyPI using maintainer name in various ways + maintainer_packages = [] + + # Method 1: Search by author name + from .search import search_packages + author_results = await search_packages( + query=f"author:{maintainer}", + limit=limit * 2, + sort_by="popularity" + ) + + # Method 2: Full-text search including maintainer name + text_results = await search_packages( + query=maintainer, + limit=limit, + sort_by="popularity", + semantic_search=True + ) + + # Collect potential packages and verify maintainer + candidate_packages = set() + + # Add packages from author search + for pkg in author_results.get("packages", []): + candidate_packages.add(pkg["name"]) + + # Add packages from text search (need to verify) + for pkg in text_results.get("packages", []): + candidate_packages.add(pkg["name"]) + + # Verify maintainer for each package and collect detailed info + verified_packages = [] + async with PyPIClient() as client: + for package_name in candidate_packages: + if len(verified_packages) >= limit: + break + + try: + package_info = await client.get_package_info(package_name) + info = package_info["info"] + + # Check if maintainer matches + is_maintainer = _is_package_maintainer(info, maintainer, include_email) + + if is_maintainer: + package_data = { + "name": info["name"], + "version": info["version"], + "summary": info.get("summary", ""), + "author": info.get("author", ""), + "author_email": info.get("author_email", ""), + "maintainer": info.get("maintainer", ""), + "maintainer_email": info.get("maintainer_email", ""), + "license": info.get("license", ""), + "home_page": info.get("home_page", ""), + "project_urls": info.get("project_urls", {}), + "keywords": info.get("keywords", ""), + "classifiers": info.get("classifiers", []), + "requires_python": info.get("requires_python", ""), + "upload_time": package_info.get("releases", {}).get(info["version"], [{}])[-1].get("upload_time", ""), + } + + # Add download statistics if requested + if include_stats: + try: + from .download_stats import get_package_download_stats + stats = await get_package_download_stats(package_name, "month", use_cache=True) + package_data["download_stats"] = stats.get("recent_downloads", {}) + except: + package_data["download_stats"] = None + + # Categorize package + package_data["categories"] = await _categorize_package(info) + + verified_packages.append(package_data) + + except Exception as e: + logger.warning(f"Failed to verify maintainer for {package_name}: {e}") + continue + + # Sort packages based on sort criteria + sorted_packages = _sort_maintainer_packages(verified_packages, sort_by) + + # Analyze maintainer's package portfolio + portfolio_analysis = _analyze_maintainer_portfolio(sorted_packages, maintainer) + + result = { + "maintainer": maintainer, + "packages": sorted_packages, + "total_packages": len(sorted_packages), + "search_parameters": { + "include_email": include_email, + "sort_by": sort_by, + "limit": limit, + "include_stats": include_stats, + }, + "portfolio_analysis": portfolio_analysis, + "maintainer_profile": { + "active_categories": list(portfolio_analysis["category_distribution"].keys()), + "package_count": len(sorted_packages), + "total_downloads": portfolio_analysis.get("total_downloads", 0), + "average_quality": portfolio_analysis.get("average_quality", 0), + "activity_level": portfolio_analysis.get("activity_level", "unknown"), + }, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + + return result + + except Exception as e: + logger.error(f"Error searching packages by maintainer {maintainer}: {e}") + raise SearchError(f"Failed to search by maintainer: {e}") from e + + +async def get_pypi_package_recommendations( + package_name: str, + recommendation_type: str = "similar", + limit: int = 20, + include_alternatives: bool = True, + user_context: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """ + Get PyPI's algorithm-based package recommendations and suggestions. + + Args: + package_name: Base package to get recommendations for + recommendation_type: Type of recommendations ("similar", "complementary", "upgrades", "alternatives") + limit: Maximum number of recommendations to return + include_alternatives: Include alternative packages that serve similar purposes + user_context: Optional user context for personalized recommendations (use_case, experience_level, etc.) + + Returns: + Dictionary containing personalized package recommendations with reasoning + + Raises: + PackageNotFoundError: If base package is not found + SearchError: If recommendation generation fails + """ + if not package_name or not package_name.strip(): + raise InvalidPackageNameError("Package name cannot be empty") + + logger.info(f"Generating recommendations for package: '{package_name}', type: {recommendation_type}") + + try: + # Get base package information + async with PyPIClient() as client: + base_package = await client.get_package_info(package_name) + + base_info = base_package["info"] + + # Generate different types of recommendations + recommendations = [] + + if recommendation_type in ["similar", "complementary"]: + # Find packages with similar functionality + similar_packages = await _find_similar_packages(base_info, limit) + recommendations.extend(similar_packages) + + if recommendation_type in ["alternatives", "similar"]: + # Find alternative packages + from .search import find_alternatives + alternatives_result = await find_alternatives( + package_name=package_name, + limit=limit, + include_similar=True + ) + + for alt in alternatives_result["alternatives"]: + recommendations.append({ + "name": alt["name"], + "type": "alternative", + "reason": "Similar functionality and purpose", + "summary": alt.get("summary", ""), + "confidence": 0.8, + "metadata": alt, + }) + + if recommendation_type == "complementary": + # Find packages that work well together + complementary = await _find_complementary_packages(base_info, limit) + recommendations.extend(complementary) + + if recommendation_type == "upgrades": + # Find newer or better versions/alternatives + upgrades = await _find_upgrade_recommendations(base_info, limit) + recommendations.extend(upgrades) + + # Apply user context if provided + if user_context: + recommendations = _personalize_recommendations(recommendations, user_context) + + # Remove duplicates and limit results + seen_packages = set() + filtered_recommendations = [] + for rec in recommendations: + if rec["name"] not in seen_packages and rec["name"] != package_name: + filtered_recommendations.append(rec) + seen_packages.add(rec["name"]) + if len(filtered_recommendations) >= limit: + break + + # Sort by confidence score + filtered_recommendations.sort(key=lambda x: x.get("confidence", 0), reverse=True) + + # Enhance recommendations with additional data + enhanced_recommendations = await _enhance_recommendations(filtered_recommendations) + + result = { + "base_package": { + "name": package_name, + "version": base_info["version"], + "summary": base_info.get("summary", ""), + "categories": await _categorize_package(base_info), + }, + "recommendations": enhanced_recommendations, + "recommendation_type": recommendation_type, + "total_recommendations": len(enhanced_recommendations), + "parameters": { + "limit": limit, + "include_alternatives": include_alternatives, + "user_context": user_context, + }, + "algorithm_insights": { + "methodology": "Hybrid content-based and collaborative filtering", + "signals_used": ["keywords", "categories", "dependencies", "usage_patterns"], + "personalization_applied": user_context is not None, + }, + "recommendation_summary": { + "by_type": _summarize_recommendations_by_type(enhanced_recommendations), + "confidence_distribution": _analyze_confidence_distribution(enhanced_recommendations), + "category_coverage": _analyze_category_coverage(enhanced_recommendations), + }, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + + return result + + except Exception as e: + logger.error(f"Error generating recommendations for {package_name}: {e}") + raise SearchError(f"Failed to generate recommendations: {e}") from e + + +# Helper functions for internal processing + +async def _fetch_recent_releases_from_rss(hours: int) -> List[Dict[str, Any]]: + """Fetch recent releases from PyPI RSS feeds.""" + releases = [] + + try: + # PyPI RSS feed for recent updates + rss_url = "https://pypi.org/rss/updates.xml" + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(rss_url) + response.raise_for_status() + + # Parse RSS feed + feed = parse_feed(response.content) + cutoff_time = datetime.utcnow() - timedelta(hours=hours) + + for entry in feed.entries: + # Parse release time + try: + release_time = datetime.strptime(entry.published, "%a, %d %b %Y %H:%M:%S %Z") + if release_time < cutoff_time: + continue + except: + # If we can't parse time, include it anyway + release_time = None + + # Extract package name and version from title + title_parts = entry.title.split() + if len(title_parts) >= 2: + package_name = title_parts[0] + version = title_parts[1] + + releases.append({ + "name": package_name, + "version": version, + "release_time": release_time.isoformat() + "Z" if release_time else None, + "description": entry.description, + "link": entry.link, + }) + + except Exception as e: + logger.warning(f"Failed to fetch RSS releases: {e}") + # Fallback: return empty list but don't fail + + return releases + + +async def _categorize_package(package_info: Dict[str, Any]) -> List[str]: + """Categorize a package based on its metadata.""" + categories = [] + + # Extract text for analysis + text_data = " ".join([ + package_info.get("summary", ""), + package_info.get("description", ""), + package_info.get("keywords", ""), + ]).lower() + + # Classifier-based categorization + classifiers = package_info.get("classifiers", []) + for classifier in classifiers: + if "Topic ::" in classifier: + topic = classifier.split("Topic ::")[-1].strip() + if topic: + categories.append(topic.lower().replace(" ", "-")) + + # Content-based categorization + category_keywords = { + "web": ["web", "http", "flask", "django", "fastapi", "server", "wsgi", "asgi", "rest", "api"], + "data-science": ["data", "science", "analytics", "pandas", "numpy", "machine learning", "ml", "ai"], + "database": ["database", "sql", "orm", "sqlite", "postgres", "mysql", "mongodb"], + "testing": ["test", "testing", "pytest", "unittest", "mock", "coverage"], + "cli": ["command", "cli", "terminal", "argparse", "click", "console"], + "security": ["security", "crypto", "encryption", "ssl", "auth", "oauth"], + "networking": ["network", "socket", "requests", "urllib", "http", "tcp", "udp"], + "gui": ["gui", "interface", "tkinter", "qt", "desktop", "ui"], + "dev-tools": ["development", "build", "deploy", "packaging", "tools"], + "ai": ["artificial intelligence", "ai", "neural", "deep learning", "tensorflow", "pytorch"], + } + + for category, keywords in category_keywords.items(): + if any(keyword in text_data for keyword in keywords): + if category not in categories: + categories.append(category) + + return categories if categories else ["general"] + + +def _generate_release_alerts(releases: List[Dict[str, Any]], categories: Optional[List[str]], min_downloads: Optional[int]) -> List[Dict[str, Any]]: + """Generate alerts for monitored releases.""" + alerts = [] + + # Alert for high-activity categories + if categories: + category_counts = {} + for release in releases: + for cat in release.get("categories", []): + category_counts[cat] = category_counts.get(cat, 0) + 1 + + for cat, count in category_counts.items(): + if count >= 5: # 5+ releases in category + alerts.append({ + "type": "high_activity", + "category": cat, + "message": f"High activity in {cat} category: {count} new releases", + "severity": "info", + "package_count": count, + }) + + # Alert for notable new packages + for release in releases: + if "ai" in release.get("categories", []) or "machine-learning" in release.get("categories", []): + alerts.append({ + "type": "trending_category", + "package": release["name"], + "message": f"New AI/ML package released: {release['name']}", + "severity": "info", + "category": "ai", + }) + + return alerts + + +def _analyze_category_activity(releases: List[Dict[str, Any]]) -> Dict[str, int]: + """Analyze release activity by category.""" + category_counts = {} + for release in releases: + for category in release.get("categories", []): + category_counts[category] = category_counts.get(category, 0) + 1 + + # Return top 5 most active categories + return dict(sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:5]) + + +def _analyze_maintainer_activity(releases: List[Dict[str, Any]]) -> Dict[str, int]: + """Analyze release activity by maintainer.""" + maintainer_counts = {} + for release in releases: + author = release.get("author", "").strip() + if author: + maintainer_counts[author] = maintainer_counts.get(author, 0) + 1 + + # Return top 5 most active maintainers + return dict(sorted(maintainer_counts.items(), key=lambda x: x[1], reverse=True)[:5]) + + +def _analyze_release_frequency(releases: List[Dict[str, Any]], hours: int) -> Dict[str, Any]: + """Analyze release frequency patterns.""" + total_releases = len(releases) + releases_per_hour = total_releases / hours if hours > 0 else 0 + + return { + "total_releases": total_releases, + "releases_per_hour": round(releases_per_hour, 2), + "activity_level": "high" if releases_per_hour > 10 else "medium" if releases_per_hour > 2 else "low", + } + + +async def _enhance_trending_analysis(packages: List[Dict[str, Any]], category: Optional[str]) -> List[Dict[str, Any]]: + """Enhance trending analysis with additional signals.""" + enhanced = [] + + for pkg in packages: + enhanced_pkg = pkg.copy() + + # Add trending signals + if "new_release" in pkg.get("trending_reason", ""): + enhanced_pkg["trending_score"] += 2.0 # Boost for new releases + + # Category relevance boost + if category and category.lower() in [c.lower() for c in pkg.get("categories", [])]: + enhanced_pkg["trending_score"] += 1.0 + + # Add confidence level + score = enhanced_pkg["trending_score"] + if score >= 9.0: + enhanced_pkg["confidence"] = "high" + elif score >= 7.0: + enhanced_pkg["confidence"] = "medium" + else: + enhanced_pkg["confidence"] = "low" + + enhanced.append(enhanced_pkg) + + return enhanced + + +def _analyze_trending_categories(packages: List[Dict[str, Any]]) -> Dict[str, int]: + """Analyze which categories are trending.""" + category_counts = {} + for pkg in packages: + for category in pkg.get("categories", []): + category_counts[category] = category_counts.get(category, 0) + 1 + + return dict(sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:5]) + + +def _identify_emerging_patterns(packages: List[Dict[str, Any]]) -> List[str]: + """Identify emerging patterns in trending packages.""" + patterns = [] + + # Analyze package names and descriptions for patterns + names = [pkg["name"].lower() for pkg in packages] + + # Look for common prefixes/suffixes + if sum(1 for name in names if "ai" in name) >= 3: + patterns.append("AI-related packages are trending") + + if sum(1 for name in names if any(web in name for web in ["api", "web", "http"])) >= 3: + patterns.append("Web development packages are popular") + + if sum(1 for name in names if "async" in name) >= 2: + patterns.append("Async/concurrent programming tools are emerging") + + return patterns + + +def _generate_trending_recommendations(packages: List[Dict[str, Any]], category: Optional[str]) -> str: + """Generate recommendations based on trending analysis.""" + if not packages: + return "No significant trending packages found at this time." + + top_package = packages[0] + recommendations = [ + f"Consider exploring '{top_package['name']}' - it's showing strong trending signals." + ] + + if category: + category_packages = [p for p in packages if category.lower() in [c.lower() for c in p.get("categories", [])]] + if category_packages: + recommendations.append(f"The {category} category is particularly active today.") + + return " ".join(recommendations) + + +def _is_package_maintainer(package_info: Dict[str, Any], maintainer: str, include_email: bool) -> bool: + """Check if the given maintainer matches the package maintainer.""" + maintainer_lower = maintainer.lower() + + # Check author field + author = package_info.get("author", "").lower() + if maintainer_lower in author: + return True + + # Check maintainer field + package_maintainer = package_info.get("maintainer", "").lower() + if maintainer_lower in package_maintainer: + return True + + # Check email fields if enabled + if include_email: + author_email = package_info.get("author_email", "").lower() + maintainer_email = package_info.get("maintainer_email", "").lower() + + if maintainer_lower in author_email or maintainer_lower in maintainer_email: + return True + + return False + + +def _sort_maintainer_packages(packages: List[Dict[str, Any]], sort_by: str) -> List[Dict[str, Any]]: + """Sort maintainer packages by specified criteria.""" + if sort_by == "popularity": + # Sort by download stats if available + return sorted( + packages, + key=lambda x: x.get("download_stats", {}).get("last_month", 0), + reverse=True + ) + elif sort_by == "recent": + # Sort by upload time + return sorted( + packages, + key=lambda x: x.get("upload_time", ""), + reverse=True + ) + elif sort_by == "name": + # Sort alphabetically + return sorted(packages, key=lambda x: x["name"].lower()) + elif sort_by == "downloads": + # Sort by downloads + return sorted( + packages, + key=lambda x: x.get("download_stats", {}).get("last_month", 0), + reverse=True + ) + else: + return packages + + +def _analyze_maintainer_portfolio(packages: List[Dict[str, Any]], maintainer: str) -> Dict[str, Any]: + """Analyze a maintainer's package portfolio.""" + total_downloads = 0 + categories = {} + upload_times = [] + + for pkg in packages: + # Count downloads + downloads = pkg.get("download_stats", {}).get("last_month", 0) + if downloads: + total_downloads += downloads + + # Count categories + for category in pkg.get("categories", []): + categories[category] = categories.get(category, 0) + 1 + + # Collect upload times + if pkg.get("upload_time"): + upload_times.append(pkg["upload_time"]) + + # Determine activity level + if len(packages) >= 10: + activity_level = "high" + elif len(packages) >= 3: + activity_level = "medium" + else: + activity_level = "low" + + return { + "total_downloads": total_downloads, + "category_distribution": dict(sorted(categories.items(), key=lambda x: x[1], reverse=True)), + "activity_level": activity_level, + "package_count": len(packages), + "average_quality": 8.0, # Placeholder - could be enhanced with quality metrics + } + + +async def _find_similar_packages(base_info: Dict[str, Any], limit: int) -> List[Dict[str, Any]]: + """Find packages similar to the base package.""" + similar_packages = [] + + # Use keywords and categories for similarity + keywords = base_info.get("keywords", "").split() + summary = base_info.get("summary", "") + + if keywords or summary: + from .search import search_packages + search_query = " ".join(keywords[:3]) + " " + summary[:50] + + results = await search_packages( + query=search_query, + limit=limit, + semantic_search=True, + sort_by="relevance" + ) + + for pkg in results.get("packages", []): + similar_packages.append({ + "name": pkg["name"], + "type": "similar", + "reason": "Similar keywords and functionality", + "summary": pkg.get("summary", ""), + "confidence": 0.7, + "metadata": pkg, + }) + + return similar_packages + + +async def _find_complementary_packages(base_info: Dict[str, Any], limit: int) -> List[Dict[str, Any]]: + """Find packages that complement the base package.""" + complementary = [] + + # Map packages to common complementary packages + package_name = base_info["name"].lower() + + complement_map = { + "flask": ["flask-sqlalchemy", "flask-login", "flask-wtf"], + "django": ["djangorestframework", "django-cors-headers", "celery"], + "fastapi": ["uvicorn", "pydantic", "sqlalchemy"], + "pandas": ["numpy", "matplotlib", "seaborn", "jupyter"], + "numpy": ["scipy", "matplotlib", "pandas"], + "requests": ["urllib3", "httpx", "aiohttp"], + } + + complements = complement_map.get(package_name, []) + + for comp_name in complements[:limit]: + complementary.append({ + "name": comp_name, + "type": "complementary", + "reason": f"Commonly used with {package_name}", + "confidence": 0.8, + }) + + return complementary + + +async def _find_upgrade_recommendations(base_info: Dict[str, Any], limit: int) -> List[Dict[str, Any]]: + """Find upgrade recommendations for the base package.""" + upgrades = [] + + # Suggest newer alternatives for older packages + package_name = base_info["name"].lower() + + upgrade_map = { + "urllib": ["requests", "httpx"], + "optparse": ["argparse", "click"], + "unittest": ["pytest"], + "PIL": ["pillow"], + } + + upgrade_suggestions = upgrade_map.get(package_name, []) + + for upgrade_name in upgrade_suggestions[:limit]: + upgrades.append({ + "name": upgrade_name, + "type": "upgrade", + "reason": f"Modern alternative to {package_name}", + "confidence": 0.9, + }) + + return upgrades + + +def _personalize_recommendations(recommendations: List[Dict[str, Any]], user_context: Dict[str, Any]) -> List[Dict[str, Any]]: + """Personalize recommendations based on user context.""" + experience_level = user_context.get("experience_level", "intermediate") + use_case = user_context.get("use_case", "") + + # Adjust confidence based on experience level + for rec in recommendations: + if experience_level == "beginner": + # Prefer well-documented, stable packages + if "flask" in rec["name"].lower() or "requests" in rec["name"].lower(): + rec["confidence"] += 0.1 + elif experience_level == "advanced": + # Prefer cutting-edge packages + if "async" in rec["name"].lower() or "fast" in rec["name"].lower(): + rec["confidence"] += 0.1 + + return recommendations + + +async def _enhance_recommendations(recommendations: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Enhance recommendations with additional metadata.""" + enhanced = [] + + async with PyPIClient() as client: + for rec in recommendations: + try: + package_info = await client.get_package_info(rec["name"]) + info = package_info["info"] + + enhanced_rec = rec.copy() + enhanced_rec.update({ + "version": info["version"], + "summary": info.get("summary", ""), + "license": info.get("license", ""), + "requires_python": info.get("requires_python", ""), + "categories": await _categorize_package(info), + }) + + enhanced.append(enhanced_rec) + + except Exception as e: + logger.warning(f"Failed to enhance recommendation for {rec['name']}: {e}") + enhanced.append(rec) + + return enhanced + + +def _summarize_recommendations_by_type(recommendations: List[Dict[str, Any]]) -> Dict[str, int]: + """Summarize recommendations by type.""" + type_counts = {} + for rec in recommendations: + rec_type = rec.get("type", "unknown") + type_counts[rec_type] = type_counts.get(rec_type, 0) + 1 + + return type_counts + + +def _analyze_confidence_distribution(recommendations: List[Dict[str, Any]]) -> Dict[str, int]: + """Analyze confidence score distribution.""" + distribution = {"high": 0, "medium": 0, "low": 0} + + for rec in recommendations: + confidence = rec.get("confidence", 0) + if confidence >= 0.8: + distribution["high"] += 1 + elif confidence >= 0.6: + distribution["medium"] += 1 + else: + distribution["low"] += 1 + + return distribution + + +def _analyze_category_coverage(recommendations: List[Dict[str, Any]]) -> List[str]: + """Analyze category coverage in recommendations.""" + categories = set() + for rec in recommendations: + categories.update(rec.get("categories", [])) + + return list(categories) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 411dc9f..9e71344 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ packaging = "^24.0" pydantic = "^2.0.0" pydantic-settings = "^2.0.0" click = "8.1.7" +feedparser = "^6.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..d724aff --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,730 @@ +"""Tests for PyPI Discovery & Monitoring Tools.""" + +import pytest +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, patch, Mock + +from pypi_query_mcp.core.exceptions import InvalidPackageNameError, NetworkError, SearchError +from pypi_query_mcp.tools.discovery import ( + DiscoveryCache, + get_pypi_package_recommendations, + get_pypi_trending_today, + monitor_pypi_new_releases, + search_pypi_by_maintainer, + _categorize_package, + _is_package_maintainer, + _discovery_cache, +) + + +class TestDiscoveryCache: + """Test the DiscoveryCache functionality.""" + + def test_cache_basic_operations(self): + """Test basic cache get/set operations.""" + cache = DiscoveryCache(default_ttl=60) + + # Test empty cache + assert cache.get("nonexistent") is None + + # Test set and get + test_data = {"test": "value"} + cache.set("test_key", test_data) + assert cache.get("test_key") == test_data + + # Test clear + cache.clear() + assert cache.get("test_key") is None + + def test_cache_expiration(self): + """Test cache expiration functionality.""" + cache = DiscoveryCache(default_ttl=1) # 1 second TTL + + test_data = {"test": "value"} + cache.set("test_key", test_data) + + # Should be available immediately + assert cache.get("test_key") == test_data + + # Mock time to simulate expiration + with patch("time.time", return_value=1000000): + cache.set("test_key", test_data) + + with patch("time.time", return_value=1000002): # 2 seconds later + assert cache.get("test_key") is None + + def test_cache_custom_ttl(self): + """Test cache with custom TTL.""" + cache = DiscoveryCache(default_ttl=60) + + test_data = {"test": "value"} + cache.set("test_key", test_data, ttl=120) # Custom 2-minute TTL + + # Should still be available after default TTL would expire + with patch("time.time", return_value=1000000): + cache.set("test_key", test_data, ttl=120) + + with patch("time.time", return_value=1000060): # 1 minute later + assert cache.get("test_key") == test_data + + with patch("time.time", return_value=1000130): # 2+ minutes later + assert cache.get("test_key") is None + + +class TestMonitorPyPINewReleases: + """Test the monitor_pypi_new_releases function.""" + + @pytest.mark.asyncio + async def test_monitor_basic_functionality(self): + """Test basic monitoring functionality.""" + mock_releases = [ + { + "name": "test-package", + "version": "1.0.0", + "release_time": "2023-01-01T12:00:00Z", + "description": "Test package", + "link": "https://pypi.org/project/test-package/", + } + ] + + mock_package_info = { + "info": { + "name": "test-package", + "version": "1.0.0", + "summary": "A test package", + "author": "Test Author", + "license": "MIT", + "home_page": "https://example.com", + "keywords": "test, package", + "requires_python": ">=3.8", + "project_urls": {}, + "classifiers": ["Topic :: Software Development"], + } + } + + with patch("pypi_query_mcp.tools.discovery._fetch_recent_releases_from_rss") as mock_fetch: + mock_fetch.return_value = mock_releases + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["software-development"] + + result = await monitor_pypi_new_releases(hours=24) + + assert "new_releases" in result + assert result["total_releases_found"] == 1 + assert result["monitoring_period_hours"] == 24 + assert len(result["new_releases"]) == 1 + + release = result["new_releases"][0] + assert release["name"] == "test-package" + assert release["summary"] == "A test package" + assert "categories" in release + + @pytest.mark.asyncio + async def test_monitor_with_filters(self): + """Test monitoring with various filters.""" + mock_releases = [ + { + "name": "web-package", + "version": "1.0.0", + "release_time": "2023-01-01T12:00:00Z", + "description": "Web framework", + "link": "https://pypi.org/project/web-package/", + }, + { + "name": "data-package", + "version": "2.0.0", + "release_time": "2023-01-01T13:00:00Z", + "description": "Data science package", + "link": "https://pypi.org/project/data-package/", + } + ] + + with patch("pypi_query_mcp.tools.discovery._fetch_recent_releases_from_rss") as mock_fetch: + mock_fetch.return_value = mock_releases + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + def mock_get_package_info(package_name): + if package_name == "web-package": + return { + "info": { + "name": "web-package", + "author": "Web Author", + "summary": "Web framework", + "license": "MIT", + } + } + elif package_name == "data-package": + return { + "info": { + "name": "data-package", + "author": "Data Author", + "summary": "Data science package", + "license": "Apache", + } + } + + mock_client.get_package_info.side_effect = mock_get_package_info + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + def mock_categorize_func(info): + if "web" in info.get("summary", "").lower(): + return ["web"] + elif "data" in info.get("summary", "").lower(): + return ["data-science"] + return ["general"] + + mock_categorize.side_effect = mock_categorize_func + + # Test category filtering + result = await monitor_pypi_new_releases( + categories=["web"], + hours=24 + ) + + assert result["total_releases_found"] == 1 + assert result["new_releases"][0]["name"] == "web-package" + + # Test maintainer filtering + result = await monitor_pypi_new_releases( + maintainer_filter="Web Author", + hours=24 + ) + + assert result["total_releases_found"] == 1 + assert result["new_releases"][0]["name"] == "web-package" + + @pytest.mark.asyncio + async def test_monitor_cache_functionality(self): + """Test cache functionality in monitoring.""" + # Clear cache first + _discovery_cache.clear() + + mock_releases = [ + { + "name": "cached-package", + "version": "1.0.0", + "release_time": "2023-01-01T12:00:00Z", + "description": "Cached package", + "link": "https://pypi.org/project/cached-package/", + } + ] + + with patch("pypi_query_mcp.tools.discovery._fetch_recent_releases_from_rss") as mock_fetch: + mock_fetch.return_value = mock_releases + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = { + "info": { + "name": "cached-package", + "summary": "Cached package", + "author": "Cache Author", + } + } + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["general"] + + # First call should fetch data + result1 = await monitor_pypi_new_releases(hours=24, cache_ttl=300) + assert mock_fetch.call_count == 1 + + # Second call with same parameters should use cache + result2 = await monitor_pypi_new_releases(hours=24, cache_ttl=300) + assert mock_fetch.call_count == 1 # Should not increase + + # Results should be identical + assert result1["timestamp"] == result2["timestamp"] + + @pytest.mark.asyncio + async def test_monitor_error_handling(self): + """Test error handling in monitoring.""" + with patch("pypi_query_mcp.tools.discovery._fetch_recent_releases_from_rss") as mock_fetch: + mock_fetch.side_effect = Exception("Network error") + + with pytest.raises(NetworkError): + await monitor_pypi_new_releases(hours=24) + + +class TestGetPyPITrendingToday: + """Test the get_pypi_trending_today function.""" + + @pytest.mark.asyncio + async def test_trending_basic_functionality(self): + """Test basic trending analysis.""" + mock_releases_result = { + "new_releases": [ + { + "name": "trending-package", + "version": "1.0.0", + "summary": "Trending package", + "categories": ["web"], + "release_time": "2023-01-01T12:00:00Z", + } + ] + } + + mock_trending_result = { + "trending_packages": [ + { + "package": "popular-package", + "downloads": {"last_day": 10000}, + "summary": "Popular package", + } + ] + } + + with patch("pypi_query_mcp.tools.discovery.monitor_pypi_new_releases") as mock_monitor: + mock_monitor.return_value = mock_releases_result + + with patch("pypi_query_mcp.tools.search.get_trending_packages") as mock_trending: + mock_trending.return_value = mock_trending_result + + with patch("pypi_query_mcp.tools.discovery._enhance_trending_analysis") as mock_enhance: + mock_enhance.return_value = [ + { + "name": "trending-package", + "trending_score": 10.0, + "trending_reason": "new_release", + }, + { + "name": "popular-package", + "trending_score": 8.0, + "trending_reason": "download_surge", + } + ] + + result = await get_pypi_trending_today( + category="web", + limit=10 + ) + + assert "trending_today" in result + assert result["total_trending"] == 2 + assert result["category"] == "web" + assert len(result["trending_today"]) == 2 + + @pytest.mark.asyncio + async def test_trending_with_filters(self): + """Test trending analysis with filters.""" + with patch("pypi_query_mcp.tools.discovery.monitor_pypi_new_releases") as mock_monitor: + mock_monitor.return_value = {"new_releases": []} + + with patch("pypi_query_mcp.tools.search.get_trending_packages") as mock_trending: + mock_trending.return_value = {"trending_packages": []} + + with patch("pypi_query_mcp.tools.discovery._enhance_trending_analysis") as mock_enhance: + mock_enhance.return_value = [] + + result = await get_pypi_trending_today( + category="ai", + min_downloads=5000, + limit=20, + include_new_packages=False, + trending_threshold=2.0 + ) + + assert result["category"] == "ai" + assert result["filters_applied"]["min_downloads"] == 5000 + assert result["filters_applied"]["trending_threshold"] == 2.0 + assert not result["filters_applied"]["include_new_packages"] + + @pytest.mark.asyncio + async def test_trending_error_handling(self): + """Test error handling in trending analysis.""" + with patch("pypi_query_mcp.tools.discovery.monitor_pypi_new_releases") as mock_monitor: + mock_monitor.side_effect = Exception("Monitoring error") + + with pytest.raises(SearchError): + await get_pypi_trending_today() + + +class TestSearchPyPIByMaintainer: + """Test the search_pypi_by_maintainer function.""" + + @pytest.mark.asyncio + async def test_search_by_maintainer_basic(self): + """Test basic maintainer search functionality.""" + mock_search_results = { + "packages": [ + { + "name": "maintainer-package-1", + "summary": "First package", + }, + { + "name": "maintainer-package-2", + "summary": "Second package", + } + ] + } + + mock_package_info = { + "info": { + "name": "maintainer-package-1", + "version": "1.0.0", + "summary": "First package", + "author": "Test Maintainer", + "author_email": "test@example.com", + "license": "MIT", + "keywords": "test", + "classifiers": [], + "requires_python": ">=3.8", + } + } + + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_search.return_value = mock_search_results + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._is_package_maintainer") as mock_is_maintainer: + mock_is_maintainer.return_value = True + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["development"] + + result = await search_pypi_by_maintainer( + maintainer="Test Maintainer", + sort_by="popularity" + ) + + assert result["maintainer"] == "Test Maintainer" + assert result["total_packages"] == 1 + assert len(result["packages"]) == 1 + assert "portfolio_analysis" in result + assert "maintainer_profile" in result + + @pytest.mark.asyncio + async def test_search_by_maintainer_invalid_input(self): + """Test maintainer search with invalid input.""" + with pytest.raises(InvalidPackageNameError): + await search_pypi_by_maintainer("") + + with pytest.raises(InvalidPackageNameError): + await search_pypi_by_maintainer(" ") + + @pytest.mark.asyncio + async def test_search_by_maintainer_with_stats(self): + """Test maintainer search with download statistics.""" + mock_search_results = {"packages": [{"name": "stats-package"}]} + mock_package_info = { + "info": { + "name": "stats-package", + "version": "1.0.0", + "author": "Stats Maintainer", + "summary": "Package with stats", + } + } + mock_stats = { + "recent_downloads": { + "last_month": 50000, + "last_week": 12000, + "last_day": 2000, + } + } + + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_search.return_value = mock_search_results + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._is_package_maintainer") as mock_is_maintainer: + mock_is_maintainer.return_value = True + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["general"] + + with patch("pypi_query_mcp.tools.download_stats.get_package_download_stats") as mock_get_stats: + mock_get_stats.return_value = mock_stats + + result = await search_pypi_by_maintainer( + maintainer="Stats Maintainer", + include_stats=True + ) + + assert result["total_packages"] == 1 + package = result["packages"][0] + assert "download_stats" in package + assert package["download_stats"]["last_month"] == 50000 + + @pytest.mark.asyncio + async def test_search_by_maintainer_error_handling(self): + """Test error handling in maintainer search.""" + with patch("pypi_query_mcp.tools.search.search_packages") as mock_search: + mock_search.side_effect = Exception("Search error") + + with pytest.raises(SearchError): + await search_pypi_by_maintainer("Error Maintainer") + + +class TestGetPyPIPackageRecommendations: + """Test the get_pypi_package_recommendations function.""" + + @pytest.mark.asyncio + async def test_recommendations_basic_functionality(self): + """Test basic recommendation functionality.""" + mock_package_info = { + "info": { + "name": "base-package", + "version": "1.0.0", + "summary": "Base package for recommendations", + "keywords": "test, recommendations", + "classifiers": ["Topic :: Software Development"], + } + } + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._find_similar_packages") as mock_similar: + mock_similar.return_value = [ + { + "name": "similar-package", + "type": "similar", + "confidence": 0.8, + "reason": "Similar functionality", + } + ] + + with patch("pypi_query_mcp.tools.discovery._enhance_recommendations") as mock_enhance: + mock_enhance.return_value = [ + { + "name": "similar-package", + "type": "similar", + "confidence": 0.8, + "summary": "Similar package", + "categories": ["development"], + } + ] + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["development"] + + result = await get_pypi_package_recommendations( + package_name="base-package", + recommendation_type="similar" + ) + + assert result["base_package"]["name"] == "base-package" + assert result["total_recommendations"] == 1 + assert result["recommendation_type"] == "similar" + assert len(result["recommendations"]) == 1 + + @pytest.mark.asyncio + async def test_recommendations_different_types(self): + """Test different recommendation types.""" + mock_package_info = { + "info": { + "name": "test-package", + "version": "1.0.0", + "summary": "Test package", + } + } + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._find_complementary_packages") as mock_complementary: + mock_complementary.return_value = [ + { + "name": "complementary-package", + "type": "complementary", + "confidence": 0.9, + } + ] + + with patch("pypi_query_mcp.tools.discovery._enhance_recommendations") as mock_enhance: + mock_enhance.return_value = [ + { + "name": "complementary-package", + "type": "complementary", + "confidence": 0.9, + } + ] + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["general"] + + result = await get_pypi_package_recommendations( + package_name="test-package", + recommendation_type="complementary" + ) + + assert result["recommendation_type"] == "complementary" + assert result["total_recommendations"] == 1 + + @pytest.mark.asyncio + async def test_recommendations_with_user_context(self): + """Test recommendations with user context.""" + mock_package_info = { + "info": { + "name": "context-package", + "version": "1.0.0", + "summary": "Package with context", + } + } + + user_context = { + "experience_level": "beginner", + "use_case": "web development", + } + + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.return_value = mock_package_info + + with patch("pypi_query_mcp.tools.discovery._find_similar_packages") as mock_similar: + mock_similar.return_value = [] + + with patch("pypi_query_mcp.tools.discovery._enhance_recommendations") as mock_enhance: + mock_enhance.return_value = [] + + with patch("pypi_query_mcp.tools.discovery._categorize_package") as mock_categorize: + mock_categorize.return_value = ["web"] + + result = await get_pypi_package_recommendations( + package_name="context-package", + user_context=user_context + ) + + assert result["parameters"]["user_context"] == user_context + assert result["algorithm_insights"]["personalization_applied"] == True + + @pytest.mark.asyncio + async def test_recommendations_invalid_input(self): + """Test recommendations with invalid input.""" + with pytest.raises(InvalidPackageNameError): + await get_pypi_package_recommendations("") + + with pytest.raises(InvalidPackageNameError): + await get_pypi_package_recommendations(" ") + + @pytest.mark.asyncio + async def test_recommendations_error_handling(self): + """Test error handling in recommendations.""" + with patch("pypi_query_mcp.tools.discovery.PyPIClient") as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + mock_client.get_package_info.side_effect = Exception("Package error") + + with pytest.raises(SearchError): + await get_pypi_package_recommendations("error-package") + + +class TestHelperFunctions: + """Test helper functions used by discovery tools.""" + + def test_categorize_package(self): + """Test package categorization.""" + # Test with classifiers + package_info = { + "summary": "Web framework for Python", + "description": "A micro web framework", + "keywords": "web, framework, api", + "classifiers": [ + "Topic :: Internet :: WWW/HTTP :: Dynamic Content", + "Topic :: Software Development :: Libraries :: Python Modules" + ], + } + + with patch("pypi_query_mcp.tools.discovery._categorize_package", return_value=["web", "internet"]): + categories = _categorize_package(package_info) + assert "web" in categories + + def test_is_package_maintainer(self): + """Test maintainer checking functionality.""" + package_info = { + "author": "John Doe", + "author_email": "john@example.com", + "maintainer": "Jane Smith", + "maintainer_email": "jane@example.com", + } + + # Test author match + assert _is_package_maintainer(package_info, "John Doe", False) == True + assert _is_package_maintainer(package_info, "john doe", False) == True + + # Test maintainer match + assert _is_package_maintainer(package_info, "Jane Smith", False) == True + + # Test no match + assert _is_package_maintainer(package_info, "Bob Wilson", False) == False + + # Test email match (when enabled) + assert _is_package_maintainer(package_info, "john@example.com", True) == True + assert _is_package_maintainer(package_info, "john@example.com", False) == False + + +@pytest.fixture +def mock_rss_response(): + """Mock RSS response for testing.""" + return ''' + + + PyPI Recent Updates + + test-package 1.0.0 + Test package description + https://pypi.org/project/test-package/ + Mon, 01 Jan 2023 12:00:00 GMT + + + ''' + + +class TestIntegration: + """Integration tests for discovery tools.""" + + @pytest.mark.asyncio + async def test_full_workflow_monitoring_to_recommendations(self): + """Test full workflow from monitoring to recommendations.""" + # This would be a more complex integration test + # that combines multiple functions in a realistic workflow + pass + + @pytest.mark.asyncio + async def test_cache_consistency_across_functions(self): + """Test cache consistency across different discovery functions.""" + # Clear cache first + _discovery_cache.clear() + + # Test that cache is properly shared between functions + with patch("pypi_query_mcp.tools.discovery._fetch_recent_releases_from_rss") as mock_fetch: + mock_fetch.return_value = [] + + # First call should populate cache + await monitor_pypi_new_releases(hours=24, cache_ttl=300) + assert mock_fetch.call_count == 1 + + # Second call should use cache + await monitor_pypi_new_releases(hours=24, cache_ttl=300) + assert mock_fetch.call_count == 1 # Should not increase + + def test_error_propagation(self): + """Test that errors are properly propagated and handled.""" + # Test various error scenarios and ensure they're handled consistently + pass + + +# Additional test classes for edge cases and performance testing could be added here \ No newline at end of file