style: fix code formatting and linting issues

- Fix whitespace in docstrings and blank lines
- Remove unused variables in tests
- Rename unused loop variables to follow conventions
- All ruff checks now pass

Signed-off-by: Hal <hal.long@outlook.com>
This commit is contained in:
longhao 2025-05-27 18:58:25 +08:00 committed by Hal
parent 6b14ff6da5
commit f2b92ff0ee
6 changed files with 196 additions and 205 deletions

View File

@ -17,7 +17,7 @@ from pypi_query_mcp.tools.package_downloader import download_package_with_depend
async def analyze_pyside2_dependencies(): async def analyze_pyside2_dependencies():
"""Analyze PySide2 dependencies for Python 3.10.""" """Analyze PySide2 dependencies for Python 3.10."""
print("🔍 Analyzing PySide2 dependencies for Python 3.10...") print("🔍 Analyzing PySide2 dependencies for Python 3.10...")
try: try:
result = await resolve_package_dependencies( result = await resolve_package_dependencies(
package_name="PySide2", package_name="PySide2",
@ -26,23 +26,23 @@ async def analyze_pyside2_dependencies():
include_dev=False, include_dev=False,
max_depth=3 max_depth=3
) )
print(f"✅ Successfully resolved dependencies for {result['package_name']}") print(f"✅ Successfully resolved dependencies for {result['package_name']}")
print(f"📊 Summary:") print("📊 Summary:")
summary = result['summary'] summary = result['summary']
print(f" - Total packages: {summary['total_packages']}") print(f" - Total packages: {summary['total_packages']}")
print(f" - Runtime dependencies: {summary['total_runtime_dependencies']}") print(f" - Runtime dependencies: {summary['total_runtime_dependencies']}")
print(f" - Max depth: {summary['max_depth']}") print(f" - Max depth: {summary['max_depth']}")
print(f"\n📦 Package list:") print("\n📦 Package list:")
for i, pkg in enumerate(summary['package_list'][:10], 1): # Show first 10 for i, pkg in enumerate(summary['package_list'][:10], 1): # Show first 10
print(f" {i}. {pkg}") print(f" {i}. {pkg}")
if len(summary['package_list']) > 10: if len(summary['package_list']) > 10:
print(f" ... and {len(summary['package_list']) - 10} more packages") print(f" ... and {len(summary['package_list']) - 10} more packages")
return result return result
except Exception as e: except Exception as e:
print(f"❌ Error analyzing dependencies: {e}") print(f"❌ Error analyzing dependencies: {e}")
return None return None
@ -51,9 +51,9 @@ async def analyze_pyside2_dependencies():
async def download_pyside2_packages(): async def download_pyside2_packages():
"""Download PySide2 and its dependencies.""" """Download PySide2 and its dependencies."""
print("\n📥 Downloading PySide2 and dependencies...") print("\n📥 Downloading PySide2 and dependencies...")
download_dir = Path("./pyside2_downloads") download_dir = Path("./pyside2_downloads")
try: try:
result = await download_package_with_dependencies( result = await download_package_with_dependencies(
package_name="PySide2", package_name="PySide2",
@ -65,9 +65,9 @@ async def download_pyside2_packages():
verify_checksums=True, verify_checksums=True,
max_depth=2 # Limit depth for demo max_depth=2 # Limit depth for demo
) )
print(f"✅ Download completed!") print("✅ Download completed!")
print(f"📊 Download Summary:") print("📊 Download Summary:")
summary = result['summary'] summary = result['summary']
print(f" - Total packages: {summary['total_packages']}") print(f" - Total packages: {summary['total_packages']}")
print(f" - Successful downloads: {summary['successful_downloads']}") print(f" - Successful downloads: {summary['successful_downloads']}")
@ -75,14 +75,14 @@ async def download_pyside2_packages():
print(f" - Total size: {summary['total_downloaded_size']:,} bytes") print(f" - Total size: {summary['total_downloaded_size']:,} bytes")
print(f" - Success rate: {summary['success_rate']:.1f}%") print(f" - Success rate: {summary['success_rate']:.1f}%")
print(f" - Download directory: {summary['download_directory']}") print(f" - Download directory: {summary['download_directory']}")
if result['failed_downloads']: if result['failed_downloads']:
print(f"\n⚠️ Failed downloads:") print("\n⚠️ Failed downloads:")
for failure in result['failed_downloads']: for failure in result['failed_downloads']:
print(f" - {failure['package']}: {failure['error']}") print(f" - {failure['package']}: {failure['error']}")
return result return result
except Exception as e: except Exception as e:
print(f"❌ Error downloading packages: {e}") print(f"❌ Error downloading packages: {e}")
return None return None
@ -91,7 +91,7 @@ async def download_pyside2_packages():
async def analyze_small_package(): async def analyze_small_package():
"""Analyze a smaller package for demonstration.""" """Analyze a smaller package for demonstration."""
print("\n🔍 Analyzing 'click' package dependencies...") print("\n🔍 Analyzing 'click' package dependencies...")
try: try:
result = await resolve_package_dependencies( result = await resolve_package_dependencies(
package_name="click", package_name="click",
@ -100,26 +100,26 @@ async def analyze_small_package():
include_dev=False, include_dev=False,
max_depth=5 max_depth=5
) )
print(f"✅ Successfully resolved dependencies for {result['package_name']}") print(f"✅ Successfully resolved dependencies for {result['package_name']}")
# Show detailed dependency tree # Show detailed dependency tree
print(f"\n🌳 Dependency Tree:") print("\n🌳 Dependency Tree:")
dependency_tree = result['dependency_tree'] dependency_tree = result['dependency_tree']
for pkg_name, pkg_info in dependency_tree.items(): for _pkg_name, pkg_info in dependency_tree.items():
indent = " " * pkg_info['depth'] indent = " " * pkg_info['depth']
print(f"{indent}- {pkg_info['name']} ({pkg_info['version']})") print(f"{indent}- {pkg_info['name']} ({pkg_info['version']})")
runtime_deps = pkg_info['dependencies']['runtime'] runtime_deps = pkg_info['dependencies']['runtime']
if runtime_deps: if runtime_deps:
for dep in runtime_deps[:3]: # Show first 3 dependencies for dep in runtime_deps[:3]: # Show first 3 dependencies
print(f"{indent} └─ {dep}") print(f"{indent} └─ {dep}")
if len(runtime_deps) > 3: if len(runtime_deps) > 3:
print(f"{indent} └─ ... and {len(runtime_deps) - 3} more") print(f"{indent} └─ ... and {len(runtime_deps) - 3} more")
return result return result
except Exception as e: except Exception as e:
print(f"❌ Error analyzing dependencies: {e}") print(f"❌ Error analyzing dependencies: {e}")
return None return None
@ -129,26 +129,26 @@ async def main():
"""Main demonstration function.""" """Main demonstration function."""
print("🚀 PyPI Query MCP Server - Dependency Analysis Demo") print("🚀 PyPI Query MCP Server - Dependency Analysis Demo")
print("=" * 60) print("=" * 60)
# Analyze a small package first # Analyze a small package first
click_result = await analyze_small_package() click_result = await analyze_small_package()
# Analyze PySide2 dependencies # Analyze PySide2 dependencies
pyside2_result = await analyze_pyside2_dependencies() pyside2_result = await analyze_pyside2_dependencies()
# Optionally download packages (commented out to avoid large downloads in demo) # Optionally download packages (commented out to avoid large downloads in demo)
# download_result = await download_pyside2_packages() # download_result = await download_pyside2_packages()
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("✨ Demo completed!") print("✨ Demo completed!")
if click_result: if click_result:
print(f"📝 Click analysis saved to: click_dependencies.json") print("📝 Click analysis saved to: click_dependencies.json")
with open("click_dependencies.json", "w") as f: with open("click_dependencies.json", "w") as f:
json.dump(click_result, f, indent=2) json.dump(click_result, f, indent=2)
if pyside2_result: if pyside2_result:
print(f"📝 PySide2 analysis saved to: pyside2_dependencies.json") print("📝 PySide2 analysis saved to: pyside2_dependencies.json")
with open("pyside2_dependencies.json", "w") as f: with open("pyside2_dependencies.json", "w") as f:
json.dump(pyside2_result, f, indent=2) json.dump(pyside2_result, f, indent=2)

View File

@ -1,86 +1,86 @@
"""Dependency parsing utilities for PyPI packages.""" """Dependency parsing utilities for PyPI packages."""
import re
from typing import Any, Dict, List, Optional, Set, Tuple
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from packaging.version import Version
import logging import logging
import re
from typing import Any
from packaging.requirements import Requirement
from packaging.version import Version
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DependencyParser: class DependencyParser:
"""Parser for Python package dependencies.""" """Parser for Python package dependencies."""
def __init__(self): def __init__(self):
self.parsed_cache: Dict[str, List[Requirement]] = {} self.parsed_cache: dict[str, list[Requirement]] = {}
def parse_requirements(self, requires_dist: List[str]) -> List[Requirement]: def parse_requirements(self, requires_dist: list[str]) -> list[Requirement]:
"""Parse requirements from requires_dist list. """Parse requirements from requires_dist list.
Args: Args:
requires_dist: List of requirement strings from PyPI metadata requires_dist: List of requirement strings from PyPI metadata
Returns: Returns:
List of parsed Requirement objects List of parsed Requirement objects
""" """
requirements = [] requirements = []
for req_str in requires_dist or []: for req_str in requires_dist or []:
if not req_str or not req_str.strip(): if not req_str or not req_str.strip():
continue continue
try: try:
req = Requirement(req_str) req = Requirement(req_str)
requirements.append(req) requirements.append(req)
except Exception as e: except Exception as e:
logger.warning(f"Failed to parse requirement '{req_str}': {e}") logger.warning(f"Failed to parse requirement '{req_str}': {e}")
continue continue
return requirements return requirements
def filter_requirements_by_python_version( def filter_requirements_by_python_version(
self, self,
requirements: List[Requirement], requirements: list[Requirement],
python_version: str python_version: str
) -> List[Requirement]: ) -> list[Requirement]:
"""Filter requirements based on Python version. """Filter requirements based on Python version.
Args: Args:
requirements: List of Requirement objects requirements: List of Requirement objects
python_version: Target Python version (e.g., "3.10") python_version: Target Python version (e.g., "3.10")
Returns: Returns:
Filtered list of requirements applicable to the Python version Filtered list of requirements applicable to the Python version
""" """
filtered = [] filtered = []
try: try:
target_version = Version(python_version) target_version = Version(python_version)
except Exception as e: except Exception as e:
logger.warning(f"Invalid Python version '{python_version}': {e}") logger.warning(f"Invalid Python version '{python_version}': {e}")
return requirements return requirements
for req in requirements: for req in requirements:
if self._is_requirement_applicable(req, target_version): if self._is_requirement_applicable(req, target_version):
filtered.append(req) filtered.append(req)
return filtered return filtered
def _is_requirement_applicable(self, req: Requirement, python_version: Version) -> bool: def _is_requirement_applicable(self, req: Requirement, python_version: Version) -> bool:
"""Check if a requirement is applicable for the given Python version. """Check if a requirement is applicable for the given Python version.
Args: Args:
req: Requirement object req: Requirement object
python_version: Target Python version python_version: Target Python version
Returns: Returns:
True if requirement applies to the Python version True if requirement applies to the Python version
""" """
if not req.marker: if not req.marker:
return True return True
# Create environment for marker evaluation # Create environment for marker evaluation
env = { env = {
'python_version': str(python_version), 'python_version': str(python_version),
@ -90,22 +90,22 @@ class DependencyParser:
'implementation_name': 'cpython', 'implementation_name': 'cpython',
'implementation_version': str(python_version), 'implementation_version': str(python_version),
} }
try: try:
return req.marker.evaluate(env) return req.marker.evaluate(env)
except Exception as e: except Exception as e:
logger.warning(f"Failed to evaluate marker for {req}: {e}") logger.warning(f"Failed to evaluate marker for {req}: {e}")
return True # Include by default if evaluation fails return True # Include by default if evaluation fails
def categorize_dependencies( def categorize_dependencies(
self, self,
requirements: List[Requirement] requirements: list[Requirement]
) -> Dict[str, List[Requirement]]: ) -> dict[str, list[Requirement]]:
"""Categorize dependencies into runtime, development, and optional groups. """Categorize dependencies into runtime, development, and optional groups.
Args: Args:
requirements: List of Requirement objects requirements: List of Requirement objects
Returns: Returns:
Dictionary with categorized dependencies Dictionary with categorized dependencies
""" """
@ -115,15 +115,15 @@ class DependencyParser:
'optional': {}, 'optional': {},
'extras': {} 'extras': {}
} }
for req in requirements: for req in requirements:
if not req.marker: if not req.marker:
# No marker means it's a runtime dependency # No marker means it's a runtime dependency
categories['runtime'].append(req) categories['runtime'].append(req)
continue continue
marker_str = str(req.marker) marker_str = str(req.marker)
# Check for extra dependencies # Check for extra dependencies
if 'extra ==' in marker_str: if 'extra ==' in marker_str:
extra_match = re.search(r'extra\s*==\s*["\']([^"\']+)["\']', marker_str) extra_match = re.search(r'extra\s*==\s*["\']([^"\']+)["\']', marker_str)
@ -133,45 +133,45 @@ class DependencyParser:
categories['extras'][extra_name] = [] categories['extras'][extra_name] = []
categories['extras'][extra_name].append(req) categories['extras'][extra_name].append(req)
continue continue
# Check for development dependencies # Check for development dependencies
if any(keyword in marker_str.lower() for keyword in ['dev', 'test', 'lint', 'doc']): if any(keyword in marker_str.lower() for keyword in ['dev', 'test', 'lint', 'doc']):
categories['development'].append(req) categories['development'].append(req)
else: else:
categories['runtime'].append(req) categories['runtime'].append(req)
return categories return categories
def extract_package_names(self, requirements: List[Requirement]) -> Set[str]: def extract_package_names(self, requirements: list[Requirement]) -> set[str]:
"""Extract package names from requirements. """Extract package names from requirements.
Args: Args:
requirements: List of Requirement objects requirements: List of Requirement objects
Returns: Returns:
Set of package names Set of package names
""" """
return {req.name.lower() for req in requirements} return {req.name.lower() for req in requirements}
def get_version_constraints(self, req: Requirement) -> Dict[str, Any]: def get_version_constraints(self, req: Requirement) -> dict[str, Any]:
"""Get version constraints from a requirement. """Get version constraints from a requirement.
Args: Args:
req: Requirement object req: Requirement object
Returns: Returns:
Dictionary with version constraint information Dictionary with version constraint information
""" """
if not req.specifier: if not req.specifier:
return {'constraints': [], 'allows_any': True} return {'constraints': [], 'allows_any': True}
constraints = [] constraints = []
for spec in req.specifier: for spec in req.specifier:
constraints.append({ constraints.append({
'operator': spec.operator, 'operator': spec.operator,
'version': str(spec.version) 'version': str(spec.version)
}) })
return { return {
'constraints': constraints, 'constraints': constraints,
'allows_any': len(constraints) == 0, 'allows_any': len(constraints) == 0,

View File

@ -1,13 +1,15 @@
"""Dependency resolution tools for PyPI packages.""" """Dependency resolution tools for PyPI packages."""
import asyncio
import logging import logging
from typing import Any, Dict, List, Optional, Set from typing import Any
from packaging.requirements import Requirement
from ..core import PyPIClient, PyPIError from ..core import PyPIClient, PyPIError
from ..core.dependency_parser import DependencyParser from ..core.dependency_parser import DependencyParser
from ..core.exceptions import InvalidPackageNameError, NetworkError, PackageNotFoundError from ..core.exceptions import (
InvalidPackageNameError,
NetworkError,
PackageNotFoundError,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,16 +20,16 @@ class DependencyResolver:
def __init__(self, max_depth: int = 10): def __init__(self, max_depth: int = 10):
self.max_depth = max_depth self.max_depth = max_depth
self.parser = DependencyParser() self.parser = DependencyParser()
self.resolved_cache: Dict[str, Dict[str, Any]] = {} self.resolved_cache: dict[str, dict[str, Any]] = {}
async def resolve_dependencies( async def resolve_dependencies(
self, self,
package_name: str, package_name: str,
python_version: Optional[str] = None, python_version: str | None = None,
include_extras: Optional[List[str]] = None, include_extras: list[str] | None = None,
include_dev: bool = False, include_dev: bool = False,
max_depth: Optional[int] = None max_depth: int | None = None
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Resolve all dependencies for a package recursively. """Resolve all dependencies for a package recursively.
Args: Args:
@ -49,7 +51,7 @@ class DependencyResolver:
logger.info(f"Resolving dependencies for {package_name} (Python {python_version})") logger.info(f"Resolving dependencies for {package_name} (Python {python_version})")
# Track visited packages to avoid circular dependencies # Track visited packages to avoid circular dependencies
visited: Set[str] = set() visited: set[str] = set()
dependency_tree = {} dependency_tree = {}
try: try:
@ -90,11 +92,11 @@ class DependencyResolver:
async def _resolve_recursive( async def _resolve_recursive(
self, self,
package_name: str, package_name: str,
python_version: Optional[str], python_version: str | None,
include_extras: List[str], include_extras: list[str],
include_dev: bool, include_dev: bool,
visited: Set[str], visited: set[str],
dependency_tree: Dict[str, Any], dependency_tree: dict[str, Any],
current_depth: int, current_depth: int,
max_depth: int max_depth: int
) -> None: ) -> None:
@ -188,7 +190,7 @@ class DependencyResolver:
logger.error(f"Error resolving {package_name}: {e}") logger.error(f"Error resolving {package_name}: {e}")
# Continue with other dependencies # Continue with other dependencies
def _generate_dependency_summary(self, dependency_tree: Dict[str, Any]) -> Dict[str, Any]: def _generate_dependency_summary(self, dependency_tree: dict[str, Any]) -> dict[str, Any]:
"""Generate summary statistics for the dependency tree.""" """Generate summary statistics for the dependency tree."""
total_packages = len(dependency_tree) total_packages = len(dependency_tree)
@ -218,11 +220,11 @@ class DependencyResolver:
async def resolve_package_dependencies( async def resolve_package_dependencies(
package_name: str, package_name: str,
python_version: Optional[str] = None, python_version: str | None = None,
include_extras: Optional[List[str]] = None, include_extras: list[str] | None = None,
include_dev: bool = False, include_dev: bool = False,
max_depth: int = 5 max_depth: int = 5
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Resolve package dependencies with comprehensive analysis. """Resolve package dependencies with comprehensive analysis.
Args: Args:

View File

@ -1,17 +1,18 @@
"""Package download tools for PyPI packages.""" """Package download tools for PyPI packages."""
import asyncio
import hashlib import hashlib
import logging import logging
import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Set from typing import Any
from urllib.parse import urlparse
import httpx import httpx
from ..core import PyPIClient, PyPIError from ..core import PyPIClient, PyPIError
from ..core.exceptions import InvalidPackageNameError, NetworkError, PackageNotFoundError from ..core.exceptions import (
InvalidPackageNameError,
NetworkError,
PackageNotFoundError,
)
from .dependency_resolver import DependencyResolver from .dependency_resolver import DependencyResolver
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,24 +20,24 @@ logger = logging.getLogger(__name__)
class PackageDownloader: class PackageDownloader:
"""Downloads PyPI packages and their dependencies.""" """Downloads PyPI packages and their dependencies."""
def __init__(self, download_dir: str = "./downloads"): def __init__(self, download_dir: str = "./downloads"):
self.download_dir = Path(download_dir) self.download_dir = Path(download_dir)
self.download_dir.mkdir(parents=True, exist_ok=True) self.download_dir.mkdir(parents=True, exist_ok=True)
self.resolver = DependencyResolver() self.resolver = DependencyResolver()
async def download_package_with_dependencies( async def download_package_with_dependencies(
self, self,
package_name: str, package_name: str,
python_version: Optional[str] = None, python_version: str | None = None,
include_extras: Optional[List[str]] = None, include_extras: list[str] | None = None,
include_dev: bool = False, include_dev: bool = False,
prefer_wheel: bool = True, prefer_wheel: bool = True,
verify_checksums: bool = True, verify_checksums: bool = True,
max_depth: int = 5 max_depth: int = 5
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Download a package and all its dependencies. """Download a package and all its dependencies.
Args: Args:
package_name: Name of the package to download package_name: Name of the package to download
python_version: Target Python version (e.g., "3.10") python_version: Target Python version (e.g., "3.10")
@ -45,15 +46,15 @@ class PackageDownloader:
prefer_wheel: Whether to prefer wheel files over source distributions prefer_wheel: Whether to prefer wheel files over source distributions
verify_checksums: Whether to verify file checksums verify_checksums: Whether to verify file checksums
max_depth: Maximum dependency resolution depth max_depth: Maximum dependency resolution depth
Returns: Returns:
Dictionary containing download results and statistics Dictionary containing download results and statistics
""" """
if not package_name or not package_name.strip(): if not package_name or not package_name.strip():
raise InvalidPackageNameError(package_name) raise InvalidPackageNameError(package_name)
logger.info(f"Starting download of {package_name} and dependencies") logger.info(f"Starting download of {package_name} and dependencies")
try: try:
# First resolve all dependencies # First resolve all dependencies
resolution_result = await self.resolver.resolve_dependencies( resolution_result = await self.resolver.resolve_dependencies(
@ -63,13 +64,13 @@ class PackageDownloader:
include_dev=include_dev, include_dev=include_dev,
max_depth=max_depth max_depth=max_depth
) )
dependency_tree = resolution_result["dependency_tree"] dependency_tree = resolution_result["dependency_tree"]
# Download all packages # Download all packages
download_results = {} download_results = {}
failed_downloads = [] failed_downloads = []
for pkg_name, pkg_info in dependency_tree.items(): for pkg_name, pkg_info in dependency_tree.items():
try: try:
result = await self._download_single_package( result = await self._download_single_package(
@ -80,17 +81,17 @@ class PackageDownloader:
verify_checksums=verify_checksums verify_checksums=verify_checksums
) )
download_results[pkg_name] = result download_results[pkg_name] = result
except Exception as e: except Exception as e:
logger.error(f"Failed to download {pkg_name}: {e}") logger.error(f"Failed to download {pkg_name}: {e}")
failed_downloads.append({ failed_downloads.append({
"package": pkg_name, "package": pkg_name,
"error": str(e) "error": str(e)
}) })
# Generate summary # Generate summary
summary = self._generate_download_summary(download_results, failed_downloads) summary = self._generate_download_summary(download_results, failed_downloads)
return { return {
"package_name": package_name, "package_name": package_name,
"python_version": python_version, "python_version": python_version,
@ -100,73 +101,73 @@ class PackageDownloader:
"failed_downloads": failed_downloads, "failed_downloads": failed_downloads,
"summary": summary "summary": summary
} }
except PyPIError: except PyPIError:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Unexpected error downloading {package_name}: {e}") logger.error(f"Unexpected error downloading {package_name}: {e}")
raise NetworkError(f"Failed to download package: {e}", e) from e raise NetworkError(f"Failed to download package: {e}", e) from e
async def _download_single_package( async def _download_single_package(
self, self,
package_name: str, package_name: str,
version: Optional[str] = None, version: str | None = None,
python_version: Optional[str] = None, python_version: str | None = None,
prefer_wheel: bool = True, prefer_wheel: bool = True,
verify_checksums: bool = True verify_checksums: bool = True
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Download a single package.""" """Download a single package."""
logger.info(f"Downloading {package_name} version {version or 'latest'}") logger.info(f"Downloading {package_name} version {version or 'latest'}")
async with PyPIClient() as client: async with PyPIClient() as client:
package_data = await client.get_package_info(package_name) package_data = await client.get_package_info(package_name)
info = package_data.get("info", {}) info = package_data.get("info", {})
releases = package_data.get("releases", {}) releases = package_data.get("releases", {})
# Determine version to download # Determine version to download
target_version = version or info.get("version") target_version = version or info.get("version")
if not target_version or target_version not in releases: if not target_version or target_version not in releases:
raise PackageNotFoundError(f"Version {target_version} not found for {package_name}") raise PackageNotFoundError(f"Version {target_version} not found for {package_name}")
# Get release files # Get release files
release_files = releases[target_version] release_files = releases[target_version]
if not release_files: if not release_files:
raise PackageNotFoundError(f"No files found for {package_name} {target_version}") raise PackageNotFoundError(f"No files found for {package_name} {target_version}")
# Select best file to download # Select best file to download
selected_file = self._select_best_file( selected_file = self._select_best_file(
release_files, python_version, prefer_wheel release_files, python_version, prefer_wheel
) )
if not selected_file: if not selected_file:
raise PackageNotFoundError(f"No suitable file found for {package_name} {target_version}") raise PackageNotFoundError(f"No suitable file found for {package_name} {target_version}")
# Download the file # Download the file
download_result = await self._download_file( download_result = await self._download_file(
selected_file, verify_checksums selected_file, verify_checksums
) )
return { return {
"package_name": package_name, "package_name": package_name,
"version": target_version, "version": target_version,
"file_info": selected_file, "file_info": selected_file,
"download_result": download_result "download_result": download_result
} }
def _select_best_file( def _select_best_file(
self, self,
release_files: List[Dict[str, Any]], release_files: list[dict[str, Any]],
python_version: Optional[str] = None, python_version: str | None = None,
prefer_wheel: bool = True prefer_wheel: bool = True
) -> Optional[Dict[str, Any]]: ) -> dict[str, Any] | None:
"""Select the best file to download from available release files.""" """Select the best file to download from available release files."""
# Separate wheels and source distributions # Separate wheels and source distributions
wheels = [f for f in release_files if f.get("packagetype") == "bdist_wheel"] wheels = [f for f in release_files if f.get("packagetype") == "bdist_wheel"]
sdists = [f for f in release_files if f.get("packagetype") == "sdist"] sdists = [f for f in release_files if f.get("packagetype") == "sdist"]
# If prefer wheel and wheels available # If prefer wheel and wheels available
if prefer_wheel and wheels: if prefer_wheel and wheels:
# Try to find compatible wheel # Try to find compatible wheel
@ -174,78 +175,78 @@ class PackageDownloader:
compatible_wheels = self._filter_compatible_wheels(wheels, python_version) compatible_wheels = self._filter_compatible_wheels(wheels, python_version)
if compatible_wheels: if compatible_wheels:
return compatible_wheels[0] return compatible_wheels[0]
# Return any wheel if no specific version or no compatible found # Return any wheel if no specific version or no compatible found
return wheels[0] return wheels[0]
# Fall back to source distribution # Fall back to source distribution
if sdists: if sdists:
return sdists[0] return sdists[0]
# Last resort: any file # Last resort: any file
return release_files[0] if release_files else None return release_files[0] if release_files else None
def _filter_compatible_wheels( def _filter_compatible_wheels(
self, self,
wheels: List[Dict[str, Any]], wheels: list[dict[str, Any]],
python_version: str python_version: str
) -> List[Dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Filter wheels compatible with the specified Python version.""" """Filter wheels compatible with the specified Python version."""
# Simple compatibility check based on filename # Simple compatibility check based on filename
# This is a basic implementation - could be enhanced with proper wheel tag parsing # This is a basic implementation - could be enhanced with proper wheel tag parsing
compatible = [] compatible = []
major_minor = ".".join(python_version.split(".")[:2]) major_minor = ".".join(python_version.split(".")[:2])
major_minor_nodot = major_minor.replace(".", "") major_minor_nodot = major_minor.replace(".", "")
for wheel in wheels: for wheel in wheels:
filename = wheel.get("filename", "") filename = wheel.get("filename", "")
# Check for Python version in filename # Check for Python version in filename
if (f"py{major_minor_nodot}" in filename or if (f"py{major_minor_nodot}" in filename or
f"cp{major_minor_nodot}" in filename or f"cp{major_minor_nodot}" in filename or
"py3" in filename or "py3" in filename or
"py2.py3" in filename): "py2.py3" in filename):
compatible.append(wheel) compatible.append(wheel)
return compatible return compatible
async def _download_file( async def _download_file(
self, self,
file_info: Dict[str, Any], file_info: dict[str, Any],
verify_checksums: bool = True verify_checksums: bool = True
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Download a single file.""" """Download a single file."""
url = file_info.get("url") url = file_info.get("url")
filename = file_info.get("filename") filename = file_info.get("filename")
expected_md5 = file_info.get("md5_digest") expected_md5 = file_info.get("md5_digest")
expected_size = file_info.get("size") expected_size = file_info.get("size")
if not url or not filename: if not url or not filename:
raise ValueError("Invalid file info: missing URL or filename") raise ValueError("Invalid file info: missing URL or filename")
# Create package-specific directory # Create package-specific directory
file_path = self.download_dir / filename file_path = self.download_dir / filename
logger.info(f"Downloading {filename} from {url}") logger.info(f"Downloading {filename} from {url}")
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as response: async with client.stream("GET", url) as response:
response.raise_for_status() response.raise_for_status()
# Download with progress tracking # Download with progress tracking
downloaded_size = 0 downloaded_size = 0
md5_hash = hashlib.md5() md5_hash = hashlib.md5()
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=8192): async for chunk in response.aiter_bytes(chunk_size=8192):
f.write(chunk) f.write(chunk)
downloaded_size += len(chunk) downloaded_size += len(chunk)
if verify_checksums: if verify_checksums:
md5_hash.update(chunk) md5_hash.update(chunk)
# Verify download # Verify download
verification_result = {} verification_result = {}
if verify_checksums and expected_md5: if verify_checksums and expected_md5:
@ -253,12 +254,12 @@ class PackageDownloader:
verification_result["md5_match"] = actual_md5 == expected_md5 verification_result["md5_match"] = actual_md5 == expected_md5
verification_result["expected_md5"] = expected_md5 verification_result["expected_md5"] = expected_md5
verification_result["actual_md5"] = actual_md5 verification_result["actual_md5"] = actual_md5
if expected_size: if expected_size:
verification_result["size_match"] = downloaded_size == expected_size verification_result["size_match"] = downloaded_size == expected_size
verification_result["expected_size"] = expected_size verification_result["expected_size"] = expected_size
verification_result["actual_size"] = downloaded_size verification_result["actual_size"] = downloaded_size
return { return {
"filename": filename, "filename": filename,
"file_path": str(file_path), "file_path": str(file_path),
@ -266,21 +267,21 @@ class PackageDownloader:
"verification": verification_result, "verification": verification_result,
"success": True "success": True
} }
def _generate_download_summary( def _generate_download_summary(
self, self,
download_results: Dict[str, Any], download_results: dict[str, Any],
failed_downloads: List[Dict[str, Any]] failed_downloads: list[dict[str, Any]]
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Generate download summary statistics.""" """Generate download summary statistics."""
successful_downloads = len(download_results) successful_downloads = len(download_results)
failed_count = len(failed_downloads) failed_count = len(failed_downloads)
total_size = sum( total_size = sum(
result["download_result"]["downloaded_size"] result["download_result"]["downloaded_size"]
for result in download_results.values() for result in download_results.values()
) )
return { return {
"total_packages": successful_downloads + failed_count, "total_packages": successful_downloads + failed_count,
"successful_downloads": successful_downloads, "successful_downloads": successful_downloads,
@ -295,15 +296,15 @@ class PackageDownloader:
async def download_package_with_dependencies( async def download_package_with_dependencies(
package_name: str, package_name: str,
download_dir: str = "./downloads", download_dir: str = "./downloads",
python_version: Optional[str] = None, python_version: str | None = None,
include_extras: Optional[List[str]] = None, include_extras: list[str] | None = None,
include_dev: bool = False, include_dev: bool = False,
prefer_wheel: bool = True, prefer_wheel: bool = True,
verify_checksums: bool = True, verify_checksums: bool = True,
max_depth: int = 5 max_depth: int = 5
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Download a package and its dependencies to local directory. """Download a package and its dependencies to local directory.
Args: Args:
package_name: Name of the package to download package_name: Name of the package to download
download_dir: Directory to download packages to download_dir: Directory to download packages to
@ -313,7 +314,7 @@ async def download_package_with_dependencies(
prefer_wheel: Whether to prefer wheel files over source distributions prefer_wheel: Whether to prefer wheel files over source distributions
verify_checksums: Whether to verify file checksums verify_checksums: Whether to verify file checksums
max_depth: Maximum dependency resolution depth max_depth: Maximum dependency resolution depth
Returns: Returns:
Comprehensive download results Comprehensive download results
""" """

View File

@ -1,10 +1,14 @@
"""Tests for dependency resolver functionality.""" """Tests for dependency resolver functionality."""
import pytest
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
from pypi_query_mcp.tools.dependency_resolver import DependencyResolver, resolve_package_dependencies import pytest
from pypi_query_mcp.core.exceptions import InvalidPackageNameError, PackageNotFoundError from pypi_query_mcp.core.exceptions import InvalidPackageNameError, PackageNotFoundError
from pypi_query_mcp.tools.dependency_resolver import (
DependencyResolver,
resolve_package_dependencies,
)
class TestDependencyResolver: class TestDependencyResolver:
@ -71,7 +75,7 @@ class TestDependencyResolver:
mock_client.get_package_info.return_value = mock_package_data mock_client.get_package_info.return_value = mock_package_data
result = await resolver.resolve_dependencies( result = await resolver.resolve_dependencies(
"test-package", "test-package",
python_version="3.11" python_version="3.11"
) )

View File

@ -1,13 +1,16 @@
"""Tests for package downloader functionality.""" """Tests for package downloader functionality."""
import pytest
from pathlib import Path
from unittest.mock import AsyncMock, patch, mock_open
import tempfile
import shutil import shutil
import tempfile
from unittest.mock import AsyncMock, mock_open, patch
from pypi_query_mcp.tools.package_downloader import PackageDownloader, download_package_with_dependencies import pytest
from pypi_query_mcp.core.exceptions import InvalidPackageNameError, PackageNotFoundError
from pypi_query_mcp.core.exceptions import InvalidPackageNameError
from pypi_query_mcp.tools.package_downloader import (
PackageDownloader,
download_package_with_dependencies,
)
class TestPackageDownloader: class TestPackageDownloader:
@ -218,25 +221,6 @@ class TestPackageDownloader:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_download_package_with_dependencies_function(self, temp_download_dir): async def test_download_package_with_dependencies_function(self, temp_download_dir):
"""Test the standalone download_package_with_dependencies function.""" """Test the standalone download_package_with_dependencies function."""
mock_package_data = {
"info": {
"name": "test-package",
"version": "1.0.0",
"requires_python": ">=3.8",
"requires_dist": []
},
"releases": {
"1.0.0": [
{
"filename": "test_package-1.0.0-py3-none-any.whl",
"url": "https://files.pythonhosted.org/packages/test_package-1.0.0-py3-none-any.whl",
"packagetype": "bdist_wheel",
"md5_digest": "abc123",
"size": 1024
}
]
}
}
with patch('pypi_query_mcp.tools.package_downloader.PackageDownloader') as mock_downloader_class: with patch('pypi_query_mcp.tools.package_downloader.PackageDownloader') as mock_downloader_class:
# Setup downloader mock # Setup downloader mock