"""Tests for timeout behavior. Verifies that the 5-minute timeout in ILSpyWrapper works correctly and that hanging processes are properly killed. """ import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcilspy.ilspy_wrapper import ILSpyWrapper from mcilspy.models import DecompileRequest, LanguageVersion, ListTypesRequest class TestTimeoutBehavior: """Tests for the 5-minute timeout in _run_command.""" @pytest.fixture def wrapper(self): """Create a wrapper with mocked ilspycmd path.""" with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"): return ILSpyWrapper() @pytest.mark.asyncio async def test_timeout_returns_error_message(self, wrapper): """Test that timeout produces appropriate error message.""" # Create a mock process that never completes mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_process): return_code, stdout, stderr = await wrapper._run_command(["test", "args"]) assert return_code == -1 assert "timed out" in stderr.lower() assert "5 minutes" in stderr mock_process.kill.assert_called_once() @pytest.mark.asyncio async def test_process_killed_on_timeout(self, wrapper): """Test that the process is killed when timeout occurs.""" mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_process): await wrapper._run_command(["test"]) # Verify kill was called mock_process.kill.assert_called_once() # Verify we waited for the process to clean up mock_process.wait.assert_called_once() @pytest.mark.asyncio async def test_timeout_in_decompile(self, wrapper, test_assembly_path): """Test timeout behavior during decompile operation.""" mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_process): request = DecompileRequest( assembly_path=test_assembly_path, language_version=LanguageVersion.LATEST, ) response = await wrapper.decompile(request) assert response.success is False assert "timed out" in response.error_message.lower() @pytest.mark.asyncio async def test_timeout_in_list_types(self, wrapper, test_assembly_path): """Test timeout behavior during list_types operation.""" mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_process): request = ListTypesRequest(assembly_path=test_assembly_path) response = await wrapper.list_types(request) assert response.success is False assert "timed out" in response.error_message.lower() @pytest.mark.asyncio async def test_timeout_value_is_5_minutes(self, wrapper): """Verify the timeout value is 300 seconds (5 minutes).""" mock_process = MagicMock() mock_process.communicate = AsyncMock(return_value=(b"output", b"")) mock_process.returncode = 0 with ( patch("asyncio.create_subprocess_exec", return_value=mock_process), patch("asyncio.wait_for") as mock_wait_for, ): # Set up the mock to return the communicate result mock_wait_for.return_value = (b"output", b"") await wrapper._run_command(["test"]) # Verify wait_for was called with 300 second timeout mock_wait_for.assert_called_once() args, kwargs = mock_wait_for.call_args assert kwargs.get("timeout") == 300.0 or args[1] == 300.0 class TestNormalOperationWithTimeout: """Tests that normal operations complete successfully within timeout.""" @pytest.fixture def wrapper(self): """Create a wrapper with mocked ilspycmd path.""" with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"): return ILSpyWrapper() @pytest.mark.asyncio async def test_fast_operation_completes(self, wrapper): """Test that fast operations complete normally.""" mock_process = MagicMock() mock_process.communicate = AsyncMock(return_value=(b"success output", b"")) mock_process.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_process): return_code, stdout, stderr = await wrapper._run_command(["test"]) assert return_code == 0 assert stdout == "success output" assert stderr == "" @pytest.mark.asyncio async def test_operation_with_stderr_completes(self, wrapper): """Test that operations with stderr output complete normally.""" mock_process = MagicMock() mock_process.communicate = AsyncMock( return_value=(b"output", b"warning message") ) mock_process.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_process): return_code, stdout, stderr = await wrapper._run_command(["test"]) assert return_code == 0 assert stdout == "output" assert stderr == "warning message" class TestTimeoutWithAsyncioWaitFor: """Tests verifying asyncio.wait_for is used correctly.""" @pytest.fixture def wrapper(self): """Create a wrapper with mocked ilspycmd path.""" with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"): return ILSpyWrapper() @pytest.mark.asyncio async def test_wait_for_is_used(self, wrapper): """Verify that asyncio.wait_for is used for timeout.""" # Read the source code and verify wait_for is used import inspect source = inspect.getsource(wrapper._run_command) assert "asyncio.wait_for" in source or "wait_for" in source @pytest.mark.asyncio async def test_timeout_value_in_source(self, wrapper): """Verify timeout value is 300 seconds in source.""" import inspect source = inspect.getsource(wrapper._run_command) # Should have timeout=300.0 or 300 seconds comment assert "300" in source or "5 minute" in source.lower() class TestTimeoutCleanup: """Tests for proper cleanup after timeout.""" @pytest.fixture def wrapper(self): """Create a wrapper with mocked ilspycmd path.""" with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"): return ILSpyWrapper() @pytest.mark.asyncio async def test_no_zombie_process_after_timeout(self, wrapper): """Verify process is properly cleaned up after timeout.""" mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() mock_process.returncode = None with patch("asyncio.create_subprocess_exec", return_value=mock_process): await wrapper._run_command(["test"]) # kill() followed by wait() ensures no zombie mock_process.kill.assert_called_once() mock_process.wait.assert_called_once() @pytest.mark.asyncio async def test_temp_files_cleaned_after_timeout(self, wrapper, test_assembly_path): """Verify temporary files are cleaned up after timeout.""" import os import tempfile mock_process = MagicMock() mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) mock_process.kill = MagicMock() mock_process.wait = AsyncMock() initial_temp_count = len(os.listdir(tempfile.gettempdir())) with patch("asyncio.create_subprocess_exec", return_value=mock_process): request = DecompileRequest( assembly_path=test_assembly_path, ) await wrapper.decompile(request) # Temp directory should be cleaned up final_temp_count = len(os.listdir(tempfile.gettempdir())) # Should not have more temp files (may have same or fewer) assert final_temp_count <= initial_temp_count + 1 # Allow small margin class TestExceptionHandling: """Tests for exception handling in _run_command.""" @pytest.fixture def wrapper(self): """Create a wrapper with mocked ilspycmd path.""" with patch.object(ILSpyWrapper, "_find_ilspycmd", return_value="/mock/ilspycmd"): return ILSpyWrapper() @pytest.mark.asyncio async def test_general_exception_handled(self, wrapper): """Test that general exceptions are caught and returned.""" with patch( "asyncio.create_subprocess_exec", side_effect=OSError("Cannot execute"), ): return_code, stdout, stderr = await wrapper._run_command(["test"]) assert return_code == -1 assert stdout == "" assert "Cannot execute" in stderr @pytest.mark.asyncio async def test_permission_error_handled(self, wrapper): """Test that permission errors are handled gracefully.""" with patch( "asyncio.create_subprocess_exec", side_effect=PermissionError("Access denied"), ): return_code, stdout, stderr = await wrapper._run_command(["test"]) assert return_code == -1 assert "Access denied" in stderr