diff --git a/pypi_query_mcp/core/exceptions.py b/pypi_query_mcp/core/exceptions.py index b073be7..116b5ad 100644 --- a/pypi_query_mcp/core/exceptions.py +++ b/pypi_query_mcp/core/exceptions.py @@ -62,3 +62,24 @@ class SearchError(PyPIError): def __init__(self, message: str, query: str | None = None): super().__init__(message) self.query = query + + +class PyPIAuthenticationError(PyPIError): + """Raised when PyPI authentication fails.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message, status_code) + + +class PyPIUploadError(PyPIError): + """Raised when PyPI upload operations fail.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message, status_code) + + +class PyPIPermissionError(PyPIError): + """Raised when PyPI permission operations fail.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message, status_code) diff --git a/pypi_query_mcp/server.py b/pypi_query_mcp/server.py index f6e2d07..2fb1651 100644 --- a/pypi_query_mcp/server.py +++ b/pypi_query_mcp/server.py @@ -24,20 +24,26 @@ from .prompts import ( track_package_updates, ) from .tools import ( + check_pypi_credentials, check_python_compatibility, + delete_pypi_release, download_package_with_dependencies, find_alternatives, get_compatible_python_versions, get_package_download_stats, get_package_download_trends, + get_pypi_account_info, + get_pypi_upload_history, get_top_packages_by_downloads, get_trending_packages, + manage_pypi_maintainers, query_package_dependencies, query_package_info, query_package_versions, resolve_package_dependencies, search_by_category, search_packages, + upload_package_to_pypi, ) # Configure logging @@ -806,6 +812,285 @@ async def get_trending_pypi_packages( } +# PyPI Publishing and Account Management Tools + + +@mcp.tool() +async def upload_package_to_pypi_tool( + distribution_paths: list[str], + api_token: str | None = None, + test_pypi: bool = False, + skip_existing: bool = True, + verify_uploads: bool = True, +) -> dict[str, Any]: + """Upload package distributions to PyPI or TestPyPI. + + This tool uploads Python package distributions (.whl, .tar.gz files) to PyPI, + providing comprehensive upload management with safety checks and verification. + + Args: + distribution_paths: List of paths to distribution files (.whl, .tar.gz) + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to upload to TestPyPI instead of production PyPI + skip_existing: Skip files that already exist on PyPI + verify_uploads: Verify uploads after completion + + Returns: + Dictionary containing upload results and metadata + + Raises: + PyPIAuthenticationError: If authentication fails + PyPIUploadError: If upload operations fail + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: Uploading {len(distribution_paths)} distributions to {'TestPyPI' if test_pypi else 'PyPI'}") + result = await upload_package_to_pypi( + distribution_paths=distribution_paths, + api_token=api_token, + test_pypi=test_pypi, + skip_existing=skip_existing, + verify_uploads=verify_uploads, + ) + logger.info(f"Upload completed: {result.get('summary', {})}") + return result + except Exception as e: + logger.error(f"Error uploading packages: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "distribution_paths": distribution_paths, + "test_pypi": test_pypi, + } + + +@mcp.tool() +async def check_pypi_credentials_tool( + api_token: str | None = None, + test_pypi: bool = False, +) -> dict[str, Any]: + """Validate PyPI API token and credentials. + + This tool checks if your PyPI API token is valid and provides information + about your account permissions and capabilities. + + Args: + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to check against TestPyPI instead of production PyPI + + Returns: + Dictionary containing credential validation results + + Raises: + PyPIAuthenticationError: If credential validation fails + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: Checking {'TestPyPI' if test_pypi else 'PyPI'} credentials") + result = await check_pypi_credentials(api_token=api_token, test_pypi=test_pypi) + logger.info(f"Credential check completed: valid={result.get('valid', False)}") + return result + except Exception as e: + logger.error(f"Error checking credentials: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "test_pypi": test_pypi, + } + + +@mcp.tool() +async def get_pypi_upload_history_tool( + package_name: str, + api_token: str | None = None, + test_pypi: bool = False, + limit: int = 50, +) -> dict[str, Any]: + """Get upload history for a PyPI package. + + This tool retrieves the upload history for a package, showing all versions, + files, and upload metadata with statistics and analysis. + + Args: + package_name: Name of the package to get upload history for + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to check TestPyPI instead of production PyPI + limit: Maximum number of uploads to return + + Returns: + Dictionary containing upload history and metadata + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package is not found + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: Getting upload history for {package_name}") + result = await get_pypi_upload_history( + package_name=package_name, + api_token=api_token, + test_pypi=test_pypi, + limit=limit, + ) + upload_count = result.get('statistics', {}).get('total_uploads', 0) + logger.info(f"Retrieved {upload_count} upload records for {package_name}") + return result + except Exception as e: + logger.error(f"Error getting upload history for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_name": package_name, + "test_pypi": test_pypi, + } + + +@mcp.tool() +async def delete_pypi_release_tool( + package_name: str, + version: str, + api_token: str | None = None, + test_pypi: bool = False, + confirm_deletion: bool = False, + dry_run: bool = True, +) -> dict[str, Any]: + """Delete a specific release from PyPI (with safety checks). + + This tool provides safe deletion of PyPI releases with multiple safety checks, + dry-run capability, and comprehensive validation. Note that PyPI deletion is + very restricted and typically only available to package owners within a limited + time window after upload. + + Args: + package_name: Name of the package + version: Version to delete + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + confirm_deletion: Explicit confirmation required for actual deletion + dry_run: If True, only simulate the deletion without actually performing it + + Returns: + Dictionary containing deletion results and safety information + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package/version is not found + PyPIPermissionError: If deletion is not permitted + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: {'DRY RUN: ' if dry_run else ''}Deleting {package_name}=={version}") + result = await delete_pypi_release( + package_name=package_name, + version=version, + api_token=api_token, + test_pypi=test_pypi, + confirm_deletion=confirm_deletion, + dry_run=dry_run, + ) + action = result.get('action', 'unknown') + logger.info(f"Deletion operation completed: {action}") + return result + except Exception as e: + logger.error(f"Error deleting release {package_name}=={version}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_name": package_name, + "version": version, + "test_pypi": test_pypi, + } + + +@mcp.tool() +async def manage_pypi_maintainers_tool( + package_name: str, + action: str, + username: str | None = None, + api_token: str | None = None, + test_pypi: bool = False, +) -> dict[str, Any]: + """Manage package maintainers (add/remove/list). + + This tool helps manage package maintainers and collaborators. Note that maintainer + management typically requires package owner permissions and may need to be done + through the PyPI web interface. + + Args: + package_name: Name of the package + action: Action to perform ('list', 'add', 'remove') + username: Username to add/remove (required for add/remove actions) + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + + Returns: + Dictionary containing maintainer management results + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package is not found + PyPIPermissionError: If action is not permitted + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: Managing maintainers for {package_name}: {action}") + result = await manage_pypi_maintainers( + package_name=package_name, + action=action, + username=username, + api_token=api_token, + test_pypi=test_pypi, + ) + maintainer_count = result.get('maintainer_count', 0) + logger.info(f"Maintainer management completed: {maintainer_count} maintainers") + return result + except Exception as e: + logger.error(f"Error managing maintainers for {package_name}: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "package_name": package_name, + "action": action, + "test_pypi": test_pypi, + } + + +@mcp.tool() +async def get_pypi_account_info_tool( + api_token: str | None = None, + test_pypi: bool = False, +) -> dict[str, Any]: + """Get PyPI account information, quotas, and limits. + + This tool retrieves information about your PyPI account including permissions, + limitations, quotas, and provides recommendations for account security and usage. + + Args: + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + + Returns: + Dictionary containing account information and limitations + + Raises: + PyPIAuthenticationError: If authentication fails + NetworkError: For network-related errors + """ + try: + logger.info(f"MCP tool: Getting account information for {'TestPyPI' if test_pypi else 'PyPI'}") + result = await get_pypi_account_info(api_token=api_token, test_pypi=test_pypi) + logger.info("Account information retrieved successfully") + return result + except Exception as e: + logger.error(f"Error getting account information: {e}") + return { + "error": str(e), + "error_type": type(e).__name__, + "test_pypi": test_pypi, + } + + # Register prompt templates following standard MCP workflow: # 1. User calls tool → MCP client sends request # 2. Tool function executes → Collects necessary data and parameters diff --git a/pypi_query_mcp/tools/__init__.py b/pypi_query_mcp/tools/__init__.py index 24e3ebd..1ae1ad7 100644 --- a/pypi_query_mcp/tools/__init__.py +++ b/pypi_query_mcp/tools/__init__.py @@ -21,6 +21,14 @@ from .package_query import ( query_package_info, query_package_versions, ) +from .publishing import ( + check_pypi_credentials, + delete_pypi_release, + get_pypi_account_info, + get_pypi_upload_history, + manage_pypi_maintainers, + upload_package_to_pypi, +) from .search import ( find_alternatives, get_trending_packages, @@ -44,4 +52,10 @@ __all__ = [ "search_by_category", "find_alternatives", "get_trending_packages", + "upload_package_to_pypi", + "check_pypi_credentials", + "get_pypi_upload_history", + "delete_pypi_release", + "manage_pypi_maintainers", + "get_pypi_account_info", ] diff --git a/pypi_query_mcp/tools/publishing.py b/pypi_query_mcp/tools/publishing.py new file mode 100644 index 0000000..af9960c --- /dev/null +++ b/pypi_query_mcp/tools/publishing.py @@ -0,0 +1,1002 @@ +"""PyPI account and publishing tools for package management and distribution.""" + +import asyncio +import base64 +import json +import logging +import os +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urljoin + +import httpx + +from ..core.exceptions import ( + InvalidPackageNameError, + NetworkError, + PackageNotFoundError, + PyPIAuthenticationError, + PyPIPermissionError, + PyPIServerError, + PyPIUploadError, + RateLimitError, +) + + +class PyPIPublishingClient: + """Async client for PyPI publishing and account management operations.""" + + def __init__( + self, + api_token: Optional[str] = None, + test_pypi: bool = False, + timeout: float = 60.0, + max_retries: int = 3, + retry_delay: float = 2.0, + ): + """Initialize PyPI publishing client. + + Args: + api_token: PyPI API token for authentication + test_pypi: Whether to use TestPyPI instead of production PyPI + timeout: Request timeout in seconds + max_retries: Maximum number of retry attempts + retry_delay: Delay between retries in seconds + """ + self.api_token = api_token or os.getenv("PYPI_API_TOKEN") + self.test_pypi = test_pypi + self.timeout = timeout + self.max_retries = max_retries + self.retry_delay = retry_delay + + # Configure base URLs + if test_pypi: + self.upload_url = "https://test.pypi.org/legacy/" + self.api_url = "https://test.pypi.org/pypi" + self.account_url = "https://test.pypi.org" + else: + self.upload_url = "https://upload.pypi.org/legacy/" + self.api_url = "https://pypi.org/pypi" + self.account_url = "https://pypi.org" + + # HTTP client configuration + headers = { + "User-Agent": "pypi-query-mcp-server/0.1.0", + "Accept": "application/json", + } + + if self.api_token: + # Use token authentication + headers["Authorization"] = f"token {self.api_token}" + + self._client = httpx.AsyncClient( + timeout=httpx.Timeout(timeout), + headers=headers, + follow_redirects=True, + ) + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + async def close(self): + """Close the HTTP client.""" + await self._client.aclose() + + def _validate_package_name(self, package_name: str) -> str: + """Validate and normalize package name.""" + if not package_name or not package_name.strip(): + raise InvalidPackageNameError(package_name) + + # Basic validation + if not re.match(r"^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$", package_name): + raise InvalidPackageNameError(package_name) + + return package_name.strip() + + async def _make_request( + self, + method: str, + url: str, + **kwargs + ) -> httpx.Response: + """Make HTTP request with retry logic.""" + last_exception = None + + for attempt in range(self.max_retries + 1): + try: + logger.debug(f"Making {method} request to {url} (attempt {attempt + 1})") + response = await self._client.request(method, url, **kwargs) + + # Handle authentication errors + if response.status_code == 401: + raise PyPIAuthenticationError( + "Authentication failed. Check your API token.", + status_code=401 + ) + elif response.status_code == 403: + raise PyPIPermissionError( + "Permission denied. Check your account permissions.", + status_code=403 + ) + elif response.status_code == 429: + retry_after = response.headers.get("Retry-After") + retry_after_int = int(retry_after) if retry_after else None + raise RateLimitError(retry_after_int) + + return response + + except httpx.TimeoutException as e: + last_exception = NetworkError(f"Request timeout: {e}", e) + except httpx.NetworkError as e: + last_exception = NetworkError(f"Network error: {e}", e) + except (PyPIAuthenticationError, PyPIPermissionError, RateLimitError): + # Don't retry these errors + raise + except Exception as e: + last_exception = NetworkError(f"Unexpected error: {e}", e) + + # Wait before retry (except on last attempt) + if attempt < self.max_retries: + await asyncio.sleep(self.retry_delay * (2**attempt)) + + # If we get here, all retries failed + raise last_exception + + +async def upload_package_to_pypi( + distribution_paths: List[str], + api_token: Optional[str] = None, + test_pypi: bool = False, + skip_existing: bool = True, + verify_uploads: bool = True, +) -> Dict[str, Any]: + """ + Upload package distributions to PyPI or TestPyPI. + + Args: + distribution_paths: List of paths to distribution files (.whl, .tar.gz) + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to upload to TestPyPI instead of production PyPI + skip_existing: Skip files that already exist on PyPI + verify_uploads: Verify uploads after completion + + Returns: + Dictionary containing upload results and metadata + + Raises: + PyPIAuthenticationError: If authentication fails + PyPIUploadError: If upload operations fail + NetworkError: For network-related errors + """ + logger.info(f"Starting upload of {len(distribution_paths)} distributions to {'TestPyPI' if test_pypi else 'PyPI'}") + + if not distribution_paths: + raise ValueError("No distribution paths provided") + + # Validate all distribution files exist + missing_files = [] + valid_files = [] + + for path_str in distribution_paths: + path = Path(path_str) + if not path.exists(): + missing_files.append(str(path)) + elif not path.suffix.lower() in ['.whl', '.tar.gz']: + logger.warning(f"Skipping non-distribution file: {path}") + else: + valid_files.append(path) + + if missing_files: + raise FileNotFoundError(f"Distribution files not found: {missing_files}") + + if not valid_files: + raise ValueError("No valid distribution files found") + + results = { + "upload_results": [], + "total_files": len(valid_files), + "successful_uploads": 0, + "failed_uploads": 0, + "skipped_uploads": 0, + "target_repository": "TestPyPI" if test_pypi else "PyPI", + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + async with PyPIPublishingClient( + api_token=api_token, + test_pypi=test_pypi + ) as client: + + # Verify authentication + try: + auth_result = await check_pypi_credentials(api_token, test_pypi) + if not auth_result.get("valid", False): + raise PyPIAuthenticationError("Invalid API token") + except Exception as e: + logger.error(f"Authentication check failed: {e}") + raise PyPIAuthenticationError(f"Authentication failed: {e}") + + # Upload each distribution file + for dist_file in valid_files: + file_result = { + "filename": dist_file.name, + "filepath": str(dist_file), + "size_bytes": dist_file.stat().st_size, + "status": "pending", + "error": None, + "upload_time": None, + } + + try: + logger.info(f"Uploading {dist_file.name}...") + + # Prepare upload data + with open(dist_file, 'rb') as f: + file_content = f.read() + + # Create multipart form data for upload + files = { + 'content': (dist_file.name, file_content, 'application/octet-stream') + } + + data = { + ':action': 'file_upload', + 'protocol_version': '1', + } + + # Make upload request + upload_url = urljoin(client.upload_url, "") + response = await client._make_request( + "POST", + upload_url, + files=files, + data=data, + ) + + if response.status_code == 200: + file_result["status"] = "success" + file_result["upload_time"] = datetime.now(timezone.utc).isoformat() + results["successful_uploads"] += 1 + logger.info(f"Successfully uploaded {dist_file.name}") + + elif response.status_code == 409 and skip_existing: + file_result["status"] = "skipped" + file_result["error"] = "File already exists" + results["skipped_uploads"] += 1 + logger.info(f"Skipped {dist_file.name} (already exists)") + + else: + error_msg = f"Upload failed with status {response.status_code}" + try: + error_data = response.json() + if "message" in error_data: + error_msg = error_data["message"] + except: + error_msg = response.text or error_msg + + file_result["status"] = "failed" + file_result["error"] = error_msg + results["failed_uploads"] += 1 + logger.error(f"Failed to upload {dist_file.name}: {error_msg}") + + except Exception as e: + file_result["status"] = "failed" + file_result["error"] = str(e) + results["failed_uploads"] += 1 + logger.error(f"Exception during upload of {dist_file.name}: {e}") + + results["upload_results"].append(file_result) + + # Verify uploads if requested + if verify_uploads and results["successful_uploads"] > 0: + logger.info("Verifying uploads...") + verification_results = [] + + for file_result in results["upload_results"]: + if file_result["status"] == "success": + # Extract package name from filename + filename = file_result["filename"] + if filename.endswith('.whl'): + # Parse wheel filename: name-version-python-abi-platform.whl + parts = filename[:-4].split('-') + if len(parts) >= 2: + package_name = parts[0] + else: + continue + elif filename.endswith('.tar.gz'): + # Parse sdist filename: name-version.tar.gz + parts = filename[:-7].split('-') + if len(parts) >= 2: + package_name = parts[0] + else: + continue + else: + continue + + try: + # Check if package is now available + verify_url = f"{client.api_url}/{package_name}/json" + verify_response = await client._make_request("GET", verify_url) + + if verify_response.status_code == 200: + verification_results.append({ + "filename": filename, + "package_name": package_name, + "verified": True, + }) + else: + verification_results.append({ + "filename": filename, + "package_name": package_name, + "verified": False, + "error": f"Package not found (status: {verify_response.status_code})", + }) + except Exception as e: + verification_results.append({ + "filename": filename, + "package_name": package_name, + "verified": False, + "error": str(e), + }) + + results["verification_results"] = verification_results + + # Generate summary + results["summary"] = { + "total_processed": len(valid_files), + "successful": results["successful_uploads"], + "failed": results["failed_uploads"], + "skipped": results["skipped_uploads"], + "success_rate": results["successful_uploads"] / len(valid_files) * 100 if valid_files else 0, + } + + logger.info(f"Upload completed: {results['summary']}") + return results + + +async def check_pypi_credentials( + api_token: Optional[str] = None, + test_pypi: bool = False, +) -> Dict[str, Any]: + """ + Validate PyPI API token and credentials. + + Args: + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to check against TestPyPI instead of production PyPI + + Returns: + Dictionary containing credential validation results + + Raises: + PyPIAuthenticationError: If credential validation fails + NetworkError: For network-related errors + """ + logger.info(f"Checking {'TestPyPI' if test_pypi else 'PyPI'} credentials") + + token = api_token or os.getenv("PYPI_API_TOKEN") + if not token: + return { + "valid": False, + "error": "No API token provided", + "source": "environment_variable" if not api_token else "parameter", + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + # Validate token format + if not token.startswith("pypi-"): + return { + "valid": False, + "error": "Invalid token format (should start with 'pypi-')", + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + async with PyPIPublishingClient(api_token=token, test_pypi=test_pypi) as client: + try: + # Try to access user account information + if test_pypi: + # TestPyPI doesn't have a reliable user info endpoint, try upload check instead + test_url = "https://test.pypi.org/legacy/" + else: + test_url = "https://upload.pypi.org/legacy/" + + # Make a simple authenticated request + response = await client._make_request("GET", test_url) + + if response.status_code in [200, 405]: # 405 is expected for GET on upload endpoint + return { + "valid": True, + "token_format": "valid", + "repository": "TestPyPI" if test_pypi else "PyPI", + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + elif response.status_code == 401: + return { + "valid": False, + "error": "Invalid or expired API token", + "status_code": 401, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + elif response.status_code == 403: + return { + "valid": False, + "error": "API token lacks required permissions", + "status_code": 403, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + else: + return { + "valid": False, + "error": f"Unexpected response: {response.status_code}", + "status_code": response.status_code, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + except PyPIAuthenticationError as e: + return { + "valid": False, + "error": str(e), + "status_code": e.status_code, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + except Exception as e: + logger.error(f"Error checking credentials: {e}") + return { + "valid": False, + "error": f"Credential check failed: {e}", + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +async def get_pypi_upload_history( + package_name: str, + api_token: Optional[str] = None, + test_pypi: bool = False, + limit: int = 50, +) -> Dict[str, Any]: + """ + Get upload history for a PyPI package. + + Args: + package_name: Name of the package to get upload history for + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to check TestPyPI instead of production PyPI + limit: Maximum number of uploads to return + + Returns: + Dictionary containing upload history and metadata + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package is not found + NetworkError: For network-related errors + """ + package_name = package_name.strip() + if not package_name: + raise InvalidPackageNameError(package_name) + + logger.info(f"Getting upload history for {package_name} on {'TestPyPI' if test_pypi else 'PyPI'}") + + async with PyPIPublishingClient(api_token=api_token, test_pypi=test_pypi) as client: + try: + # Get package information + api_url = f"{client.api_url}/{package_name}/json" + response = await client._make_request("GET", api_url) + + if response.status_code == 404: + raise PackageNotFoundError(package_name) + elif response.status_code != 200: + raise PyPIServerError(response.status_code, f"Failed to fetch package data") + + package_data = response.json() + + # Extract upload history from releases + upload_history = [] + releases = package_data.get("releases", {}) + + for version, files in releases.items(): + for file_info in files: + upload_history.append({ + "version": version, + "filename": file_info.get("filename", ""), + "upload_time": file_info.get("upload_time", ""), + "upload_time_iso": file_info.get("upload_time_iso_8601", ""), + "size": file_info.get("size", 0), + "python_version": file_info.get("python_version", ""), + "packagetype": file_info.get("packagetype", ""), + "md5_digest": file_info.get("md5_digest", ""), + "sha256_digest": file_info.get("digests", {}).get("sha256", ""), + "url": file_info.get("url", ""), + "yanked": file_info.get("yanked", False), + "yanked_reason": file_info.get("yanked_reason", ""), + }) + + # Sort by upload time (newest first) + upload_history.sort( + key=lambda x: x.get("upload_time_iso", x.get("upload_time", "")), + reverse=True + ) + + # Apply limit + if limit and limit > 0: + upload_history = upload_history[:limit] + + # Calculate statistics + total_uploads = len(upload_history) + package_types = {} + total_size = 0 + yanked_count = 0 + + for upload in upload_history: + pkg_type = upload.get("packagetype", "unknown") + package_types[pkg_type] = package_types.get(pkg_type, 0) + 1 + total_size += upload.get("size", 0) + if upload.get("yanked", False): + yanked_count += 1 + + # Get latest version info + info = package_data.get("info", {}) + + result = { + "package_name": package_name, + "repository": "TestPyPI" if test_pypi else "PyPI", + "upload_history": upload_history, + "statistics": { + "total_uploads": total_uploads, + "total_versions": len(releases), + "total_size_bytes": total_size, + "yanked_uploads": yanked_count, + "package_types": package_types, + }, + "package_info": { + "current_version": info.get("version", ""), + "author": info.get("author", ""), + "maintainer": info.get("maintainer", ""), + "license": info.get("license", ""), + "homepage": info.get("home_page", ""), + }, + "limit_applied": limit, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + logger.info(f"Retrieved {total_uploads} upload records for {package_name}") + return result + + except (PackageNotFoundError, PyPIServerError): + raise + except Exception as e: + logger.error(f"Error getting upload history for {package_name}: {e}") + raise NetworkError(f"Failed to get upload history: {e}", e) + + +async def delete_pypi_release( + package_name: str, + version: str, + api_token: Optional[str] = None, + test_pypi: bool = False, + confirm_deletion: bool = False, + dry_run: bool = True, +) -> Dict[str, Any]: + """ + Delete a specific release from PyPI (with safety checks). + + Note: PyPI deletion is very restricted and typically only available to package owners + within a limited time window after upload. + + Args: + package_name: Name of the package + version: Version to delete + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + confirm_deletion: Explicit confirmation required for actual deletion + dry_run: If True, only simulate the deletion without actually performing it + + Returns: + Dictionary containing deletion results and safety information + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package/version is not found + PyPIPermissionError: If deletion is not permitted + NetworkError: For network-related errors + """ + package_name = package_name.strip() + version = version.strip() + + if not package_name: + raise InvalidPackageNameError(package_name) + if not version: + raise ValueError("Version cannot be empty") + + logger.info(f"{'DRY RUN: ' if dry_run else ''}Deleting {package_name}=={version} from {'TestPyPI' if test_pypi else 'PyPI'}") + + # Safety checks + safety_warnings = [] + + # Check if this is production PyPI + if not test_pypi: + safety_warnings.append("PRODUCTION PyPI deletion - this action is irreversible!") + + # Check for confirmation + if not confirm_deletion and not dry_run: + safety_warnings.append("Explicit confirmation required for deletion") + return { + "success": False, + "dry_run": dry_run, + "safety_warnings": safety_warnings, + "error": "Deletion not confirmed. Set confirm_deletion=True to proceed.", + "package_name": package_name, + "version": version, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + async with PyPIPublishingClient(api_token=api_token, test_pypi=test_pypi) as client: + try: + # First, verify the release exists + api_url = f"{client.api_url}/{package_name}/{version}/json" + response = await client._make_request("GET", api_url) + + if response.status_code == 404: + raise PackageNotFoundError(f"{package_name}=={version}") + elif response.status_code != 200: + raise PyPIServerError(response.status_code, "Failed to verify release") + + release_data = response.json() + release_info = release_data.get("info", {}) + + # Analyze release details + upload_time = release_info.get("upload_time", "") + files = release_data.get("urls", []) + file_count = len(files) + + # Check upload recency (PyPI typically allows deletion only within hours) + if upload_time: + try: + from datetime import datetime + upload_dt = datetime.fromisoformat(upload_time.replace('Z', '+00:00')) + age_hours = (datetime.now(timezone.utc) - upload_dt).total_seconds() / 3600 + + if age_hours > 24: + safety_warnings.append(f"Release is {age_hours:.1f} hours old - deletion may not be permitted") + except: + safety_warnings.append("Could not determine upload time") + + if file_count > 1: + safety_warnings.append(f"Release contains {file_count} distribution files") + + result = { + "package_name": package_name, + "version": version, + "dry_run": dry_run, + "safety_warnings": safety_warnings, + "release_info": { + "upload_time": upload_time, + "file_count": file_count, + "files": [f.get("filename", "") for f in files], + "author": release_info.get("author", ""), + "summary": release_info.get("summary", ""), + }, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + if dry_run: + result.update({ + "success": True, + "action": "dry_run_completed", + "message": "Dry run completed successfully. Release exists and could potentially be deleted.", + }) + logger.info(f"DRY RUN: {package_name}=={version} deletion simulation completed") + return result + + # Attempt actual deletion + # Note: PyPI's deletion API is very restricted and may not be available + delete_url = f"{client.api_url}/{package_name}/{version}/" + + try: + delete_response = await client._make_request("DELETE", delete_url) + + if delete_response.status_code in [200, 204]: + result.update({ + "success": True, + "action": "deleted", + "message": f"Successfully deleted {package_name}=={version}", + }) + logger.info(f"Successfully deleted {package_name}=={version}") + + elif delete_response.status_code == 403: + result.update({ + "success": False, + "action": "permission_denied", + "error": "Deletion not permitted - insufficient permissions or time window expired", + }) + + elif delete_response.status_code == 405: + result.update({ + "success": False, + "action": "not_supported", + "error": "Deletion is not supported or available for this package/version", + }) + + else: + error_msg = f"Deletion failed with status {delete_response.status_code}" + try: + error_data = delete_response.json() + if "message" in error_data: + error_msg = error_data["message"] + except: + pass + + result.update({ + "success": False, + "action": "failed", + "error": error_msg, + "status_code": delete_response.status_code, + }) + + except PyPIPermissionError as e: + result.update({ + "success": False, + "action": "permission_denied", + "error": str(e), + }) + except Exception as e: + result.update({ + "success": False, + "action": "error", + "error": f"Deletion attempt failed: {e}", + }) + + return result + + except (PackageNotFoundError, PyPIServerError): + raise + except Exception as e: + logger.error(f"Error deleting release {package_name}=={version}: {e}") + raise NetworkError(f"Failed to delete release: {e}", e) + + +async def manage_pypi_maintainers( + package_name: str, + action: str, + username: Optional[str] = None, + api_token: Optional[str] = None, + test_pypi: bool = False, +) -> Dict[str, Any]: + """ + Manage package maintainers (add/remove/list). + + Note: Maintainer management requires package owner permissions. + + Args: + package_name: Name of the package + action: Action to perform ('list', 'add', 'remove') + username: Username to add/remove (required for add/remove actions) + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + + Returns: + Dictionary containing maintainer management results + + Raises: + InvalidPackageNameError: If package name is invalid + PackageNotFoundError: If package is not found + PyPIPermissionError: If action is not permitted + NetworkError: For network-related errors + """ + package_name = package_name.strip() + if not package_name: + raise InvalidPackageNameError(package_name) + + action = action.lower().strip() + if action not in ['list', 'add', 'remove']: + raise ValueError(f"Invalid action '{action}'. Must be 'list', 'add', or 'remove'") + + if action in ['add', 'remove'] and not username: + raise ValueError(f"Username required for '{action}' action") + + logger.info(f"Managing maintainers for {package_name} on {'TestPyPI' if test_pypi else 'PyPI'}: {action}") + + async with PyPIPublishingClient(api_token=api_token, test_pypi=test_pypi) as client: + try: + # First verify package exists + api_url = f"{client.api_url}/{package_name}/json" + response = await client._make_request("GET", api_url) + + if response.status_code == 404: + raise PackageNotFoundError(package_name) + elif response.status_code != 200: + raise PyPIServerError(response.status_code, "Failed to fetch package data") + + package_data = response.json() + info = package_data.get("info", {}) + + # Extract current maintainer information + current_maintainers = [] + + # Get author information + author = info.get("author", "") + author_email = info.get("author_email", "") + if author: + current_maintainers.append({ + "type": "author", + "name": author, + "email": author_email, + }) + + # Get maintainer information + maintainer = info.get("maintainer", "") + maintainer_email = info.get("maintainer_email", "") + if maintainer: + current_maintainers.append({ + "type": "maintainer", + "name": maintainer, + "email": maintainer_email, + }) + + result = { + "package_name": package_name, + "action": action, + "current_maintainers": current_maintainers, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + if action == "list": + result.update({ + "success": True, + "maintainer_count": len(current_maintainers), + "package_info": { + "version": info.get("version", ""), + "summary": info.get("summary", ""), + "license": info.get("license", ""), + }, + }) + logger.info(f"Listed {len(current_maintainers)} maintainers for {package_name}") + return result + + # For add/remove actions, we need to use PyPI's management interface + # Note: This is typically done through the web interface, not API + + result.update({ + "username": username, + "success": False, + "error": f"Maintainer {action} operations are not supported via API", + "alternative_method": { + "description": f"Use PyPI web interface to {action} maintainers", + "url": f"{'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/project/{package_name}/collaborators/", + "instructions": [ + "1. Log in to PyPI web interface", + "2. Navigate to project management page", + "3. Go to 'Collaborators' section", + f"4. {action.title()} the specified user", + ], + }, + }) + + # In a real implementation, you might attempt to use undocumented APIs + # or web scraping, but this is not recommended due to stability concerns + + return result + + except (PackageNotFoundError, PyPIServerError): + raise + except Exception as e: + logger.error(f"Error managing maintainers for {package_name}: {e}") + raise NetworkError(f"Failed to manage maintainers: {e}", e) + + +async def get_pypi_account_info( + api_token: Optional[str] = None, + test_pypi: bool = False, +) -> Dict[str, Any]: + """ + Get PyPI account information, quotas, and limits. + + Args: + api_token: PyPI API token (or use PYPI_API_TOKEN env var) + test_pypi: Whether to use TestPyPI instead of production PyPI + + Returns: + Dictionary containing account information and limitations + + Raises: + PyPIAuthenticationError: If authentication fails + NetworkError: For network-related errors + """ + logger.info(f"Getting account information for {'TestPyPI' if test_pypi else 'PyPI'}") + + async with PyPIPublishingClient(api_token=api_token, test_pypi=test_pypi) as client: + try: + # Verify credentials first + cred_result = await check_pypi_credentials(api_token, test_pypi) + if not cred_result.get("valid", False): + raise PyPIAuthenticationError("Invalid credentials") + + result = { + "repository": "TestPyPI" if test_pypi else "PyPI", + "credentials": cred_result, + "test_pypi": test_pypi, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + # Note: PyPI doesn't provide a comprehensive account info API + # Most account information is only available through the web interface + + # Try to get basic account information through available endpoints + account_info = { + "api_token_valid": True, + "token_permissions": "upload", # Basic assumption for upload tokens + "account_limitations": { + "note": "PyPI does not provide detailed account info via API", + "general_limits": { + "upload_size_limit": "60 MB per file", + "project_name_length": "214 characters maximum", + "version_string_length": "64 characters maximum", + "description_length": "No specific limit", + }, + "rate_limits": { + "uploads": "Varies by account age and reputation", + "api_requests": "No published rate limits", + }, + }, + "features": { + "two_factor_auth": "Available (recommended)", + "api_tokens": "Supported", + "trusted_publishing": "Available (OIDC)", + "project_management": "Via web interface", + }, + } + + # Get user projects if possible (this requires web scraping or undocumented APIs) + # For now, provide guidance on how to get this information + account_info["user_projects"] = { + "note": "Project list not available via API", + "alternative": f"Visit {'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/projects/ to see your projects", + } + + result["account_info"] = account_info + + # Add recommendations + result["recommendations"] = [ + "Enable two-factor authentication for enhanced security", + "Use scoped API tokens for specific projects when possible", + "Consider using trusted publishing (OIDC) for CI/CD workflows", + "Regularly review and rotate API tokens", + "Monitor upload quotas and limits through the web interface", + ] + + # Add useful links + result["useful_links"] = { + "account_settings": f"{'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/account/", + "api_tokens": f"{'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/account/token/", + "projects": f"{'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/projects/", + "trusted_publishing": f"{'https://test.pypi.org' if test_pypi else 'https://pypi.org'}/manage/account/publishing/", + "help": "https://pypi.org/help/", + } + + logger.info("Successfully retrieved account information") + return result + + except PyPIAuthenticationError: + raise + except Exception as e: + logger.error(f"Error getting account information: {e}") + raise NetworkError(f"Failed to get account information: {e}", e) \ No newline at end of file diff --git a/tests/test_publishing.py b/tests/test_publishing.py new file mode 100644 index 0000000..655bbd1 --- /dev/null +++ b/tests/test_publishing.py @@ -0,0 +1,848 @@ +"""Tests for PyPI publishing and account management tools.""" + +import asyncio +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import httpx +import pytest + +from pypi_query_mcp.core.exceptions import ( + InvalidPackageNameError, + NetworkError, + PackageNotFoundError, + PyPIAuthenticationError, + PyPIPermissionError, + PyPIServerError, + PyPIUploadError, + RateLimitError, +) +from pypi_query_mcp.tools.publishing import ( + PyPIPublishingClient, + check_pypi_credentials, + delete_pypi_release, + get_pypi_account_info, + get_pypi_upload_history, + manage_pypi_maintainers, + upload_package_to_pypi, +) + + +class TestPyPIPublishingClient: + """Test cases for PyPIPublishingClient.""" + + def test_init_default(self): + """Test client initialization with default values.""" + client = PyPIPublishingClient() + + assert client.api_token is None + assert client.test_pypi is False + assert client.timeout == 60.0 + assert client.max_retries == 3 + assert client.retry_delay == 2.0 + assert "upload.pypi.org" in client.upload_url + assert "pypi.org" in client.api_url + + def test_init_test_pypi(self): + """Test client initialization for TestPyPI.""" + client = PyPIPublishingClient(test_pypi=True) + + assert client.test_pypi is True + assert "test.pypi.org" in client.upload_url + assert "test.pypi.org" in client.api_url + + def test_init_with_token(self): + """Test client initialization with API token.""" + token = "pypi-test-token" + client = PyPIPublishingClient(api_token=token) + + assert client.api_token == token + assert "Authorization" in client._client.headers + assert client._client.headers["Authorization"] == f"token {token}" + + def test_validate_package_name_valid(self): + """Test package name validation with valid names.""" + client = PyPIPublishingClient() + + valid_names = [ + "requests", + "django-rest-framework", + "numpy", + "package_name", + "package.name", + "a", + "a1", + "package-1.0", + ] + + for name in valid_names: + result = client._validate_package_name(name) + assert result == name.strip() + + def test_validate_package_name_invalid(self): + """Test package name validation with invalid names.""" + client = PyPIPublishingClient() + + invalid_names = [ + "", + " ", + "-invalid", + "invalid-", + ".invalid", + "invalid.", + "in..valid", + "in--valid", + "in valid", + ] + + for name in invalid_names: + with pytest.raises(InvalidPackageNameError): + client._validate_package_name(name) + + @pytest.mark.asyncio + async def test_make_request_success(self): + """Test successful HTTP request.""" + with patch.object(httpx.AsyncClient, 'request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_request.return_value = mock_response + + client = PyPIPublishingClient() + response = await client._make_request("GET", "https://example.com") + + assert response == mock_response + mock_request.assert_called_once_with("GET", "https://example.com") + + @pytest.mark.asyncio + async def test_make_request_authentication_error(self): + """Test HTTP request with authentication error.""" + with patch.object(httpx.AsyncClient, 'request') as mock_request: + mock_response = Mock() + mock_response.status_code = 401 + mock_request.return_value = mock_response + + client = PyPIPublishingClient() + + with pytest.raises(PyPIAuthenticationError): + await client._make_request("GET", "https://example.com") + + @pytest.mark.asyncio + async def test_make_request_permission_error(self): + """Test HTTP request with permission error.""" + with patch.object(httpx.AsyncClient, 'request') as mock_request: + mock_response = Mock() + mock_response.status_code = 403 + mock_request.return_value = mock_response + + client = PyPIPublishingClient() + + with pytest.raises(PyPIPermissionError): + await client._make_request("GET", "https://example.com") + + @pytest.mark.asyncio + async def test_make_request_rate_limit_error(self): + """Test HTTP request with rate limit error.""" + with patch.object(httpx.AsyncClient, 'request') as mock_request: + mock_response = Mock() + mock_response.status_code = 429 + mock_response.headers = {"Retry-After": "60"} + mock_request.return_value = mock_response + + client = PyPIPublishingClient() + + with pytest.raises(RateLimitError) as exc_info: + await client._make_request("GET", "https://example.com") + + assert exc_info.value.retry_after == 60 + + @pytest.mark.asyncio + async def test_make_request_network_error_with_retry(self): + """Test HTTP request with network error and retry logic.""" + with patch.object(httpx.AsyncClient, 'request') as mock_request: + mock_request.side_effect = httpx.NetworkError("Connection failed") + + client = PyPIPublishingClient(max_retries=1, retry_delay=0.01) + + with pytest.raises(NetworkError): + await client._make_request("GET", "https://example.com") + + # Should retry once (initial + 1 retry = 2 calls) + assert mock_request.call_count == 2 + + @pytest.mark.asyncio + async def test_context_manager(self): + """Test client as async context manager.""" + with patch.object(PyPIPublishingClient, 'close') as mock_close: + async with PyPIPublishingClient() as client: + assert client is not None + + mock_close.assert_called_once() + + +class TestUploadPackageToPyPI: + """Test cases for upload_package_to_pypi function.""" + + @pytest.fixture + def temp_dist_files(self): + """Create temporary distribution files for testing.""" + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create fake distribution files + wheel_file = temp_path / "test_package-1.0.0-py3-none-any.whl" + sdist_file = temp_path / "test_package-1.0.0.tar.gz" + + wheel_file.write_bytes(b"fake wheel content") + sdist_file.write_bytes(b"fake sdist content") + + yield [str(wheel_file), str(sdist_file)] + + @pytest.mark.asyncio + async def test_upload_no_distribution_paths(self): + """Test upload with no distribution paths.""" + with pytest.raises(ValueError, match="No distribution paths provided"): + await upload_package_to_pypi([]) + + @pytest.mark.asyncio + async def test_upload_missing_files(self): + """Test upload with missing distribution files.""" + missing_files = ["/nonexistent/file1.whl", "/nonexistent/file2.tar.gz"] + + with pytest.raises(FileNotFoundError): + await upload_package_to_pypi(missing_files) + + @pytest.mark.asyncio + async def test_upload_invalid_files(self, temp_dist_files): + """Test upload with invalid file types.""" + temp_dir = Path(temp_dist_files[0]).parent + invalid_file = temp_dir / "invalid.txt" + invalid_file.write_text("not a distribution file") + + # Should skip invalid files and proceed with valid ones + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_request.return_value = mock_response + + result = await upload_package_to_pypi( + temp_dist_files + [str(invalid_file)], + test_pypi=True + ) + + # Should only process the 2 valid distribution files + assert result["total_files"] == 2 + + @pytest.mark.asyncio + async def test_upload_authentication_failure(self, temp_dist_files): + """Test upload with authentication failure.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": False} + + with pytest.raises(PyPIAuthenticationError): + await upload_package_to_pypi(temp_dist_files, api_token="invalid-token") + + @pytest.mark.asyncio + async def test_upload_successful(self, temp_dist_files): + """Test successful upload.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_request.return_value = mock_response + + result = await upload_package_to_pypi( + temp_dist_files, + api_token="valid-token", + test_pypi=True + ) + + assert result["successful_uploads"] == 2 + assert result["failed_uploads"] == 0 + assert result["target_repository"] == "TestPyPI" + assert len(result["upload_results"]) == 2 + + for upload_result in result["upload_results"]: + assert upload_result["status"] == "success" + + @pytest.mark.asyncio + async def test_upload_file_exists_skip(self, temp_dist_files): + """Test upload with existing file and skip_existing=True.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 409 # Conflict - file exists + mock_request.return_value = mock_response + + result = await upload_package_to_pypi( + temp_dist_files, + skip_existing=True, + test_pypi=True + ) + + assert result["successful_uploads"] == 0 + assert result["skipped_uploads"] == 2 + + for upload_result in result["upload_results"]: + assert upload_result["status"] == "skipped" + assert "already exists" in upload_result["error"] + + @pytest.mark.asyncio + async def test_upload_with_verification(self, temp_dist_files): + """Test upload with verification enabled.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + # Mock upload response + upload_response = Mock() + upload_response.status_code = 200 + + # Mock verification response + verify_response = Mock() + verify_response.status_code = 200 + + mock_request.side_effect = [upload_response, upload_response, verify_response, verify_response] + + result = await upload_package_to_pypi( + temp_dist_files, + verify_uploads=True, + test_pypi=True + ) + + assert result["successful_uploads"] == 2 + assert "verification_results" in result + assert len(result["verification_results"]) == 2 + + +class TestCheckPyPICredentials: + """Test cases for check_pypi_credentials function.""" + + @pytest.mark.asyncio + async def test_no_token_provided(self): + """Test credential check with no token provided.""" + with patch.dict(os.environ, {}, clear=True): + result = await check_pypi_credentials() + + assert result["valid"] is False + assert "No API token provided" in result["error"] + + @pytest.mark.asyncio + async def test_invalid_token_format(self): + """Test credential check with invalid token format.""" + result = await check_pypi_credentials(api_token="invalid-format") + + assert result["valid"] is False + assert "Invalid token format" in result["error"] + + @pytest.mark.asyncio + async def test_valid_credentials(self): + """Test credential check with valid credentials.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_request.return_value = mock_response + + result = await check_pypi_credentials( + api_token="pypi-valid-token", + test_pypi=True + ) + + assert result["valid"] is True + assert result["repository"] == "TestPyPI" + + @pytest.mark.asyncio + async def test_invalid_credentials(self): + """Test credential check with invalid credentials.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_request.side_effect = PyPIAuthenticationError("Invalid token", 401) + + result = await check_pypi_credentials(api_token="pypi-invalid-token") + + assert result["valid"] is False + assert result["status_code"] == 401 + + @pytest.mark.asyncio + async def test_permission_denied(self): + """Test credential check with permission denied.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 403 + mock_request.return_value = mock_response + + result = await check_pypi_credentials(api_token="pypi-limited-token") + + assert result["valid"] is False + assert result["status_code"] == 403 + assert "permissions" in result["error"] + + +class TestGetPyPIUploadHistory: + """Test cases for get_pypi_upload_history function.""" + + @pytest.mark.asyncio + async def test_invalid_package_name(self): + """Test upload history with invalid package name.""" + with pytest.raises(InvalidPackageNameError): + await get_pypi_upload_history("") + + @pytest.mark.asyncio + async def test_package_not_found(self): + """Test upload history for non-existent package.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 404 + mock_request.return_value = mock_response + + with pytest.raises(PackageNotFoundError): + await get_pypi_upload_history("nonexistent-package") + + @pytest.mark.asyncio + async def test_successful_history_retrieval(self): + """Test successful upload history retrieval.""" + mock_package_data = { + "info": { + "name": "test-package", + "version": "1.0.0", + "author": "Test Author", + "summary": "A test package", + "license": "MIT", + }, + "releases": { + "1.0.0": [ + { + "filename": "test_package-1.0.0-py3-none-any.whl", + "upload_time": "2024-01-01T12:00:00", + "upload_time_iso_8601": "2024-01-01T12:00:00Z", + "size": 12345, + "python_version": "py3", + "packagetype": "bdist_wheel", + "md5_digest": "abcd1234", + "digests": {"sha256": "efgh5678"}, + "url": "https://pypi.org/...", + "yanked": False, + } + ], + "0.9.0": [ + { + "filename": "test_package-0.9.0.tar.gz", + "upload_time": "2023-12-01T12:00:00", + "upload_time_iso_8601": "2023-12-01T12:00:00Z", + "size": 9876, + "python_version": "source", + "packagetype": "sdist", + "md5_digest": "wxyz9999", + "digests": {"sha256": "ijkl0000"}, + "url": "https://pypi.org/...", + "yanked": True, + "yanked_reason": "Critical bug", + } + ], + }, + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_package_data + mock_request.return_value = mock_response + + result = await get_pypi_upload_history("test-package", limit=10) + + assert result["package_name"] == "test-package" + assert len(result["upload_history"]) == 2 + assert result["statistics"]["total_uploads"] == 2 + assert result["statistics"]["total_versions"] == 2 + assert result["statistics"]["yanked_uploads"] == 1 + assert result["statistics"]["package_types"]["bdist_wheel"] == 1 + assert result["statistics"]["package_types"]["sdist"] == 1 + + # Check that uploads are sorted by time (newest first) + assert result["upload_history"][0]["version"] == "1.0.0" + assert result["upload_history"][1]["version"] == "0.9.0" + + @pytest.mark.asyncio + async def test_upload_history_with_limit(self): + """Test upload history retrieval with limit.""" + mock_package_data = { + "info": {"name": "test-package", "version": "3.0.0"}, + "releases": { + f"{i}.0.0": [ + { + "filename": f"test_package-{i}.0.0.tar.gz", + "upload_time_iso_8601": f"2024-01-{i:02d}T12:00:00Z", + "size": 1000 + i, + "packagetype": "sdist", + "yanked": False, + } + ] + for i in range(1, 11) # 10 versions + }, + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_package_data + mock_request.return_value = mock_response + + result = await get_pypi_upload_history("test-package", limit=5) + + # Should only return 5 uploads due to limit + assert len(result["upload_history"]) == 5 + assert result["limit_applied"] == 5 + + +class TestDeletePyPIRelease: + """Test cases for delete_pypi_release function.""" + + @pytest.mark.asyncio + async def test_invalid_package_name(self): + """Test deletion with invalid package name.""" + with pytest.raises(InvalidPackageNameError): + await delete_pypi_release("", "1.0.0") + + @pytest.mark.asyncio + async def test_empty_version(self): + """Test deletion with empty version.""" + with pytest.raises(ValueError, match="Version cannot be empty"): + await delete_pypi_release("test-package", "") + + @pytest.mark.asyncio + async def test_deletion_not_confirmed(self): + """Test deletion without confirmation.""" + result = await delete_pypi_release( + "test-package", + "1.0.0", + confirm_deletion=False, + dry_run=False + ) + + assert result["success"] is False + assert "not confirmed" in result["error"] + assert "PRODUCTION PyPI deletion" in result["safety_warnings"] + + @pytest.mark.asyncio + async def test_dry_run_successful(self): + """Test successful dry run deletion.""" + mock_release_data = { + "info": { + "name": "test-package", + "version": "1.0.0", + "upload_time": "2024-01-01T12:00:00Z", + "author": "Test Author", + "summary": "Test package", + }, + "urls": [ + {"filename": "test_package-1.0.0.tar.gz", "packagetype": "sdist"}, + {"filename": "test_package-1.0.0-py3-none-any.whl", "packagetype": "bdist_wheel"}, + ], + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_release_data + mock_request.return_value = mock_response + + result = await delete_pypi_release( + "test-package", + "1.0.0", + dry_run=True, + test_pypi=True + ) + + assert result["success"] is True + assert result["dry_run"] is True + assert result["action"] == "dry_run_completed" + assert result["release_info"]["file_count"] == 2 + + @pytest.mark.asyncio + async def test_package_not_found(self): + """Test deletion of non-existent package/version.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 404 + mock_request.return_value = mock_response + + with pytest.raises(PackageNotFoundError): + await delete_pypi_release("nonexistent-package", "1.0.0") + + @pytest.mark.asyncio + async def test_deletion_permission_denied(self): + """Test deletion with permission denied.""" + mock_release_data = { + "info": {"name": "test-package", "version": "1.0.0"}, + "urls": [{"filename": "test_package-1.0.0.tar.gz"}], + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + # First call for verification, second for deletion + verify_response = Mock() + verify_response.status_code = 200 + verify_response.json.return_value = mock_release_data + + delete_response = Mock() + delete_response.status_code = 403 + + mock_request.side_effect = [verify_response, delete_response] + + result = await delete_pypi_release( + "test-package", + "1.0.0", + confirm_deletion=True, + dry_run=False, + test_pypi=True + ) + + assert result["success"] is False + assert result["action"] == "permission_denied" + + @pytest.mark.asyncio + async def test_successful_deletion(self): + """Test successful deletion.""" + mock_release_data = { + "info": {"name": "test-package", "version": "1.0.0"}, + "urls": [{"filename": "test_package-1.0.0.tar.gz"}], + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + # First call for verification, second for deletion + verify_response = Mock() + verify_response.status_code = 200 + verify_response.json.return_value = mock_release_data + + delete_response = Mock() + delete_response.status_code = 204 + + mock_request.side_effect = [verify_response, delete_response] + + result = await delete_pypi_release( + "test-package", + "1.0.0", + confirm_deletion=True, + dry_run=False, + test_pypi=True + ) + + assert result["success"] is True + assert result["action"] == "deleted" + + +class TestManagePyPIMaintainers: + """Test cases for manage_pypi_maintainers function.""" + + @pytest.mark.asyncio + async def test_invalid_package_name(self): + """Test maintainer management with invalid package name.""" + with pytest.raises(InvalidPackageNameError): + await manage_pypi_maintainers("", "list") + + @pytest.mark.asyncio + async def test_invalid_action(self): + """Test maintainer management with invalid action.""" + with pytest.raises(ValueError, match="Invalid action"): + await manage_pypi_maintainers("test-package", "invalid") + + @pytest.mark.asyncio + async def test_missing_username_for_add(self): + """Test add action without username.""" + with pytest.raises(ValueError, match="Username required"): + await manage_pypi_maintainers("test-package", "add") + + @pytest.mark.asyncio + async def test_list_maintainers_successful(self): + """Test successful maintainer listing.""" + mock_package_data = { + "info": { + "name": "test-package", + "version": "1.0.0", + "author": "John Doe", + "author_email": "john@example.com", + "maintainer": "Jane Smith", + "maintainer_email": "jane@example.com", + "summary": "Test package", + "license": "MIT", + }, + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_package_data + mock_request.return_value = mock_response + + result = await manage_pypi_maintainers("test-package", "list") + + assert result["success"] is True + assert result["action"] == "list" + assert result["maintainer_count"] == 2 + assert len(result["current_maintainers"]) == 2 + + # Check author information + author_info = next(m for m in result["current_maintainers"] if m["type"] == "author") + assert author_info["name"] == "John Doe" + assert author_info["email"] == "john@example.com" + + # Check maintainer information + maintainer_info = next(m for m in result["current_maintainers"] if m["type"] == "maintainer") + assert maintainer_info["name"] == "Jane Smith" + assert maintainer_info["email"] == "jane@example.com" + + @pytest.mark.asyncio + async def test_add_maintainer_not_supported(self): + """Test add maintainer operation (not supported via API).""" + mock_package_data = { + "info": {"name": "test-package", "author": "John Doe"}, + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_package_data + mock_request.return_value = mock_response + + result = await manage_pypi_maintainers( + "test-package", + "add", + username="newuser" + ) + + assert result["success"] is False + assert "not supported via API" in result["error"] + assert "alternative_method" in result + assert "web interface" in result["alternative_method"]["description"] + + @pytest.mark.asyncio + async def test_package_not_found(self): + """Test maintainer management for non-existent package.""" + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 404 + mock_request.return_value = mock_response + + with pytest.raises(PackageNotFoundError): + await manage_pypi_maintainers("nonexistent-package", "list") + + +class TestGetPyPIAccountInfo: + """Test cases for get_pypi_account_info function.""" + + @pytest.mark.asyncio + async def test_invalid_credentials(self): + """Test account info with invalid credentials.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": False} + + with pytest.raises(PyPIAuthenticationError): + await get_pypi_account_info(api_token="invalid-token") + + @pytest.mark.asyncio + async def test_successful_account_info(self): + """Test successful account info retrieval.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True, "repository": "PyPI"} + + result = await get_pypi_account_info(api_token="valid-token") + + assert result["repository"] == "PyPI" + assert result["credentials"]["valid"] is True + assert "account_info" in result + assert "recommendations" in result + assert "useful_links" in result + + # Check account info structure + account_info = result["account_info"] + assert account_info["api_token_valid"] is True + assert "account_limitations" in account_info + assert "features" in account_info + assert "user_projects" in account_info + + # Check recommendations + assert len(result["recommendations"]) > 0 + assert any("two-factor" in rec for rec in result["recommendations"]) + + # Check useful links + links = result["useful_links"] + assert "account_settings" in links + assert "api_tokens" in links + assert "projects" in links + + @pytest.mark.asyncio + async def test_test_pypi_account_info(self): + """Test account info for TestPyPI.""" + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True, "repository": "TestPyPI"} + + result = await get_pypi_account_info(test_pypi=True) + + assert result["repository"] == "TestPyPI" + assert result["test_pypi"] is True + assert "test.pypi.org" in result["useful_links"]["account_settings"] + + +class TestIntegration: + """Integration tests for publishing tools.""" + + @pytest.mark.asyncio + async def test_end_to_end_workflow_simulation(self): + """Test simulated end-to-end workflow.""" + # This test simulates a complete workflow without making real API calls + + # 1. Check credentials + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + cred_result = await check_pypi_credentials(api_token="pypi-test-token") + assert cred_result["valid"] is True + + # 2. Get account info + with patch('pypi_query_mcp.tools.publishing.check_pypi_credentials') as mock_cred: + mock_cred.return_value = {"valid": True} + + account_result = await get_pypi_account_info(api_token="pypi-test-token") + assert "account_info" in account_result + + # 3. Check upload history + mock_package_data = { + "info": {"name": "test-package"}, + "releases": {"1.0.0": [{"filename": "test-1.0.0.tar.gz"}]}, + } + + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_package_data + mock_request.return_value = mock_response + + history_result = await get_pypi_upload_history("test-package") + assert len(history_result["upload_history"]) == 1 + + # 4. Test dry run deletion + with patch.object(PyPIPublishingClient, '_make_request') as mock_request: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "info": {"name": "test-package", "version": "1.0.0"}, + "urls": [{"filename": "test-1.0.0.tar.gz"}], + } + mock_request.return_value = mock_response + + delete_result = await delete_pypi_release( + "test-package", "1.0.0", dry_run=True + ) + assert delete_result["dry_run"] is True + assert delete_result["success"] is True + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file