longhao a0c507c3ff feat: implement comprehensive configuration management system with multi-mirror support
- Add ServerSettings class with pydantic-settings for type-safe configuration
- Support multiple PyPI mirror sources with priority-based fallback mechanism
- Implement RepositoryConfig and RepositoryManager for multi-repository support
- Add environment variable support for all configuration options
- Include private repository authentication configuration
- Add advanced dependency analysis settings (max depth, concurrency, security)
- Provide secure credential management with sensitive data masking
- Update documentation and configuration examples
- Add comprehensive test suite with 23 test cases covering all features
- Include demo script showcasing multi-mirror configuration capabilities

Configuration features:
- Primary, additional, and fallback index URLs
- Automatic duplicate URL removal with priority preservation
- Runtime configuration reloading
- Integration with repository manager for seamless multi-source queries

Signed-off-by: longhao <hal.long@outlook.com>
2025-05-27 17:41:42 +08:00

246 lines
8.0 KiB
Python

"""PyPI API client for package information retrieval."""
import asyncio
import logging
import re
from typing import Any
from urllib.parse import quote
import httpx
from .exceptions import (
InvalidPackageNameError,
NetworkError,
PackageNotFoundError,
PyPIServerError,
RateLimitError,
)
logger = logging.getLogger(__name__)
class PyPIClient:
"""Async client for PyPI JSON API."""
def __init__(
self,
base_url: str = "https://pypi.org/pypi",
timeout: float = 30.0,
max_retries: int = 3,
retry_delay: float = 1.0,
):
"""Initialize PyPI client.
Args:
base_url: Base URL for PyPI API
timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
"""
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
# Simple in-memory cache
self._cache: dict[str, dict[str, Any]] = {}
self._cache_ttl = 300 # 5 minutes
# HTTP client configuration
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout),
headers={
"User-Agent": "pypi-query-mcp-server/0.1.0",
"Accept": "application/json",
},
follow_redirects=True,
)
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def close(self):
"""Close the HTTP client."""
await self._client.aclose()
def _validate_package_name(self, package_name: str) -> str:
"""Validate and normalize package name.
Args:
package_name: Package name to validate
Returns:
Normalized package name
Raises:
InvalidPackageNameError: If package name is invalid
"""
if not package_name or not package_name.strip():
raise InvalidPackageNameError(package_name)
# Normalize package name (convert to lowercase, replace _ with -)
normalized = re.sub(r"[-_.]+", "-", package_name.lower())
# Basic validation - package names should contain only alphanumeric, hyphens, dots, underscores
if not re.match(r"^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$", package_name):
raise InvalidPackageNameError(package_name)
return normalized
def _get_cache_key(self, package_name: str, endpoint: str = "info") -> str:
"""Generate cache key for package data."""
return f"{endpoint}:{package_name}"
def _is_cache_valid(self, cache_entry: dict[str, Any]) -> bool:
"""Check if cache entry is still valid."""
import time
return time.time() - cache_entry.get("timestamp", 0) < self._cache_ttl
async def _make_request(self, url: str) -> dict[str, Any]:
"""Make HTTP request with retry logic.
Args:
url: URL to request
Returns:
JSON response data
Raises:
NetworkError: For network-related errors
PackageNotFoundError: When package is not found
RateLimitError: When rate limit is exceeded
PyPIServerError: For server errors
"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
logger.debug(f"Making request to {url} (attempt {attempt + 1})")
response = await self._client.get(url)
# Handle different HTTP status codes
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
# Extract package name from URL for better error message
package_name = url.split("/")[-2] if "/" in url else "unknown"
raise PackageNotFoundError(package_name)
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After")
retry_after_int = int(retry_after) if retry_after else None
raise RateLimitError(retry_after_int)
elif response.status_code >= 500:
raise PyPIServerError(response.status_code)
else:
raise PyPIServerError(
response.status_code,
f"Unexpected status code: {response.status_code}",
)
except httpx.TimeoutException as e:
last_exception = NetworkError(f"Request timeout: {e}", e)
except httpx.NetworkError as e:
last_exception = NetworkError(f"Network error: {e}", e)
except (PackageNotFoundError, RateLimitError, PyPIServerError):
# Don't retry these errors
raise
except Exception as e:
last_exception = NetworkError(f"Unexpected error: {e}", e)
# Wait before retry (except on last attempt)
if attempt < self.max_retries:
await asyncio.sleep(
self.retry_delay * (2**attempt)
) # Exponential backoff
# If we get here, all retries failed
raise last_exception
async def get_package_info(
self, package_name: str, use_cache: bool = True
) -> dict[str, Any]:
"""Get comprehensive package information from PyPI.
Args:
package_name: Name of the package to query
use_cache: Whether to use cached data if available
Returns:
Dictionary containing package information
Raises:
InvalidPackageNameError: If package name is invalid
PackageNotFoundError: If package is not found
NetworkError: For network-related errors
"""
normalized_name = self._validate_package_name(package_name)
cache_key = self._get_cache_key(normalized_name, "info")
# Check cache first
if use_cache and cache_key in self._cache:
cache_entry = self._cache[cache_key]
if self._is_cache_valid(cache_entry):
logger.debug(f"Using cached data for package: {normalized_name}")
return cache_entry["data"]
# Make API request
url = f"{self.base_url}/{quote(normalized_name)}/json"
logger.info(f"Fetching package info for: {normalized_name}")
try:
data = await self._make_request(url)
# Cache the result
import time
self._cache[cache_key] = {"data": data, "timestamp": time.time()}
return data
except Exception as e:
logger.error(f"Failed to fetch package info for {normalized_name}: {e}")
raise
async def get_package_versions(
self, package_name: str, use_cache: bool = True
) -> list[str]:
"""Get list of available versions for a package.
Args:
package_name: Name of the package to query
use_cache: Whether to use cached data if available
Returns:
List of version strings
"""
package_info = await self.get_package_info(package_name, use_cache)
releases = package_info.get("releases", {})
return list(releases.keys())
async def get_latest_version(
self, package_name: str, use_cache: bool = True
) -> str:
"""Get the latest version of a package.
Args:
package_name: Name of the package to query
use_cache: Whether to use cached data if available
Returns:
Latest version string
"""
package_info = await self.get_package_info(package_name, use_cache)
return package_info.get("info", {}).get("version", "")
def clear_cache(self):
"""Clear the internal cache."""
self._cache.clear()
logger.debug("Cache cleared")