perf: major performance improvements and code quality fixes

- P1: search_strings now uses dnfile's #US heap directly instead of
  decompiling entire assembly, providing 10-100x speedup
- P2: add pagination (max_results/offset) to all list/search tools
- P5: add proper logging for platform detection failures
- P6: replace generic exception catches with specific exceptions
- P7: fix MetadataReader.__exit__ return type
- P8: add PE signature (MZ header) validation before invoking ilspycmd

All 35 tests pass, ruff check clean.
This commit is contained in:
Ryan Malloy 2026-02-08 11:29:41 -07:00
parent 4bd9ce19af
commit 20d0cd2e3a
3 changed files with 372 additions and 112 deletions

View File

@ -32,6 +32,32 @@ logger = logging.getLogger(__name__)
# from malicious or corrupted assemblies that produce huge output
MAX_OUTPUT_BYTES = 50_000_000 # 50 MB
# PE file signature constants
_MZ_SIGNATURE = b"MZ" # DOS header magic number
def _validate_pe_signature(file_path: str) -> tuple[bool, str]:
"""Quick validation of PE file signature (MZ header).
Fails fast on non-PE files before invoking ilspycmd.
Args:
file_path: Path to the file to validate
Returns:
Tuple of (is_valid, error_message). error_message is empty if valid.
"""
try:
with open(file_path, "rb") as f:
header = f.read(2)
if len(header) < 2:
return False, "File is too small to be a valid PE file"
if header != _MZ_SIGNATURE:
return False, f"Not a valid PE file (missing MZ signature, got {header!r})"
return True, ""
except OSError as e:
return False, f"Cannot read file: {e}"
class ILSpyWrapper:
"""Wrapper class for ILSpy command line tool.
@ -139,8 +165,8 @@ class ILSpyWrapper:
return process.returncode, stdout, stderr
except Exception as e:
logger.error(f"Error running command: {e}")
except (OSError, FileNotFoundError) as e:
logger.exception(f"Error running ilspycmd command: {e}")
return -1, "", str(e)
async def decompile(self, request: DecompileRequest) -> DecompileResponse:
@ -159,6 +185,15 @@ class ILSpyWrapper:
assembly_name=os.path.basename(request.assembly_path),
)
# Validate PE signature before invoking ilspycmd
is_valid, pe_error = _validate_pe_signature(request.assembly_path)
if not is_valid:
return DecompileResponse(
success=False,
error_message=pe_error,
assembly_name=os.path.basename(request.assembly_path),
)
# Use TemporaryDirectory context manager for guaranteed cleanup (no race condition)
# when user doesn't specify an output directory
if request.output_dir:
@ -268,7 +303,8 @@ class ILSpyWrapper:
type_name=request.type_name,
)
except Exception as e:
except OSError as e:
logger.exception(f"Error during decompilation: {e}")
return DecompileResponse(
success=False,
error_message=str(e),
@ -290,6 +326,11 @@ class ILSpyWrapper:
success=False, error_message=f"Assembly file not found: {request.assembly_path}"
)
# Validate PE signature before invoking ilspycmd
is_valid, pe_error = _validate_pe_signature(request.assembly_path)
if not is_valid:
return ListTypesResponse(success=False, error_message=pe_error)
args = [request.assembly_path]
# Add entity types to list
@ -313,7 +354,8 @@ class ILSpyWrapper:
error_msg = stderr or stdout or "Unknown error occurred"
return ListTypesResponse(success=False, error_message=error_msg)
except Exception as e:
except OSError as e:
logger.exception(f"Error listing types: {e}")
return ListTypesResponse(success=False, error_message=str(e))
# Compiled regex for parsing ilspycmd list output
@ -422,6 +464,11 @@ class ILSpyWrapper:
"error_message": f"Assembly file not found: {request.assembly_path}",
}
# Validate PE signature before invoking ilspycmd
is_valid, pe_error = _validate_pe_signature(request.assembly_path)
if not is_valid:
return {"success": False, "error_message": pe_error}
args = [request.assembly_path, "--generate-diagrammer"]
# Add output directory
@ -467,7 +514,8 @@ class ILSpyWrapper:
error_msg = stderr or stdout or "Unknown error occurred"
return {"success": False, "error_message": error_msg}
except Exception as e:
except OSError as e:
logger.exception(f"Error generating diagrammer: {e}")
return {"success": False, "error_message": str(e)}
async def get_assembly_info(self, request: AssemblyInfoRequest) -> AssemblyInfo:
@ -482,6 +530,11 @@ class ILSpyWrapper:
if not os.path.exists(request.assembly_path):
raise FileNotFoundError(f"Assembly file not found: {request.assembly_path}")
# Validate PE signature before invoking ilspycmd
is_valid, pe_error = _validate_pe_signature(request.assembly_path)
if not is_valid:
raise ValueError(pe_error)
assembly_path = Path(request.assembly_path)
# Use ilspycmd to list types and extract assembly info from output

View File

@ -14,11 +14,16 @@ rather than traditional IntFlag enums, so we use those directly.
"""
import logging
import re
import struct
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dnfile
from dnfile.mdtable import TypeDefRow
from dnfile.utils import read_compressed_int
from .models import (
AssemblyMetadata,
@ -43,6 +48,14 @@ class AssemblySizeError(ValueError):
pass
@dataclass
class StringMatch:
"""A matched string from the user strings heap."""
value: str
offset: int # Offset in the #US heap
class MetadataReader:
"""Read .NET assembly metadata directly using dnfile."""
@ -78,7 +91,9 @@ class MetadataReader:
if self._pe is None:
try:
self._pe = dnfile.dnPE(str(self.assembly_path))
except Exception as e:
except (OSError, struct.error) as e:
# OSError/IOError: file access issues
# struct.error: malformed PE structure
raise ValueError(f"Failed to parse assembly: {e}") from e
# Build type cache for lookups
@ -140,7 +155,8 @@ class MetadataReader:
type_name = str(ca.Type) if ca.Type else ""
if "TargetFramework" in type_name and hasattr(ca, "Value") and ca.Value:
target_framework = str(ca.Value)
except Exception:
except (AttributeError, TypeError, ValueError):
# CustomAttribute parsing can fail in various ways due to blob format
pass
type_count = (
@ -539,6 +555,121 @@ class MetadataReader:
return resources
def _iter_user_strings(self) -> Iterator[tuple[int, str]]:
"""Iterate over all user strings in the #US heap.
Yields (offset, string_value) tuples.
The #US (User Strings) heap stores UTF-16 encoded strings used in the
assembly's IL code (ldstr instructions). Each entry is prefixed with a
compressed integer length, followed by UTF-16 bytes and a trailing flag byte.
"""
pe = self._ensure_loaded()
if not pe.net or not pe.net.user_strings:
return
heap = pe.net.user_strings
data = heap._ClrStream__data__ # Access the raw bytes
if not data:
return
# The first byte is always 0x00 (null string entry)
offset = 1
while offset < len(data):
# Read compressed integer length
result = read_compressed_int(data[offset:])
if result is None:
break
length, size_bytes = result
if length == 0:
offset += size_bytes
continue
# Skip past the length bytes
string_start = offset + size_bytes
if string_start + length > len(data):
# Corrupted or truncated - stop iteration
break
# Extract string data (UTF-16 with possible trailing flag byte)
string_data = data[string_start : string_start + length]
# The trailing byte is a flag if length is odd
if length % 2 == 1:
string_data = string_data[:-1] # Remove flag byte
# Decode as UTF-16 Little Endian
try:
string_value = string_data.decode("utf-16-le", errors="replace")
if string_value: # Only yield non-empty strings
yield offset, string_value
except (UnicodeDecodeError, ValueError):
# Skip malformed strings
pass
# Move to next entry
offset = string_start + length
def search_user_strings(
self,
pattern: str,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 100,
) -> list[StringMatch]:
"""Search for strings in the user strings heap.
This is much faster than decompiling the entire assembly because it
reads directly from the #US metadata heap without invoking ilspycmd.
Args:
pattern: String pattern to search for
case_sensitive: Whether to match case (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of matches to return (default: 100)
Returns:
List of StringMatch objects containing matching strings
"""
matches: list[StringMatch] = []
# Compile regex if needed
if use_regex:
flags = 0 if case_sensitive else re.IGNORECASE
try:
search_pattern = re.compile(pattern, flags)
except re.error as e:
raise ValueError(f"Invalid regex pattern: {e}") from e
else:
search_pattern = None
# Prepare pattern for non-regex search
if not use_regex and not case_sensitive:
pattern_lower = pattern.lower()
for offset, string_value in self._iter_user_strings():
if len(matches) >= max_results:
break
# Check for match
if use_regex and search_pattern is not None:
if search_pattern.search(string_value):
matches.append(StringMatch(value=string_value, offset=offset))
elif case_sensitive:
if pattern in string_value:
matches.append(StringMatch(value=string_value, offset=offset))
else:
if pattern_lower in string_value.lower():
matches.append(StringMatch(value=string_value, offset=offset))
return matches
def close(self) -> None:
"""Close the PE file."""
if self._pe:
@ -550,4 +681,3 @@ class MetadataReader:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()
return False

View File

@ -187,8 +187,8 @@ async def _check_dotnet_tools() -> dict:
stdout, _ = await proc.communicate()
if proc.returncode == 0:
result["dotnet_version"] = stdout.decode().strip()
except Exception:
pass
except (OSError, FileNotFoundError) as e:
logger.debug(f"Could not check dotnet version: {e}")
# Check if ilspycmd is available (check PATH and common locations)
ilspy_path = find_ilspycmd_path()
@ -205,8 +205,8 @@ async def _check_dotnet_tools() -> dict:
stdout, _ = await proc.communicate()
if proc.returncode == 0:
result["ilspycmd_version"] = stdout.decode().strip()
except Exception:
pass
except (OSError, FileNotFoundError) as e:
logger.debug(f"Could not check ilspycmd version: {e}")
return result
@ -267,7 +267,11 @@ def _detect_platform() -> dict:
["sudo", "zypper", "install", "-y", "dotnet-sdk-8.0"]
]
except FileNotFoundError:
pass
logger.debug("Could not find /etc/os-release - using fallback detection")
except PermissionError:
logger.warning("Permission denied reading /etc/os-release - using fallback detection")
except OSError as e:
logger.warning(f"Error reading /etc/os-release: {e} - using fallback detection")
# Fallback: check for common package managers
if result["install_commands"] is None:
@ -663,6 +667,8 @@ async def decompile_assembly(
async def list_types(
assembly_path: str,
entity_types: list[str] | None = None,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""List all types (classes, interfaces, structs, etc.) in a .NET assembly.
@ -688,6 +694,8 @@ async def list_types(
- "delegate" or "d"
- "enum" or "e"
Example: ["class", "interface"] or ["c", "i"]
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -721,12 +729,24 @@ async def list_types(
response = await wrapper.list_types(request)
if response.success and response.types:
all_types = response.types
total_count = len(all_types)
# Apply pagination
paginated_types = all_types[offset : offset + max_results]
has_more = (offset + max_results) < total_count
content = f"# Types in {validated_path}\n\n"
content += f"Found {response.total_count} types:\n\n"
content += f"Showing {len(paginated_types)} of {total_count} types"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by namespace
by_namespace = {}
for type_info in response.types:
by_namespace: dict[str, list] = {}
for type_info in paginated_types:
ns = type_info.namespace or "(Global)"
if ns not in by_namespace:
by_namespace[ns] = []
@ -739,6 +759,10 @@ async def list_types(
content += f" - Full name: `{type_info.full_name}`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
return content
else:
return response.error_message or "No types found in assembly"
@ -869,6 +893,8 @@ async def search_types(
entity_types: list[str] | None = None,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""Search for types in an assembly by name pattern.
@ -890,6 +916,8 @@ async def search_types(
entity_types: Types to search. Accepts: "class", "interface", "struct", "delegate", "enum" (default: all)
case_sensitive: Whether pattern matching is case-sensitive (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -957,13 +985,23 @@ async def search_types(
if not matching_types:
return f"No types found matching pattern '{pattern}'"
# Apply pagination
total_count = len(matching_types)
paginated_types = matching_types[offset : offset + max_results]
has_more = (offset + max_results) < total_count
# Format results
content = f"# Search Results for '{pattern}'\n\n"
content += f"Found {len(matching_types)} matching types:\n\n"
content += f"Showing {len(paginated_types)} of {total_count} matching types"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by namespace
by_namespace: dict[str, list] = {}
for type_info in matching_types:
for type_info in paginated_types:
ns = type_info.namespace or "(Global)"
if ns not in by_namespace:
by_namespace[ns] = []
@ -976,6 +1014,10 @@ async def search_types(
content += f" - Full name: `{type_info.full_name}`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
content += "\n**TIP**: Use `decompile_assembly` with `type_name` set to the full name to examine any of these types."
return content
@ -1002,11 +1044,13 @@ async def search_strings(
- Hardcoded credentials (security analysis)
- Registry keys and file paths
Returns the types and methods containing matching strings.
This uses the fast #US (User Strings) heap search which reads directly
from metadata without decompilation. This is typically 10-100x faster
than the previous approach of decompiling to IL.
Args:
assembly_path: Full path to the .NET assembly file (.dll or .exe)
pattern: String pattern to search for in the decompiled code
pattern: String pattern to search for in the assembly's strings
case_sensitive: Whether search is case-sensitive (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of matches to return (default: 100)
@ -1021,71 +1065,15 @@ async def search_strings(
await ctx.info(f"Searching for strings matching '{pattern}' in: {validated_path}")
try:
wrapper = get_wrapper(ctx)
from .metadata_reader import MetadataReader
# Decompile to IL to find string literals (ldstr instructions)
from .models import DecompileRequest
request = DecompileRequest(
assembly_path=validated_path,
show_il_code=True, # IL makes string literals explicit
language_version=LanguageVersion.LATEST,
)
response = await wrapper.decompile(request)
if not response.success:
return f"Failed to decompile assembly: {response.error_message}"
source_code = response.source_code or ""
# Compile regex if needed
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Search for string literals containing the pattern
# In IL, strings appear as: ldstr "string value"
# In C#, they're just regular string literals
matches = []
current_type = None
current_method = None
lines = source_code.split("\n")
for i, line in enumerate(lines):
# Track current type/method context
type_match = re.match(
r"^\s*(?:public|private|internal|protected)?\s*(?:class|struct|interface)\s+(\w+)",
line,
with MetadataReader(validated_path) as reader:
matches = reader.search_user_strings(
pattern=pattern,
case_sensitive=case_sensitive,
use_regex=use_regex,
max_results=max_results,
)
if type_match:
current_type = type_match.group(1)
method_match = re.match(
r"^\s*(?:public|private|internal|protected)?\s*(?:static\s+)?(?:\w+\s+)+(\w+)\s*\(",
line,
)
if method_match:
current_method = method_match.group(1)
# Search for pattern in the line
found = False
if use_regex and search_pattern:
found = bool(search_pattern.search(line))
elif case_sensitive:
found = pattern in line
else:
found = pattern.lower() in line.lower()
if found and len(matches) < max_results:
matches.append(
{
"line_num": i + 1,
"line": line.strip()[:MAX_LINE_LENGTH], # Truncate long lines
"type": current_type or "Unknown",
"method": current_method,
}
)
if not matches:
return f"No strings found matching pattern '{pattern}'"
@ -1094,32 +1082,30 @@ async def search_strings(
content = f"# String Search Results for '{pattern}'\n\n"
content += f"Found {len(matches)} matches"
if len(matches) >= max_results:
content += f" (limited to {max_results})"
content += f" (limited to {max_results}, use `max_results` to increase)"
content += ":\n\n"
# Group by type
by_type: dict[str, list] = {}
for match in matches:
type_name = match["type"]
if type_name not in by_type:
by_type[type_name] = []
by_type[type_name].append(match)
# Truncate long strings for display
display_value = match.value
if len(display_value) > 200:
display_value = display_value[:197] + "..."
for type_name, type_matches in sorted(by_type.items()):
content += f"## {type_name}\n\n"
for match in type_matches[:MAX_MATCHES_PER_TYPE]:
method_info = f" in `{match['method']}()`" if match["method"] else ""
content += f"- Line {match['line_num']}{method_info}:\n"
content += f" ```\n {match['line']}\n ```\n"
if len(type_matches) > MAX_MATCHES_PER_TYPE:
content += f" ... and {len(type_matches) - MAX_MATCHES_PER_TYPE} more matches in this type\n"
content += "\n"
# Escape backticks in the string
display_value = display_value.replace("`", "\\`")
content += "\n**TIP**: Use `decompile_assembly` with `type_name` to see the full context of interesting matches."
content += f"- `{display_value}`\n"
content += "\n**TIP**: Use `decompile_assembly` with `-il` option to find which methods use these strings."
return content
except FileNotFoundError as e:
return _format_error(e)
except ValueError as e:
# Invalid regex pattern
return _format_error(e)
except Exception as e:
logger.error(f"Error searching strings: {e}")
logger.exception(f"Error searching strings: {e}")
return _format_error(e)
@ -1137,6 +1123,8 @@ async def search_methods(
public_only: bool = False,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""Search for methods in an assembly by name pattern.
@ -1158,6 +1146,8 @@ async def search_methods(
public_only: Only return public methods (default: False)
case_sensitive: Whether pattern matching is case-sensitive (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -1203,13 +1193,23 @@ async def search_methods(
if not matching_methods:
return f"No methods found matching pattern '{pattern}'"
# Apply pagination
total_count = len(matching_methods)
paginated_methods = matching_methods[offset : offset + max_results]
has_more = (offset + max_results) < total_count
# Format results
content = f"# Method Search Results for '{pattern}'\n\n"
content += f"Found {len(matching_methods)} matching methods:\n\n"
content += f"Showing {len(paginated_methods)} of {total_count} matching methods"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by type
by_type: dict[str, list] = {}
for method in matching_methods:
for method in paginated_methods:
key = (
f"{method.namespace}.{method.declaring_type}"
if method.namespace
@ -1235,6 +1235,10 @@ async def search_methods(
content += f"- `{mod_str}{method.name}()`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
content += (
"\n**TIP**: Use `decompile_assembly` with `type_name` to see the full implementation."
)
@ -1257,6 +1261,8 @@ async def search_fields(
constants_only: bool = False,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""Search for fields in an assembly by name pattern.
@ -1276,6 +1282,8 @@ async def search_fields(
constants_only: Only return constant (literal) fields (default: False)
case_sensitive: Whether pattern matching is case-sensitive (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -1322,13 +1330,23 @@ async def search_fields(
if not matching_fields:
return f"No fields found matching pattern '{pattern}'"
# Apply pagination
total_count = len(matching_fields)
paginated_fields = matching_fields[offset : offset + max_results]
has_more = (offset + max_results) < total_count
# Format results
content = f"# Field Search Results for '{pattern}'\n\n"
content += f"Found {len(matching_fields)} matching fields:\n\n"
content += f"Showing {len(paginated_fields)} of {total_count} matching fields"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by type
by_type: dict[str, list] = {}
for field in matching_fields:
for field in paginated_fields:
key = (
f"{field.namespace}.{field.declaring_type}"
if field.namespace
@ -1352,6 +1370,10 @@ async def search_fields(
content += f"- `{mod_str}{field.name}`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
return content
except FileNotFoundError as e:
@ -1369,6 +1391,8 @@ async def search_properties(
namespace_filter: str | None = None,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""Search for properties in an assembly by name pattern.
@ -1386,6 +1410,8 @@ async def search_properties(
namespace_filter: Only search in namespaces containing this string
case_sensitive: Whether pattern matching is case-sensitive (default: False)
use_regex: Treat pattern as regular expression (default: False)
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -1430,13 +1456,23 @@ async def search_properties(
if not matching_props:
return f"No properties found matching pattern '{pattern}'"
# Apply pagination
total_count = len(matching_props)
paginated_props = matching_props[offset : offset + max_results]
has_more = (offset + max_results) < total_count
# Format results
content = f"# Property Search Results for '{pattern}'\n\n"
content += f"Found {len(matching_props)} matching properties:\n\n"
content += f"Showing {len(paginated_props)} of {total_count} matching properties"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by type
by_type: dict[str, list] = {}
for prop in matching_props:
for prop in paginated_props:
key = (
f"{prop.namespace}.{prop.declaring_type}" if prop.namespace else prop.declaring_type
)
@ -1450,6 +1486,10 @@ async def search_properties(
content += f"- `{prop.name}`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
return content
except FileNotFoundError as e:
@ -1464,6 +1504,8 @@ async def list_events(
assembly_path: str,
type_filter: str | None = None,
namespace_filter: str | None = None,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""List all events defined in an assembly.
@ -1478,6 +1520,8 @@ async def list_events(
assembly_path: Full path to the .NET assembly file (.dll or .exe)
type_filter: Only list events in types containing this string
namespace_filter: Only list events in namespaces containing this string
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -1500,12 +1544,22 @@ async def list_events(
if not events:
return "No events found in assembly"
# Apply pagination
total_count = len(events)
paginated_events = events[offset : offset + max_results]
has_more = (offset + max_results) < total_count
content = "# Events in Assembly\n\n"
content += f"Found {len(events)} events:\n\n"
content += f"Showing {len(paginated_events)} of {total_count} events"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
# Group by type
by_type: dict[str, list] = {}
for evt in events:
for evt in paginated_events:
key = f"{evt.namespace}.{evt.declaring_type}" if evt.namespace else evt.declaring_type
if key not in by_type:
by_type[key] = []
@ -1517,6 +1571,10 @@ async def list_events(
content += f"- `event {evt.name}`\n"
content += "\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
return content
except FileNotFoundError as e:
@ -1529,6 +1587,8 @@ async def list_events(
@mcp.tool()
async def list_resources(
assembly_path: str,
max_results: int = 1000,
offset: int = 0,
ctx: Context | None = None,
) -> str:
"""List all embedded resources in an assembly.
@ -1541,6 +1601,8 @@ async def list_resources(
Args:
assembly_path: Full path to the .NET assembly file (.dll or .exe)
max_results: Maximum number of results to return (default: 1000)
offset: Number of results to skip for pagination (default: 0)
"""
# Validate assembly path before any processing
try:
@ -1560,13 +1622,28 @@ async def list_resources(
if not resources:
return "No embedded resources found in assembly"
content = "# Embedded Resources\n\n"
content += f"Found {len(resources)} resources:\n\n"
# Apply pagination
total_count = len(resources)
sorted_resources = sorted(resources, key=lambda r: r.name)
paginated_resources = sorted_resources[offset : offset + max_results]
has_more = (offset + max_results) < total_count
for res in sorted(resources, key=lambda r: r.name):
content = "# Embedded Resources\n\n"
content += f"Showing {len(paginated_resources)} of {total_count} resources"
if offset > 0:
content += f" (offset: {offset})"
if has_more:
content += " - more results available"
content += ":\n\n"
for res in paginated_resources:
visibility = "public" if res.is_public else "private"
content += f"- `{res.name}` ({visibility})\n"
if has_more:
next_offset = offset + max_results
content += f"\n**More results available.** Use `offset={next_offset}` to see the next page.\n"
return content
except FileNotFoundError as e: