mcilspy/tests/test_timeout.py
Ryan Malloy 3c21b9d640 test: fix test compatibility with security and architecture changes
- Add bypass_path_validation fixture for tests using mock paths
- Update find_ilspycmd_path references to use mcilspy.utils
- Fix wrapper fixture patches in timeout tests
- Update assertions for new output formats (pagination, etc.)
- Mark all taskmaster domains as merged in status.json

All 165 tests passing.
2026-02-08 11:47:14 -07:00

262 lines
9.9 KiB
Python

"""Tests for timeout behavior.
Verifies that the 5-minute timeout in ILSpyWrapper works correctly
and that hanging processes are properly killed.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcilspy.ilspy_wrapper import ILSpyWrapper
from mcilspy.models import DecompileRequest, LanguageVersion, ListTypesRequest
class TestTimeoutBehavior:
"""Tests for the 5-minute timeout in _run_command."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_timeout_returns_error_message(self, wrapper):
"""Test that timeout produces appropriate error message."""
# Create a mock process that never completes
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test", "args"])
assert return_code == -1
assert "timed out" in stderr.lower()
assert "5 minutes" in stderr
mock_process.kill.assert_called_once()
@pytest.mark.asyncio
async def test_process_killed_on_timeout(self, wrapper):
"""Test that the process is killed when timeout occurs."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await wrapper._run_command(["test"])
# Verify kill was called
mock_process.kill.assert_called_once()
# Verify we waited for the process to clean up
mock_process.wait.assert_called_once()
@pytest.mark.asyncio
async def test_timeout_in_decompile(self, wrapper, test_assembly_path):
"""Test timeout behavior during decompile operation."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = DecompileRequest(
assembly_path=test_assembly_path,
language_version=LanguageVersion.LATEST,
)
response = await wrapper.decompile(request)
assert response.success is False
assert "timed out" in response.error_message.lower()
@pytest.mark.asyncio
async def test_timeout_in_list_types(self, wrapper, test_assembly_path):
"""Test timeout behavior during list_types operation."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = ListTypesRequest(assembly_path=test_assembly_path)
response = await wrapper.list_types(request)
assert response.success is False
assert "timed out" in response.error_message.lower()
@pytest.mark.asyncio
async def test_timeout_value_is_5_minutes(self, wrapper):
"""Verify the timeout value is 300 seconds (5 minutes)."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"output", b""))
mock_process.returncode = 0
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
patch("asyncio.wait_for") as mock_wait_for,
):
# Set up the mock to return the communicate result
mock_wait_for.return_value = (b"output", b"")
await wrapper._run_command(["test"])
# Verify wait_for was called with 300 second timeout
mock_wait_for.assert_called_once()
args, kwargs = mock_wait_for.call_args
assert kwargs.get("timeout") == 300.0 or args[1] == 300.0
class TestNormalOperationWithTimeout:
"""Tests that normal operations complete successfully within timeout."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_fast_operation_completes(self, wrapper):
"""Test that fast operations complete normally."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"success output", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == 0
assert stdout == "success output"
assert stderr == ""
@pytest.mark.asyncio
async def test_operation_with_stderr_completes(self, wrapper):
"""Test that operations with stderr output complete normally."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(
return_value=(b"output", b"warning message")
)
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == 0
assert stdout == "output"
assert stderr == "warning message"
class TestTimeoutWithAsyncioWaitFor:
"""Tests verifying asyncio.wait_for is used correctly."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_wait_for_is_used(self, wrapper):
"""Verify that asyncio.wait_for is used for timeout."""
# Read the source code and verify wait_for is used
import inspect
source = inspect.getsource(wrapper._run_command)
assert "asyncio.wait_for" in source or "wait_for" in source
@pytest.mark.asyncio
async def test_timeout_value_in_source(self, wrapper):
"""Verify timeout is configured via constants."""
import inspect
source = inspect.getsource(wrapper._run_command)
# Should use DECOMPILE_TIMEOUT_SECONDS constant or have timeout reference
assert "DECOMPILE_TIMEOUT_SECONDS" in source or "timeout" in source.lower()
class TestTimeoutCleanup:
"""Tests for proper cleanup after timeout."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_no_zombie_process_after_timeout(self, wrapper):
"""Verify process is properly cleaned up after timeout."""
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
mock_process.returncode = None
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
await wrapper._run_command(["test"])
# kill() followed by wait() ensures no zombie
mock_process.kill.assert_called_once()
mock_process.wait.assert_called_once()
@pytest.mark.asyncio
async def test_temp_files_cleaned_after_timeout(self, wrapper, test_assembly_path):
"""Verify temporary files are cleaned up after timeout."""
import os
import tempfile
mock_process = MagicMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.kill = MagicMock()
mock_process.wait = AsyncMock()
initial_temp_count = len(os.listdir(tempfile.gettempdir()))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
request = DecompileRequest(
assembly_path=test_assembly_path,
)
await wrapper.decompile(request)
# Temp directory should be cleaned up
final_temp_count = len(os.listdir(tempfile.gettempdir()))
# Should not have more temp files (may have same or fewer)
assert final_temp_count <= initial_temp_count + 1 # Allow small margin
class TestExceptionHandling:
"""Tests for exception handling in _run_command."""
@pytest.fixture
def wrapper(self):
"""Create a wrapper with mocked ilspycmd path."""
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
return ILSpyWrapper()
@pytest.mark.asyncio
async def test_general_exception_handled(self, wrapper):
"""Test that general exceptions are caught and returned."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=OSError("Cannot execute"),
):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == -1
assert stdout == ""
assert "Cannot execute" in stderr
@pytest.mark.asyncio
async def test_permission_error_handled(self, wrapper):
"""Test that permission errors are handled gracefully."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=PermissionError("Access denied"),
):
return_code, stdout, stderr = await wrapper._run_command(["test"])
assert return_code == -1
assert "Access denied" in stderr