""" Test ESP-IDF Integration component Tests parsers, validators, subprocess runners, tools.json caching, tool/resource/prompt registration, and the TARGET_ARCH constant. """ import asyncio import json import os import textwrap from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcesptool.components.idf_integration import ( TARGET_ARCH, IDFIntegration, _parse_export_vars, _parse_tools_check, _parse_tools_list, _validate_target, _validate_tool_names, ) from mcesptool.config import ESPToolServerConfig # ------------------------------------------------------------------ # # Fixtures # ------------------------------------------------------------------ # @pytest.fixture def mock_app(): """Mock FastMCP app that captures registered tools, resources, and prompts.""" app = MagicMock() registered_tools = {} registered_resources = {} registered_prompts = {} def tool_decorator(name): def decorator(func): registered_tools[name] = func return func return decorator def resource_decorator(uri): def decorator(func): registered_resources[uri] = func return func return decorator def prompt_decorator(name): def decorator(func): registered_prompts[name] = func return func return decorator app.tool = tool_decorator app.resource = resource_decorator app.prompt = prompt_decorator app._registered_tools = registered_tools app._registered_resources = registered_resources app._registered_prompts = registered_prompts return app @pytest.fixture def config(tmp_path): """Config with a fake ESP-IDF path rooted in tmp_path.""" idf_dir = tmp_path / "esp-idf" tools_dir = idf_dir / "tools" tools_dir.mkdir(parents=True) (tools_dir / "idf_tools.py").write_text("# stub") (tools_dir / "idf.py").write_text("# stub") with patch.object(ESPToolServerConfig, "__post_init__"): cfg = ESPToolServerConfig() cfg.esp_idf_path = idf_dir return cfg @pytest.fixture def integration(mock_app, config): """IDFIntegration instance wired to mock_app and config.""" return IDFIntegration(mock_app, config) @pytest.fixture def mock_context(): ctx = MagicMock() ctx.info = AsyncMock() ctx.log = AsyncMock() ctx.progress = AsyncMock() return ctx # ------------------------------------------------------------------ # # 1. Parser unit tests (pure functions) # ------------------------------------------------------------------ # class TestParseToolsList: """Tests for _parse_tools_list parser.""" SAMPLE_OUTPUT = textwrap.dedent("""\ * xtensa-esp-elf-gdb - Version 14.2_20240403 - xtensa-esp-elf-gdb-14.2_20240403-x86_64-linux-gnu.tar.gz (installed) - Version 12.1_20231023 - xtensa-esp-elf-gdb-12.1_20231023-x86_64-linux-gnu.tar.gz * riscv32-esp-elf - Version 14.2.0_20241119 - riscv32-esp-elf-14.2.0_20241119-x86_64-linux-gnu.tar.gz (installed) - riscv32-esp-elf-14.2.0_20241119-aarch64-linux-gnu.tar.gz * cmake - Version 3.24.0 - cmake-3.24.0-linux-x86_64.tar.gz """) def test_extracts_tool_names(self): tools = _parse_tools_list(self.SAMPLE_OUTPUT) names = [t["name"] for t in tools] assert names == ["xtensa-esp-elf-gdb", "riscv32-esp-elf", "cmake"] def test_extracts_versions(self): tools = _parse_tools_list(self.SAMPLE_OUTPUT) gdb_tool = tools[0] assert len(gdb_tool["versions"]) == 2 assert gdb_tool["versions"][0]["version"] == "14.2_20240403" assert gdb_tool["versions"][1]["version"] == "12.1_20231023" def test_installed_status(self): tools = _parse_tools_list(self.SAMPLE_OUTPUT) gdb_tool = tools[0] assert gdb_tool["versions"][0]["installed"] is True assert gdb_tool["versions"][1]["installed"] is False def test_archives_extracted(self): tools = _parse_tools_list(self.SAMPLE_OUTPUT) riscv_tool = tools[1] archives = riscv_tool["versions"][0]["archives"] assert len(archives) == 2 assert archives[0]["installed"] is True assert archives[1]["installed"] is False def test_archive_filenames(self): tools = _parse_tools_list(self.SAMPLE_OUTPUT) riscv_tool = tools[1] archive_files = [a["file"] for a in riscv_tool["versions"][0]["archives"]] assert "riscv32-esp-elf-14.2.0_20241119-x86_64-linux-gnu.tar.gz" in archive_files assert "riscv32-esp-elf-14.2.0_20241119-aarch64-linux-gnu.tar.gz" in archive_files def test_empty_output(self): assert _parse_tools_list("") == [] def test_output_with_only_blank_lines(self): assert _parse_tools_list("\n\n\n") == [] def test_single_tool_no_archives(self): output = "* lonely-tool\n - Version 1.0\n" tools = _parse_tools_list(output) assert len(tools) == 1 assert tools[0]["name"] == "lonely-tool" assert tools[0]["versions"][0]["archives"] == [] class TestParseToolsCheck: """Tests for _parse_tools_check parser.""" SAMPLE_OUTPUT = textwrap.dedent("""\ xtensa-esp-elf 14.2.0_20241119: found riscv32-esp-elf 14.2.0_20241119: found xtensa-esp-elf-gdb 14.2_20240403: found esp-rom-elfs 20240305: found in /home/user/.espressif cmake 3.24.0: not found ninja 1.11.1: not found """) def test_installed_tools(self): result = _parse_tools_check(self.SAMPLE_OUTPUT) assert "xtensa-esp-elf 14.2.0_20241119" in result["installed"] assert "riscv32-esp-elf 14.2.0_20241119" in result["installed"] def test_missing_tools(self): result = _parse_tools_check(self.SAMPLE_OUTPUT) assert "cmake 3.24.0" in result["missing"] assert "ninja 1.11.1" in result["missing"] def test_installed_count(self): result = _parse_tools_check(self.SAMPLE_OUTPUT) assert len(result["installed"]) == 4 def test_missing_count(self): result = _parse_tools_check(self.SAMPLE_OUTPUT) assert len(result["missing"]) == 2 def test_empty_output(self): result = _parse_tools_check("") assert result == {"installed": [], "missing": []} def test_all_found(self): output = "cmake 3.24.0: found\nninja 1.11.1: found\n" result = _parse_tools_check(output) assert len(result["installed"]) == 2 assert len(result["missing"]) == 0 def test_all_missing(self): output = "cmake 3.24.0: not found\nninja 1.11.1: not found\n" result = _parse_tools_check(output) assert len(result["installed"]) == 0 assert len(result["missing"]) == 2 class TestParseExportVars: """Tests for _parse_export_vars parser.""" def test_basic_key_value(self): output = "IDF_PATH=/opt/esp-idf\nIDF_TOOLS_PATH=/home/user/.espressif\n" result = _parse_export_vars(output) assert result["IDF_PATH"] == "/opt/esp-idf" assert result["IDF_TOOLS_PATH"] == "/home/user/.espressif" def test_skips_comments(self): output = "# This is a comment\nKEY=value\n# Another comment\n" result = _parse_export_vars(output) assert len(result) == 1 assert result["KEY"] == "value" def test_skips_blank_lines(self): output = "\n\nKEY=value\n\n\nOTHER=thing\n\n" result = _parse_export_vars(output) assert len(result) == 2 def test_path_entry_with_colons(self): output = "PATH=/opt/esp-idf/tools/bin:/home/user/.espressif/tools/bin\n" result = _parse_export_vars(output) assert result["PATH"] == "/opt/esp-idf/tools/bin:/home/user/.espressif/tools/bin" def test_empty_output(self): assert _parse_export_vars("") == {} def test_value_with_equals_sign(self): output = "CMAKE_FLAGS=-DFOO=bar\n" result = _parse_export_vars(output) assert result["CMAKE_FLAGS"] == "-DFOO=bar" def test_whitespace_stripping(self): output = " KEY = value \n" result = _parse_export_vars(output) assert result["KEY"] == "value" # ------------------------------------------------------------------ # # 2. Validation function tests # ------------------------------------------------------------------ # class TestValidateTarget: """Tests for _validate_target.""" def test_valid_targets(self): for target in TARGET_ARCH: assert _validate_target(target) == target def test_esp32(self): assert _validate_target("esp32") == "esp32" def test_esp32p4(self): assert _validate_target("esp32p4") == "esp32p4" def test_esp32c3(self): assert _validate_target("esp32c3") == "esp32c3" def test_invalid_target_raises(self): with pytest.raises(ValueError, match="Unknown target"): _validate_target("esp8266") def test_empty_string_raises(self): with pytest.raises(ValueError, match="Unknown target"): _validate_target("") def test_similar_name_raises(self): with pytest.raises(ValueError, match="Unknown target"): _validate_target("ESP32") def test_error_lists_valid_targets(self): with pytest.raises(ValueError, match="esp32") as exc_info: _validate_target("nope") # The error message should list valid targets assert "Valid targets" in str(exc_info.value) class TestValidateToolNames: """Tests for _validate_tool_names.""" def test_valid_tool_names(self): names = ["riscv32-esp-elf", "xtensa-esp-elf-gdb", "cmake", "ninja"] assert _validate_tool_names(names) == names def test_tool_name_with_dots(self): names = ["esp-rom-elfs"] assert _validate_tool_names(names) == names def test_flag_injection_double_dash(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["--dangerous-flag"]) def test_flag_injection_single_dash(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["-rf"]) def test_special_chars_semicolon(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["cmake; rm -rf /"]) def test_special_chars_pipe(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["cmake|evil"]) def test_special_chars_backtick(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["`whoami`"]) def test_empty_list(self): assert _validate_tool_names([]) == [] def test_mixed_valid_and_invalid(self): with pytest.raises(ValueError, match="Invalid tool name"): _validate_tool_names(["cmake", "--evil", "ninja"]) # ------------------------------------------------------------------ # # 3. Component tests (mock subprocess) # ------------------------------------------------------------------ # class TestToolRegistration: """Verify all tools, resources, and prompts are registered.""" def test_tier1_tools_registered(self, mock_app, config): IDFIntegration(mock_app, config) tools = mock_app._registered_tools assert "idf_tools_list" in tools assert "idf_tools_check" in tools assert "idf_tools_install" in tools assert "idf_env_info" in tools def test_tier2_tools_registered(self, mock_app, config): IDFIntegration(mock_app, config) tools = mock_app._registered_tools assert "idf_build_project" in tools assert "idf_flash_project" in tools assert "idf_monitor" in tools def test_all_seven_tools(self, mock_app, config): IDFIntegration(mock_app, config) assert len(mock_app._registered_tools) == 7 def test_resource_registered(self, mock_app, config): IDFIntegration(mock_app, config) assert "esp://idf/status" in mock_app._registered_resources def test_prompt_registered(self, mock_app, config): IDFIntegration(mock_app, config) assert "idf_setup_target" in mock_app._registered_prompts class TestRunIdfTools: """Tests for _run_idf_tools subprocess runner.""" @pytest.mark.asyncio async def test_success_case(self, integration): mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"tool output", b"")) mock_proc.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await integration._run_idf_tools(["list"]) assert result["success"] is True assert result["output"] == "tool output" @pytest.mark.asyncio async def test_nonzero_exit_code(self, integration): mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"", b"fatal error")) mock_proc.returncode = 1 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await integration._run_idf_tools(["check"]) assert result["success"] is False assert "fatal error" in result["error"] @pytest.mark.asyncio async def test_timeout_kills_process(self, integration): mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(side_effect=asyncio.TimeoutError) mock_proc.returncode = None mock_proc.kill = MagicMock() mock_proc.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_proc): with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): result = await integration._run_idf_tools(["list"], timeout=0.1) assert result["success"] is False assert "Timeout" in result["error"] mock_proc.kill.assert_called_once() @pytest.mark.asyncio async def test_file_not_found(self, integration): with patch( "asyncio.create_subprocess_exec", side_effect=FileNotFoundError("python3 missing") ): result = await integration._run_idf_tools(["list"]) assert result["success"] is False assert "not found" in result["error"] @pytest.mark.asyncio async def test_no_idf_path(self, integration): integration.config.esp_idf_path = None result = await integration._run_idf_tools(["list"]) assert result["success"] is False assert "not configured" in result["error"] @pytest.mark.asyncio async def test_stderr_output_captured(self, integration): mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"stdout", b"stderr info")) mock_proc.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await integration._run_idf_tools(["list"]) assert result["success"] is True assert result["stderr"] == "stderr info" @pytest.mark.asyncio async def test_custom_env_passed(self, integration): mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"ok", b"")) mock_proc.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: await integration._run_idf_tools(["list"], env={"CUSTOM_VAR": "test"}) call_kwargs = mock_exec.call_args env = call_kwargs.kwargs["env"] assert env["CUSTOM_VAR"] == "test" assert env["IDF_PATH"] == str(integration.config.esp_idf_path) class TestRunIdfPy: """Tests for _run_idf_py subprocess runner.""" def _mock_env(self, integration): idf_path = str(integration.config.esp_idf_path) return {"PATH": "/usr/bin", "IDF_PATH": idf_path} @pytest.mark.asyncio async def test_unavailable_returns_error(self, integration): integration.config.get_idf_available = MagicMock(return_value=False) result = await integration._run_idf_py(["build"]) assert result["success"] is False assert "not available" in result["error"] assert "hint" in result @pytest.mark.asyncio async def test_success_case(self, integration): integration.config.get_idf_available = MagicMock(return_value=True) integration._build_idf_env = AsyncMock(return_value=self._mock_env(integration)) mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"build ok", b"")) mock_proc.returncode = 0 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await integration._run_idf_py(["build"]) assert result["success"] is True assert result["output"] == "build ok" @pytest.mark.asyncio async def test_build_env_failure(self, integration): integration.config.get_idf_available = MagicMock(return_value=True) integration._build_idf_env = AsyncMock(return_value=None) result = await integration._run_idf_py(["build"]) assert result["success"] is False assert "Failed to export" in result["error"] @pytest.mark.asyncio async def test_timeout_cleanup(self, integration): integration.config.get_idf_available = MagicMock(return_value=True) integration._build_idf_env = AsyncMock(return_value=self._mock_env(integration)) mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(side_effect=asyncio.TimeoutError) mock_proc.returncode = None mock_proc.kill = MagicMock() mock_proc.wait = AsyncMock() with patch("asyncio.create_subprocess_exec", return_value=mock_proc): with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): result = await integration._run_idf_py(["build"], timeout=0.1) assert result["success"] is False assert "Timeout" in result["error"] @pytest.mark.asyncio async def test_file_not_found(self, integration): integration.config.get_idf_available = MagicMock(return_value=True) integration._build_idf_env = AsyncMock(return_value=self._mock_env(integration)) with patch( "asyncio.create_subprocess_exec", side_effect=FileNotFoundError("not found") ): result = await integration._run_idf_py(["build"]) assert result["success"] is False assert "not found" in result["error"] @pytest.mark.asyncio async def test_nonzero_exit_code(self, integration): integration.config.get_idf_available = MagicMock(return_value=True) integration._build_idf_env = AsyncMock(return_value=self._mock_env(integration)) mock_proc = AsyncMock() mock_proc.communicate = AsyncMock(return_value=(b"output", b"error detail")) mock_proc.returncode = 2 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await integration._run_idf_py(["build"]) assert result["success"] is False assert "error detail" in result["error"] class TestBuildIdfEnv: """Tests for _build_idf_env.""" @pytest.mark.asyncio async def test_parses_export_output(self, integration): export_output = "IDF_TOOLS_PATH=/home/user/.espressif\nPATH=/tools/bin\n" integration._run_idf_tools = AsyncMock( return_value={"success": True, "output": export_output, "stderr": ""} ) env = await integration._build_idf_env() assert env is not None assert env["IDF_TOOLS_PATH"] == "/home/user/.espressif" assert env["IDF_PATH"] == str(integration.config.esp_idf_path) @pytest.mark.asyncio async def test_path_prepended(self, integration): export_output = "PATH=/new/tools/bin\n" integration._run_idf_tools = AsyncMock( return_value={"success": True, "output": export_output, "stderr": ""} ) env = await integration._build_idf_env() assert env is not None # The exported PATH should be prepended to the existing PATH assert env["PATH"].startswith("/new/tools/bin") @pytest.mark.asyncio async def test_returns_none_on_failure(self, integration): integration._run_idf_tools = AsyncMock( return_value={"success": False, "error": "script missing"} ) env = await integration._build_idf_env() assert env is None class TestLoadToolsJson: """Tests for _load_tools_json with mtime-based caching.""" @pytest.mark.asyncio async def test_loads_on_first_call(self, integration, config): tools_json_path = config.esp_idf_path / "tools" / "tools.json" tools_data = {"version": 2, "tools": [{"name": "cmake"}]} tools_json_path.write_text(json.dumps(tools_data)) result = await integration._load_tools_json() assert result is not None assert result["version"] == 2 assert len(result["tools"]) == 1 @pytest.mark.asyncio async def test_cache_hit_same_mtime(self, integration, config): tools_json_path = config.esp_idf_path / "tools" / "tools.json" tools_data = {"version": 2, "tools": [{"name": "cmake"}]} tools_json_path.write_text(json.dumps(tools_data)) result1 = await integration._load_tools_json() assert result1 is not None # Second call should return cached version without re-reading result2 = await integration._load_tools_json() assert result2 is result1 # Same object reference = cache hit @pytest.mark.asyncio async def test_cache_invalidation_on_mtime_change(self, integration, config): tools_json_path = config.esp_idf_path / "tools" / "tools.json" # Write initial data tools_data_v1 = {"version": 1, "tools": []} tools_json_path.write_text(json.dumps(tools_data_v1)) result1 = await integration._load_tools_json() assert result1 is not None assert result1["version"] == 1 # Modify file with new content and force a different mtime tools_data_v2 = {"version": 2, "tools": [{"name": "new-tool"}]} tools_json_path.write_text(json.dumps(tools_data_v2)) # Bump mtime to ensure it differs (some filesystems have 1s resolution) new_mtime = tools_json_path.stat().st_mtime + 2.0 os.utime(tools_json_path, (new_mtime, new_mtime)) result2 = await integration._load_tools_json() assert result2 is not None assert result2["version"] == 2 @pytest.mark.asyncio async def test_returns_none_when_no_idf_path(self, integration): integration.config.esp_idf_path = None result = await integration._load_tools_json() assert result is None @pytest.mark.asyncio async def test_returns_none_when_file_missing(self, integration, config): # tools.json does not exist by default in our tmp_path fixture tools_json_path = config.esp_idf_path / "tools" / "tools.json" if tools_json_path.exists(): tools_json_path.unlink() result = await integration._load_tools_json() assert result is None class TestToolsForTarget: """Tests for _tools_for_target.""" def test_returns_matching_tools(self, integration): tools_json = { "tools": [ { "name": "xtensa-esp-elf", "description": "Xtensa compiler", "supported_targets": ["esp32", "esp32s2", "esp32s3"], "versions": [{"name": "14.2.0", "status": "recommended"}], }, { "name": "riscv32-esp-elf", "description": "RISC-V compiler", "supported_targets": ["esp32c3", "esp32c6", "esp32h2"], "versions": [{"name": "14.2.0", "status": "recommended"}], }, ] } result = integration._tools_for_target(tools_json, "esp32") assert len(result) == 1 assert result[0]["name"] == "xtensa-esp-elf" assert result[0]["version"] == "14.2.0" def test_handles_all_supported_targets(self, integration): tools_json = { "tools": [ { "name": "cmake", "description": "Build system", "supported_targets": "all", "versions": [{"name": "3.24.0", "status": "recommended"}], }, { "name": "xtensa-esp-elf", "description": "Xtensa compiler", "supported_targets": ["esp32"], "versions": [{"name": "14.2.0", "status": "recommended"}], }, ] } result = integration._tools_for_target(tools_json, "esp32") assert len(result) == 2 names = [t["name"] for t in result] assert "cmake" in names assert "xtensa-esp-elf" in names def test_no_matching_target(self, integration): tools_json = { "tools": [ { "name": "xtensa-esp-elf", "description": "Xtensa compiler", "supported_targets": ["esp32"], "versions": [{"name": "14.2.0", "status": "recommended"}], }, ] } result = integration._tools_for_target(tools_json, "esp32c3") assert len(result) == 0 def test_supported_targets_converted_to_list(self, integration): tools_json = { "tools": [ { "name": "cmake", "description": "Build system", "supported_targets": "all", "versions": [{"name": "3.24.0", "status": "recommended"}], }, ] } result = integration._tools_for_target(tools_json, "esp32") assert result[0]["supported_targets"] == ["all"] def test_empty_tools(self, integration): tools_json = {"tools": []} result = integration._tools_for_target(tools_json, "esp32") assert result == [] def test_picks_recommended_version(self, integration): tools_json = { "tools": [ { "name": "cmake", "supported_targets": "all", "versions": [ {"name": "3.20.0", "status": "deprecated"}, {"name": "3.24.0", "status": "recommended"}, {"name": "3.28.0", "status": "preview"}, ], }, ] } result = integration._tools_for_target(tools_json, "esp32") assert result[0]["version"] == "3.24.0" # ------------------------------------------------------------------ # # 4. TARGET_ARCH constant tests # ------------------------------------------------------------------ # class TestTargetArch: """Tests for the TARGET_ARCH constant.""" def test_all_ten_targets_present(self): expected = { "esp32", "esp32s2", "esp32s3", "esp32c2", "esp32c3", "esp32c5", "esp32c6", "esp32c61", "esp32h2", "esp32p4", } assert set(TARGET_ARCH.keys()) == expected def test_count(self): assert len(TARGET_ARCH) == 10 def test_only_valid_architectures(self): valid_archs = {"xtensa", "riscv"} for target, arch in TARGET_ARCH.items(): assert arch in valid_archs, f"{target} has unexpected arch {arch!r}" def test_xtensa_targets(self): xtensa_targets = {k for k, v in TARGET_ARCH.items() if v == "xtensa"} assert xtensa_targets == {"esp32", "esp32s2", "esp32s3"} def test_riscv_targets(self): riscv_targets = {k for k, v in TARGET_ARCH.items() if v == "riscv"} expected = {"esp32c2", "esp32c3", "esp32c5", "esp32c6", "esp32c61", "esp32h2", "esp32p4"} assert riscv_targets == expected