mcilspy/tests/test_concurrency.py
Ryan Malloy 70c4a4a39a test: comprehensive test suite for mcilspy MCP server
Add complete test coverage for the mcilspy package:

- T7: Create TestAssembly.dll fixture with known types/members
- T1: Integration tests using real assembly (metadata reader + ILSpy wrapper)
- T2: MCP tool tests with mocked wrapper for each @mcp.tool()
- T3: Error path tests for regex, file not found, invalid assemblies
- T4: Concurrency tests with asyncio.gather() for parallel operations
- T5: Docstring coverage tests using AST introspection
- T6: Timeout behavior tests for 5-minute subprocess timeout

Test summary:
- 147 tests passing
- 14 skipped (ilspycmd-dependent integration tests)
- 73% code coverage
- All ruff linting checks pass
2026-02-08 11:40:57 -07:00

286 lines
9.6 KiB
Python

"""Tests for concurrent tool invocations.
These tests verify that the server handles multiple simultaneous
tool calls correctly using asyncio.gather().
"""
import asyncio
import pytest
from mcilspy import server
from mcilspy.metadata_reader import MetadataReader
class TestConcurrentMetadataOperations:
"""Test concurrent metadata reading operations."""
@pytest.mark.asyncio
async def test_concurrent_search_methods(self, test_assembly_path):
"""Test multiple search_methods calls running concurrently."""
patterns = ["Get", "Do", "Set", "Add", "Create"]
tasks = [
server.search_methods(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
# All tasks should complete successfully
assert len(results) == len(patterns)
# Each result should be a string
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_search_fields(self, test_assembly_path):
"""Test multiple search_fields calls running concurrently."""
patterns = ["API", "URL", "MAX", "BASE", "VALUE"]
tasks = [
server.search_fields(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == len(patterns)
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_search_properties(self, test_assembly_path):
"""Test multiple search_properties calls running concurrently."""
patterns = ["Name", "Value", "Is", "Service"]
tasks = [
server.search_properties(test_assembly_path, pattern=p) for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == len(patterns)
@pytest.mark.asyncio
async def test_concurrent_mixed_operations(self, test_assembly_path):
"""Test different metadata operations running concurrently."""
tasks = [
server.search_methods(test_assembly_path, pattern="Get"),
server.search_fields(test_assembly_path, pattern="API"),
server.search_properties(test_assembly_path, pattern="Name"),
server.list_events(test_assembly_path),
server.list_resources(test_assembly_path),
server.get_metadata_summary(test_assembly_path),
]
results = await asyncio.gather(*tasks)
assert len(results) == 6
for result in results:
assert isinstance(result, str)
# None of them should have crashed
assert "Traceback" not in result
@pytest.mark.asyncio
async def test_concurrent_same_assembly_multiple_readers(self, test_assembly_path):
"""Test multiple MetadataReaders on the same assembly."""
async def read_metadata(path):
"""Async wrapper for metadata reading."""
with MetadataReader(path) as reader:
return reader.get_assembly_metadata()
# Run multiple readers concurrently
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, lambda: MetadataReader(test_assembly_path).__enter__().get_assembly_metadata())
for _ in range(5)
]
results = await asyncio.gather(*tasks)
assert len(results) == 5
# All results should have the same assembly name
names = [r.name for r in results]
assert all(n == names[0] for n in names)
class TestConcurrentToolCalls:
"""Test concurrent MCP tool invocations."""
@pytest.mark.asyncio
async def test_high_concurrency_search(self, test_assembly_path):
"""Test high number of concurrent searches."""
num_concurrent = 20
tasks = [
server.search_methods(test_assembly_path, pattern=f"pattern{i}")
for i in range(num_concurrent)
]
results = await asyncio.gather(*tasks)
assert len(results) == num_concurrent
# Most should return "No methods found" but shouldn't crash
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_with_errors(self, test_assembly_path, nonexistent_path):
"""Test concurrent calls where some will fail."""
tasks = [
# These should succeed
server.search_methods(test_assembly_path, pattern="Get"),
server.search_fields(test_assembly_path, pattern="API"),
# These should fail gracefully
server.search_methods(nonexistent_path, pattern="test"),
server.search_fields(nonexistent_path, pattern="test"),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert len(results) == 4
# First two should be successful results
assert "GetGreeting" in results[0] or "No methods" in results[0]
assert "API_KEY" in results[1] or "No fields" in results[1]
# Last two should have error messages
assert "Error" in results[2]
assert "Error" in results[3]
@pytest.mark.asyncio
async def test_concurrent_list_operations(self, test_assembly_path):
"""Test concurrent list operations."""
tasks = [
server.list_events(test_assembly_path),
server.list_events(test_assembly_path),
server.list_resources(test_assembly_path),
server.list_resources(test_assembly_path),
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
# Event results should be identical
assert results[0] == results[1]
# Resource results should be identical
assert results[2] == results[3]
class TestConcurrentWithRegex:
"""Test concurrent operations with regex patterns."""
@pytest.mark.asyncio
async def test_concurrent_regex_searches(self, test_assembly_path):
"""Test concurrent regex pattern searches."""
patterns = [r"^Get.*", r".*Service$", r"On\w+", r".*Base.*"]
tasks = [
server.search_methods(test_assembly_path, pattern=p, use_regex=True)
for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
for result in results:
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_invalid_regex(self, test_assembly_path):
"""Test that concurrent invalid regex patterns are handled safely."""
patterns = [
r"[invalid", # Invalid
r"valid.*", # Valid
r"(?P<broken", # Invalid
r"also.*valid", # Valid
]
tasks = [
server.search_methods(test_assembly_path, pattern=p, use_regex=True)
for p in patterns
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
# Invalid patterns should return error messages
assert "Invalid regex" in results[0]
assert "Invalid regex" in results[2]
# Valid patterns should return results (even if empty)
assert "Invalid regex" not in results[1]
assert "Invalid regex" not in results[3]
class TestConcurrentNamespaceFiltering:
"""Test concurrent operations with namespace filtering."""
@pytest.mark.asyncio
async def test_concurrent_namespace_filtered_searches(self, test_assembly_path):
"""Test concurrent searches with different namespace filters."""
namespace_filters = [
"TestNamespace",
"SubNamespace",
"NonExistent",
None, # No filter
]
tasks = [
server.search_methods(
test_assembly_path, pattern="", namespace_filter=ns
)
for ns in namespace_filters
]
results = await asyncio.gather(*tasks)
assert len(results) == 4
class TestConcurrencyIsolation:
"""Test that concurrent operations don't interfere with each other."""
@pytest.mark.asyncio
async def test_results_not_mixed(self, test_assembly_path):
"""Verify that concurrent operations return their own results."""
# Use very specific patterns that should match different things
async def search_and_verify(pattern, expected):
result = await server.search_methods(test_assembly_path, pattern=pattern)
if expected:
assert expected in result, f"Expected '{expected}' in result for pattern '{pattern}'"
return result
tasks = [
search_and_verify("DoSomething", "DoSomething"),
search_and_verify("GetGreeting", "GetGreeting"),
search_and_verify("Add", "Add"),
]
results = await asyncio.gather(*tasks)
# Verify each result contains only its expected match
assert "DoSomething" in results[0]
assert "GetGreeting" in results[1]
assert "Add" in results[2]
# Results shouldn't be mixed up
assert results[0] != results[1]
assert results[1] != results[2]
@pytest.mark.asyncio
async def test_concurrent_metadata_summary_consistent(self, test_assembly_path):
"""Verify concurrent metadata summary calls return consistent results."""
num_concurrent = 10
tasks = [
server.get_metadata_summary(test_assembly_path)
for _ in range(num_concurrent)
]
results = await asyncio.gather(*tasks)
# All results should be identical
first_result = results[0]
for result in results[1:]:
assert result == first_result, "Concurrent metadata calls returned different results"