- Add bypass_path_validation fixture for tests using mock paths - Update find_ilspycmd_path references to use mcilspy.utils - Fix wrapper fixture patches in timeout tests - Update assertions for new output formats (pagination, etc.) - Mark all taskmaster domains as merged in status.json All 165 tests passing.
262 lines
9.9 KiB
Python
262 lines
9.9 KiB
Python
"""Tests for timeout behavior.
|
|
|
|
Verifies that the 5-minute timeout in ILSpyWrapper works correctly
|
|
and that hanging processes are properly killed.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from mcilspy.ilspy_wrapper import ILSpyWrapper
|
|
from mcilspy.models import DecompileRequest, LanguageVersion, ListTypesRequest
|
|
|
|
|
|
class TestTimeoutBehavior:
|
|
"""Tests for the 5-minute timeout in _run_command."""
|
|
|
|
@pytest.fixture
|
|
def wrapper(self):
|
|
"""Create a wrapper with mocked ilspycmd path."""
|
|
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
|
|
return ILSpyWrapper()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_error_message(self, wrapper):
|
|
"""Test that timeout produces appropriate error message."""
|
|
# Create a mock process that never completes
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
return_code, stdout, stderr = await wrapper._run_command(["test", "args"])
|
|
|
|
assert return_code == -1
|
|
assert "timed out" in stderr.lower()
|
|
assert "5 minutes" in stderr
|
|
mock_process.kill.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_killed_on_timeout(self, wrapper):
|
|
"""Test that the process is killed when timeout occurs."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
await wrapper._run_command(["test"])
|
|
|
|
# Verify kill was called
|
|
mock_process.kill.assert_called_once()
|
|
# Verify we waited for the process to clean up
|
|
mock_process.wait.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_in_decompile(self, wrapper, test_assembly_path):
|
|
"""Test timeout behavior during decompile operation."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
request = DecompileRequest(
|
|
assembly_path=test_assembly_path,
|
|
language_version=LanguageVersion.LATEST,
|
|
)
|
|
response = await wrapper.decompile(request)
|
|
|
|
assert response.success is False
|
|
assert "timed out" in response.error_message.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_in_list_types(self, wrapper, test_assembly_path):
|
|
"""Test timeout behavior during list_types operation."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
request = ListTypesRequest(assembly_path=test_assembly_path)
|
|
response = await wrapper.list_types(request)
|
|
|
|
assert response.success is False
|
|
assert "timed out" in response.error_message.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_value_is_5_minutes(self, wrapper):
|
|
"""Verify the timeout value is 300 seconds (5 minutes)."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(return_value=(b"output", b""))
|
|
mock_process.returncode = 0
|
|
|
|
with (
|
|
patch("asyncio.create_subprocess_exec", return_value=mock_process),
|
|
patch("asyncio.wait_for") as mock_wait_for,
|
|
):
|
|
# Set up the mock to return the communicate result
|
|
mock_wait_for.return_value = (b"output", b"")
|
|
|
|
await wrapper._run_command(["test"])
|
|
|
|
# Verify wait_for was called with 300 second timeout
|
|
mock_wait_for.assert_called_once()
|
|
args, kwargs = mock_wait_for.call_args
|
|
assert kwargs.get("timeout") == 300.0 or args[1] == 300.0
|
|
|
|
|
|
class TestNormalOperationWithTimeout:
|
|
"""Tests that normal operations complete successfully within timeout."""
|
|
|
|
@pytest.fixture
|
|
def wrapper(self):
|
|
"""Create a wrapper with mocked ilspycmd path."""
|
|
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
|
|
return ILSpyWrapper()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fast_operation_completes(self, wrapper):
|
|
"""Test that fast operations complete normally."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(return_value=(b"success output", b""))
|
|
mock_process.returncode = 0
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
return_code, stdout, stderr = await wrapper._run_command(["test"])
|
|
|
|
assert return_code == 0
|
|
assert stdout == "success output"
|
|
assert stderr == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_operation_with_stderr_completes(self, wrapper):
|
|
"""Test that operations with stderr output complete normally."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(
|
|
return_value=(b"output", b"warning message")
|
|
)
|
|
mock_process.returncode = 0
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
return_code, stdout, stderr = await wrapper._run_command(["test"])
|
|
|
|
assert return_code == 0
|
|
assert stdout == "output"
|
|
assert stderr == "warning message"
|
|
|
|
|
|
class TestTimeoutWithAsyncioWaitFor:
|
|
"""Tests verifying asyncio.wait_for is used correctly."""
|
|
|
|
@pytest.fixture
|
|
def wrapper(self):
|
|
"""Create a wrapper with mocked ilspycmd path."""
|
|
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
|
|
return ILSpyWrapper()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_for_is_used(self, wrapper):
|
|
"""Verify that asyncio.wait_for is used for timeout."""
|
|
# Read the source code and verify wait_for is used
|
|
import inspect
|
|
source = inspect.getsource(wrapper._run_command)
|
|
assert "asyncio.wait_for" in source or "wait_for" in source
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_value_in_source(self, wrapper):
|
|
"""Verify timeout is configured via constants."""
|
|
import inspect
|
|
source = inspect.getsource(wrapper._run_command)
|
|
# Should use DECOMPILE_TIMEOUT_SECONDS constant or have timeout reference
|
|
assert "DECOMPILE_TIMEOUT_SECONDS" in source or "timeout" in source.lower()
|
|
|
|
|
|
class TestTimeoutCleanup:
|
|
"""Tests for proper cleanup after timeout."""
|
|
|
|
@pytest.fixture
|
|
def wrapper(self):
|
|
"""Create a wrapper with mocked ilspycmd path."""
|
|
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
|
|
return ILSpyWrapper()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_zombie_process_after_timeout(self, wrapper):
|
|
"""Verify process is properly cleaned up after timeout."""
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
mock_process.returncode = None
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
await wrapper._run_command(["test"])
|
|
|
|
# kill() followed by wait() ensures no zombie
|
|
mock_process.kill.assert_called_once()
|
|
mock_process.wait.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_temp_files_cleaned_after_timeout(self, wrapper, test_assembly_path):
|
|
"""Verify temporary files are cleaned up after timeout."""
|
|
import os
|
|
import tempfile
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
mock_process.kill = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
|
|
initial_temp_count = len(os.listdir(tempfile.gettempdir()))
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
|
|
request = DecompileRequest(
|
|
assembly_path=test_assembly_path,
|
|
)
|
|
await wrapper.decompile(request)
|
|
|
|
# Temp directory should be cleaned up
|
|
final_temp_count = len(os.listdir(tempfile.gettempdir()))
|
|
# Should not have more temp files (may have same or fewer)
|
|
assert final_temp_count <= initial_temp_count + 1 # Allow small margin
|
|
|
|
|
|
class TestExceptionHandling:
|
|
"""Tests for exception handling in _run_command."""
|
|
|
|
@pytest.fixture
|
|
def wrapper(self):
|
|
"""Create a wrapper with mocked ilspycmd path."""
|
|
with patch("mcilspy.utils.find_ilspycmd_path", return_value="/mock/ilspycmd"):
|
|
return ILSpyWrapper()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_general_exception_handled(self, wrapper):
|
|
"""Test that general exceptions are caught and returned."""
|
|
with patch(
|
|
"asyncio.create_subprocess_exec",
|
|
side_effect=OSError("Cannot execute"),
|
|
):
|
|
return_code, stdout, stderr = await wrapper._run_command(["test"])
|
|
|
|
assert return_code == -1
|
|
assert stdout == ""
|
|
assert "Cannot execute" in stderr
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_permission_error_handled(self, wrapper):
|
|
"""Test that permission errors are handled gracefully."""
|
|
with patch(
|
|
"asyncio.create_subprocess_exec",
|
|
side_effect=PermissionError("Access denied"),
|
|
):
|
|
return_code, stdout, stderr = await wrapper._run_command(["test"])
|
|
|
|
assert return_code == -1
|
|
assert "Access denied" in stderr
|