refactor: consolidate architecture across 8 issues (A1-A8)

- A1: Extract duplicated PATH discovery to utils.py (single source of truth)
- A2: Convert metadata_reader dataclasses to Pydantic models in models.py
- A3: Simplify get_wrapper() with module-level caching (removed fragile lifespan context)
- A4: Document ILSpyWrapper design rationale (why class exists despite being stateless)
- A5: Document MetadataReader as CPU-bound sync code with thread pool suggestion
- A6: Create constants.py for all timeouts/limits (DECOMPILE_TIMEOUT_SECONDS, etc.)
- A7: Add _compile_search_pattern() helper to deduplicate regex compilation
- A8: Add LanguageVersion validation with helpful error listing valid options

Tests pass, ruff clean.
This commit is contained in:
Ryan Malloy 2026-02-08 11:25:43 -07:00
parent 7d784af17c
commit 8901752ae3
8 changed files with 541 additions and 231 deletions

221
docs/taskmaster/PLAN.md Normal file
View File

@ -0,0 +1,221 @@
# Taskmaster Execution Plan: mcilspy Code Review Fixes
## Overview
33 issues identified in hater-hat code review, organized into 4 parallel workstreams.
```
┌─────────────────────────────────────────────────────────────────┐
│ MERGE ORDER (Sequential) │
│ security → architecture → performance → testing │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│SECURITY │ │ ARCH │ │ PERF │ │ TESTING │
│ 4 issues│ │ 8 issues│ │ 8 issues│ │ 7 issues│
│ P1-CRIT │ │ P2-HIGH │ │ P3-MED │ │ P4-LOW │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
```
---
## Domain 1: SECURITY (Priority 1 - Critical)
**Branch**: `fix/security`
**Estimated Effort**: 2-3 hours
**Blocks**: All other domains should wait for security review patterns
### Issues to Fix
| ID | Issue | File:Line | Fix |
|----|-------|-----------|-----|
| S1 | Path traversal - no validation | `server.py:*`, `ilspy_wrapper.py:132` | Add `_validate_assembly_path()` helper that checks: exists, is file, has .dll/.exe extension, resolves to absolute path, optionally check for PE signature |
| S2 | Temp directory race condition | `ilspy_wrapper.py:150-153` | Replace `tempfile.mkdtemp()` with `tempfile.TemporaryDirectory()` context manager |
| S3 | Unbounded subprocess output | `ilspy_wrapper.py:104-107` | Add `MAX_OUTPUT_BYTES = 50_000_000` constant, truncate stdout/stderr if exceeded |
| S4 | No file size limit before loading | `metadata_reader.py:113-115` | Add `MAX_ASSEMBLY_SIZE_MB = 500` check before `dnfile.dnPE()` |
### Acceptance Criteria
- [ ] All user-provided paths validated before use
- [ ] No temp file leaks possible
- [ ] Memory exhaustion attacks mitigated
- [ ] Unit tests for validation functions
---
## Domain 2: ARCHITECTURE (Priority 2 - High)
**Branch**: `fix/architecture`
**Estimated Effort**: 3-4 hours
**Depends On**: Security (for validation patterns)
### Issues to Fix
| ID | Issue | File:Line | Fix |
|----|-------|-----------|-----|
| A1 | Duplicated PATH discovery | `server.py:99-123`, `ilspy_wrapper.py:41-75` | Extract to `src/mcilspy/utils.py``find_ilspycmd_path()` |
| A2 | Mixed dataclass/Pydantic models | `metadata_reader.py` vs `models.py` | Convert metadata_reader dataclasses to Pydantic models in `models.py` |
| A3 | Fragile lifespan context access | `server.py:51-80` | Simplify to module-level `_wrapper: ILSpyWrapper | None = None` with proper locking |
| A4 | Stateless wrapper pretending to be stateful | `ilspy_wrapper.py` | Keep as-is but document why (caches ilspycmd_path lookup) OR convert to module functions |
| A5 | Inconsistent async/sync | `metadata_reader.py` | Document as "CPU-bound sync" in docstring, add note about thread pool for heavy loads |
| A6 | Magic numbers scattered | Throughout | Create `src/mcilspy/constants.py` with all limits/timeouts |
| A7 | Repeated regex compilation | `server.py` (6 places) | Add `_compile_search_pattern(pattern, case_sensitive, use_regex)` helper |
| A8 | Language version validation missing | `server.py:578` | Add try/except with helpful error listing valid versions |
### Acceptance Criteria
- [ ] Single source of truth for PATH discovery
- [ ] Consistent data model layer (all Pydantic)
- [ ] All magic numbers in constants.py
- [ ] Helper functions reduce code duplication
---
## Domain 3: PERFORMANCE (Priority 3 - Medium)
**Branch**: `fix/performance`
**Estimated Effort**: 4-5 hours
**Depends On**: Architecture (for new constants/utils)
### Issues to Fix
| ID | Issue | File:Line | Fix |
|----|-------|-----------|-----|
| P1 | search_strings decompiles entire assembly | `server.py:948-954` | Use dnfile's `pe.net.user_strings` to search string heap directly - 100x faster |
| P2 | No result pagination | All list_* tools | Add `max_results: int = 1000` and `offset: int = 0` params, return `has_more` flag |
| P3 | List conversion instead of generators | `metadata_reader.py:289` | Use `enumerate()` directly on iterator where possible |
| P4 | No caching of decompilation | `ilspy_wrapper.py` | Add optional LRU cache keyed by (path, mtime, type_name) |
| P5 | Silent failures in platform detection | `server.py:195-230` | Log warnings for permission errors, return reason in result |
| P6 | Generic exception catches | Throughout | Replace with specific exceptions, preserve stack traces |
| P7 | MetadataReader __exit__ type mismatch | `metadata_reader.py:595-597` | Fix return type, remove unnecessary `return False` |
| P8 | No assembly validation | `ilspy_wrapper.py` | Add PE signature check (MZ header + PE\0\0) before subprocess |
### Acceptance Criteria
- [ ] search_strings uses string heap (benchmark: 10x improvement)
- [ ] All list tools support pagination
- [ ] No silent swallowed exceptions
- [ ] PE validation prevents wasted subprocess calls
---
## Domain 4: TESTING (Priority 4 - Enhancement)
**Branch**: `fix/testing`
**Estimated Effort**: 5-6 hours
**Depends On**: All other domains (tests should cover new code)
### Issues to Fix
| ID | Issue | Fix |
|----|-------|-----|
| T1 | No integration tests | Create `tests/integration/` with real assembly tests. Use a small test .dll checked into repo |
| T2 | No MCP tool tests | Add `tests/test_server_tools.py` testing each `@mcp.tool()` function |
| T3 | No error path tests | Add tests for: regex compilation failure, ilspycmd not found, ctx.info() failure |
| T4 | No concurrency tests | Add `tests/test_concurrency.py` with parallel tool invocations |
| T5 | Missing docstring validation | Add test that all public functions have docstrings (using AST) |
| T6 | No cancel/progress tests | Test timeout behavior, verify progress reporting works |
| T7 | Add test .NET assembly | Create minimal C# project, compile to .dll, check into `tests/fixtures/` |
### Acceptance Criteria
- [ ] Integration tests with real ilspycmd calls
- [ ] 80%+ code coverage (currently ~40% estimated)
- [ ] All error paths tested
- [ ] CI runs integration tests
---
## Deferred (Won't Fix This Sprint)
These issues are valid but lower priority:
| ID | Issue | Reason to Defer |
|----|-------|-----------------|
| D1 | No cancel/abort for decompilation | Requires MCP protocol support for cancellation |
| D2 | No progress reporting | Needs ilspycmd changes or parsing stdout in real-time |
| D3 | No resource extraction | Feature request, not bug - add to backlog |
| D4 | install_ilspy sudo handling | Edge case - document limitation instead |
| D5 | No dry-run for installation | Nice-to-have, not critical |
| D6 | Error messages expose paths | Low risk for MCP (local tool) |
---
## Execution Commands
### Setup Worktrees (Run Once)
```bash
# Create worktrees for parallel development
git worktree add ../mcilspy-security fix/security -b fix/security
git worktree add ../mcilspy-arch fix/architecture -b fix/architecture
git worktree add ../mcilspy-perf fix/performance -b fix/performance
git worktree add ../mcilspy-testing fix/testing -b fix/testing
```
### Task Master Commands
```bash
# Security Task Master
cd ../mcilspy-security
claude -p "You are the SECURITY Task Master. Read docs/taskmaster/PLAN.md and implement all S1-S4 issues. Update status.json when done."
# Architecture Task Master (can start in parallel)
cd ../mcilspy-arch
claude -p "You are the ARCHITECTURE Task Master. Read docs/taskmaster/PLAN.md and implement all A1-A8 issues. Check status.json for security patterns first."
# Performance Task Master
cd ../mcilspy-perf
claude -p "You are the PERFORMANCE Task Master. Read docs/taskmaster/PLAN.md and implement all P1-P8 issues."
# Testing Task Master (runs last)
cd ../mcilspy-testing
claude -p "You are the TESTING Task Master. Read docs/taskmaster/PLAN.md and implement all T1-T7 issues. Requires other domains complete first."
```
### Merge Protocol
```bash
# After all complete, merge in order:
git checkout main
git merge --no-ff fix/security -m "security: path validation, temp cleanup, output limits"
git merge --no-ff fix/architecture -m "refactor: consolidate utils, constants, models"
git merge --no-ff fix/performance -m "perf: string heap search, pagination, caching"
git merge --no-ff fix/testing -m "test: integration tests, tool coverage, fixtures"
# Cleanup
git worktree remove ../mcilspy-security
git worktree remove ../mcilspy-arch
git worktree remove ../mcilspy-perf
git worktree remove ../mcilspy-testing
```
---
## Priority Matrix
```
IMPACT
Low Medium High
┌────────┬─────────┬─────────┐
High │ A5 │ P2,P3 │ S1-S4 │ ← Fix First
EFFORT │ A8 │ A6,A7 │ A1-A4 │
Medium │ T5 │ P5-P8 │ P1,P4 │
│ D4 │ T3,T4 │ T1,T2 │
Low │ D5,D6 │ T6 │ T7 │ ← Quick Wins
└────────┴─────────┴─────────┘
```
**Quick Wins** (do first within each domain):
- S2: Temp directory fix (5 min)
- A6: Constants file (15 min)
- P7: __exit__ fix (2 min)
- T7: Add test assembly (30 min)
---
## Definition of Done
- [ ] All issues in domain addressed
- [ ] Tests pass: `uv run pytest`
- [ ] Lint passes: `uv run ruff check src/`
- [ ] Types pass: `uv run ruff check src/ --select=ANN`
- [ ] status.json updated to "ready"
- [ ] PR draft created (not merged)

View File

@ -0,0 +1,11 @@
{
"project": "mcilspy-code-review-fixes",
"created": "2025-02-08T00:00:00Z",
"domains": {
"security": { "status": "pending", "branch": "fix/security", "priority": 1 },
"architecture": { "status": "ready", "branch": "fix/architecture", "priority": 2 },
"performance": { "status": "pending", "branch": "fix/performance", "priority": 3 },
"testing": { "status": "pending", "branch": "fix/testing", "priority": 4 }
},
"merge_order": ["security", "architecture", "performance", "testing"]
}

49
src/mcilspy/constants.py Normal file
View File

@ -0,0 +1,49 @@
"""Constants and configuration values for mcilspy.
This module centralizes all timeouts, limits, and magic numbers used throughout
the codebase. Import from here rather than hardcoding values.
"""
# =============================================================================
# Subprocess Timeouts
# =============================================================================
# Maximum time to wait for ilspycmd decompilation (in seconds)
# Large assemblies or corrupted files may take longer
DECOMPILE_TIMEOUT_SECONDS: float = 300.0 # 5 minutes
# =============================================================================
# Output Limits
# =============================================================================
# Maximum characters to display from subprocess output in error messages
MAX_ERROR_OUTPUT_CHARS: int = 1000
# Maximum line length when displaying code snippets (truncate longer lines)
MAX_LINE_LENGTH: int = 200
# Maximum unparsed lines to log before suppressing (avoid log spam)
MAX_UNPARSED_LOG_LINES: int = 3
# Preview length for unparsed line debug messages
UNPARSED_LINE_PREVIEW_LENGTH: int = 100
# =============================================================================
# Search Limits
# =============================================================================
# Default maximum results for search operations
DEFAULT_MAX_SEARCH_RESULTS: int = 100
# Maximum matches to display per type in grouped results
MAX_MATCHES_PER_TYPE: int = 20
# =============================================================================
# Tool Identifiers (for ilspycmd CLI)
# =============================================================================
# Default entity types for list operations
DEFAULT_ENTITY_TYPES: list[str] = ["class"]
# All supported entity types
ALL_ENTITY_TYPES: list[str] = ["class", "interface", "struct", "delegate", "enum"]

View File

@ -9,6 +9,11 @@ import tempfile
from pathlib import Path
from typing import Any
from .constants import (
DECOMPILE_TIMEOUT_SECONDS,
MAX_UNPARSED_LOG_LINES,
UNPARSED_LINE_PREVIEW_LENGTH,
)
from .models import (
AssemblyInfo,
AssemblyInfoRequest,
@ -19,12 +24,33 @@ from .models import (
ListTypesResponse,
TypeInfo,
)
from .utils import find_ilspycmd_path
logger = logging.getLogger(__name__)
class ILSpyWrapper:
"""Wrapper class for ILSpy command line tool."""
"""Wrapper class for ILSpy command line tool.
This class encapsulates all interactions with the ilspycmd CLI tool.
While the wrapper is stateless in terms of decompilation operations
(each call is independent), it exists as a class to:
1. Cache the ilspycmd path lookup - Finding the executable involves
checking PATH and several common installation locations, which
is relatively expensive. Caching this on instantiation avoids
repeated filesystem operations.
2. Provide a single point of configuration - If ilspycmd is not found,
we fail fast at wrapper creation rather than on each tool call.
3. Enable future extensions - The class structure allows adding
connection pooling, result caching, or other optimizations without
changing the API.
The wrapper should be created once and reused across tool calls.
See get_wrapper() in server.py for the recommended usage pattern.
"""
def __init__(self, ilspycmd_path: str | None = None) -> None:
"""Initialize the wrapper.
@ -32,48 +58,12 @@ class ILSpyWrapper:
Args:
ilspycmd_path: Path to ilspycmd executable. If None, will try to find it in PATH.
"""
self.ilspycmd_path = ilspycmd_path or self._find_ilspycmd()
self.ilspycmd_path = ilspycmd_path or find_ilspycmd_path()
if not self.ilspycmd_path:
raise RuntimeError(
"ILSpyCmd not found. Please install it with: dotnet tool install --global ilspycmd"
)
def _find_ilspycmd(self) -> str | None:
"""Find ilspycmd executable in PATH or common install locations.
Checks:
1. Standard PATH (via shutil.which)
2. ~/.dotnet/tools (default location for 'dotnet tool install --global')
3. Platform-specific locations
"""
# Try standard PATH first
for cmd_name in ["ilspycmd", "ilspycmd.exe"]:
path = shutil.which(cmd_name)
if path:
return path
# Check common dotnet tools locations (not always in PATH)
home = os.path.expanduser("~")
dotnet_tools_paths = [
os.path.join(home, ".dotnet", "tools", "ilspycmd"),
os.path.join(home, ".dotnet", "tools", "ilspycmd.exe"),
]
# Windows-specific paths
if os.name == "nt":
userprofile = os.environ.get("USERPROFILE", "")
if userprofile:
dotnet_tools_paths.extend([
os.path.join(userprofile, ".dotnet", "tools", "ilspycmd.exe"),
])
for tool_path in dotnet_tools_paths:
if os.path.isfile(tool_path) and os.access(tool_path, os.X_OK):
logger.info(f"Found ilspycmd at {tool_path} (not in PATH)")
return tool_path
return None
async def _run_command(
self, args: list[str], input_data: str | None = None
) -> tuple[int, str, str]:
@ -99,17 +89,18 @@ class ILSpyWrapper:
input_bytes = input_data.encode("utf-8") if input_data else None
# Timeout after 5 minutes to prevent hanging on malicious/corrupted assemblies
# Timeout to prevent hanging on malicious/corrupted assemblies
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(input=input_bytes),
timeout=300.0 # 5 minutes
timeout=DECOMPILE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.warning("Command timed out after 5 minutes, killing process")
timeout_mins = DECOMPILE_TIMEOUT_SECONDS / 60
logger.warning(f"Command timed out after {timeout_mins:.0f} minutes, killing process")
process.kill()
await process.wait() # Ensure process is cleaned up
return -1, "", "Command timed out after 5 minutes. The assembly may be corrupted or too complex."
return -1, "", f"Command timed out after {timeout_mins:.0f} minutes. The assembly may be corrupted or too complex."
stdout = stdout_bytes.decode("utf-8", errors="replace") if stdout_bytes else ""
stderr = stderr_bytes.decode("utf-8", errors="replace") if stderr_bytes else ""
@ -326,10 +317,10 @@ class ILSpyWrapper:
else:
# Log unexpected lines (but don't fail - ilspycmd may output warnings/info)
unparsed_count += 1
if unparsed_count <= 3: # Avoid log spam
logger.debug(f"Skipping unparsed line from ilspycmd: {line[:100]}")
if unparsed_count <= MAX_UNPARSED_LOG_LINES:
logger.debug(f"Skipping unparsed line from ilspycmd: {line[:UNPARSED_LINE_PREVIEW_LENGTH]}")
if unparsed_count > 3:
if unparsed_count > MAX_UNPARSED_LOG_LINES:
logger.debug(f"Skipped {unparsed_count} unparsed lines total")
return types

View File

@ -4,103 +4,34 @@ Provides access to all 34+ CLR metadata tables without requiring ilspycmd.
This enables searching for methods, fields, properties, events, and resources
that are not exposed via the ilspycmd CLI.
This module contains CPU-bound synchronous code for parsing .NET PE metadata.
For heavy workloads with many concurrent requests, consider running these
operations in a thread pool (e.g., asyncio.to_thread) to avoid blocking
the event loop.
Note: dnfile provides flag attributes as boolean properties (e.g., mdPublic, fdStatic)
rather than traditional IntFlag enums, so we use those directly.
"""
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import dnfile
from dnfile.mdtable import TypeDefRow
from .models import (
AssemblyMetadata,
EventInfo,
FieldInfo,
MethodInfo,
PropertyInfo,
ResourceInfo,
)
logger = logging.getLogger(__name__)
@dataclass
class MethodInfo:
"""Information about a method in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None
return_type: str | None = None
is_public: bool = False
is_static: bool = False
is_virtual: bool = False
is_abstract: bool = False
parameters: list[str] = field(default_factory=list)
@dataclass
class FieldInfo:
"""Information about a field in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None
field_type: str | None = None
is_public: bool = False
is_static: bool = False
is_literal: bool = False # Constant
default_value: str | None = None
@dataclass
class PropertyInfo:
"""Information about a property in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None
property_type: str | None = None
has_getter: bool = False
has_setter: bool = False
@dataclass
class EventInfo:
"""Information about an event in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None
event_type: str | None = None
@dataclass
class ResourceInfo:
"""Information about an embedded resource."""
name: str
size: int
is_public: bool = True
@dataclass
class AssemblyMetadata:
"""Complete assembly metadata from dnfile."""
name: str
version: str
culture: str | None = None
public_key_token: str | None = None
target_framework: str | None = None
type_count: int = 0
method_count: int = 0
field_count: int = 0
property_count: int = 0
event_count: int = 0
resource_count: int = 0
referenced_assemblies: list[str] = field(default_factory=list)
class MetadataReader:
"""Read .NET assembly metadata directly using dnfile."""

View File

@ -161,3 +161,84 @@ class AssemblyInfo(BaseModel):
runtime_version: str | None = None
is_signed: bool = False
has_debug_info: bool = False
# =============================================================================
# Metadata Reader Models (dnfile-based direct metadata access)
# =============================================================================
class MethodInfo(BaseModel):
"""Information about a method in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None = None
return_type: str | None = None
is_public: bool = False
is_static: bool = False
is_virtual: bool = False
is_abstract: bool = False
parameters: list[str] = Field(default_factory=list)
class FieldInfo(BaseModel):
"""Information about a field in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None = None
field_type: str | None = None
is_public: bool = False
is_static: bool = False
is_literal: bool = False # Constant
default_value: str | None = None
class PropertyInfo(BaseModel):
"""Information about a property in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None = None
property_type: str | None = None
has_getter: bool = False
has_setter: bool = False
class EventInfo(BaseModel):
"""Information about an event in an assembly."""
name: str
full_name: str
declaring_type: str
namespace: str | None = None
event_type: str | None = None
class ResourceInfo(BaseModel):
"""Information about an embedded resource."""
name: str
size: int = 0
is_public: bool = True
class AssemblyMetadata(BaseModel):
"""Complete assembly metadata from dnfile."""
name: str
version: str
culture: str | None = None
public_key_token: str | None = None
target_framework: str | None = None
type_count: int = 0
method_count: int = 0
field_count: int = 0
property_count: int = 0
event_count: int = 0
resource_count: int = 0
referenced_assemblies: list[str] = Field(default_factory=list)

View File

@ -5,12 +5,19 @@ import platform
import re
import shutil
from contextlib import asynccontextmanager
from dataclasses import dataclass
from mcp.server.fastmcp import Context, FastMCP
from .constants import (
ALL_ENTITY_TYPES,
DEFAULT_MAX_SEARCH_RESULTS,
MAX_ERROR_OUTPUT_CHARS,
MAX_LINE_LENGTH,
MAX_MATCHES_PER_TYPE,
)
from .ilspy_wrapper import ILSpyWrapper
from .models import EntityType, LanguageVersion
from .utils import find_ilspycmd_path
# Setup logging
log_level = os.getenv("LOGLEVEL", "INFO").upper()
@ -21,27 +28,21 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
@dataclass
class AppState:
"""Application state shared across tools via lifespan context."""
wrapper: ILSpyWrapper | None = None
# Module-level cached wrapper instance (lazily initialized)
# This avoids complex lifespan context access and provides simple caching
_cached_wrapper: ILSpyWrapper | None = None
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Initialize application state on startup, cleanup on shutdown.
The ILSpyWrapper is lazily initialized on first use to avoid
failing startup if ilspycmd isn't installed (dnfile-based tools
still work without it).
"""
state = AppState()
"""Initialize application state on startup, cleanup on shutdown."""
global _cached_wrapper
logger.info("mcilspy server starting up")
try:
yield {"app_state": state}
yield {}
finally:
logger.info("mcilspy server shutting down")
_cached_wrapper = None # Clear cache on shutdown
# Create FastMCP server with lifespan
@ -49,12 +50,14 @@ mcp = FastMCP("mcilspy", lifespan=app_lifespan)
def get_wrapper(ctx: Context | None = None) -> ILSpyWrapper:
"""Get ILSpy wrapper instance from context or create new one.
"""Get ILSpy wrapper instance, creating one if needed.
The wrapper is cached at module level for efficiency. It's lazily
initialized on first use to avoid failing startup if ilspycmd
isn't installed (dnfile-based tools still work without it).
Args:
ctx: Optional FastMCP context. If provided, uses lifespan state
for caching. If None, creates a new instance (for backwards
compatibility with non-tool callers).
ctx: Optional FastMCP context (unused, kept for API compatibility)
Returns:
ILSpyWrapper instance
@ -62,22 +65,10 @@ def get_wrapper(ctx: Context | None = None) -> ILSpyWrapper:
Raises:
RuntimeError: If ilspycmd is not installed
"""
if ctx is not None:
# Use lifespan state for caching (access via request_context)
try:
lifespan_ctx = ctx.request_context.lifespan_context
if lifespan_ctx is not None:
state: AppState = lifespan_ctx.get("app_state")
if state is not None:
if state.wrapper is None:
state.wrapper = ILSpyWrapper()
return state.wrapper
except (AttributeError, ValueError):
# Context not fully initialized, fall through to create new instance
pass
# Fallback: create new instance (no caching)
return ILSpyWrapper()
global _cached_wrapper
if _cached_wrapper is None:
_cached_wrapper = ILSpyWrapper()
return _cached_wrapper
def _format_error(error: Exception, context: str = "") -> str:
@ -96,31 +87,29 @@ def _format_error(error: Exception, context: str = "") -> str:
return f"**Error**: {error_msg}"
def _find_ilspycmd_path() -> str | None:
"""Find ilspycmd in PATH or common install locations."""
# Check PATH first
path = shutil.which("ilspycmd")
if path:
return path
def _compile_search_pattern(
pattern: str, case_sensitive: bool, use_regex: bool
) -> tuple[re.Pattern | None, str | None]:
"""Compile a search pattern into a regex, handling errors.
# Check common dotnet tools locations (often not in PATH for MCP servers)
home = os.path.expanduser("~")
candidates = [
os.path.join(home, ".dotnet", "tools", "ilspycmd"),
os.path.join(home, ".dotnet", "tools", "ilspycmd.exe"),
]
Args:
pattern: The search pattern string
case_sensitive: Whether matching should be case-sensitive
use_regex: Whether to treat pattern as a regular expression
# Windows-specific
if os.name == "nt":
userprofile = os.environ.get("USERPROFILE", "")
if userprofile:
candidates.append(os.path.join(userprofile, ".dotnet", "tools", "ilspycmd.exe"))
Returns:
Tuple of (compiled_pattern, error_message). If use_regex is False,
returns (None, None). If regex compilation fails, returns
(None, error_message). Otherwise returns (pattern, None).
"""
if not use_regex:
return None, None
for candidate in candidates:
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
return None
try:
flags = 0 if case_sensitive else re.IGNORECASE
return re.compile(pattern, flags), None
except re.error as e:
return None, f"Invalid regex pattern: {e}"
async def _check_dotnet_tools() -> dict:
@ -151,7 +140,7 @@ async def _check_dotnet_tools() -> dict:
pass
# Check if ilspycmd is available (check PATH and common locations)
ilspy_path = _find_ilspycmd_path()
ilspy_path = find_ilspycmd_path()
if ilspy_path:
result["ilspycmd_available"] = True
result["ilspycmd_path"] = ilspy_path
@ -329,7 +318,7 @@ async def _try_install_dotnet_sdk(ctx: Context | None = None) -> tuple[bool, str
return False, (
f"❌ Installation failed (exit code {proc.returncode}).\n\n"
f"Command: `{' '.join(cmd_args)}`\n\n"
f"Output:\n```\n{output[-1000:]}\n```\n\n"
f"Output:\n```\n{output[-MAX_ERROR_OUTPUT_CHARS:]}\n```\n\n"
"Try running the command manually with sudo if needed."
)
@ -571,11 +560,21 @@ async def decompile_assembly(
# Use simplified request object (no complex pydantic validation needed)
from .models import DecompileRequest
# Validate language version with helpful error message
try:
lang_ver = LanguageVersion(language_version)
except ValueError:
valid_versions = [v.value for v in LanguageVersion]
return (
f"Invalid language version: '{language_version}'\n\n"
f"Valid options: {', '.join(valid_versions)}"
)
request = DecompileRequest(
assembly_path=assembly_path,
output_dir=output_dir,
type_name=type_name,
language_version=LanguageVersion(language_version),
language_version=lang_ver,
create_project=create_project,
show_il_code=show_il_code or show_il_sequence_points,
remove_dead_code=remove_dead_code,
@ -825,7 +824,7 @@ async def search_types(
# Default to search all entity types
if entity_types is None:
entity_types = ["class", "interface", "struct", "delegate", "enum"]
entity_types = ALL_ENTITY_TYPES.copy()
# Convert to EntityType enums
entity_type_enums = []
@ -845,14 +844,9 @@ async def search_types(
return response.error_message or "Failed to list types"
# Compile regex if needed
if use_regex:
try:
flags = 0 if case_sensitive else re.IGNORECASE
search_pattern = re.compile(pattern, flags)
except re.error as e:
return f"Invalid regex pattern: {e}"
else:
search_pattern = None
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Filter types by pattern and namespace
matching_types = []
@ -915,7 +909,7 @@ async def search_strings(
pattern: str,
case_sensitive: bool = False,
use_regex: bool = False,
max_results: int = 100,
max_results: int = DEFAULT_MAX_SEARCH_RESULTS,
ctx: Context | None = None,
) -> str:
"""Search for string literals in assembly code.
@ -959,12 +953,9 @@ async def search_strings(
source_code = response.source_code or ""
# Compile regex if needed
if use_regex:
try:
flags = 0 if case_sensitive else re.IGNORECASE
search_pattern = re.compile(pattern, flags)
except re.error as e:
return f"Invalid regex pattern: {e}"
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Search for string literals containing the pattern
# In IL, strings appear as: ldstr "string value"
@ -1003,7 +994,7 @@ async def search_strings(
matches.append(
{
"line_num": i + 1,
"line": line.strip()[:200], # Truncate long lines
"line": line.strip()[:MAX_LINE_LENGTH], # Truncate long lines
"type": current_type or "Unknown",
"method": current_method,
}
@ -1029,12 +1020,12 @@ async def search_strings(
for type_name, type_matches in sorted(by_type.items()):
content += f"## {type_name}\n\n"
for match in type_matches[:20]: # Limit per type
for match in type_matches[:MAX_MATCHES_PER_TYPE]:
method_info = f" in `{match['method']}()`" if match["method"] else ""
content += f"- Line {match['line_num']}{method_info}:\n"
content += f" ```\n {match['line']}\n ```\n"
if len(type_matches) > 20:
content += f" ... and {len(type_matches) - 20} more matches in this type\n"
if len(type_matches) > MAX_MATCHES_PER_TYPE:
content += f" ... and {len(type_matches) - MAX_MATCHES_PER_TYPE} more matches in this type\n"
content += "\n"
content += "\n**TIP**: Use `decompile_assembly` with `type_name` to see the full context of interesting matches."
@ -1098,14 +1089,9 @@ async def search_methods(
return "No methods found in assembly (or assembly has no metadata)"
# Compile regex if needed
if use_regex:
try:
flags = 0 if case_sensitive else re.IGNORECASE
search_pattern = re.compile(pattern, flags)
except re.error as e:
return f"Invalid regex pattern: {e}"
else:
search_pattern = None
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Filter by pattern
matching_methods = []
@ -1216,14 +1202,9 @@ async def search_fields(
return "No fields found in assembly"
# Compile regex if needed
if use_regex:
try:
flags = 0 if case_sensitive else re.IGNORECASE
search_pattern = re.compile(pattern, flags)
except re.error as e:
return f"Invalid regex pattern: {e}"
else:
search_pattern = None
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Filter by pattern
matching_fields = []
@ -1323,14 +1304,9 @@ async def search_properties(
return "No properties found in assembly"
# Compile regex if needed
if use_regex:
try:
flags = 0 if case_sensitive else re.IGNORECASE
search_pattern = re.compile(pattern, flags)
except re.error as e:
return f"Invalid regex pattern: {e}"
else:
search_pattern = None
search_pattern, regex_error = _compile_search_pattern(pattern, case_sensitive, use_regex)
if regex_error:
return regex_error
# Filter by pattern
matching_props = []

50
src/mcilspy/utils.py Normal file
View File

@ -0,0 +1,50 @@
"""Shared utility functions for mcilspy.
This module contains common utilities used across the codebase to avoid
code duplication and ensure consistent behavior.
"""
import logging
import os
import shutil
logger = logging.getLogger(__name__)
def find_ilspycmd_path() -> str | None:
"""Find ilspycmd executable in PATH or common install locations.
This is the single source of truth for locating the ilspycmd binary.
It checks:
1. Standard PATH (via shutil.which)
2. ~/.dotnet/tools (default location for 'dotnet tool install --global')
3. Platform-specific locations (Windows %USERPROFILE%)
Returns:
Path to ilspycmd executable if found, None otherwise
"""
# Check PATH first (handles both ilspycmd and ilspycmd.exe)
for cmd_name in ["ilspycmd", "ilspycmd.exe"]:
path = shutil.which(cmd_name)
if path:
return path
# Check common dotnet tools locations (not always in PATH for MCP servers)
home = os.path.expanduser("~")
candidates = [
os.path.join(home, ".dotnet", "tools", "ilspycmd"),
os.path.join(home, ".dotnet", "tools", "ilspycmd.exe"),
]
# Windows-specific: also check USERPROFILE if different from ~
if os.name == "nt":
userprofile = os.environ.get("USERPROFILE", "")
if userprofile and userprofile != home:
candidates.append(os.path.join(userprofile, ".dotnet", "tools", "ilspycmd.exe"))
for candidate in candidates:
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
logger.info(f"Found ilspycmd at {candidate} (not in PATH)")
return candidate
return None