From 157d671d281a0b10f8b9aadb12db09eefcdf425c Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Thu, 5 Feb 2026 12:33:19 -0700 Subject: [PATCH] refactor: address major code review findings - Replace global mutable state with FastMCP lifespan pattern - Add AppState dataclass for dependency management - Use lifespan context for ILSpyWrapper caching - get_wrapper() now accepts optional Context parameter - Improve type parsing robustness in ilspy_wrapper - Compile regex pattern for better performance - Add _split_type_name() to handle nested types (Outer+Nested) - Add logging for unparsed lines (helps debug edge cases) - Standardize error handling across all tools - Add _format_error() helper for consistent formatting - All tools now return "**Error**: message" format --- src/mcilspy/ilspy_wrapper.py | 75 +++++++++++++++++--- src/mcilspy/server.py | 133 ++++++++++++++++++++++++++--------- 2 files changed, 162 insertions(+), 46 deletions(-) diff --git a/src/mcilspy/ilspy_wrapper.py b/src/mcilspy/ilspy_wrapper.py index d29d224..be10f8a 100644 --- a/src/mcilspy/ilspy_wrapper.py +++ b/src/mcilspy/ilspy_wrapper.py @@ -255,6 +255,10 @@ class ILSpyWrapper: except Exception as e: return ListTypesResponse(success=False, error_message=str(e)) + # Compiled regex for parsing ilspycmd list output + # Format: "TypeKind: FullTypeName" (e.g., "Class: MyNamespace.MyClass") + _TYPE_LINE_PATTERN = re.compile(r"^(\w+):\s*(.+)$") + def _parse_types_output(self, output: str) -> list[TypeInfo]: """Parse the output from list types command. @@ -263,36 +267,85 @@ class ILSpyWrapper: Returns: List of TypeInfo objects + + Note: + ilspycmd outputs types in format "TypeKind: FullTypeName" + Examples: + Class: MyNamespace.MyClass + Interface: MyNamespace.IService + Struct: MyNamespace.MyStruct + Class: MyNamespace.Outer+Nested (nested types) """ types = [] lines = output.strip().split("\n") + unparsed_count = 0 for line in lines: line = line.strip() if not line: continue - # Parse the line format: "TypeKind: FullTypeName" - match = re.match(r"^(\w+):\s*(.+)$", line) + match = self._TYPE_LINE_PATTERN.match(line) if match: kind = match.group(1) - full_name = match.group(2) + full_name = match.group(2).strip() - # Extract namespace and name - parts = full_name.split(".") - if len(parts) > 1: - namespace = ".".join(parts[:-1]) - name = parts[-1] - else: - namespace = None - name = full_name + # Extract namespace and name, handling nested types (+ separator) + name, namespace = self._split_type_name(full_name) types.append( TypeInfo(name=name, full_name=full_name, kind=kind, namespace=namespace) ) + else: + # Log unexpected lines (but don't fail - ilspycmd may output warnings/info) + unparsed_count += 1 + if unparsed_count <= 3: # Avoid log spam + logger.debug(f"Skipping unparsed line from ilspycmd: {line[:100]}") + + if unparsed_count > 3: + logger.debug(f"Skipped {unparsed_count} unparsed lines total") return types + @staticmethod + def _split_type_name(full_name: str) -> tuple[str, str | None]: + """Split a full type name into (name, namespace). + + Handles: + - Simple types: "MyClass" -> ("MyClass", None) + - Namespaced types: "MyNamespace.MyClass" -> ("MyClass", "MyNamespace") + - Nested types: "MyNamespace.Outer+Nested" -> ("Outer+Nested", "MyNamespace") + - Deep nesting: "NS.Outer+Mid+Inner" -> ("Outer+Mid+Inner", "NS") + + Args: + full_name: Fully qualified type name + + Returns: + Tuple of (type_name, namespace_or_none) + """ + # Find the last dot that's not inside a nested type (before any +) + # For "NS.Outer+Nested", we want to split at the dot, not after + + plus_idx = full_name.find("+") + if plus_idx != -1: + # Has nested types - only look for namespace separator before the + + prefix = full_name[:plus_idx] + suffix = full_name[plus_idx:] # includes the + + dot_idx = prefix.rfind(".") + if dot_idx != -1: + namespace = prefix[:dot_idx] + name = prefix[dot_idx + 1:] + suffix + return name, namespace + else: + # No namespace, just nested types + return full_name, None + else: + # No nested types - simple split on last dot + dot_idx = full_name.rfind(".") + if dot_idx != -1: + return full_name[dot_idx + 1:], full_name[:dot_idx] + else: + return full_name, None + async def generate_diagrammer(self, request: GenerateDiagrammerRequest) -> dict[str, Any]: """Generate HTML diagrammer for an assembly. diff --git a/src/mcilspy/server.py b/src/mcilspy/server.py index d477f10..af5a31a 100644 --- a/src/mcilspy/server.py +++ b/src/mcilspy/server.py @@ -3,6 +3,8 @@ import logging import os import platform import shutil +from contextlib import asynccontextmanager +from dataclasses import dataclass from mcp.server.fastmcp import Context, FastMCP @@ -17,19 +19,80 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -# Create FastMCP server -mcp = FastMCP("mcilspy") -# Global ILSpy wrapper -ilspy_wrapper: ILSpyWrapper | None = None +@dataclass +class AppState: + """Application state shared across tools via lifespan context.""" + + wrapper: ILSpyWrapper | None = None -def get_wrapper() -> ILSpyWrapper: - """Get ILSpy wrapper instance""" - global ilspy_wrapper - if ilspy_wrapper is None: - ilspy_wrapper = ILSpyWrapper() - return ilspy_wrapper +@asynccontextmanager +async def app_lifespan(server: FastMCP): + """Initialize application state on startup, cleanup on shutdown. + + The ILSpyWrapper is lazily initialized on first use to avoid + failing startup if ilspycmd isn't installed (dnfile-based tools + still work without it). + """ + state = AppState() + logger.info("mcilspy server starting up") + try: + yield {"app_state": state} + finally: + logger.info("mcilspy server shutting down") + + +# Create FastMCP server with lifespan +mcp = FastMCP("mcilspy", lifespan=app_lifespan) + + +def get_wrapper(ctx: Context | None = None) -> ILSpyWrapper: + """Get ILSpy wrapper instance from context or create new one. + + Args: + ctx: Optional FastMCP context. If provided, uses lifespan state + for caching. If None, creates a new instance (for backwards + compatibility with non-tool callers). + + Returns: + ILSpyWrapper instance + + Raises: + RuntimeError: If ilspycmd is not installed + """ + if ctx is not None: + # Use lifespan state for caching (access via request_context) + try: + lifespan_ctx = ctx.request_context.lifespan_context + if lifespan_ctx is not None: + state: AppState = lifespan_ctx.get("app_state") + if state is not None: + if state.wrapper is None: + state.wrapper = ILSpyWrapper() + return state.wrapper + except (AttributeError, ValueError): + # Context not fully initialized, fall through to create new instance + pass + + # Fallback: create new instance (no caching) + return ILSpyWrapper() + + +def _format_error(error: Exception, context: str = "") -> str: + """Format an error message consistently across all tools. + + Args: + error: The exception that occurred + context: Optional context string (e.g., "listing types") + + Returns: + Formatted error string for MCP response + """ + error_msg = str(error) + if context: + return f"**Error** ({context}): {error_msg}" + return f"**Error**: {error_msg}" async def _check_dotnet_tools() -> dict: @@ -431,7 +494,7 @@ async def install_ilspy( except Exception as e: logger.error(f"Error installing ilspycmd: {e}") - return f"❌ **Error**: {str(e)}" + return _format_error(e, "installation") @mcp.tool() @@ -475,7 +538,7 @@ async def decompile_assembly( await ctx.info(f"Starting decompilation of assembly: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) # Use simplified request object (no complex pydantic validation needed) from .models import DecompileRequest @@ -509,7 +572,7 @@ async def decompile_assembly( except Exception as e: logger.error(f"Decompilation error: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -546,7 +609,7 @@ async def list_types( await ctx.info(f"Listing types in assembly: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) # Default to list only classes if entity_types is None: @@ -592,7 +655,7 @@ async def list_types( except Exception as e: logger.error(f"Error listing types: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -626,7 +689,7 @@ async def generate_diagrammer( await ctx.info(f"Generating assembly diagram: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) from .models import GenerateDiagrammerRequest @@ -646,7 +709,7 @@ async def generate_diagrammer( except Exception as e: logger.error(f"Error generating diagram: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -669,7 +732,7 @@ async def get_assembly_info(assembly_path: str, ctx: Context | None = None) -> s await ctx.info(f"Getting assembly info: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) from .models import AssemblyInfoRequest @@ -693,7 +756,7 @@ async def get_assembly_info(assembly_path: str, ctx: Context | None = None) -> s except Exception as e: logger.error(f"Error getting assembly info: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -730,7 +793,7 @@ async def search_types( await ctx.info(f"Searching for types matching '{pattern}' in: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) # Default to search all entity types if entity_types is None: @@ -817,7 +880,7 @@ async def search_types( except Exception as e: logger.error(f"Error searching types: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -851,7 +914,7 @@ async def search_strings( await ctx.info(f"Searching for strings matching '{pattern}' in: {assembly_path}") try: - wrapper = get_wrapper() + wrapper = get_wrapper(ctx) # Decompile to IL to find string literals (ldstr instructions) from .models import DecompileRequest @@ -955,7 +1018,7 @@ async def search_strings( except Exception as e: logger.error(f"Error searching strings: {e}") - return f"Error: {str(e)}" + return _format_error(e) # ============================================================================ @@ -1077,10 +1140,10 @@ async def search_methods( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error searching methods: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -1192,10 +1255,10 @@ async def search_fields( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error searching fields: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -1291,10 +1354,10 @@ async def search_properties( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error searching properties: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -1352,10 +1415,10 @@ async def list_events( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error listing events: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -1396,10 +1459,10 @@ async def list_resources( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error listing resources: {e}") - return f"Error: {str(e)}" + return _format_error(e) @mcp.tool() @@ -1459,10 +1522,10 @@ async def get_metadata_summary( return content except FileNotFoundError as e: - return f"Error: {e}" + return _format_error(e) except Exception as e: logger.error(f"Error getting metadata summary: {e}") - return f"Error: {str(e)}" + return _format_error(e) # FastMCP automatically handles prompts