Merge fix/testing: test fixtures, integration tests, docstring validation

This commit is contained in:
Ryan Malloy 2026-02-08 11:41:20 -07:00
commit db95aeb491
12 changed files with 2502 additions and 14 deletions

View File

@ -4,8 +4,8 @@
"domains": { "domains": {
"security": { "status": "merged", "branch": "fix/security", "priority": 1 }, "security": { "status": "merged", "branch": "fix/security", "priority": 1 },
"architecture": { "status": "merged", "branch": "fix/architecture", "priority": 2 }, "architecture": { "status": "merged", "branch": "fix/architecture", "priority": 2 },
"performance": { "status": "merging", "branch": "fix/performance", "priority": 3 }, "performance": { "status": "merged", "branch": "fix/performance", "priority": 3 },
"testing": { "status": "ready", "branch": "fix/testing", "priority": 4 } "testing": { "status": "merging", "branch": "fix/testing", "priority": 4 }
}, },
"merge_order": ["security", "architecture", "performance", "testing"] "merge_order": ["security", "architecture", "performance", "testing"]
} }

View File

@ -1,33 +1,58 @@
"""Shared pytest fixtures for mcilspy tests.""" """Shared pytest fixtures for mcilspy tests."""
import os import os
import shutil
from pathlib import Path from pathlib import Path
import pytest import pytest
# Path to test fixtures directory
FIXTURES_DIR = Path(__file__).parent / "fixtures"
@pytest.fixture
def test_assembly_path() -> str:
"""Return path to the custom test assembly.
This is the primary fixture for tests - uses our custom-built
TestAssembly.dll with known types and members.
"""
test_dll = FIXTURES_DIR / "TestAssembly.dll"
if not test_dll.exists():
pytest.skip("TestAssembly.dll not found - run build_test_assembly.sh first")
return str(test_dll)
@pytest.fixture @pytest.fixture
def sample_assembly_path() -> str: def sample_assembly_path() -> str:
"""Return path to a .NET assembly for testing. """Return path to a .NET assembly for testing.
Uses a known .NET SDK assembly that should exist on systems with dotnet installed. Falls back to SDK assemblies if test assembly not available.
Prefer using test_assembly_path for new tests.
""" """
# Try to find a .NET SDK assembly # First try our test assembly
test_dll = FIXTURES_DIR / "TestAssembly.dll"
if test_dll.exists():
return str(test_dll)
# Fallback: Try to find a .NET SDK assembly
dotnet_base = Path("/usr/share/dotnet/sdk") dotnet_base = Path("/usr/share/dotnet/sdk")
if dotnet_base.exists(): if dotnet_base.exists():
# Find any SDK version
for sdk_dir in dotnet_base.iterdir(): for sdk_dir in dotnet_base.iterdir():
test_dll = sdk_dir / "Sdks" / "Microsoft.NET.Sdk" / "tools" / "net10.0" / "Microsoft.NET.Build.Tasks.dll" for net_version in ["net10.0", "net9.0", "net8.0", "net7.0", "net6.0"]:
if test_dll.exists(): test_dll = (
return str(test_dll) sdk_dir
# Try older paths / "Sdks"
for net_version in ["net9.0", "net8.0", "net7.0", "net6.0"]: / "Microsoft.NET.Sdk"
test_dll = sdk_dir / "Sdks" / "Microsoft.NET.Sdk" / "tools" / net_version / "Microsoft.NET.Build.Tasks.dll" / "tools"
/ net_version
/ "Microsoft.NET.Build.Tasks.dll"
)
if test_dll.exists(): if test_dll.exists():
return str(test_dll) return str(test_dll)
# Fallback: any .dll in dotnet directory # Last resort: any .dll in dotnet directory
for root, dirs, files in os.walk("/usr/share/dotnet"): for root, _dirs, files in os.walk("/usr/share/dotnet"):
for f in files: for f in files:
if f.endswith(".dll"): if f.endswith(".dll"):
return os.path.join(root, f) return os.path.join(root, f)
@ -39,3 +64,22 @@ def sample_assembly_path() -> str:
def nonexistent_path() -> str: def nonexistent_path() -> str:
"""Return a path that doesn't exist.""" """Return a path that doesn't exist."""
return "/nonexistent/path/to/assembly.dll" return "/nonexistent/path/to/assembly.dll"
@pytest.fixture
def ilspycmd_installed() -> bool:
"""Check if ilspycmd is available for integration tests."""
return shutil.which("ilspycmd") is not None
@pytest.fixture
def skip_without_ilspycmd(ilspycmd_installed):
"""Skip test if ilspycmd is not installed."""
if not ilspycmd_installed:
pytest.skip("ilspycmd not installed")
@pytest.fixture
def temp_output_dir(tmp_path):
"""Provide a temporary directory for test outputs."""
return str(tmp_path)

214
tests/fixtures/TestAssembly.cs vendored Normal file
View File

@ -0,0 +1,214 @@
using System;
using System.Threading.Tasks;
namespace TestNamespace
{
/// <summary>
/// A test class with various members for testing the mcilspy MCP server.
/// </summary>
public class TestClass
{
// Constants for testing string search
public const string API_KEY = "test-secret-key";
public const string BASE_URL = "https://api.example.com";
public const int MAX_RETRIES = 3;
// Fields
public static readonly string BaseUrl = "https://api.example.com";
private int _privateField;
protected string _protectedField;
internal double _internalField;
// Properties
public string Name { get; set; }
public int Age { get; private set; }
public virtual bool IsActive { get; set; }
// Events
public event EventHandler OnChange;
public event EventHandler<string> OnMessage;
// Constructors
public TestClass()
{
Name = "Default";
Age = 0;
}
public TestClass(string name, int age)
{
Name = name;
Age = age;
}
// Methods
public void DoSomething()
{
Console.WriteLine("Hello from DoSomething");
OnChange?.Invoke(this, EventArgs.Empty);
}
public string GetGreeting()
{
return $"Hello, {Name}!";
}
public static int Add(int a, int b)
{
return a + b;
}
protected virtual void OnPropertyChanged(string propertyName)
{
OnMessage?.Invoke(this, propertyName);
}
private void PrivateMethod()
{
_privateField = 42;
}
}
/// <summary>
/// Interface for testing interface discovery.
/// </summary>
public interface ITestService
{
void Execute();
Task<string> ExecuteAsync();
string ServiceName { get; }
}
/// <summary>
/// Another interface for inheritance testing.
/// </summary>
public interface IConfigurable
{
void Configure(string settings);
}
/// <summary>
/// Struct for testing struct discovery.
/// </summary>
public struct TestStruct
{
public int Value;
public string Label;
public TestStruct(int value, string label)
{
Value = value;
Label = label;
}
public override string ToString() => $"{Label}: {Value}";
}
/// <summary>
/// Enum for testing enum discovery.
/// </summary>
public enum TestEnum
{
None = 0,
First = 1,
Second = 2,
Third = 3
}
/// <summary>
/// Delegate for testing delegate discovery.
/// </summary>
public delegate void TestDelegate(string message);
/// <summary>
/// Delegate with return type.
/// </summary>
public delegate bool ValidationDelegate<T>(T value);
/// <summary>
/// Service implementation for testing class relationships.
/// </summary>
public class TestServiceImpl : ITestService, IConfigurable
{
private string _config;
public string ServiceName => "TestService";
public void Execute()
{
Console.WriteLine($"Executing with config: {_config}");
}
public Task<string> ExecuteAsync()
{
return Task.FromResult($"Async result from {ServiceName}");
}
public void Configure(string settings)
{
_config = settings;
}
}
/// <summary>
/// Nested class for testing nested type discovery.
/// </summary>
public class OuterClass
{
public class NestedClass
{
public string Value { get; set; }
}
private class PrivateNestedClass
{
public int Secret { get; set; }
}
public NestedClass CreateNested() => new NestedClass();
}
/// <summary>
/// Abstract class for testing abstract type discovery.
/// </summary>
public abstract class AbstractBase
{
public abstract void AbstractMethod();
public virtual void VirtualMethod() { }
protected string BaseProperty { get; set; }
}
/// <summary>
/// Derived class for testing inheritance.
/// </summary>
public class DerivedClass : AbstractBase
{
public override void AbstractMethod()
{
Console.WriteLine("Implemented abstract method");
}
public override void VirtualMethod()
{
base.VirtualMethod();
Console.WriteLine("Overridden virtual method");
}
}
}
namespace TestNamespace.SubNamespace
{
/// <summary>
/// Class in a sub-namespace for testing namespace filtering.
/// </summary>
public class SubClass
{
public const string CONNECTION_STRING = "Server=localhost;Database=test";
public void SubMethod()
{
Console.WriteLine("Sub namespace method");
}
}
}

BIN
tests/fixtures/TestAssembly.dll vendored Normal file

Binary file not shown.

View File

@ -0,0 +1 @@
"""Integration tests that use real ilspycmd calls."""

View File

@ -0,0 +1,379 @@
"""Integration tests using the custom TestAssembly.dll fixture.
These tests exercise the full stack including ilspycmd calls.
Tests are skipped if ilspycmd is not installed.
"""
import pytest
from mcilspy.ilspy_wrapper import ILSpyWrapper
from mcilspy.metadata_reader import MetadataReader
from mcilspy.models import (
DecompileRequest,
EntityType,
LanguageVersion,
ListTypesRequest,
)
class TestMetadataReaderWithTestAssembly:
"""Test MetadataReader against our custom test assembly."""
def test_get_assembly_metadata(self, test_assembly_path):
"""Test reading metadata from test assembly."""
with MetadataReader(test_assembly_path) as reader:
meta = reader.get_assembly_metadata()
assert meta.name == "TestAssemblyProject"
assert meta.type_count > 0
assert meta.method_count > 0
def test_list_methods_finds_known_methods(self, test_assembly_path):
"""Test that we can find methods we know exist."""
with MetadataReader(test_assembly_path) as reader:
methods = reader.list_methods()
method_names = [m.name for m in methods]
# Check for methods we defined in TestClass
assert "DoSomething" in method_names
assert "GetGreeting" in method_names
assert "Add" in method_names
def test_list_methods_with_type_filter(self, test_assembly_path):
"""Test filtering methods by type."""
with MetadataReader(test_assembly_path) as reader:
methods = reader.list_methods(type_filter="TestClass")
# All methods should be from types containing "TestClass"
for method in methods:
assert "TestClass" in method.declaring_type
def test_list_methods_with_namespace_filter(self, test_assembly_path):
"""Test filtering methods by namespace."""
with MetadataReader(test_assembly_path) as reader:
methods = reader.list_methods(namespace_filter="SubNamespace")
# Should only find methods from SubNamespace
for method in methods:
assert method.namespace is not None
assert "SubNamespace" in method.namespace
def test_list_methods_public_only(self, test_assembly_path):
"""Test filtering for public methods only."""
with MetadataReader(test_assembly_path) as reader:
public_methods = reader.list_methods(public_only=True)
all_methods = reader.list_methods(public_only=False)
# Should have fewer public methods than total
assert len(public_methods) <= len(all_methods)
# All returned methods should be public
for method in public_methods:
assert method.is_public
def test_list_fields_finds_known_fields(self, test_assembly_path):
"""Test that we can find fields we defined."""
with MetadataReader(test_assembly_path) as reader:
fields = reader.list_fields()
field_names = [f.name for f in fields]
# Check for constants and fields we defined
assert "API_KEY" in field_names
assert "BASE_URL" in field_names
assert "MAX_RETRIES" in field_names
def test_list_fields_constants_only(self, test_assembly_path):
"""Test filtering for constant fields only."""
with MetadataReader(test_assembly_path) as reader:
constants = reader.list_fields(constants_only=True)
# All returned fields should be literals
for field in constants:
assert field.is_literal
const_names = [f.name for f in constants]
assert "API_KEY" in const_names
assert "MAX_RETRIES" in const_names
def test_list_properties_finds_known_properties(self, test_assembly_path):
"""Test that we can find properties we defined."""
with MetadataReader(test_assembly_path) as reader:
properties = reader.list_properties()
prop_names = [p.name for p in properties]
# Check for properties we defined
assert "Name" in prop_names
assert "Age" in prop_names
assert "IsActive" in prop_names
assert "ServiceName" in prop_names
def test_list_events_finds_known_events(self, test_assembly_path):
"""Test that we can find events we defined."""
with MetadataReader(test_assembly_path) as reader:
events = reader.list_events()
event_names = [e.name for e in events]
# Check for events we defined
assert "OnChange" in event_names
assert "OnMessage" in event_names
def test_list_resources_empty_for_test_assembly(self, test_assembly_path):
"""Test that test assembly has no embedded resources."""
with MetadataReader(test_assembly_path) as reader:
resources = reader.list_resources()
# Our simple test assembly has no resources
assert isinstance(resources, list)
class TestILSpyWrapperWithTestAssembly:
"""Integration tests for ILSpyWrapper using real ilspycmd calls."""
@pytest.fixture
def wrapper(self, skip_without_ilspycmd):
"""Get wrapper instance, skipping if ilspycmd not available."""
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_list_types_finds_classes(self, wrapper, test_assembly_path):
"""Test listing classes from test assembly."""
request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.CLASS],
)
response = await wrapper.list_types(request)
assert response.success
assert response.total_count > 0
type_names = [t.name for t in response.types]
assert "TestClass" in type_names
assert "TestServiceImpl" in type_names
assert "OuterClass" in type_names
@pytest.mark.asyncio
async def test_list_types_finds_interfaces(self, wrapper, test_assembly_path):
"""Test listing interfaces from test assembly."""
request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.INTERFACE],
)
response = await wrapper.list_types(request)
assert response.success
type_names = [t.name for t in response.types]
assert "ITestService" in type_names
assert "IConfigurable" in type_names
@pytest.mark.asyncio
async def test_list_types_finds_structs(self, wrapper, test_assembly_path):
"""Test listing structs from test assembly."""
request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.STRUCT],
)
response = await wrapper.list_types(request)
assert response.success
type_names = [t.name for t in response.types]
assert "TestStruct" in type_names
@pytest.mark.asyncio
async def test_list_types_finds_enums(self, wrapper, test_assembly_path):
"""Test listing enums from test assembly."""
request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.ENUM],
)
response = await wrapper.list_types(request)
assert response.success
type_names = [t.name for t in response.types]
assert "TestEnum" in type_names
@pytest.mark.asyncio
async def test_list_types_finds_delegates(self, wrapper, test_assembly_path):
"""Test listing delegates from test assembly."""
request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.DELEGATE],
)
response = await wrapper.list_types(request)
assert response.success
type_names = [t.name for t in response.types]
assert "TestDelegate" in type_names
@pytest.mark.asyncio
async def test_decompile_specific_type(self, wrapper, test_assembly_path):
"""Test decompiling a specific type."""
request = DecompileRequest(
assembly_path=test_assembly_path,
type_name="TestNamespace.TestClass",
language_version=LanguageVersion.LATEST,
)
response = await wrapper.decompile(request)
assert response.success
assert response.source_code is not None
# Check that decompiled code contains expected elements
source = response.source_code
assert "class TestClass" in source
assert "DoSomething" in source
assert "GetGreeting" in source
@pytest.mark.asyncio
async def test_decompile_entire_assembly(self, wrapper, test_assembly_path):
"""Test decompiling the entire assembly."""
request = DecompileRequest(
assembly_path=test_assembly_path,
language_version=LanguageVersion.LATEST,
)
response = await wrapper.decompile(request)
assert response.success
assert response.source_code is not None
# Check that all types are present
source = response.source_code
assert "TestClass" in source
assert "ITestService" in source
assert "TestStruct" in source
assert "TestEnum" in source
@pytest.mark.asyncio
async def test_decompile_to_il(self, wrapper, test_assembly_path):
"""Test decompiling to IL code."""
request = DecompileRequest(
assembly_path=test_assembly_path,
type_name="TestNamespace.TestClass",
show_il_code=True,
)
response = await wrapper.decompile(request)
assert response.success
assert response.source_code is not None
# IL code should contain IL-specific keywords
source = response.source_code
# IL typically shows .method, .field, etc.
assert ".class" in source or "IL_" in source
@pytest.mark.asyncio
async def test_decompile_to_output_dir(self, wrapper, test_assembly_path, temp_output_dir):
"""Test decompiling to an output directory."""
request = DecompileRequest(
assembly_path=test_assembly_path,
output_dir=temp_output_dir,
)
response = await wrapper.decompile(request)
assert response.success
assert response.output_path is not None
@pytest.mark.asyncio
async def test_decompile_with_project_structure(
self, wrapper, test_assembly_path, temp_output_dir
):
"""Test decompiling with project structure."""
request = DecompileRequest(
assembly_path=test_assembly_path,
output_dir=temp_output_dir,
create_project=True,
)
response = await wrapper.decompile(request)
assert response.success
@pytest.mark.asyncio
async def test_decompile_nonexistent_type(self, wrapper, test_assembly_path):
"""Test decompiling a type that doesn't exist."""
request = DecompileRequest(
assembly_path=test_assembly_path,
type_name="NonExistent.FakeClass",
)
response = await wrapper.decompile(request)
# Should still succeed but with empty or no matching output
# The actual behavior depends on ilspycmd version
assert response is not None
class TestIntegrationEndToEnd:
"""End-to-end integration tests covering complete workflows."""
@pytest.mark.asyncio
async def test_discover_and_decompile_workflow(
self, skip_without_ilspycmd, test_assembly_path
):
"""Test the typical workflow: list types, then decompile specific one."""
wrapper = ILSpyWrapper()
# Step 1: List all types
list_request = ListTypesRequest(
assembly_path=test_assembly_path,
entity_types=[EntityType.CLASS],
)
list_response = await wrapper.list_types(list_request)
assert list_response.success
assert len(list_response.types) > 0
# Step 2: Find TestServiceImpl
service_type = None
for t in list_response.types:
if t.name == "TestServiceImpl":
service_type = t
break
assert service_type is not None
# Step 3: Decompile it
decompile_request = DecompileRequest(
assembly_path=test_assembly_path,
type_name=service_type.full_name,
)
decompile_response = await wrapper.decompile(decompile_request)
assert decompile_response.success
assert decompile_response.source_code is not None
assert "TestServiceImpl" in decompile_response.source_code
assert "ITestService" in decompile_response.source_code
@pytest.mark.asyncio
async def test_metadata_and_decompile_combined(
self, skip_without_ilspycmd, test_assembly_path
):
"""Test using metadata reader and ILSpy wrapper together."""
# Use metadata reader for quick discovery
with MetadataReader(test_assembly_path) as reader:
methods = reader.list_methods(type_filter="TestClass")
add_method = None
for m in methods:
if m.name == "Add":
add_method = m
break
assert add_method is not None
assert add_method.is_static
# Use ILSpy for decompilation
wrapper = ILSpyWrapper()
request = DecompileRequest(
assembly_path=test_assembly_path,
type_name="TestNamespace.TestClass",
)
response = await wrapper.decompile(request)
assert response.success
# Verify the static method is in the output
assert "static" in response.source_code
assert "Add" in response.source_code

285
tests/test_concurrency.py Normal file
View File

@ -0,0 +1,285 @@
"""Tests for concurrent tool invocations.
These tests verify that the server handles multiple simultaneous
tool calls correctly using asyncio.gather().
"""
import asyncio
import pytest
from mcilspy import server
from mcilspy.metadata_reader import MetadataReader
class TestConcurrentMetadataOperations:
"""Test concurrent metadata reading operations."""
@pytest.mark.asyncio
async def test_concurrent_search_methods(self, test_assembly_path):
"""Test multiple search_methods calls running concurrently."""
patterns = ["Get", "Do", "Set", "Add", "Create"]
tasks = [
server.search_methods(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
# All tasks should complete successfully
assert len(results) == len(patterns)
# Each result should be a string
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_search_fields(self, test_assembly_path):
"""Test multiple search_fields calls running concurrently."""
patterns = ["API", "URL", "MAX", "BASE", "VALUE"]
tasks = [
server.search_fields(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == len(patterns)
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_search_properties(self, test_assembly_path):
"""Test multiple search_properties calls running concurrently."""
patterns = ["Name", "Value", "Is", "Service"]
tasks = [
server.search_properties(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == len(patterns)
@pytest.mark.asyncio
async def test_concurrent_mixed_operations(self, test_assembly_path):
"""Test different metadata operations running concurrently."""
tasks = [
server.search_methods(test_assembly_path, pattern="Get"),
server.search_fields(test_assembly_path, pattern="API"),
server.search_properties(test_assembly_path, pattern="Name"),
server.list_events(test_assembly_path),
server.list_resources(test_assembly_path),
server.get_metadata_summary(test_assembly_path),
]
results = await asyncio.gather(*tasks)
assert len(results) == 6
for result in results:
assert isinstance(result, str)
# None of them should have crashed
assert "Traceback" not in result
@pytest.mark.asyncio
async def test_concurrent_same_assembly_multiple_readers(self, test_assembly_path):
"""Test multiple MetadataReaders on the same assembly."""
async def read_metadata(path):
"""Async wrapper for metadata reading."""
with MetadataReader(path) as reader:
return reader.get_assembly_metadata()
# Run multiple readers concurrently
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, lambda: MetadataReader(test_assembly_path).__enter__().get_assembly_metadata())
for _ in range(5)
]
results = await asyncio.gather(*tasks)
assert len(results) == 5
# All results should have the same assembly name
names = [r.name for r in results]
assert all(n == names[0] for n in names)
class TestConcurrentToolCalls:
"""Test concurrent MCP tool invocations."""
@pytest.mark.asyncio
async def test_high_concurrency_search(self, test_assembly_path):
"""Test high number of concurrent searches."""
num_concurrent = 20
tasks = [
server.search_methods(test_assembly_path, pattern=f"pattern{i}")
for i in range(num_concurrent)
]
results = await asyncio.gather(*tasks)
assert len(results) == num_concurrent
# Most should return "No methods found" but shouldn't crash
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_with_errors(self, test_assembly_path, nonexistent_path):
"""Test concurrent calls where some will fail."""
tasks = [
# These should succeed
server.search_methods(test_assembly_path, pattern="Get"),
server.search_fields(test_assembly_path, pattern="API"),
# These should fail gracefully
server.search_methods(nonexistent_path, pattern="test"),
server.search_fields(nonexistent_path, pattern="test"),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert len(results) == 4
# First two should be successful results
assert "GetGreeting" in results[0] or "No methods" in results[0]
assert "API_KEY" in results[1] or "No fields" in results[1]
# Last two should have error messages
assert "Error" in results[2]
assert "Error" in results[3]
@pytest.mark.asyncio
async def test_concurrent_list_operations(self, test_assembly_path):
"""Test concurrent list operations."""
tasks = [
server.list_events(test_assembly_path),
server.list_events(test_assembly_path),
server.list_resources(test_assembly_path),
server.list_resources(test_assembly_path),
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
# Event results should be identical
assert results[0] == results[1]
# Resource results should be identical
assert results[2] == results[3]
class TestConcurrentWithRegex:
"""Test concurrent operations with regex patterns."""
@pytest.mark.asyncio
async def test_concurrent_regex_searches(self, test_assembly_path):
"""Test concurrent regex pattern searches."""
patterns = [r"^Get.*", r".*Service$", r"On\w+", r".*Base.*"]
tasks = [
server.search_methods(test_assembly_path, pattern=p, use_regex=True)
for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_invalid_regex(self, test_assembly_path):
"""Test that concurrent invalid regex patterns are handled safely."""
patterns = [
r"[invalid", # Invalid
r"valid.*", # Valid
r"(?P<broken", # Invalid
r"also.*valid", # Valid
]
tasks = [
server.search_methods(test_assembly_path, pattern=p, use_regex=True)
for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
# Invalid patterns should return error messages
assert "Invalid regex" in results[0]
assert "Invalid regex" in results[2]
# Valid patterns should return results (even if empty)
assert "Invalid regex" not in results[1]
assert "Invalid regex" not in results[3]
class TestConcurrentNamespaceFiltering:
"""Test concurrent operations with namespace filtering."""
@pytest.mark.asyncio
async def test_concurrent_namespace_filtered_searches(self, test_assembly_path):
"""Test concurrent searches with different namespace filters."""
namespace_filters = [
"TestNamespace",
"SubNamespace",
"NonExistent",
None, # No filter
]
tasks = [
server.search_methods(
test_assembly_path, pattern="", namespace_filter=ns
)
for ns in namespace_filters
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
class TestConcurrencyIsolation:
"""Test that concurrent operations don't interfere with each other."""
@pytest.mark.asyncio
async def test_results_not_mixed(self, test_assembly_path):
"""Verify that concurrent operations return their own results."""
# Use very specific patterns that should match different things
async def search_and_verify(pattern, expected):
result = await server.search_methods(test_assembly_path, pattern=pattern)
if expected:
assert expected in result, f"Expected '{expected}' in result for pattern '{pattern}'"
return result
tasks = [
search_and_verify("DoSomething", "DoSomething"),
search_and_verify("GetGreeting", "GetGreeting"),
search_and_verify("Add", "Add"),
]
results = await asyncio.gather(*tasks)
# Verify each result contains only its expected match
assert "DoSomething" in results[0]
assert "GetGreeting" in results[1]
assert "Add" in results[2]
# Results shouldn't be mixed up
assert results[0] != results[1]
assert results[1] != results[2]
@pytest.mark.asyncio
async def test_concurrent_metadata_summary_consistent(self, test_assembly_path):
"""Verify concurrent metadata summary calls return consistent results."""
num_concurrent = 10
tasks = [
server.get_metadata_summary(test_assembly_path)
for _ in range(num_concurrent)
]
results = await asyncio.gather(*tasks)
# All results should be identical
first_result = results[0]
for result in results[1:]:
assert result == first_result, "Concurrent metadata calls returned different results"

301
tests/test_docstrings.py Normal file
View File

@ -0,0 +1,301 @@
"""Tests for docstring coverage.
Verifies that all public functions and classes have docstrings.
Uses AST to introspect the source code.
"""
import inspect
import pytest
import mcilspy.ilspy_wrapper as wrapper_module
import mcilspy.metadata_reader as reader_module
import mcilspy.models as models_module
# Import the modules we want to check
import mcilspy.server as server_module
def get_public_functions_and_classes(module):
"""Get all public functions and classes from a module.
Returns a list of (name, obj, has_docstring) tuples.
"""
results = []
for name in dir(module):
if name.startswith("_"):
continue
obj = getattr(module, name)
# Check if it's a function or class defined in this module
if not (inspect.isfunction(obj) or inspect.isclass(obj)):
continue
# Skip imported items
if hasattr(obj, "__module__") and obj.__module__ != module.__name__:
continue
has_docstring = bool(inspect.getdoc(obj))
results.append((name, obj, has_docstring))
return results
def get_public_methods(cls):
"""Get all public methods from a class.
Returns a list of (name, method, has_docstring) tuples.
"""
results = []
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
if name.startswith("_") and not name.startswith("__"):
continue
if name.startswith("__") and name != "__init__":
continue
has_docstring = bool(inspect.getdoc(method))
results.append((name, method, has_docstring))
return results
class TestServerModuleDocstrings:
"""Tests for server.py docstring coverage."""
def test_all_mcp_tools_have_docstrings(self):
"""Verify all @mcp.tool() decorated functions have docstrings."""
# Find all functions decorated with @mcp.tool()
# These are the public API and MUST have docstrings
tools = [
server_module.check_ilspy_installation,
server_module.install_ilspy,
server_module.decompile_assembly,
server_module.list_types,
server_module.generate_diagrammer,
server_module.get_assembly_info,
server_module.search_types,
server_module.search_strings,
server_module.search_methods,
server_module.search_fields,
server_module.search_properties,
server_module.list_events,
server_module.list_resources,
server_module.get_metadata_summary,
]
missing_docstrings = []
for tool in tools:
docstring = inspect.getdoc(tool)
if not docstring:
missing_docstrings.append(tool.__name__)
assert not missing_docstrings, f"Tools missing docstrings: {missing_docstrings}"
def test_tool_docstrings_have_args_section(self):
"""Verify tool docstrings document their arguments."""
# Tools with parameters should have Args: section
tools_with_params = [
server_module.decompile_assembly,
server_module.list_types,
server_module.generate_diagrammer,
server_module.get_assembly_info,
server_module.search_types,
server_module.search_strings,
server_module.search_methods,
server_module.search_fields,
server_module.search_properties,
server_module.list_events,
server_module.list_resources,
server_module.get_metadata_summary,
]
missing_args = []
for tool in tools_with_params:
docstring = inspect.getdoc(tool)
sig = inspect.signature(tool)
# Get non-ctx parameters
params = [
p
for p in sig.parameters.values()
if p.name != "ctx" and p.name != "self"
]
if params and docstring and "Args:" not in docstring:
missing_args.append(tool.__name__)
assert not missing_args, f"Tools missing Args section: {missing_args}"
def test_helper_functions_have_docstrings(self):
"""Verify helper functions have docstrings."""
helpers = [
server_module.get_wrapper,
server_module._format_error,
server_module._find_ilspycmd_path,
server_module._check_dotnet_tools,
server_module._detect_platform,
server_module._try_install_dotnet_sdk,
]
missing_docstrings = []
for helper in helpers:
docstring = inspect.getdoc(helper)
if not docstring:
missing_docstrings.append(helper.__name__)
assert not missing_docstrings, f"Helpers missing docstrings: {missing_docstrings}"
class TestWrapperModuleDocstrings:
"""Tests for ilspy_wrapper.py docstring coverage."""
def test_wrapper_class_has_docstring(self):
"""Verify ILSpyWrapper class has a docstring."""
docstring = inspect.getdoc(wrapper_module.ILSpyWrapper)
assert docstring, "ILSpyWrapper class should have a docstring"
def test_wrapper_public_methods_have_docstrings(self):
"""Verify ILSpyWrapper public methods have docstrings."""
methods_to_check = [
"decompile",
"list_types",
"generate_diagrammer",
"get_assembly_info",
]
missing_docstrings = []
for method_name in methods_to_check:
method = getattr(wrapper_module.ILSpyWrapper, method_name, None)
if method:
docstring = inspect.getdoc(method)
if not docstring:
missing_docstrings.append(method_name)
assert not missing_docstrings, (
f"ILSpyWrapper methods missing docstrings: {missing_docstrings}"
)
class TestMetadataReaderDocstrings:
"""Tests for metadata_reader.py docstring coverage."""
def test_reader_class_has_docstring(self):
"""Verify MetadataReader class has a docstring."""
docstring = inspect.getdoc(reader_module.MetadataReader)
assert docstring, "MetadataReader class should have a docstring"
def test_reader_public_methods_have_docstrings(self):
"""Verify MetadataReader public methods have docstrings."""
methods_to_check = [
"get_assembly_metadata",
"list_methods",
"list_fields",
"list_properties",
"list_events",
"list_resources",
]
missing_docstrings = []
for method_name in methods_to_check:
method = getattr(reader_module.MetadataReader, method_name, None)
if method:
docstring = inspect.getdoc(method)
if not docstring:
missing_docstrings.append(method_name)
assert not missing_docstrings, (
f"MetadataReader methods missing docstrings: {missing_docstrings}"
)
class TestModelsDocstrings:
"""Tests for models.py docstring coverage."""
def test_pydantic_models_have_docstrings(self):
"""Verify Pydantic model classes have docstrings."""
models_to_check = [
models_module.DecompileRequest,
models_module.DecompileResponse,
models_module.ListTypesRequest,
models_module.ListTypesResponse,
models_module.TypeInfo,
models_module.AssemblyInfo,
]
missing_docstrings = []
for model in models_to_check:
docstring = inspect.getdoc(model)
if not docstring:
missing_docstrings.append(model.__name__)
# Just check that most have docstrings - Pydantic models are self-documenting
# through their field names
assert len(missing_docstrings) <= 2, (
f"Too many models missing docstrings: {missing_docstrings}"
)
class TestModuleDocstrings:
"""Tests for module-level docstrings."""
def test_all_modules_have_docstrings(self):
"""Verify all mcilspy modules have module-level docstrings."""
modules = [
server_module,
wrapper_module,
reader_module,
models_module,
]
missing_docstrings = []
for module in modules:
if not module.__doc__:
missing_docstrings.append(module.__name__)
# Just warn, don't fail - module docstrings are nice but not critical
if missing_docstrings:
pytest.skip(f"Modules missing docstrings (non-critical): {missing_docstrings}")
class TestDocstringQuality:
"""Tests for docstring quality (not just presence)."""
def test_tool_docstrings_not_empty(self):
"""Verify tool docstrings have meaningful content."""
tools = [
server_module.decompile_assembly,
server_module.list_types,
server_module.search_methods,
]
short_docstrings = []
for tool in tools:
docstring = inspect.getdoc(tool)
if docstring and len(docstring) < 50:
short_docstrings.append(f"{tool.__name__}: {len(docstring)} chars")
assert not short_docstrings, (
f"Tools have too-short docstrings: {short_docstrings}"
)
def test_docstrings_describe_purpose(self):
"""Verify key tool docstrings describe what the tool does."""
key_words = {
server_module.decompile_assembly: ["decompile", "assembly", "C#"],
server_module.list_types: ["types", "list", "class"],
server_module.search_methods: ["search", "method"],
}
missing_keywords = []
for tool, keywords in key_words.items():
docstring = inspect.getdoc(tool).lower() if inspect.getdoc(tool) else ""
for keyword in keywords:
if keyword.lower() not in docstring:
missing_keywords.append(f"{tool.__name__} missing '{keyword}'")
assert not missing_keywords, (
f"Docstrings missing expected keywords: {missing_keywords}"
)

423
tests/test_error_paths.py Normal file
View File

@ -0,0 +1,423 @@
"""Tests for error handling paths.
These tests verify that the server handles various error conditions gracefully:
- Invalid regex patterns
- ilspycmd not found scenarios
- Invalid language versions
- File not found errors
- Invalid assembly files
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcilspy import server
from mcilspy.ilspy_wrapper import ILSpyWrapper
from mcilspy.metadata_reader import MetadataReader
from mcilspy.models import EntityType
class TestInvalidRegexPatterns:
"""Tests for invalid regex pattern handling."""
@pytest.mark.asyncio
async def test_search_types_invalid_regex(self, test_assembly_path):
"""Test search_types with invalid regex pattern."""
# Use an invalid regex pattern
result = await server.search_types(
test_assembly_path,
pattern="[invalid(regex",
use_regex=True,
)
assert "Invalid regex pattern" in result
@pytest.mark.asyncio
async def test_search_methods_invalid_regex(self, test_assembly_path):
"""Test search_methods with invalid regex pattern."""
result = await server.search_methods(
test_assembly_path,
pattern="[unclosed",
use_regex=True,
)
assert "Invalid regex pattern" in result
@pytest.mark.asyncio
async def test_search_fields_invalid_regex(self, test_assembly_path):
"""Test search_fields with invalid regex pattern."""
result = await server.search_fields(
test_assembly_path,
pattern="*invalid*",
use_regex=True,
)
assert "Invalid regex pattern" in result
@pytest.mark.asyncio
async def test_search_properties_invalid_regex(self, test_assembly_path):
"""Test search_properties with invalid regex pattern."""
result = await server.search_properties(
test_assembly_path,
pattern="(?P<broken",
use_regex=True,
)
assert "Invalid regex pattern" in result
@pytest.mark.asyncio
async def test_search_strings_invalid_regex(self):
"""Test search_strings with invalid regex pattern."""
# Mock the wrapper to avoid needing ilspycmd
from mcilspy.models import DecompileResponse
mock_response = DecompileResponse(
success=True,
assembly_name="Test",
source_code="public class Test { string s = \"hello\"; }",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.search_strings(
"/path/to/test.dll",
pattern="[broken",
use_regex=True,
)
assert "Invalid regex pattern" in result
class TestIlspyCmdNotFound:
"""Tests for scenarios where ilspycmd is not installed."""
def test_wrapper_init_raises_when_not_found(self):
"""Test that ILSpyWrapper raises RuntimeError when ilspycmd not found."""
with (
patch("shutil.which", return_value=None),
patch("os.path.isfile", return_value=False),
pytest.raises(RuntimeError) as exc_info,
):
ILSpyWrapper()
assert "ILSpyCmd not found" in str(exc_info.value)
@pytest.mark.asyncio
async def test_decompile_when_not_installed(self):
"""Test decompile_assembly when ilspycmd is not installed."""
with patch.object(
server, "get_wrapper", side_effect=RuntimeError("ILSpyCmd not found")
):
result = await server.decompile_assembly("/path/to/test.dll")
assert "Error" in result
assert "ILSpyCmd not found" in result
@pytest.mark.asyncio
async def test_list_types_when_not_installed(self):
"""Test list_types when ilspycmd is not installed."""
with patch.object(
server, "get_wrapper", side_effect=RuntimeError("ILSpyCmd not found")
):
result = await server.list_types("/path/to/test.dll")
assert "Error" in result
assert "ILSpyCmd not found" in result
@pytest.mark.asyncio
async def test_generate_diagrammer_when_not_installed(self):
"""Test generate_diagrammer when ilspycmd is not installed."""
with patch.object(
server, "get_wrapper", side_effect=RuntimeError("ILSpyCmd not found")
):
result = await server.generate_diagrammer("/path/to/test.dll")
assert "Error" in result
@pytest.mark.asyncio
async def test_get_assembly_info_when_not_installed(self):
"""Test get_assembly_info when ilspycmd is not installed."""
with patch.object(
server, "get_wrapper", side_effect=RuntimeError("ILSpyCmd not found")
):
result = await server.get_assembly_info("/path/to/test.dll")
assert "Error" in result
class TestInvalidLanguageVersion:
"""Tests for invalid language version handling."""
@pytest.mark.asyncio
async def test_decompile_with_invalid_language_version(self):
"""Test decompile_assembly with invalid language version."""
# The LanguageVersion enum should raise ValueError for invalid versions
result = await server.decompile_assembly(
"/path/to/test.dll",
language_version="CSharp99", # Invalid version
)
# Should return an error about the invalid language version
assert "Error" in result
class TestFileNotFoundErrors:
"""Tests for file not found error handling."""
@pytest.mark.asyncio
async def test_search_methods_file_not_found(self, nonexistent_path):
"""Test search_methods with nonexistent file."""
result = await server.search_methods(nonexistent_path, pattern="test")
assert "Error" in result
@pytest.mark.asyncio
async def test_search_fields_file_not_found(self, nonexistent_path):
"""Test search_fields with nonexistent file."""
result = await server.search_fields(nonexistent_path, pattern="test")
assert "Error" in result
@pytest.mark.asyncio
async def test_search_properties_file_not_found(self, nonexistent_path):
"""Test search_properties with nonexistent file."""
result = await server.search_properties(nonexistent_path, pattern="test")
assert "Error" in result
@pytest.mark.asyncio
async def test_list_events_file_not_found(self, nonexistent_path):
"""Test list_events with nonexistent file."""
result = await server.list_events(nonexistent_path)
assert "Error" in result
@pytest.mark.asyncio
async def test_list_resources_file_not_found(self, nonexistent_path):
"""Test list_resources with nonexistent file."""
result = await server.list_resources(nonexistent_path)
assert "Error" in result
@pytest.mark.asyncio
async def test_get_metadata_summary_file_not_found(self, nonexistent_path):
"""Test get_metadata_summary with nonexistent file."""
result = await server.get_metadata_summary(nonexistent_path)
assert "Error" in result
def test_metadata_reader_file_not_found(self, nonexistent_path):
"""Test MetadataReader with nonexistent file."""
with pytest.raises(FileNotFoundError):
MetadataReader(nonexistent_path)
class TestInvalidAssemblyFiles:
"""Tests for handling invalid assembly files."""
@pytest.fixture
def invalid_assembly_path(self, tmp_path):
"""Create a file that is not a valid .NET assembly."""
invalid_file = tmp_path / "invalid.dll"
invalid_file.write_text("This is not a valid PE file")
return str(invalid_file)
def test_metadata_reader_invalid_assembly(self, invalid_assembly_path):
"""Test MetadataReader with an invalid assembly file."""
# dnfile may silently fail or raise on invalid assemblies
# Either outcome is acceptable - the key is it doesn't crash
try:
with MetadataReader(invalid_assembly_path) as reader:
# If it opens, trying to read should fail or return empty
reader.get_assembly_metadata()
# If we get here, that's OK - just shouldn't crash
assert True
except Exception:
# An exception is also acceptable for invalid PE files
assert True
@pytest.mark.asyncio
async def test_search_methods_invalid_assembly(self, invalid_assembly_path):
"""Test search_methods with invalid assembly."""
result = await server.search_methods(invalid_assembly_path, pattern="test")
assert "Error" in result
@pytest.mark.asyncio
async def test_get_metadata_summary_invalid_assembly(self, invalid_assembly_path):
"""Test get_metadata_summary with invalid assembly."""
result = await server.get_metadata_summary(invalid_assembly_path)
assert "Error" in result
class TestEntityTypeValidation:
"""Tests for EntityType enum validation."""
def test_invalid_entity_type_string(self):
"""Test EntityType.from_string with invalid type name."""
with pytest.raises(ValueError) as exc_info:
EntityType.from_string("invalid_type")
assert "Invalid entity type" in str(exc_info.value)
def test_invalid_entity_type_single_letter(self):
"""Test EntityType.from_string with invalid single letter."""
with pytest.raises(ValueError) as exc_info:
EntityType.from_string("x")
assert "Invalid entity type" in str(exc_info.value)
@pytest.mark.asyncio
async def test_list_types_with_invalid_entity_type(self):
"""Test list_types with invalid entity type in list."""
# The server should skip invalid entity types with a warning
from mcilspy.models import ListTypesResponse
mock_response = ListTypesResponse(success=True, types=[], total_count=0)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
# Should not raise, but should skip invalid types
result = await server.list_types(
"/path/to/test.dll",
entity_types=["class", "invalid", "interface"],
)
# Should still work, just skipping the invalid type
assert isinstance(result, str)
class TestContextInfoFailure:
"""Tests for handling ctx.info() failures."""
@pytest.mark.asyncio
async def test_decompile_with_failing_context(self):
"""Test decompile_assembly when ctx.info() fails."""
from mcilspy.models import DecompileResponse
mock_response = DecompileResponse(
success=True,
assembly_name="Test",
source_code="class Test { }",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
# Create a mock context that fails on info()
mock_ctx = MagicMock()
mock_ctx.info = AsyncMock(side_effect=Exception("Context info failed"))
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
# The function should handle the failing ctx.info gracefully
# Note: Currently ctx is optional and None by default
result = await server.decompile_assembly("/path/to/test.dll", ctx=None)
# Should still succeed since ctx is optional
assert "Test" in result
class TestEmptyResults:
"""Tests for handling empty result sets."""
@pytest.mark.asyncio
async def test_search_methods_empty_assembly(self, test_assembly_path):
"""Test search with pattern that matches nothing."""
result = await server.search_methods(
test_assembly_path, pattern="ZZZZNONEXISTENT"
)
assert "No methods found" in result
@pytest.mark.asyncio
async def test_search_fields_no_matches(self, test_assembly_path):
"""Test field search with no matches."""
result = await server.search_fields(
test_assembly_path, pattern="NONEXISTENT_FIELD_12345"
)
assert "No fields found" in result
@pytest.mark.asyncio
async def test_search_properties_no_matches(self, test_assembly_path):
"""Test property search with no matches."""
result = await server.search_properties(
test_assembly_path, pattern="NONEXISTENT_PROPERTY"
)
assert "No properties found" in result
class TestInstallIlspy:
"""Tests for install_ilspy tool error paths."""
@pytest.mark.asyncio
async def test_install_without_dotnet(self):
"""Test install_ilspy when dotnet is not available."""
mock_status = {
"dotnet_available": False,
"dotnet_version": None,
"ilspycmd_available": False,
"ilspycmd_version": None,
"ilspycmd_path": None,
}
with (
patch.object(server, "_check_dotnet_tools", return_value=mock_status),
patch.object(
server,
"_detect_platform",
return_value={
"system": "linux",
"distro": "arch",
"package_manager": "pacman",
"install_command": "sudo pacman -S dotnet-sdk",
},
),
):
result = await server.install_ilspy()
assert "dotnet CLI is not installed" in result
@pytest.mark.asyncio
async def test_install_already_installed(self):
"""Test install_ilspy when already installed."""
mock_status = {
"dotnet_available": True,
"dotnet_version": "8.0.100",
"ilspycmd_available": True,
"ilspycmd_version": "8.2.0",
"ilspycmd_path": "/home/user/.dotnet/tools/ilspycmd",
}
with patch.object(server, "_check_dotnet_tools", return_value=mock_status):
result = await server.install_ilspy()
assert "already installed" in result
@pytest.mark.asyncio
async def test_install_fails(self):
"""Test install_ilspy when installation fails."""
mock_status_before = {
"dotnet_available": True,
"dotnet_version": "8.0.100",
"ilspycmd_available": False,
"ilspycmd_version": None,
"ilspycmd_path": None,
}
# Mock subprocess to simulate installation failure
mock_proc = MagicMock()
mock_proc.returncode = 1
mock_proc.communicate = AsyncMock(return_value=(b"", b"Installation failed"))
with (
patch.object(server, "_check_dotnet_tools", return_value=mock_status_before),
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
):
result = await server.install_ilspy()
assert "Installation failed" in result or "failed" in result.lower()

View File

@ -3,9 +3,9 @@
import pytest import pytest
from mcilspy.models import ( from mcilspy.models import (
DecompileRequest,
EntityType, EntityType,
LanguageVersion, LanguageVersion,
DecompileRequest,
ListTypesRequest, ListTypesRequest,
TypeInfo, TypeInfo,
) )

580
tests/test_server_tools.py Normal file
View File

@ -0,0 +1,580 @@
"""Tests for MCP server tool functions.
These tests exercise the @mcp.tool() decorated functions in server.py.
We mock the ILSpyWrapper to test the tool logic independently of ilspycmd.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcilspy import server
from mcilspy.models import (
AssemblyInfo,
DecompileResponse,
ListTypesResponse,
TypeInfo,
)
class TestCheckIlspyInstallation:
"""Tests for check_ilspy_installation tool."""
@pytest.mark.asyncio
async def test_both_installed(self):
"""Test when both dotnet and ilspycmd are installed."""
mock_status = {
"dotnet_available": True,
"dotnet_version": "8.0.100",
"ilspycmd_available": True,
"ilspycmd_version": "8.2.0",
"ilspycmd_path": "/home/user/.dotnet/tools/ilspycmd",
}
with patch.object(server, "_check_dotnet_tools", return_value=mock_status):
result = await server.check_ilspy_installation()
assert "dotnet CLI" in result
assert "8.0.100" in result
assert "ilspycmd" in result
assert "8.2.0" in result
assert "ready to use" in result
@pytest.mark.asyncio
async def test_dotnet_not_installed(self):
"""Test when dotnet is not installed."""
mock_status = {
"dotnet_available": False,
"dotnet_version": None,
"ilspycmd_available": False,
"ilspycmd_version": None,
"ilspycmd_path": None,
}
with patch.object(server, "_check_dotnet_tools", return_value=mock_status):
result = await server.check_ilspy_installation()
assert "Not found" in result
assert "dotnet.microsoft.com" in result
@pytest.mark.asyncio
async def test_ilspycmd_not_installed(self):
"""Test when dotnet is installed but ilspycmd is not."""
mock_status = {
"dotnet_available": True,
"dotnet_version": "8.0.100",
"ilspycmd_available": False,
"ilspycmd_version": None,
"ilspycmd_path": None,
}
with patch.object(server, "_check_dotnet_tools", return_value=mock_status):
result = await server.check_ilspy_installation()
assert "ilspycmd" in result
assert "Not installed" in result
assert "install_ilspy" in result.lower() or "dotnet tool install" in result
class TestDecompileAssembly:
"""Tests for decompile_assembly tool."""
@pytest.mark.asyncio
async def test_successful_decompile(self):
"""Test successful decompilation returns formatted output."""
mock_response = DecompileResponse(
success=True,
assembly_name="TestAssembly",
type_name="MyClass",
source_code="public class MyClass { }",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.decompile_assembly("/path/to/test.dll")
assert "Decompilation result" in result
assert "TestAssembly" in result
assert "public class MyClass" in result
assert "```csharp" in result
@pytest.mark.asyncio
async def test_decompile_with_output_dir(self):
"""Test decompilation to output directory."""
mock_response = DecompileResponse(
success=True,
assembly_name="TestAssembly",
output_path="/tmp/output",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.decompile_assembly(
"/path/to/test.dll", output_dir="/tmp/output"
)
assert "successful" in result.lower()
assert "/tmp/output" in result
@pytest.mark.asyncio
async def test_decompile_failure(self):
"""Test failed decompilation returns error message."""
mock_response = DecompileResponse(
success=False,
assembly_name="TestAssembly",
error_message="Assembly not found",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.decompile_assembly("/path/to/nonexistent.dll")
assert "failed" in result.lower()
assert "Assembly not found" in result
@pytest.mark.asyncio
async def test_decompile_with_type_name(self):
"""Test decompiling a specific type."""
mock_response = DecompileResponse(
success=True,
assembly_name="TestAssembly",
type_name="MyNamespace.MyClass",
source_code="namespace MyNamespace { public class MyClass { } }",
)
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.decompile_assembly(
"/path/to/test.dll", type_name="MyNamespace.MyClass"
)
assert "MyNamespace.MyClass" in result
@pytest.mark.asyncio
async def test_decompile_exception_handling(self):
"""Test that exceptions are handled gracefully."""
mock_wrapper = MagicMock()
mock_wrapper.decompile = AsyncMock(side_effect=RuntimeError("Test error"))
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.decompile_assembly("/path/to/test.dll")
assert "Error" in result
assert "Test error" in result
class TestListTypes:
"""Tests for list_types tool."""
@pytest.mark.asyncio
async def test_list_types_success(self):
"""Test successful type listing."""
mock_types = [
TypeInfo(name="ClassA", full_name="NS.ClassA", kind="Class", namespace="NS"),
TypeInfo(name="ClassB", full_name="NS.ClassB", kind="Class", namespace="NS"),
TypeInfo(name="IService", full_name="NS.IService", kind="Interface", namespace="NS"),
]
mock_response = ListTypesResponse(
success=True,
types=mock_types,
total_count=3,
)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.list_types("/path/to/test.dll")
assert "Types in" in result
assert "Found 3 types" in result
assert "ClassA" in result
assert "ClassB" in result
assert "IService" in result
@pytest.mark.asyncio
async def test_list_types_grouped_by_namespace(self):
"""Test that types are grouped by namespace."""
mock_types = [
TypeInfo(name="ClassA", full_name="NS1.ClassA", kind="Class", namespace="NS1"),
TypeInfo(name="ClassB", full_name="NS2.ClassB", kind="Class", namespace="NS2"),
]
mock_response = ListTypesResponse(
success=True,
types=mock_types,
total_count=2,
)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.list_types("/path/to/test.dll")
# Should have namespace headers
assert "## NS1" in result
assert "## NS2" in result
@pytest.mark.asyncio
async def test_list_types_with_entity_types(self):
"""Test listing specific entity types."""
mock_response = ListTypesResponse(
success=True,
types=[TypeInfo(name="IService", full_name="NS.IService", kind="Interface", namespace="NS")],
total_count=1,
)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.list_types(
"/path/to/test.dll", entity_types=["interface"]
)
assert "IService" in result
mock_wrapper.list_types.assert_called_once()
@pytest.mark.asyncio
async def test_list_types_no_types_found(self):
"""Test when no types are found."""
mock_response = ListTypesResponse(
success=True,
types=[],
total_count=0,
error_message="No types found in assembly",
)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.list_types("/path/to/test.dll")
assert "No types found" in result
class TestSearchTypes:
"""Tests for search_types tool."""
@pytest.mark.asyncio
async def test_search_types_finds_matches(self):
"""Test searching types by pattern."""
mock_types = [
TypeInfo(name="UserService", full_name="NS.UserService", kind="Class", namespace="NS"),
TypeInfo(name="OrderService", full_name="NS.OrderService", kind="Class", namespace="NS"),
TypeInfo(name="Helper", full_name="NS.Helper", kind="Class", namespace="NS"),
]
mock_response = ListTypesResponse(
success=True,
types=mock_types,
total_count=3,
)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.search_types("/path/to/test.dll", pattern="Service")
assert "Search Results" in result
assert "UserService" in result
assert "OrderService" in result
assert "Helper" not in result
@pytest.mark.asyncio
async def test_search_types_case_insensitive(self):
"""Test case-insensitive search (default)."""
mock_types = [
TypeInfo(name="SERVICE", full_name="NS.SERVICE", kind="Class", namespace="NS"),
]
mock_response = ListTypesResponse(success=True, types=mock_types, total_count=1)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.search_types(
"/path/to/test.dll", pattern="service", case_sensitive=False
)
assert "SERVICE" in result
@pytest.mark.asyncio
async def test_search_types_with_namespace_filter(self):
"""Test searching with namespace filter."""
mock_types = [
TypeInfo(name="ClassA", full_name="App.Services.ClassA", kind="Class", namespace="App.Services"),
TypeInfo(name="ClassB", full_name="App.Models.ClassB", kind="Class", namespace="App.Models"),
]
mock_response = ListTypesResponse(success=True, types=mock_types, total_count=2)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.search_types(
"/path/to/test.dll", pattern="Class", namespace_filter="Services"
)
assert "ClassA" in result
assert "ClassB" not in result
@pytest.mark.asyncio
async def test_search_types_no_matches(self):
"""Test when no types match the pattern."""
mock_types = [
TypeInfo(name="Helper", full_name="NS.Helper", kind="Class", namespace="NS"),
]
mock_response = ListTypesResponse(success=True, types=mock_types, total_count=1)
mock_wrapper = MagicMock()
mock_wrapper.list_types = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.search_types("/path/to/test.dll", pattern="Service")
assert "No types found" in result
class TestSearchMethods:
"""Tests for search_methods tool (using metadata reader)."""
@pytest.mark.asyncio
async def test_search_methods_uses_metadata_reader(self, test_assembly_path):
"""Test that search_methods uses MetadataReader directly."""
result = await server.search_methods(test_assembly_path, pattern="Do")
assert "DoSomething" in result
@pytest.mark.asyncio
async def test_search_methods_filters_by_pattern(self, test_assembly_path):
"""Test method name pattern filtering."""
result = await server.search_methods(test_assembly_path, pattern="Get")
assert "GetGreeting" in result
assert "DoSomething" not in result
@pytest.mark.asyncio
async def test_search_methods_public_only(self, test_assembly_path):
"""Test filtering for public methods."""
result = await server.search_methods(
test_assembly_path, pattern="Method", public_only=True
)
# Should find public methods
assert "Method" in result or "No methods found" in result
class TestSearchFields:
"""Tests for search_fields tool."""
@pytest.mark.asyncio
async def test_search_fields_finds_constants(self, test_assembly_path):
"""Test finding constant fields."""
result = await server.search_fields(test_assembly_path, pattern="API")
assert "API_KEY" in result
@pytest.mark.asyncio
async def test_search_fields_constants_only(self, test_assembly_path):
"""Test filtering for constants only."""
result = await server.search_fields(
test_assembly_path, pattern="", constants_only=True
)
assert "const" in result
class TestSearchProperties:
"""Tests for search_properties tool."""
@pytest.mark.asyncio
async def test_search_properties(self, test_assembly_path):
"""Test searching for properties."""
result = await server.search_properties(test_assembly_path, pattern="Name")
assert "Name" in result
class TestListEvents:
"""Tests for list_events tool."""
@pytest.mark.asyncio
async def test_list_events(self, test_assembly_path):
"""Test listing events from assembly."""
result = await server.list_events(test_assembly_path)
assert "OnChange" in result or "No events" in result
class TestListResources:
"""Tests for list_resources tool."""
@pytest.mark.asyncio
async def test_list_resources_empty(self, test_assembly_path):
"""Test listing resources (test assembly has none)."""
result = await server.list_resources(test_assembly_path)
assert "No embedded resources" in result or "Embedded Resources" in result
class TestGetMetadataSummary:
"""Tests for get_metadata_summary tool."""
@pytest.mark.asyncio
async def test_get_metadata_summary(self, test_assembly_path):
"""Test getting metadata summary."""
result = await server.get_metadata_summary(test_assembly_path)
assert "Assembly Metadata Summary" in result
assert "Name" in result
assert "Version" in result
assert "Statistics" in result
assert "Types" in result
assert "Methods" in result
class TestGetAssemblyInfo:
"""Tests for get_assembly_info tool."""
@pytest.mark.asyncio
async def test_get_assembly_info_success(self):
"""Test getting assembly info successfully."""
mock_info = AssemblyInfo(
name="TestAssembly",
full_name="TestAssembly, Version=1.0.0.0",
location="/path/to/test.dll",
version="1.0.0.0",
target_framework=".NETStandard,Version=v2.0",
is_signed=False,
has_debug_info=False,
)
mock_wrapper = MagicMock()
mock_wrapper.get_assembly_info = AsyncMock(return_value=mock_info)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.get_assembly_info("/path/to/test.dll")
assert "Assembly Information" in result
assert "TestAssembly" in result
assert "1.0.0.0" in result
@pytest.mark.asyncio
async def test_get_assembly_info_exception(self):
"""Test handling of exceptions in get_assembly_info."""
mock_wrapper = MagicMock()
mock_wrapper.get_assembly_info = AsyncMock(
side_effect=FileNotFoundError("File not found")
)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.get_assembly_info("/nonexistent/file.dll")
assert "Error" in result
class TestGenerateDiagrammer:
"""Tests for generate_diagrammer tool."""
@pytest.mark.asyncio
async def test_generate_diagrammer_success(self):
"""Test successful diagram generation."""
mock_response = {
"success": True,
"output_directory": "/tmp/diagrammer",
}
mock_wrapper = MagicMock()
mock_wrapper.generate_diagrammer = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.generate_diagrammer("/path/to/test.dll")
assert "successfully" in result.lower()
assert "/tmp/diagrammer" in result
@pytest.mark.asyncio
async def test_generate_diagrammer_failure(self):
"""Test failed diagram generation."""
mock_response = {
"success": False,
"error_message": "Failed to generate diagram",
}
mock_wrapper = MagicMock()
mock_wrapper.generate_diagrammer = AsyncMock(return_value=mock_response)
with patch.object(server, "get_wrapper", return_value=mock_wrapper):
result = await server.generate_diagrammer("/path/to/test.dll")
assert "Failed" in result
class TestHelperFunctions:
"""Tests for helper functions in server.py."""
def test_format_error_with_context(self):
"""Test _format_error with context."""
error = ValueError("test error")
result = server._format_error(error, "testing")
assert "Error" in result
assert "testing" in result
assert "test error" in result
def test_format_error_without_context(self):
"""Test _format_error without context."""
error = RuntimeError("something went wrong")
result = server._format_error(error)
assert "Error" in result
assert "something went wrong" in result
def test_find_ilspycmd_path_not_installed(self):
"""Test _find_ilspycmd_path when not installed."""
with (
patch("shutil.which", return_value=None),
patch("os.path.isfile", return_value=False),
):
result = server._find_ilspycmd_path()
assert result is None
def test_find_ilspycmd_path_in_path(self):
"""Test _find_ilspycmd_path when in PATH."""
with patch("shutil.which", return_value="/usr/local/bin/ilspycmd"):
result = server._find_ilspycmd_path()
assert result == "/usr/local/bin/ilspycmd"
def test_detect_platform_linux(self):
"""Test platform detection on Linux."""
with (
patch("platform.system", return_value="Linux"),
patch("builtins.open", MagicMock()),
patch("shutil.which", return_value="/usr/bin/pacman"),
):
result = server._detect_platform()
assert result["system"] == "linux"
assert result["package_manager"] is not None
def test_detect_platform_windows(self):
"""Test platform detection on Windows."""
with (
patch("platform.system", return_value="Windows"),
patch("shutil.which", return_value=None),
):
result = server._detect_platform()
assert result["system"] == "windows"

261
tests/test_timeout.py Normal file
View File

@ -0,0 +1,261 @@
"""Tests for timeout behavior.
Verifies that the 5-minute timeout in ILSpyWrapper works correctly
and that hanging processes are properly killed.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcilspy.ilspy_wrapper import ILSpyWrapper
from mcilspy.models import DecompileRequest, LanguageVersion, ListTypesRequest
class TestTimeoutBehavior:
"""Tests for the 5-minute timeout in _run_command."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_timeout_returns_error_message(self, wrapper):
"""Test that timeout produces appropriate error message."""
# Create a mock process that never completes
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test", "args"])
assert return_code == -1
assert "timed out" in stderr.lower()
assert "5 minutes" in stderr
mock_process.kill.assert_called_once()
@pytest.mark.asyncio
async def test_process_killed_on_timeout(self, wrapper):
"""Test that the process is killed when timeout occurs."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await wrapper._run_command(["test"])
# Verify kill was called
mock_process.kill.assert_called_once()
# Verify we waited for the process to clean up
mock_process.wait.assert_called_once()
@pytest.mark.asyncio
async def test_timeout_in_decompile(self, wrapper, test_assembly_path):
"""Test timeout behavior during decompile operation."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = DecompileRequest(
assembly_path=test_assembly_path,
language_version=LanguageVersion.LATEST,
)
response = await wrapper.decompile(request)
assert response.success is False
assert "timed out" in response.error_message.lower()
@pytest.mark.asyncio
async def test_timeout_in_list_types(self, wrapper, test_assembly_path):
"""Test timeout behavior during list_types operation."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = ListTypesRequest(assembly_path=test_assembly_path)
response = await wrapper.list_types(request)
assert response.success is False
assert "timed out" in response.error_message.lower()
@pytest.mark.asyncio
async def test_timeout_value_is_5_minutes(self, wrapper):
"""Verify the timeout value is 300 seconds (5 minutes)."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"output", b""))
mock_process.returncode = 0
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
patch("asyncio.wait_for") as mock_wait_for,
):
# Set up the mock to return the communicate result
mock_wait_for.return_value = (b"output", b"")
await wrapper._run_command(["test"])
# Verify wait_for was called with 300 second timeout
mock_wait_for.assert_called_once()
args, kwargs = mock_wait_for.call_args
assert kwargs.get("timeout") == 300.0 or args[1] == 300.0
class TestNormalOperationWithTimeout:
"""Tests that normal operations complete successfully within timeout."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_fast_operation_completes(self, wrapper):
"""Test that fast operations complete normally."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"success output", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == 0
assert stdout == "success output"
assert stderr == ""
@pytest.mark.asyncio
async def test_operation_with_stderr_completes(self, wrapper):
"""Test that operations with stderr output complete normally."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(
return_value=(b"output", b"warning message")
)
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == 0
assert stdout == "output"
assert stderr == "warning message"
class TestTimeoutWithAsyncioWaitFor:
"""Tests verifying asyncio.wait_for is used correctly."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_wait_for_is_used(self, wrapper):
"""Verify that asyncio.wait_for is used for timeout."""
# Read the source code and verify wait_for is used
import inspect
source = inspect.getsource(wrapper._run_command)
assert "asyncio.wait_for" in source or "wait_for" in source
@pytest.mark.asyncio
async def test_timeout_value_in_source(self, wrapper):
"""Verify timeout value is 300 seconds in source."""
import inspect
source = inspect.getsource(wrapper._run_command)
# Should have timeout=300.0 or 300 seconds comment
assert "300" in source or "5 minute" in source.lower()
class TestTimeoutCleanup:
"""Tests for proper cleanup after timeout."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_no_zombie_process_after_timeout(self, wrapper):
"""Verify process is properly cleaned up after timeout."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
mock_process.returncode = None
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await wrapper._run_command(["test"])
# kill() followed by wait() ensures no zombie
mock_process.kill.assert_called_once()
mock_process.wait.assert_called_once()
@pytest.mark.asyncio
async def test_temp_files_cleaned_after_timeout(self, wrapper, test_assembly_path):
"""Verify temporary files are cleaned up after timeout."""
import os
import tempfile
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
initial_temp_count = len(os.listdir(tempfile.gettempdir()))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = DecompileRequest(
assembly_path=test_assembly_path,
)
await wrapper.decompile(request)
# Temp directory should be cleaned up
final_temp_count = len(os.listdir(tempfile.gettempdir()))
# Should not have more temp files (may have same or fewer)
assert final_temp_count <= initial_temp_count + 1 # Allow small margin
class TestExceptionHandling:
"""Tests for exception handling in _run_command."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_general_exception_handled(self, wrapper):
"""Test that general exceptions are caught and returned."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=OSError("Cannot execute"),
):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == -1
assert stdout == ""
assert "Cannot execute" in stderr
@pytest.mark.asyncio
async def test_permission_error_handled(self, wrapper):
"""Test that permission errors are handled gracefully."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=PermissionError("Access denied"),
):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == -1
assert "Access denied" in stderr