"""GitHub API client for fetching repository statistics and popularity metrics.""" import asyncio import logging from typing import Any, Dict, Optional import httpx logger = logging.getLogger(__name__) class GitHubAPIClient: """Async client for GitHub API to fetch repository statistics.""" def __init__( self, timeout: float = 10.0, max_retries: int = 2, retry_delay: float = 1.0, github_token: Optional[str] = None, ): """Initialize GitHub API client. Args: timeout: Request timeout in seconds max_retries: Maximum number of retry attempts retry_delay: Delay between retries in seconds github_token: Optional GitHub API token for higher rate limits """ self.base_url = "https://api.github.com" self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay # Simple in-memory cache for repository data self._cache: Dict[str, Dict[str, Any]] = {} self._cache_ttl = 3600 # 1 hour cache # HTTP client configuration headers = { "Accept": "application/vnd.github.v3+json", "User-Agent": "pypi-query-mcp-server/0.1.0", } if github_token: headers["Authorization"] = f"token {github_token}" self._client = httpx.AsyncClient( timeout=httpx.Timeout(timeout), headers=headers, follow_redirects=True, ) async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def close(self): """Close the HTTP client.""" await self._client.aclose() def _get_cache_key(self, repo: str) -> str: """Generate cache key for repository data.""" return f"repo:{repo}" def _is_cache_valid(self, cache_entry: Dict[str, Any]) -> bool: """Check if cache entry is still valid.""" import time return time.time() - cache_entry.get("timestamp", 0) < self._cache_ttl async def _make_request(self, url: str) -> Optional[Dict[str, Any]]: """Make HTTP request with retry logic and error handling. Args: url: URL to request Returns: JSON response data or None if failed """ last_exception = None for attempt in range(self.max_retries + 1): try: logger.debug(f"Making GitHub API request to {url} (attempt {attempt + 1})") response = await self._client.get(url) # Handle different HTTP status codes if response.status_code == 200: return response.json() elif response.status_code == 404: logger.warning(f"GitHub repository not found: {url}") return None elif response.status_code == 403: # Rate limit or permission issue logger.warning(f"GitHub API rate limit or permission denied: {url}") return None elif response.status_code >= 500: logger.warning(f"GitHub API server error {response.status_code}: {url}") if attempt < self.max_retries: continue return None else: logger.warning(f"Unexpected GitHub API status {response.status_code}: {url}") return None except httpx.TimeoutException: last_exception = f"Request timeout for {url}" logger.warning(last_exception) except httpx.NetworkError as e: last_exception = f"Network error for {url}: {e}" logger.warning(last_exception) except Exception as e: last_exception = f"Unexpected error for {url}: {e}" logger.warning(last_exception) # Wait before retry (except on last attempt) if attempt < self.max_retries: await asyncio.sleep(self.retry_delay * (2 ** attempt)) # If we get here, all retries failed logger.error(f"Failed to fetch GitHub data after {self.max_retries + 1} attempts: {last_exception}") return None async def get_repository_stats(self, repo_path: str, use_cache: bool = True) -> Optional[Dict[str, Any]]: """Get repository statistics from GitHub API. Args: repo_path: Repository path in format "owner/repo" use_cache: Whether to use cached data if available Returns: Dictionary containing repository statistics or None if failed """ cache_key = self._get_cache_key(repo_path) # Check cache first if use_cache and cache_key in self._cache: cache_entry = self._cache[cache_key] if self._is_cache_valid(cache_entry): logger.debug(f"Using cached GitHub data for: {repo_path}") return cache_entry["data"] # Make API request url = f"{self.base_url}/repos/{repo_path}" try: data = await self._make_request(url) if data: # Extract relevant statistics stats = { "stars": data.get("stargazers_count", 0), "forks": data.get("forks_count", 0), "watchers": data.get("watchers_count", 0), "open_issues": data.get("open_issues_count", 0), "size": data.get("size", 0), "language": data.get("language"), "created_at": data.get("created_at"), "updated_at": data.get("updated_at"), "pushed_at": data.get("pushed_at"), "description": data.get("description"), "topics": data.get("topics", []), "homepage": data.get("homepage"), "has_issues": data.get("has_issues", False), "has_projects": data.get("has_projects", False), "has_wiki": data.get("has_wiki", False), "archived": data.get("archived", False), "disabled": data.get("disabled", False), "license": data.get("license", {}).get("name") if data.get("license") else None, } # Cache the result import time self._cache[cache_key] = {"data": stats, "timestamp": time.time()} logger.debug(f"Fetched GitHub stats for {repo_path}: {stats['stars']} stars") return stats else: return None except Exception as e: logger.error(f"Error fetching GitHub stats for {repo_path}: {e}") return None async def get_multiple_repo_stats( self, repo_paths: list[str], use_cache: bool = True, max_concurrent: int = 5 ) -> Dict[str, Optional[Dict[str, Any]]]: """Get statistics for multiple repositories concurrently. Args: repo_paths: List of repository paths in format "owner/repo" use_cache: Whether to use cached data if available max_concurrent: Maximum number of concurrent requests Returns: Dictionary mapping repo paths to their statistics """ semaphore = asyncio.Semaphore(max_concurrent) async def fetch_repo_stats(repo_path: str) -> tuple[str, Optional[Dict[str, Any]]]: async with semaphore: stats = await self.get_repository_stats(repo_path, use_cache) return repo_path, stats # Fetch all repositories concurrently tasks = [fetch_repo_stats(repo) for repo in repo_paths] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results repo_stats = {} for result in results: if isinstance(result, Exception): logger.error(f"Error in concurrent GitHub fetch: {result}") continue repo_path, stats = result repo_stats[repo_path] = stats return repo_stats def clear_cache(self): """Clear the internal cache.""" self._cache.clear() logger.debug("GitHub cache cleared") async def get_rate_limit(self) -> Optional[Dict[str, Any]]: """Get current GitHub API rate limit status. Returns: Dictionary containing rate limit information """ url = f"{self.base_url}/rate_limit" try: data = await self._make_request(url) if data: return data.get("rate", {}) return None except Exception as e: logger.error(f"Error fetching GitHub rate limit: {e}") return None