refactor: address major code review findings

- Replace global mutable state with FastMCP lifespan pattern
  - Add AppState dataclass for dependency management
  - Use lifespan context for ILSpyWrapper caching
  - get_wrapper() now accepts optional Context parameter

- Improve type parsing robustness in ilspy_wrapper
  - Compile regex pattern for better performance
  - Add _split_type_name() to handle nested types (Outer+Nested)
  - Add logging for unparsed lines (helps debug edge cases)

- Standardize error handling across all tools
  - Add _format_error() helper for consistent formatting
  - All tools now return "**Error**: message" format
This commit is contained in:
Ryan Malloy 2026-02-05 12:33:19 -07:00
parent f52790cec0
commit 157d671d28
2 changed files with 162 additions and 46 deletions

View File

@ -255,6 +255,10 @@ class ILSpyWrapper:
except Exception as e: except Exception as e:
return ListTypesResponse(success=False, error_message=str(e)) return ListTypesResponse(success=False, error_message=str(e))
# Compiled regex for parsing ilspycmd list output
# Format: "TypeKind: FullTypeName" (e.g., "Class: MyNamespace.MyClass")
_TYPE_LINE_PATTERN = re.compile(r"^(\w+):\s*(.+)$")
def _parse_types_output(self, output: str) -> list[TypeInfo]: def _parse_types_output(self, output: str) -> list[TypeInfo]:
"""Parse the output from list types command. """Parse the output from list types command.
@ -263,36 +267,85 @@ class ILSpyWrapper:
Returns: Returns:
List of TypeInfo objects List of TypeInfo objects
Note:
ilspycmd outputs types in format "TypeKind: FullTypeName"
Examples:
Class: MyNamespace.MyClass
Interface: MyNamespace.IService
Struct: MyNamespace.MyStruct
Class: MyNamespace.Outer+Nested (nested types)
""" """
types = [] types = []
lines = output.strip().split("\n") lines = output.strip().split("\n")
unparsed_count = 0
for line in lines: for line in lines:
line = line.strip() line = line.strip()
if not line: if not line:
continue continue
# Parse the line format: "TypeKind: FullTypeName" match = self._TYPE_LINE_PATTERN.match(line)
match = re.match(r"^(\w+):\s*(.+)$", line)
if match: if match:
kind = match.group(1) kind = match.group(1)
full_name = match.group(2) full_name = match.group(2).strip()
# Extract namespace and name # Extract namespace and name, handling nested types (+ separator)
parts = full_name.split(".") name, namespace = self._split_type_name(full_name)
if len(parts) > 1:
namespace = ".".join(parts[:-1])
name = parts[-1]
else:
namespace = None
name = full_name
types.append( types.append(
TypeInfo(name=name, full_name=full_name, kind=kind, namespace=namespace) TypeInfo(name=name, full_name=full_name, kind=kind, namespace=namespace)
) )
else:
# Log unexpected lines (but don't fail - ilspycmd may output warnings/info)
unparsed_count += 1
if unparsed_count <= 3: # Avoid log spam
logger.debug(f"Skipping unparsed line from ilspycmd: {line[:100]}")
if unparsed_count > 3:
logger.debug(f"Skipped {unparsed_count} unparsed lines total")
return types return types
@staticmethod
def _split_type_name(full_name: str) -> tuple[str, str | None]:
"""Split a full type name into (name, namespace).
Handles:
- Simple types: "MyClass" -> ("MyClass", None)
- Namespaced types: "MyNamespace.MyClass" -> ("MyClass", "MyNamespace")
- Nested types: "MyNamespace.Outer+Nested" -> ("Outer+Nested", "MyNamespace")
- Deep nesting: "NS.Outer+Mid+Inner" -> ("Outer+Mid+Inner", "NS")
Args:
full_name: Fully qualified type name
Returns:
Tuple of (type_name, namespace_or_none)
"""
# Find the last dot that's not inside a nested type (before any +)
# For "NS.Outer+Nested", we want to split at the dot, not after +
plus_idx = full_name.find("+")
if plus_idx != -1:
# Has nested types - only look for namespace separator before the +
prefix = full_name[:plus_idx]
suffix = full_name[plus_idx:] # includes the +
dot_idx = prefix.rfind(".")
if dot_idx != -1:
namespace = prefix[:dot_idx]
name = prefix[dot_idx + 1:] + suffix
return name, namespace
else:
# No namespace, just nested types
return full_name, None
else:
# No nested types - simple split on last dot
dot_idx = full_name.rfind(".")
if dot_idx != -1:
return full_name[dot_idx + 1:], full_name[:dot_idx]
else:
return full_name, None
async def generate_diagrammer(self, request: GenerateDiagrammerRequest) -> dict[str, Any]: async def generate_diagrammer(self, request: GenerateDiagrammerRequest) -> dict[str, Any]:
"""Generate HTML diagrammer for an assembly. """Generate HTML diagrammer for an assembly.

View File

@ -3,6 +3,8 @@ import logging
import os import os
import platform import platform
import shutil import shutil
from contextlib import asynccontextmanager
from dataclasses import dataclass
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
@ -17,19 +19,80 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Create FastMCP server
mcp = FastMCP("mcilspy")
# Global ILSpy wrapper @dataclass
ilspy_wrapper: ILSpyWrapper | None = None class AppState:
"""Application state shared across tools via lifespan context."""
wrapper: ILSpyWrapper | None = None
def get_wrapper() -> ILSpyWrapper: @asynccontextmanager
"""Get ILSpy wrapper instance""" async def app_lifespan(server: FastMCP):
global ilspy_wrapper """Initialize application state on startup, cleanup on shutdown.
if ilspy_wrapper is None:
ilspy_wrapper = ILSpyWrapper() The ILSpyWrapper is lazily initialized on first use to avoid
return ilspy_wrapper failing startup if ilspycmd isn't installed (dnfile-based tools
still work without it).
"""
state = AppState()
logger.info("mcilspy server starting up")
try:
yield {"app_state": state}
finally:
logger.info("mcilspy server shutting down")
# Create FastMCP server with lifespan
mcp = FastMCP("mcilspy", lifespan=app_lifespan)
def get_wrapper(ctx: Context | None = None) -> ILSpyWrapper:
"""Get ILSpy wrapper instance from context or create new one.
Args:
ctx: Optional FastMCP context. If provided, uses lifespan state
for caching. If None, creates a new instance (for backwards
compatibility with non-tool callers).
Returns:
ILSpyWrapper instance
Raises:
RuntimeError: If ilspycmd is not installed
"""
if ctx is not None:
# Use lifespan state for caching (access via request_context)
try:
lifespan_ctx = ctx.request_context.lifespan_context
if lifespan_ctx is not None:
state: AppState = lifespan_ctx.get("app_state")
if state is not None:
if state.wrapper is None:
state.wrapper = ILSpyWrapper()
return state.wrapper
except (AttributeError, ValueError):
# Context not fully initialized, fall through to create new instance
pass
# Fallback: create new instance (no caching)
return ILSpyWrapper()
def _format_error(error: Exception, context: str = "") -> str:
"""Format an error message consistently across all tools.
Args:
error: The exception that occurred
context: Optional context string (e.g., "listing types")
Returns:
Formatted error string for MCP response
"""
error_msg = str(error)
if context:
return f"**Error** ({context}): {error_msg}"
return f"**Error**: {error_msg}"
async def _check_dotnet_tools() -> dict: async def _check_dotnet_tools() -> dict:
@ -431,7 +494,7 @@ async def install_ilspy(
except Exception as e: except Exception as e:
logger.error(f"Error installing ilspycmd: {e}") logger.error(f"Error installing ilspycmd: {e}")
return f"❌ **Error**: {str(e)}" return _format_error(e, "installation")
@mcp.tool() @mcp.tool()
@ -475,7 +538,7 @@ async def decompile_assembly(
await ctx.info(f"Starting decompilation of assembly: {assembly_path}") await ctx.info(f"Starting decompilation of assembly: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
# Use simplified request object (no complex pydantic validation needed) # Use simplified request object (no complex pydantic validation needed)
from .models import DecompileRequest from .models import DecompileRequest
@ -509,7 +572,7 @@ async def decompile_assembly(
except Exception as e: except Exception as e:
logger.error(f"Decompilation error: {e}") logger.error(f"Decompilation error: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -546,7 +609,7 @@ async def list_types(
await ctx.info(f"Listing types in assembly: {assembly_path}") await ctx.info(f"Listing types in assembly: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
# Default to list only classes # Default to list only classes
if entity_types is None: if entity_types is None:
@ -592,7 +655,7 @@ async def list_types(
except Exception as e: except Exception as e:
logger.error(f"Error listing types: {e}") logger.error(f"Error listing types: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -626,7 +689,7 @@ async def generate_diagrammer(
await ctx.info(f"Generating assembly diagram: {assembly_path}") await ctx.info(f"Generating assembly diagram: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
from .models import GenerateDiagrammerRequest from .models import GenerateDiagrammerRequest
@ -646,7 +709,7 @@ async def generate_diagrammer(
except Exception as e: except Exception as e:
logger.error(f"Error generating diagram: {e}") logger.error(f"Error generating diagram: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -669,7 +732,7 @@ async def get_assembly_info(assembly_path: str, ctx: Context | None = None) -> s
await ctx.info(f"Getting assembly info: {assembly_path}") await ctx.info(f"Getting assembly info: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
from .models import AssemblyInfoRequest from .models import AssemblyInfoRequest
@ -693,7 +756,7 @@ async def get_assembly_info(assembly_path: str, ctx: Context | None = None) -> s
except Exception as e: except Exception as e:
logger.error(f"Error getting assembly info: {e}") logger.error(f"Error getting assembly info: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -730,7 +793,7 @@ async def search_types(
await ctx.info(f"Searching for types matching '{pattern}' in: {assembly_path}") await ctx.info(f"Searching for types matching '{pattern}' in: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
# Default to search all entity types # Default to search all entity types
if entity_types is None: if entity_types is None:
@ -817,7 +880,7 @@ async def search_types(
except Exception as e: except Exception as e:
logger.error(f"Error searching types: {e}") logger.error(f"Error searching types: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -851,7 +914,7 @@ async def search_strings(
await ctx.info(f"Searching for strings matching '{pattern}' in: {assembly_path}") await ctx.info(f"Searching for strings matching '{pattern}' in: {assembly_path}")
try: try:
wrapper = get_wrapper() wrapper = get_wrapper(ctx)
# Decompile to IL to find string literals (ldstr instructions) # Decompile to IL to find string literals (ldstr instructions)
from .models import DecompileRequest from .models import DecompileRequest
@ -955,7 +1018,7 @@ async def search_strings(
except Exception as e: except Exception as e:
logger.error(f"Error searching strings: {e}") logger.error(f"Error searching strings: {e}")
return f"Error: {str(e)}" return _format_error(e)
# ============================================================================ # ============================================================================
@ -1077,10 +1140,10 @@ async def search_methods(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error searching methods: {e}") logger.error(f"Error searching methods: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -1192,10 +1255,10 @@ async def search_fields(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error searching fields: {e}") logger.error(f"Error searching fields: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -1291,10 +1354,10 @@ async def search_properties(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error searching properties: {e}") logger.error(f"Error searching properties: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -1352,10 +1415,10 @@ async def list_events(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error listing events: {e}") logger.error(f"Error listing events: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -1396,10 +1459,10 @@ async def list_resources(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error listing resources: {e}") logger.error(f"Error listing resources: {e}")
return f"Error: {str(e)}" return _format_error(e)
@mcp.tool() @mcp.tool()
@ -1459,10 +1522,10 @@ async def get_metadata_summary(
return content return content
except FileNotFoundError as e: except FileNotFoundError as e:
return f"Error: {e}" return _format_error(e)
except Exception as e: except Exception as e:
logger.error(f"Error getting metadata summary: {e}") logger.error(f"Error getting metadata summary: {e}")
return f"Error: {str(e)}" return _format_error(e)
# FastMCP automatically handles prompts # FastMCP automatically handles prompts