diff --git a/tests/conftest.py b/tests/conftest.py index dee9233..78d068d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,33 +1,58 @@ """Shared pytest fixtures for mcilspy tests.""" import os +import shutil from pathlib import Path import pytest +# Path to test fixtures directory +FIXTURES_DIR = Path(__file__).parent / "fixtures" + + +@pytest.fixture +def test_assembly_path() -> str: + """Return path to the custom test assembly. + + This is the primary fixture for tests - uses our custom-built + TestAssembly.dll with known types and members. + """ + test_dll = FIXTURES_DIR / "TestAssembly.dll" + if not test_dll.exists(): + pytest.skip("TestAssembly.dll not found - run build_test_assembly.sh first") + return str(test_dll) + @pytest.fixture def sample_assembly_path() -> str: """Return path to a .NET assembly for testing. - Uses a known .NET SDK assembly that should exist on systems with dotnet installed. + Falls back to SDK assemblies if test assembly not available. + Prefer using test_assembly_path for new tests. """ - # Try to find a .NET SDK assembly + # First try our test assembly + test_dll = FIXTURES_DIR / "TestAssembly.dll" + if test_dll.exists(): + return str(test_dll) + + # Fallback: Try to find a .NET SDK assembly dotnet_base = Path("/usr/share/dotnet/sdk") if dotnet_base.exists(): - # Find any SDK version for sdk_dir in dotnet_base.iterdir(): - test_dll = sdk_dir / "Sdks" / "Microsoft.NET.Sdk" / "tools" / "net10.0" / "Microsoft.NET.Build.Tasks.dll" - if test_dll.exists(): - return str(test_dll) - # Try older paths - for net_version in ["net9.0", "net8.0", "net7.0", "net6.0"]: - test_dll = sdk_dir / "Sdks" / "Microsoft.NET.Sdk" / "tools" / net_version / "Microsoft.NET.Build.Tasks.dll" + for net_version in ["net10.0", "net9.0", "net8.0", "net7.0", "net6.0"]: + test_dll = ( + sdk_dir + / "Sdks" + / "Microsoft.NET.Sdk" + / "tools" + / net_version + / "Microsoft.NET.Build.Tasks.dll" + ) if test_dll.exists(): return str(test_dll) - # Fallback: any .dll in dotnet directory - for root, dirs, files in os.walk("/usr/share/dotnet"): + # Last resort: any .dll in dotnet directory + for root, _dirs, files in os.walk("/usr/share/dotnet"): for f in files: if f.endswith(".dll"): return os.path.join(root, f) @@ -39,3 +64,22 @@ def sample_assembly_path() -> str: def nonexistent_path() -> str: """Return a path that doesn't exist.""" return "/nonexistent/path/to/assembly.dll" + + +@pytest.fixture +def ilspycmd_installed() -> bool: + """Check if ilspycmd is available for integration tests.""" + return shutil.which("ilspycmd") is not None + + +@pytest.fixture +def skip_without_ilspycmd(ilspycmd_installed): + """Skip test if ilspycmd is not installed.""" + if not ilspycmd_installed: + pytest.skip("ilspycmd not installed") + + +@pytest.fixture +def temp_output_dir(tmp_path): + """Provide a temporary directory for test outputs.""" + return str(tmp_path) diff --git a/tests/fixtures/TestAssembly.cs b/tests/fixtures/TestAssembly.cs new file mode 100644 index 0000000..1af8668 --- /dev/null +++ b/tests/fixtures/TestAssembly.cs @@ -0,0 +1,214 @@ +using System; +using System.Threading.Tasks; + +namespace TestNamespace +{ + /// + /// A test class with various members for testing the mcilspy MCP server. + /// + public class TestClass + { + // Constants for testing string search + public const string API_KEY = "test-secret-key"; + public const string BASE_URL = "https://api.example.com"; + public const int MAX_RETRIES = 3; + + // Fields + public static readonly string BaseUrl = "https://api.example.com"; + private int _privateField; + protected string _protectedField; + internal double _internalField; + + // Properties + public string Name { get; set; } + public int Age { get; private set; } + public virtual bool IsActive { get; set; } + + // Events + public event EventHandler OnChange; + public event EventHandler OnMessage; + + // Constructors + public TestClass() + { + Name = "Default"; + Age = 0; + } + + public TestClass(string name, int age) + { + Name = name; + Age = age; + } + + // Methods + public void DoSomething() + { + Console.WriteLine("Hello from DoSomething"); + OnChange?.Invoke(this, EventArgs.Empty); + } + + public string GetGreeting() + { + return $"Hello, {Name}!"; + } + + public static int Add(int a, int b) + { + return a + b; + } + + protected virtual void OnPropertyChanged(string propertyName) + { + OnMessage?.Invoke(this, propertyName); + } + + private void PrivateMethod() + { + _privateField = 42; + } + } + + /// + /// Interface for testing interface discovery. + /// + public interface ITestService + { + void Execute(); + Task ExecuteAsync(); + string ServiceName { get; } + } + + /// + /// Another interface for inheritance testing. + /// + public interface IConfigurable + { + void Configure(string settings); + } + + /// + /// Struct for testing struct discovery. + /// + public struct TestStruct + { + public int Value; + public string Label; + + public TestStruct(int value, string label) + { + Value = value; + Label = label; + } + + public override string ToString() => $"{Label}: {Value}"; + } + + /// + /// Enum for testing enum discovery. + /// + public enum TestEnum + { + None = 0, + First = 1, + Second = 2, + Third = 3 + } + + /// + /// Delegate for testing delegate discovery. + /// + public delegate void TestDelegate(string message); + + /// + /// Delegate with return type. + /// + public delegate bool ValidationDelegate(T value); + + /// + /// Service implementation for testing class relationships. + /// + public class TestServiceImpl : ITestService, IConfigurable + { + private string _config; + + public string ServiceName => "TestService"; + + public void Execute() + { + Console.WriteLine($"Executing with config: {_config}"); + } + + public Task ExecuteAsync() + { + return Task.FromResult($"Async result from {ServiceName}"); + } + + public void Configure(string settings) + { + _config = settings; + } + } + + /// + /// Nested class for testing nested type discovery. + /// + public class OuterClass + { + public class NestedClass + { + public string Value { get; set; } + } + + private class PrivateNestedClass + { + public int Secret { get; set; } + } + + public NestedClass CreateNested() => new NestedClass(); + } + + /// + /// Abstract class for testing abstract type discovery. + /// + public abstract class AbstractBase + { + public abstract void AbstractMethod(); + public virtual void VirtualMethod() { } + + protected string BaseProperty { get; set; } + } + + /// + /// Derived class for testing inheritance. + /// + public class DerivedClass : AbstractBase + { + public override void AbstractMethod() + { + Console.WriteLine("Implemented abstract method"); + } + + public override void VirtualMethod() + { + base.VirtualMethod(); + Console.WriteLine("Overridden virtual method"); + } + } +} + +namespace TestNamespace.SubNamespace +{ + /// + /// Class in a sub-namespace for testing namespace filtering. + /// + public class SubClass + { + public const string CONNECTION_STRING = "Server=localhost;Database=test"; + + public void SubMethod() + { + Console.WriteLine("Sub namespace method"); + } + } +} diff --git a/tests/fixtures/TestAssembly.dll b/tests/fixtures/TestAssembly.dll new file mode 100644 index 0000000..09c8b53 Binary files /dev/null and b/tests/fixtures/TestAssembly.dll differ diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..cdf9878 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1 @@ +"""Integration tests that use real ilspycmd calls.""" diff --git a/tests/integration/test_real_assembly.py b/tests/integration/test_real_assembly.py new file mode 100644 index 0000000..74a8e1d --- /dev/null +++ b/tests/integration/test_real_assembly.py @@ -0,0 +1,379 @@ +"""Integration tests using the custom TestAssembly.dll fixture. + +These tests exercise the full stack including ilspycmd calls. +Tests are skipped if ilspycmd is not installed. +""" + +import pytest + +from mcilspy.ilspy_wrapper import ILSpyWrapper +from mcilspy.metadata_reader import MetadataReader +from mcilspy.models import ( + DecompileRequest, + EntityType, + LanguageVersion, + ListTypesRequest, +) + + +class TestMetadataReaderWithTestAssembly: + """Test MetadataReader against our custom test assembly.""" + + def test_get_assembly_metadata(self, test_assembly_path): + """Test reading metadata from test assembly.""" + with MetadataReader(test_assembly_path) as reader: + meta = reader.get_assembly_metadata() + + assert meta.name == "TestAssemblyProject" + assert meta.type_count > 0 + assert meta.method_count > 0 + + def test_list_methods_finds_known_methods(self, test_assembly_path): + """Test that we can find methods we know exist.""" + with MetadataReader(test_assembly_path) as reader: + methods = reader.list_methods() + + method_names = [m.name for m in methods] + + # Check for methods we defined in TestClass + assert "DoSomething" in method_names + assert "GetGreeting" in method_names + assert "Add" in method_names + + def test_list_methods_with_type_filter(self, test_assembly_path): + """Test filtering methods by type.""" + with MetadataReader(test_assembly_path) as reader: + methods = reader.list_methods(type_filter="TestClass") + + # All methods should be from types containing "TestClass" + for method in methods: + assert "TestClass" in method.declaring_type + + def test_list_methods_with_namespace_filter(self, test_assembly_path): + """Test filtering methods by namespace.""" + with MetadataReader(test_assembly_path) as reader: + methods = reader.list_methods(namespace_filter="SubNamespace") + + # Should only find methods from SubNamespace + for method in methods: + assert method.namespace is not None + assert "SubNamespace" in method.namespace + + def test_list_methods_public_only(self, test_assembly_path): + """Test filtering for public methods only.""" + with MetadataReader(test_assembly_path) as reader: + public_methods = reader.list_methods(public_only=True) + all_methods = reader.list_methods(public_only=False) + + # Should have fewer public methods than total + assert len(public_methods) <= len(all_methods) + # All returned methods should be public + for method in public_methods: + assert method.is_public + + def test_list_fields_finds_known_fields(self, test_assembly_path): + """Test that we can find fields we defined.""" + with MetadataReader(test_assembly_path) as reader: + fields = reader.list_fields() + + field_names = [f.name for f in fields] + + # Check for constants and fields we defined + assert "API_KEY" in field_names + assert "BASE_URL" in field_names + assert "MAX_RETRIES" in field_names + + def test_list_fields_constants_only(self, test_assembly_path): + """Test filtering for constant fields only.""" + with MetadataReader(test_assembly_path) as reader: + constants = reader.list_fields(constants_only=True) + + # All returned fields should be literals + for field in constants: + assert field.is_literal + + const_names = [f.name for f in constants] + assert "API_KEY" in const_names + assert "MAX_RETRIES" in const_names + + def test_list_properties_finds_known_properties(self, test_assembly_path): + """Test that we can find properties we defined.""" + with MetadataReader(test_assembly_path) as reader: + properties = reader.list_properties() + + prop_names = [p.name for p in properties] + + # Check for properties we defined + assert "Name" in prop_names + assert "Age" in prop_names + assert "IsActive" in prop_names + assert "ServiceName" in prop_names + + def test_list_events_finds_known_events(self, test_assembly_path): + """Test that we can find events we defined.""" + with MetadataReader(test_assembly_path) as reader: + events = reader.list_events() + + event_names = [e.name for e in events] + + # Check for events we defined + assert "OnChange" in event_names + assert "OnMessage" in event_names + + def test_list_resources_empty_for_test_assembly(self, test_assembly_path): + """Test that test assembly has no embedded resources.""" + with MetadataReader(test_assembly_path) as reader: + resources = reader.list_resources() + + # Our simple test assembly has no resources + assert isinstance(resources, list) + + +class TestILSpyWrapperWithTestAssembly: + """Integration tests for ILSpyWrapper using real ilspycmd calls.""" + + @pytest.fixture + def wrapper(self, skip_without_ilspycmd): + """Get wrapper instance, skipping if ilspycmd not available.""" + return ILSpyWrapper() + + @pytest.mark.asyncio + async def test_list_types_finds_classes(self, wrapper, test_assembly_path): + """Test listing classes from test assembly.""" + request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.CLASS], + ) + response = await wrapper.list_types(request) + + assert response.success + assert response.total_count > 0 + + type_names = [t.name for t in response.types] + assert "TestClass" in type_names + assert "TestServiceImpl" in type_names + assert "OuterClass" in type_names + + @pytest.mark.asyncio + async def test_list_types_finds_interfaces(self, wrapper, test_assembly_path): + """Test listing interfaces from test assembly.""" + request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.INTERFACE], + ) + response = await wrapper.list_types(request) + + assert response.success + + type_names = [t.name for t in response.types] + assert "ITestService" in type_names + assert "IConfigurable" in type_names + + @pytest.mark.asyncio + async def test_list_types_finds_structs(self, wrapper, test_assembly_path): + """Test listing structs from test assembly.""" + request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.STRUCT], + ) + response = await wrapper.list_types(request) + + assert response.success + + type_names = [t.name for t in response.types] + assert "TestStruct" in type_names + + @pytest.mark.asyncio + async def test_list_types_finds_enums(self, wrapper, test_assembly_path): + """Test listing enums from test assembly.""" + request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.ENUM], + ) + response = await wrapper.list_types(request) + + assert response.success + + type_names = [t.name for t in response.types] + assert "TestEnum" in type_names + + @pytest.mark.asyncio + async def test_list_types_finds_delegates(self, wrapper, test_assembly_path): + """Test listing delegates from test assembly.""" + request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.DELEGATE], + ) + response = await wrapper.list_types(request) + + assert response.success + + type_names = [t.name for t in response.types] + assert "TestDelegate" in type_names + + @pytest.mark.asyncio + async def test_decompile_specific_type(self, wrapper, test_assembly_path): + """Test decompiling a specific type.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + type_name="TestNamespace.TestClass", + language_version=LanguageVersion.LATEST, + ) + response = await wrapper.decompile(request) + + assert response.success + assert response.source_code is not None + + # Check that decompiled code contains expected elements + source = response.source_code + assert "class TestClass" in source + assert "DoSomething" in source + assert "GetGreeting" in source + + @pytest.mark.asyncio + async def test_decompile_entire_assembly(self, wrapper, test_assembly_path): + """Test decompiling the entire assembly.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + language_version=LanguageVersion.LATEST, + ) + response = await wrapper.decompile(request) + + assert response.success + assert response.source_code is not None + + # Check that all types are present + source = response.source_code + assert "TestClass" in source + assert "ITestService" in source + assert "TestStruct" in source + assert "TestEnum" in source + + @pytest.mark.asyncio + async def test_decompile_to_il(self, wrapper, test_assembly_path): + """Test decompiling to IL code.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + type_name="TestNamespace.TestClass", + show_il_code=True, + ) + response = await wrapper.decompile(request) + + assert response.success + assert response.source_code is not None + + # IL code should contain IL-specific keywords + source = response.source_code + # IL typically shows .method, .field, etc. + assert ".class" in source or "IL_" in source + + @pytest.mark.asyncio + async def test_decompile_to_output_dir(self, wrapper, test_assembly_path, temp_output_dir): + """Test decompiling to an output directory.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + output_dir=temp_output_dir, + ) + response = await wrapper.decompile(request) + + assert response.success + assert response.output_path is not None + + @pytest.mark.asyncio + async def test_decompile_with_project_structure( + self, wrapper, test_assembly_path, temp_output_dir + ): + """Test decompiling with project structure.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + output_dir=temp_output_dir, + create_project=True, + ) + response = await wrapper.decompile(request) + + assert response.success + + @pytest.mark.asyncio + async def test_decompile_nonexistent_type(self, wrapper, test_assembly_path): + """Test decompiling a type that doesn't exist.""" + request = DecompileRequest( + assembly_path=test_assembly_path, + type_name="NonExistent.FakeClass", + ) + response = await wrapper.decompile(request) + + # Should still succeed but with empty or no matching output + # The actual behavior depends on ilspycmd version + assert response is not None + + +class TestIntegrationEndToEnd: + """End-to-end integration tests covering complete workflows.""" + + @pytest.mark.asyncio + async def test_discover_and_decompile_workflow( + self, skip_without_ilspycmd, test_assembly_path + ): + """Test the typical workflow: list types, then decompile specific one.""" + wrapper = ILSpyWrapper() + + # Step 1: List all types + list_request = ListTypesRequest( + assembly_path=test_assembly_path, + entity_types=[EntityType.CLASS], + ) + list_response = await wrapper.list_types(list_request) + + assert list_response.success + assert len(list_response.types) > 0 + + # Step 2: Find TestServiceImpl + service_type = None + for t in list_response.types: + if t.name == "TestServiceImpl": + service_type = t + break + + assert service_type is not None + + # Step 3: Decompile it + decompile_request = DecompileRequest( + assembly_path=test_assembly_path, + type_name=service_type.full_name, + ) + decompile_response = await wrapper.decompile(decompile_request) + + assert decompile_response.success + assert decompile_response.source_code is not None + assert "TestServiceImpl" in decompile_response.source_code + assert "ITestService" in decompile_response.source_code + + @pytest.mark.asyncio + async def test_metadata_and_decompile_combined( + self, skip_without_ilspycmd, test_assembly_path + ): + """Test using metadata reader and ILSpy wrapper together.""" + # Use metadata reader for quick discovery + with MetadataReader(test_assembly_path) as reader: + methods = reader.list_methods(type_filter="TestClass") + add_method = None + for m in methods: + if m.name == "Add": + add_method = m + break + + assert add_method is not None + assert add_method.is_static + + # Use ILSpy for decompilation + wrapper = ILSpyWrapper() + request = DecompileRequest( + assembly_path=test_assembly_path, + type_name="TestNamespace.TestClass", + ) + response = await wrapper.decompile(request) + + assert response.success + # Verify the static method is in the output + assert "static" in response.source_code + assert "Add" in response.source_code diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..4052bcb --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,285 @@ +"""Tests for concurrent tool invocations. + +These tests verify that the server handles multiple simultaneous +tool calls correctly using asyncio.gather(). +""" + +import asyncio + +import pytest + +from mcilspy import server +from mcilspy.metadata_reader import MetadataReader + + +class TestConcurrentMetadataOperations: + """Test concurrent metadata reading operations.""" + + @pytest.mark.asyncio + async def test_concurrent_search_methods(self, test_assembly_path): + """Test multiple search_methods calls running concurrently.""" + patterns = ["Get", "Do", "Set", "Add", "Create"] + + tasks = [ + server.search_methods(test_assembly_path, pattern=p) for p in patterns + ] + + results = await asyncio.gather(*tasks) + + # All tasks should complete successfully + assert len(results) == len(patterns) + + # Each result should be a string + for result in results: + assert isinstance(result, str) + + @pytest.mark.asyncio + async def test_concurrent_search_fields(self, test_assembly_path): + """Test multiple search_fields calls running concurrently.""" + patterns = ["API", "URL", "MAX", "BASE", "VALUE"] + + tasks = [ + server.search_fields(test_assembly_path, pattern=p) for p in patterns + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == len(patterns) + for result in results: + assert isinstance(result, str) + + @pytest.mark.asyncio + async def test_concurrent_search_properties(self, test_assembly_path): + """Test multiple search_properties calls running concurrently.""" + patterns = ["Name", "Value", "Is", "Service"] + + tasks = [ + server.search_properties(test_assembly_path, pattern=p) for p in patterns + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == len(patterns) + + @pytest.mark.asyncio + async def test_concurrent_mixed_operations(self, test_assembly_path): + """Test different metadata operations running concurrently.""" + tasks = [ + server.search_methods(test_assembly_path, pattern="Get"), + server.search_fields(test_assembly_path, pattern="API"), + server.search_properties(test_assembly_path, pattern="Name"), + server.list_events(test_assembly_path), + server.list_resources(test_assembly_path), + server.get_metadata_summary(test_assembly_path), + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == 6 + for result in results: + assert isinstance(result, str) + # None of them should have crashed + assert "Traceback" not in result + + @pytest.mark.asyncio + async def test_concurrent_same_assembly_multiple_readers(self, test_assembly_path): + """Test multiple MetadataReaders on the same assembly.""" + async def read_metadata(path): + """Async wrapper for metadata reading.""" + with MetadataReader(path) as reader: + return reader.get_assembly_metadata() + + # Run multiple readers concurrently + loop = asyncio.get_event_loop() + tasks = [ + loop.run_in_executor(None, lambda: MetadataReader(test_assembly_path).__enter__().get_assembly_metadata()) + for _ in range(5) + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == 5 + # All results should have the same assembly name + names = [r.name for r in results] + assert all(n == names[0] for n in names) + + +class TestConcurrentToolCalls: + """Test concurrent MCP tool invocations.""" + + @pytest.mark.asyncio + async def test_high_concurrency_search(self, test_assembly_path): + """Test high number of concurrent searches.""" + num_concurrent = 20 + + tasks = [ + server.search_methods(test_assembly_path, pattern=f"pattern{i}") + for i in range(num_concurrent) + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == num_concurrent + # Most should return "No methods found" but shouldn't crash + for result in results: + assert isinstance(result, str) + + @pytest.mark.asyncio + async def test_concurrent_with_errors(self, test_assembly_path, nonexistent_path): + """Test concurrent calls where some will fail.""" + tasks = [ + # These should succeed + server.search_methods(test_assembly_path, pattern="Get"), + server.search_fields(test_assembly_path, pattern="API"), + # These should fail gracefully + server.search_methods(nonexistent_path, pattern="test"), + server.search_fields(nonexistent_path, pattern="test"), + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + assert len(results) == 4 + + # First two should be successful results + assert "GetGreeting" in results[0] or "No methods" in results[0] + assert "API_KEY" in results[1] or "No fields" in results[1] + + # Last two should have error messages + assert "Error" in results[2] + assert "Error" in results[3] + + @pytest.mark.asyncio + async def test_concurrent_list_operations(self, test_assembly_path): + """Test concurrent list operations.""" + tasks = [ + server.list_events(test_assembly_path), + server.list_events(test_assembly_path), + server.list_resources(test_assembly_path), + server.list_resources(test_assembly_path), + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == 4 + # Event results should be identical + assert results[0] == results[1] + # Resource results should be identical + assert results[2] == results[3] + + +class TestConcurrentWithRegex: + """Test concurrent operations with regex patterns.""" + + @pytest.mark.asyncio + async def test_concurrent_regex_searches(self, test_assembly_path): + """Test concurrent regex pattern searches.""" + patterns = [r"^Get.*", r".*Service$", r"On\w+", r".*Base.*"] + + tasks = [ + server.search_methods(test_assembly_path, pattern=p, use_regex=True) + for p in patterns + ] + + results = await asyncio.gather(*tasks) + + assert len(results) == 4 + for result in results: + assert isinstance(result, str) + + @pytest.mark.asyncio + async def test_concurrent_invalid_regex(self, test_assembly_path): + """Test that concurrent invalid regex patterns are handled safely.""" + patterns = [ + r"[invalid", # Invalid + r"valid.*", # Valid + r"(?P