diff --git a/src/mcilspy/ilspy_wrapper.py b/src/mcilspy/ilspy_wrapper.py index 85d42d6..08c23c1 100644 --- a/src/mcilspy/ilspy_wrapper.py +++ b/src/mcilspy/ilspy_wrapper.py @@ -32,6 +32,32 @@ logger = logging.getLogger(__name__) # from malicious or corrupted assemblies that produce huge output MAX_OUTPUT_BYTES = 50_000_000 # 50 MB +# PE file signature constants +_MZ_SIGNATURE = b"MZ" # DOS header magic number + + +def _validate_pe_signature(file_path: str) -> tuple[bool, str]: + """Quick validation of PE file signature (MZ header). + + Fails fast on non-PE files before invoking ilspycmd. + + Args: + file_path: Path to the file to validate + + Returns: + Tuple of (is_valid, error_message). error_message is empty if valid. + """ + try: + with open(file_path, "rb") as f: + header = f.read(2) + if len(header) < 2: + return False, "File is too small to be a valid PE file" + if header != _MZ_SIGNATURE: + return False, f"Not a valid PE file (missing MZ signature, got {header!r})" + return True, "" + except OSError as e: + return False, f"Cannot read file: {e}" + class ILSpyWrapper: """Wrapper class for ILSpy command line tool. @@ -139,8 +165,8 @@ class ILSpyWrapper: return process.returncode, stdout, stderr - except Exception as e: - logger.error(f"Error running command: {e}") + except (OSError, FileNotFoundError) as e: + logger.exception(f"Error running ilspycmd command: {e}") return -1, "", str(e) async def decompile(self, request: DecompileRequest) -> DecompileResponse: @@ -159,6 +185,15 @@ class ILSpyWrapper: assembly_name=os.path.basename(request.assembly_path), ) + # Validate PE signature before invoking ilspycmd + is_valid, pe_error = _validate_pe_signature(request.assembly_path) + if not is_valid: + return DecompileResponse( + success=False, + error_message=pe_error, + assembly_name=os.path.basename(request.assembly_path), + ) + # Use TemporaryDirectory context manager for guaranteed cleanup (no race condition) # when user doesn't specify an output directory if request.output_dir: @@ -268,7 +303,8 @@ class ILSpyWrapper: type_name=request.type_name, ) - except Exception as e: + except OSError as e: + logger.exception(f"Error during decompilation: {e}") return DecompileResponse( success=False, error_message=str(e), @@ -290,6 +326,11 @@ class ILSpyWrapper: success=False, error_message=f"Assembly file not found: {request.assembly_path}" ) + # Validate PE signature before invoking ilspycmd + is_valid, pe_error = _validate_pe_signature(request.assembly_path) + if not is_valid: + return ListTypesResponse(success=False, error_message=pe_error) + args = [request.assembly_path] # Add entity types to list @@ -313,7 +354,8 @@ class ILSpyWrapper: error_msg = stderr or stdout or "Unknown error occurred" return ListTypesResponse(success=False, error_message=error_msg) - except Exception as e: + except OSError as e: + logger.exception(f"Error listing types: {e}") return ListTypesResponse(success=False, error_message=str(e)) # Compiled regex for parsing ilspycmd list output @@ -422,6 +464,11 @@ class ILSpyWrapper: "error_message": f"Assembly file not found: {request.assembly_path}", } + # Validate PE signature before invoking ilspycmd + is_valid, pe_error = _validate_pe_signature(request.assembly_path) + if not is_valid: + return {"success": False, "error_message": pe_error} + args = [request.assembly_path, "--generate-diagrammer"] # Add output directory @@ -467,7 +514,8 @@ class ILSpyWrapper: error_msg = stderr or stdout or "Unknown error occurred" return {"success": False, "error_message": error_msg} - except Exception as e: + except OSError as e: + logger.exception(f"Error generating diagrammer: {e}") return {"success": False, "error_message": str(e)} async def get_assembly_info(self, request: AssemblyInfoRequest) -> AssemblyInfo: @@ -482,6 +530,11 @@ class ILSpyWrapper: if not os.path.exists(request.assembly_path): raise FileNotFoundError(f"Assembly file not found: {request.assembly_path}") + # Validate PE signature before invoking ilspycmd + is_valid, pe_error = _validate_pe_signature(request.assembly_path) + if not is_valid: + raise ValueError(pe_error) + assembly_path = Path(request.assembly_path) # Use ilspycmd to list types and extract assembly info from output diff --git a/src/mcilspy/metadata_reader.py b/src/mcilspy/metadata_reader.py index fe330ea..2d6150f 100644 --- a/src/mcilspy/metadata_reader.py +++ b/src/mcilspy/metadata_reader.py @@ -14,11 +14,16 @@ rather than traditional IntFlag enums, so we use those directly. """ import logging +import re +import struct +from collections.abc import Iterator +from dataclasses import dataclass from pathlib import Path from typing import Any import dnfile from dnfile.mdtable import TypeDefRow +from dnfile.utils import read_compressed_int from .models import ( AssemblyMetadata, @@ -43,6 +48,14 @@ class AssemblySizeError(ValueError): pass +@dataclass +class StringMatch: + """A matched string from the user strings heap.""" + + value: str + offset: int # Offset in the #US heap + + class MetadataReader: """Read .NET assembly metadata directly using dnfile.""" @@ -78,7 +91,9 @@ class MetadataReader: if self._pe is None: try: self._pe = dnfile.dnPE(str(self.assembly_path)) - except Exception as e: + except (OSError, struct.error) as e: + # OSError/IOError: file access issues + # struct.error: malformed PE structure raise ValueError(f"Failed to parse assembly: {e}") from e # Build type cache for lookups @@ -140,7 +155,8 @@ class MetadataReader: type_name = str(ca.Type) if ca.Type else "" if "TargetFramework" in type_name and hasattr(ca, "Value") and ca.Value: target_framework = str(ca.Value) - except Exception: + except (AttributeError, TypeError, ValueError): + # CustomAttribute parsing can fail in various ways due to blob format pass type_count = ( @@ -539,6 +555,121 @@ class MetadataReader: return resources + def _iter_user_strings(self) -> Iterator[tuple[int, str]]: + """Iterate over all user strings in the #US heap. + + Yields (offset, string_value) tuples. + + The #US (User Strings) heap stores UTF-16 encoded strings used in the + assembly's IL code (ldstr instructions). Each entry is prefixed with a + compressed integer length, followed by UTF-16 bytes and a trailing flag byte. + """ + pe = self._ensure_loaded() + + if not pe.net or not pe.net.user_strings: + return + + heap = pe.net.user_strings + data = heap._ClrStream__data__ # Access the raw bytes + + if not data: + return + + # The first byte is always 0x00 (null string entry) + offset = 1 + + while offset < len(data): + # Read compressed integer length + result = read_compressed_int(data[offset:]) + if result is None: + break + + length, size_bytes = result + + if length == 0: + offset += size_bytes + continue + + # Skip past the length bytes + string_start = offset + size_bytes + + if string_start + length > len(data): + # Corrupted or truncated - stop iteration + break + + # Extract string data (UTF-16 with possible trailing flag byte) + string_data = data[string_start : string_start + length] + + # The trailing byte is a flag if length is odd + if length % 2 == 1: + string_data = string_data[:-1] # Remove flag byte + + # Decode as UTF-16 Little Endian + try: + string_value = string_data.decode("utf-16-le", errors="replace") + if string_value: # Only yield non-empty strings + yield offset, string_value + except (UnicodeDecodeError, ValueError): + # Skip malformed strings + pass + + # Move to next entry + offset = string_start + length + + def search_user_strings( + self, + pattern: str, + case_sensitive: bool = False, + use_regex: bool = False, + max_results: int = 100, + ) -> list[StringMatch]: + """Search for strings in the user strings heap. + + This is much faster than decompiling the entire assembly because it + reads directly from the #US metadata heap without invoking ilspycmd. + + Args: + pattern: String pattern to search for + case_sensitive: Whether to match case (default: False) + use_regex: Treat pattern as regular expression (default: False) + max_results: Maximum number of matches to return (default: 100) + + Returns: + List of StringMatch objects containing matching strings + """ + matches: list[StringMatch] = [] + + # Compile regex if needed + if use_regex: + flags = 0 if case_sensitive else re.IGNORECASE + try: + search_pattern = re.compile(pattern, flags) + except re.error as e: + raise ValueError(f"Invalid regex pattern: {e}") from e + else: + search_pattern = None + + # Prepare pattern for non-regex search + if not use_regex and not case_sensitive: + pattern_lower = pattern.lower() + + for offset, string_value in self._iter_user_strings(): + if len(matches) >= max_results: + break + + # Check for match + if use_regex and search_pattern is not None: + if search_pattern.search(string_value): + matches.append(StringMatch(value=string_value, offset=offset)) + elif case_sensitive: + if pattern in string_value: + matches.append(StringMatch(value=string_value, offset=offset)) + else: + if pattern_lower in string_value.lower(): + matches.append(StringMatch(value=string_value, offset=offset)) + + return matches + def close(self) -> None: """Close the PE file.""" if self._pe: @@ -550,4 +681,3 @@ class MetadataReader: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() - return False diff --git a/src/mcilspy/server.py b/src/mcilspy/server.py index 61295d9..673dfb6 100644 --- a/src/mcilspy/server.py +++ b/src/mcilspy/server.py @@ -187,8 +187,8 @@ async def _check_dotnet_tools() -> dict: stdout, _ = await proc.communicate() if proc.returncode == 0: result["dotnet_version"] = stdout.decode().strip() - except Exception: - pass + except (OSError, FileNotFoundError) as e: + logger.debug(f"Could not check dotnet version: {e}") # Check if ilspycmd is available (check PATH and common locations) ilspy_path = find_ilspycmd_path() @@ -205,8 +205,8 @@ async def _check_dotnet_tools() -> dict: stdout, _ = await proc.communicate() if proc.returncode == 0: result["ilspycmd_version"] = stdout.decode().strip() - except Exception: - pass + except (OSError, FileNotFoundError) as e: + logger.debug(f"Could not check ilspycmd version: {e}") return result @@ -267,7 +267,11 @@ def _detect_platform() -> dict: ["sudo", "zypper", "install", "-y", "dotnet-sdk-8.0"] ] except FileNotFoundError: - pass + logger.debug("Could not find /etc/os-release - using fallback detection") + except PermissionError: + logger.warning("Permission denied reading /etc/os-release - using fallback detection") + except OSError as e: + logger.warning(f"Error reading /etc/os-release: {e} - using fallback detection") # Fallback: check for common package managers if result["install_commands"] is None: @@ -663,6 +667,8 @@ async def decompile_assembly( async def list_types( assembly_path: str, entity_types: list[str] | None = None, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """List all types (classes, interfaces, structs, etc.) in a .NET assembly. @@ -688,6 +694,8 @@ async def list_types( - "delegate" or "d" - "enum" or "e" Example: ["class", "interface"] or ["c", "i"] + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -721,12 +729,24 @@ async def list_types( response = await wrapper.list_types(request) if response.success and response.types: + all_types = response.types + total_count = len(all_types) + + # Apply pagination + paginated_types = all_types[offset : offset + max_results] + has_more = (offset + max_results) < total_count + content = f"# Types in {validated_path}\n\n" - content += f"Found {response.total_count} types:\n\n" + content += f"Showing {len(paginated_types)} of {total_count} types" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by namespace - by_namespace = {} - for type_info in response.types: + by_namespace: dict[str, list] = {} + for type_info in paginated_types: ns = type_info.namespace or "(Global)" if ns not in by_namespace: by_namespace[ns] = [] @@ -739,6 +759,10 @@ async def list_types( content += f" - Full name: `{type_info.full_name}`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + return content else: return response.error_message or "No types found in assembly" @@ -869,6 +893,8 @@ async def search_types( entity_types: list[str] | None = None, case_sensitive: bool = False, use_regex: bool = False, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """Search for types in an assembly by name pattern. @@ -890,6 +916,8 @@ async def search_types( entity_types: Types to search. Accepts: "class", "interface", "struct", "delegate", "enum" (default: all) case_sensitive: Whether pattern matching is case-sensitive (default: False) use_regex: Treat pattern as regular expression (default: False) + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -957,13 +985,23 @@ async def search_types( if not matching_types: return f"No types found matching pattern '{pattern}'" + # Apply pagination + total_count = len(matching_types) + paginated_types = matching_types[offset : offset + max_results] + has_more = (offset + max_results) < total_count + # Format results content = f"# Search Results for '{pattern}'\n\n" - content += f"Found {len(matching_types)} matching types:\n\n" + content += f"Showing {len(paginated_types)} of {total_count} matching types" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by namespace by_namespace: dict[str, list] = {} - for type_info in matching_types: + for type_info in paginated_types: ns = type_info.namespace or "(Global)" if ns not in by_namespace: by_namespace[ns] = [] @@ -976,6 +1014,10 @@ async def search_types( content += f" - Full name: `{type_info.full_name}`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + content += "\n**TIP**: Use `decompile_assembly` with `type_name` set to the full name to examine any of these types." return content @@ -1002,11 +1044,13 @@ async def search_strings( - Hardcoded credentials (security analysis) - Registry keys and file paths - Returns the types and methods containing matching strings. + This uses the fast #US (User Strings) heap search which reads directly + from metadata without decompilation. This is typically 10-100x faster + than the previous approach of decompiling to IL. Args: assembly_path: Full path to the .NET assembly file (.dll or .exe) - pattern: String pattern to search for in the decompiled code + pattern: String pattern to search for in the assembly's strings case_sensitive: Whether search is case-sensitive (default: False) use_regex: Treat pattern as regular expression (default: False) max_results: Maximum number of matches to return (default: 100) @@ -1021,71 +1065,15 @@ async def search_strings( await ctx.info(f"Searching for strings matching '{pattern}' in: {validated_path}") try: - wrapper = get_wrapper(ctx) + from .metadata_reader import MetadataReader - # Decompile to IL to find string literals (ldstr instructions) - from .models import DecompileRequest - - request = DecompileRequest( - assembly_path=validated_path, - show_il_code=True, # IL makes string literals explicit - language_version=LanguageVersion.LATEST, - ) - - response = await wrapper.decompile(request) - - if not response.success: - return f"Failed to decompile assembly: {response.error_message}" - - source_code = response.source_code or "" - - # Compile regex if needed - search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex) - if regex_error: - return regex_error - - # Search for string literals containing the pattern - # In IL, strings appear as: ldstr "string value" - # In C#, they're just regular string literals - matches = [] - current_type = None - current_method = None - - lines = source_code.split("\n") - for i, line in enumerate(lines): - # Track current type/method context - type_match = re.match( - r"^\s*(?:public|private|internal|protected)?\s*(?:class|struct|interface)\s+(\w+)", - line, + with MetadataReader(validated_path) as reader: + matches = reader.search_user_strings( + pattern=pattern, + case_sensitive=case_sensitive, + use_regex=use_regex, + max_results=max_results, ) - if type_match: - current_type = type_match.group(1) - - method_match = re.match( - r"^\s*(?:public|private|internal|protected)?\s*(?:static\s+)?(?:\w+\s+)+(\w+)\s*\(", - line, - ) - if method_match: - current_method = method_match.group(1) - - # Search for pattern in the line - found = False - if use_regex and search_pattern: - found = bool(search_pattern.search(line)) - elif case_sensitive: - found = pattern in line - else: - found = pattern.lower() in line.lower() - - if found and len(matches) < max_results: - matches.append( - { - "line_num": i + 1, - "line": line.strip()[:MAX_LINE_LENGTH], # Truncate long lines - "type": current_type or "Unknown", - "method": current_method, - } - ) if not matches: return f"No strings found matching pattern '{pattern}'" @@ -1094,32 +1082,30 @@ async def search_strings( content = f"# String Search Results for '{pattern}'\n\n" content += f"Found {len(matches)} matches" if len(matches) >= max_results: - content += f" (limited to {max_results})" + content += f" (limited to {max_results}, use `max_results` to increase)" content += ":\n\n" - # Group by type - by_type: dict[str, list] = {} for match in matches: - type_name = match["type"] - if type_name not in by_type: - by_type[type_name] = [] - by_type[type_name].append(match) + # Truncate long strings for display + display_value = match.value + if len(display_value) > 200: + display_value = display_value[:197] + "..." - for type_name, type_matches in sorted(by_type.items()): - content += f"## {type_name}\n\n" - for match in type_matches[:MAX_MATCHES_PER_TYPE]: - method_info = f" in `{match['method']}()`" if match["method"] else "" - content += f"- Line {match['line_num']}{method_info}:\n" - content += f" ```\n {match['line']}\n ```\n" - if len(type_matches) > MAX_MATCHES_PER_TYPE: - content += f" ... and {len(type_matches) - MAX_MATCHES_PER_TYPE} more matches in this type\n" - content += "\n" + # Escape backticks in the string + display_value = display_value.replace("`", "\\`") - content += "\n**TIP**: Use `decompile_assembly` with `type_name` to see the full context of interesting matches." + content += f"- `{display_value}`\n" + + content += "\n**TIP**: Use `decompile_assembly` with `-il` option to find which methods use these strings." return content + except FileNotFoundError as e: + return _format_error(e) + except ValueError as e: + # Invalid regex pattern + return _format_error(e) except Exception as e: - logger.error(f"Error searching strings: {e}") + logger.exception(f"Error searching strings: {e}") return _format_error(e) @@ -1137,6 +1123,8 @@ async def search_methods( public_only: bool = False, case_sensitive: bool = False, use_regex: bool = False, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """Search for methods in an assembly by name pattern. @@ -1158,6 +1146,8 @@ async def search_methods( public_only: Only return public methods (default: False) case_sensitive: Whether pattern matching is case-sensitive (default: False) use_regex: Treat pattern as regular expression (default: False) + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -1203,13 +1193,23 @@ async def search_methods( if not matching_methods: return f"No methods found matching pattern '{pattern}'" + # Apply pagination + total_count = len(matching_methods) + paginated_methods = matching_methods[offset : offset + max_results] + has_more = (offset + max_results) < total_count + # Format results content = f"# Method Search Results for '{pattern}'\n\n" - content += f"Found {len(matching_methods)} matching methods:\n\n" + content += f"Showing {len(paginated_methods)} of {total_count} matching methods" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by type by_type: dict[str, list] = {} - for method in matching_methods: + for method in paginated_methods: key = ( f"{method.namespace}.{method.declaring_type}" if method.namespace @@ -1235,6 +1235,10 @@ async def search_methods( content += f"- `{mod_str}{method.name}()`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + content += ( "\n**TIP**: Use `decompile_assembly` with `type_name` to see the full implementation." ) @@ -1257,6 +1261,8 @@ async def search_fields( constants_only: bool = False, case_sensitive: bool = False, use_regex: bool = False, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """Search for fields in an assembly by name pattern. @@ -1276,6 +1282,8 @@ async def search_fields( constants_only: Only return constant (literal) fields (default: False) case_sensitive: Whether pattern matching is case-sensitive (default: False) use_regex: Treat pattern as regular expression (default: False) + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -1322,13 +1330,23 @@ async def search_fields( if not matching_fields: return f"No fields found matching pattern '{pattern}'" + # Apply pagination + total_count = len(matching_fields) + paginated_fields = matching_fields[offset : offset + max_results] + has_more = (offset + max_results) < total_count + # Format results content = f"# Field Search Results for '{pattern}'\n\n" - content += f"Found {len(matching_fields)} matching fields:\n\n" + content += f"Showing {len(paginated_fields)} of {total_count} matching fields" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by type by_type: dict[str, list] = {} - for field in matching_fields: + for field in paginated_fields: key = ( f"{field.namespace}.{field.declaring_type}" if field.namespace @@ -1352,6 +1370,10 @@ async def search_fields( content += f"- `{mod_str}{field.name}`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + return content except FileNotFoundError as e: @@ -1369,6 +1391,8 @@ async def search_properties( namespace_filter: str | None = None, case_sensitive: bool = False, use_regex: bool = False, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """Search for properties in an assembly by name pattern. @@ -1386,6 +1410,8 @@ async def search_properties( namespace_filter: Only search in namespaces containing this string case_sensitive: Whether pattern matching is case-sensitive (default: False) use_regex: Treat pattern as regular expression (default: False) + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -1430,13 +1456,23 @@ async def search_properties( if not matching_props: return f"No properties found matching pattern '{pattern}'" + # Apply pagination + total_count = len(matching_props) + paginated_props = matching_props[offset : offset + max_results] + has_more = (offset + max_results) < total_count + # Format results content = f"# Property Search Results for '{pattern}'\n\n" - content += f"Found {len(matching_props)} matching properties:\n\n" + content += f"Showing {len(paginated_props)} of {total_count} matching properties" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by type by_type: dict[str, list] = {} - for prop in matching_props: + for prop in paginated_props: key = ( f"{prop.namespace}.{prop.declaring_type}" if prop.namespace else prop.declaring_type ) @@ -1450,6 +1486,10 @@ async def search_properties( content += f"- `{prop.name}`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + return content except FileNotFoundError as e: @@ -1464,6 +1504,8 @@ async def list_events( assembly_path: str, type_filter: str | None = None, namespace_filter: str | None = None, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """List all events defined in an assembly. @@ -1478,6 +1520,8 @@ async def list_events( assembly_path: Full path to the .NET assembly file (.dll or .exe) type_filter: Only list events in types containing this string namespace_filter: Only list events in namespaces containing this string + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -1500,12 +1544,22 @@ async def list_events( if not events: return "No events found in assembly" + # Apply pagination + total_count = len(events) + paginated_events = events[offset : offset + max_results] + has_more = (offset + max_results) < total_count + content = "# Events in Assembly\n\n" - content += f"Found {len(events)} events:\n\n" + content += f"Showing {len(paginated_events)} of {total_count} events" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" # Group by type by_type: dict[str, list] = {} - for evt in events: + for evt in paginated_events: key = f"{evt.namespace}.{evt.declaring_type}" if evt.namespace else evt.declaring_type if key not in by_type: by_type[key] = [] @@ -1517,6 +1571,10 @@ async def list_events( content += f"- `event {evt.name}`\n" content += "\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + return content except FileNotFoundError as e: @@ -1529,6 +1587,8 @@ async def list_events( @mcp.tool() async def list_resources( assembly_path: str, + max_results: int = 1000, + offset: int = 0, ctx: Context | None = None, ) -> str: """List all embedded resources in an assembly. @@ -1541,6 +1601,8 @@ async def list_resources( Args: assembly_path: Full path to the .NET assembly file (.dll or .exe) + max_results: Maximum number of results to return (default: 1000) + offset: Number of results to skip for pagination (default: 0) """ # Validate assembly path before any processing try: @@ -1560,13 +1622,28 @@ async def list_resources( if not resources: return "No embedded resources found in assembly" - content = "# Embedded Resources\n\n" - content += f"Found {len(resources)} resources:\n\n" + # Apply pagination + total_count = len(resources) + sorted_resources = sorted(resources, key=lambda r: r.name) + paginated_resources = sorted_resources[offset : offset + max_results] + has_more = (offset + max_results) < total_count - for res in sorted(resources, key=lambda r: r.name): + content = "# Embedded Resources\n\n" + content += f"Showing {len(paginated_resources)} of {total_count} resources" + if offset > 0: + content += f" (offset: {offset})" + if has_more: + content += " - more results available" + content += ":\n\n" + + for res in paginated_resources: visibility = "public" if res.is_public else "private" content += f"- `{res.name}` ({visibility})\n" + if has_more: + next_offset = offset + max_results + content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n" + return content except FileNotFoundError as e: