From bd08a47a6f38755fcba1fb0db898b63b84e45cbd Mon Sep 17 00:00:00 2001 From: Lauri Gates Date: Thu, 17 Jul 2025 21:34:16 +0300 Subject: [PATCH] feat: add comprehensive security and input validation system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add PathValidator class for preventing path traversal attacks - Add SecureSubprocessRunner for safe command execution - Replace unsafe XML parsing with defusedxml for security - Add comprehensive input validation tools for circuit generation - Include security dependencies (defusedxml, bandit) in pyproject.toml - Add security scanning job to CI/CD pipeline - Add comprehensive test coverage for security utilities - Add timeout constants for safe operation limits - Add boundary validation for component positioning This establishes a strong security foundation for the KiCad MCP server by implementing defense-in-depth security measures across all input vectors and external process interactions. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .github/workflows/ci.yml | 123 +++++++ kicad_mcp/__init__.py | 7 +- kicad_mcp/config.py | 164 +++++++-- kicad_mcp/tools/bom_tools.py | 6 +- kicad_mcp/tools/validation_tools.py | 298 +++++++++++++++++ kicad_mcp/utils/boundary_validator.py | 365 +++++++++++++++++++++ kicad_mcp/utils/file_utils.py | 57 ++-- kicad_mcp/utils/kicad_cli.py | 241 ++++++++++++++ kicad_mcp/utils/path_validator.py | 226 +++++++++++++ kicad_mcp/utils/secure_subprocess.py | 294 +++++++++++++++++ pyproject.toml | 227 +++++++++++++ tests/unit/__init__.py | 0 tests/unit/utils/__init__.py | 0 tests/unit/utils/test_path_validator.py | 238 ++++++++++++++ tests/unit/utils/test_secure_subprocess.py | 275 ++++++++++++++++ 15 files changed, 2471 insertions(+), 50 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 kicad_mcp/tools/validation_tools.py create mode 100644 kicad_mcp/utils/boundary_validator.py create mode 100644 kicad_mcp/utils/kicad_cli.py create mode 100644 kicad_mcp/utils/path_validator.py create mode 100644 kicad_mcp/utils/secure_subprocess.py create mode 100644 pyproject.toml create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/utils/__init__.py create mode 100644 tests/unit/utils/test_path_validator.py create mode 100644 tests/unit/utils/test_secure_subprocess.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..0493e04 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,123 @@ +name: CI + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + +jobs: + lint: + runs-on: ubuntu-latest + name: Lint and Format + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python 3.12 + run: | + uv python install 3.12 + uv python pin 3.12 + + - name: Install dependencies + run: uv sync --group dev + + - name: Lint with ruff + run: uv run ruff check kicad_mcp/ tests/ + + - name: Check formatting with ruff + run: uv run ruff format --check kicad_mcp/ tests/ + + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest] + python-version: ["3.10", "3.11", "3.12", "3.13"] + exclude: + - os: macos-latest + python-version: "3.10" + + name: Test Python ${{ matrix.python-version }} on ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python ${{ matrix.python-version }} + run: | + uv python install ${{ matrix.python-version }} + uv python pin ${{ matrix.python-version }} + + - name: Install dependencies + run: uv sync --group dev + + - name: Run tests + run: uv run python -m pytest tests/ -v --cov=kicad_mcp --cov-report=xml --cov-fail-under=30 + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.12' + with: + file: ./coverage.xml + fail_ci_if_error: false + + security: + runs-on: ubuntu-latest + name: Security Scan + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python 3.12 + run: | + uv python install 3.12 + uv python pin 3.12 + + - name: Install dependencies + run: uv sync --group dev + + - name: Run security scan + run: uv run bandit -r kicad_mcp/ + + build: + runs-on: ubuntu-latest + name: Build Package + needs: [lint, test] + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python 3.12 + run: | + uv python install 3.12 + uv python pin 3.12 + + - name: Build package + run: uv build + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ \ No newline at end of file diff --git a/kicad_mcp/__init__.py b/kicad_mcp/__init__.py index 8a17880..1a212bc 100644 --- a/kicad_mcp/__init__.py +++ b/kicad_mcp/__init__.py @@ -1,4 +1,7 @@ """ -KiCad MCP Server - A Model Context Protocol server for KiCad. +KiCad MCP Server. + +A Model Context Protocol (MCP) server for KiCad electronic design automation (EDA) files. """ -__version__ = "0.1.0" + +__version__ = "0.2.0" diff --git a/kicad_mcp/config.py b/kicad_mcp/config.py index 0c74c54..150996a 100644 --- a/kicad_mcp/config.py +++ b/kicad_mcp/config.py @@ -1,14 +1,46 @@ """ Configuration settings for the KiCad MCP server. -""" -import os +This module provides platform-specific configuration for KiCad integration, +including file paths, extensions, component libraries, and operational constants. +All settings are determined at import time based on the operating system. + +Module Variables: + system (str): Operating system name from platform.system() + KICAD_USER_DIR (str): User's KiCad documents directory + KICAD_APP_PATH (str): KiCad application installation path + ADDITIONAL_SEARCH_PATHS (List[str]): Additional project search locations + DEFAULT_PROJECT_LOCATIONS (List[str]): Common project directory patterns + KICAD_PYTHON_BASE (str): KiCad Python framework base path (macOS only) + KICAD_EXTENSIONS (Dict[str, str]): KiCad file extension mappings + DATA_EXTENSIONS (List[str]): Recognized data file extensions + CIRCUIT_DEFAULTS (Dict[str, Union[float, List[float]]]): Default circuit parameters + COMMON_LIBRARIES (Dict[str, Dict[str, Dict[str, str]]]): Component library mappings + DEFAULT_FOOTPRINTS (Dict[str, List[str]]): Default footprint suggestions per component + TIMEOUT_CONSTANTS (Dict[str, float]): Operation timeout values in seconds + PROGRESS_CONSTANTS (Dict[str, int]): Progress reporting percentage values + DISPLAY_CONSTANTS (Dict[str, int]): UI display configuration values + +Platform Support: + - macOS (Darwin): Full support with application bundle paths + - Windows: Standard installation paths + - Linux: System package paths + - Unknown: Defaults to macOS paths for compatibility + +Dependencies: + - os: File system operations and environment variables + - platform: Operating system detection +""" + +import os import platform -# Determine operating system +# Determine operating system for platform-specific configuration +# Returns 'Darwin' (macOS), 'Windows', 'Linux', or other system = platform.system() -# KiCad paths based on operating system +# Platform-specific KiCad installation and user directory paths +# These paths are used for finding KiCad resources and user projects if system == "Darwin": # macOS KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad") KICAD_APP_PATH = "/Applications/KiCad/KiCad.app" @@ -19,42 +51,52 @@ elif system == "Linux": KICAD_USER_DIR = os.path.expanduser("~/KiCad") KICAD_APP_PATH = "/usr/share/kicad" else: - # Default to macOS paths if system is unknown + # Default to macOS paths if system is unknown for maximum compatibility + # This ensures the server can start even on unrecognized platforms KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad") KICAD_APP_PATH = "/Applications/KiCad/KiCad.app" -# Additional search paths from environment variable +# Additional search paths from environment variable KICAD_SEARCH_PATHS +# Users can specify custom project locations as comma-separated paths ADDITIONAL_SEARCH_PATHS = [] env_search_paths = os.environ.get("KICAD_SEARCH_PATHS", "") if env_search_paths: for path in env_search_paths.split(","): - expanded_path = os.path.expanduser(path.strip()) - if os.path.exists(expanded_path): + expanded_path = os.path.expanduser(path.strip()) # Expand ~ and variables + if os.path.exists(expanded_path): # Only add existing directories ADDITIONAL_SEARCH_PATHS.append(expanded_path) -# Try to auto-detect common project locations if not specified +# Auto-detect common project locations for convenient project discovery +# These are typical directory names users create for electronics projects DEFAULT_PROJECT_LOCATIONS = [ - "~/Documents/PCB", - "~/PCB", - "~/Electronics", - "~/Projects/Electronics", - "~/Projects/PCB", - "~/Projects/KiCad" + "~/Documents/PCB", # Common Windows/macOS location + "~/PCB", # Simple home directory structure + "~/Electronics", # Generic electronics projects + "~/Projects/Electronics", # Organized project structure + "~/Projects/PCB", # PCB-specific project directory + "~/Projects/KiCad", # KiCad-specific project directory ] +# Add existing default locations to search paths +# Avoids duplicates and only includes directories that actually exist for location in DEFAULT_PROJECT_LOCATIONS: expanded_path = os.path.expanduser(location) if os.path.exists(expanded_path) and expanded_path not in ADDITIONAL_SEARCH_PATHS: ADDITIONAL_SEARCH_PATHS.append(expanded_path) -# Base path to KiCad's Python framework +# Base path to KiCad's Python framework for API access +# macOS bundles Python framework within the application if system == "Darwin": # macOS - KICAD_PYTHON_BASE = os.path.join(KICAD_APP_PATH, "Contents/Frameworks/Python.framework/Versions") + KICAD_PYTHON_BASE = os.path.join( + KICAD_APP_PATH, "Contents/Frameworks/Python.framework/Versions" + ) else: + # Linux/Windows use system Python or require dynamic detection KICAD_PYTHON_BASE = "" # Will be determined dynamically in python_path.py -# File extensions +# KiCad file extension mappings for project file identification +# Used by file discovery and validation functions KICAD_EXTENSIONS = { "project": ".kicad_pro", "pcb": ".kicad_pcb", @@ -66,8 +108,92 @@ KICAD_EXTENSIONS = { "kibot_config": ".kibot.yaml", } -# Recognized data files +# Additional data file extensions that may be part of KiCad projects +# Includes manufacturing files, component data, and export formats DATA_EXTENSIONS = [ ".csv", # BOM or other data ".pos", # Component position file + ".net", # Netlist files + ".zip", # Gerber files and other archives + ".drl", # Drill files ] + +# Default parameters for circuit creation and component placement +# Values in mm unless otherwise specified, following KiCad conventions +CIRCUIT_DEFAULTS = { + "grid_spacing": 1.0, # Default grid spacing in mm for user coordinates + "component_spacing": 10.16, # Default component spacing in mm + "wire_width": 6, # Default wire width in KiCad units (0.006 inch) + "text_size": [1.27, 1.27], # Default text size in mm + "pin_length": 2.54, # Default pin length in mm +} + +# Predefined component library mappings for quick circuit creation +# Maps common component types to their KiCad library and symbol names +# Organized by functional categories: basic, power, connectors +COMMON_LIBRARIES = { + "basic": { + "resistor": {"library": "Device", "symbol": "R"}, + "capacitor": {"library": "Device", "symbol": "C"}, + "inductor": {"library": "Device", "symbol": "L"}, + "led": {"library": "Device", "symbol": "LED"}, + "diode": {"library": "Device", "symbol": "D"}, + }, + "power": { + "vcc": {"library": "power", "symbol": "VCC"}, + "gnd": {"library": "power", "symbol": "GND"}, + "+5v": {"library": "power", "symbol": "+5V"}, + "+3v3": {"library": "power", "symbol": "+3V3"}, + "+12v": {"library": "power", "symbol": "+12V"}, + "-12v": {"library": "power", "symbol": "-12V"}, + }, + "connectors": { + "conn_2pin": {"library": "Connector", "symbol": "Conn_01x02_Male"}, + "conn_4pin": {"library": "Connector_Generic", "symbol": "Conn_01x04"}, + "conn_8pin": {"library": "Connector_Generic", "symbol": "Conn_01x08"}, + }, +} + +# Suggested footprints for common components, ordered by preference +# SMD variants listed first, followed by through-hole alternatives +DEFAULT_FOOTPRINTS = { + "R": [ + "Resistor_SMD:R_0805_2012Metric", + "Resistor_SMD:R_0603_1608Metric", + "Resistor_THT:R_Axial_DIN0207_L6.3mm_D2.5mm_P10.16mm_Horizontal", + ], + "C": [ + "Capacitor_SMD:C_0805_2012Metric", + "Capacitor_SMD:C_0603_1608Metric", + "Capacitor_THT:C_Disc_D5.0mm_W2.5mm_P5.00mm", + ], + "LED": ["LED_SMD:LED_0805_2012Metric", "LED_THT:LED_D5.0mm"], + "D": ["Diode_SMD:D_SOD-123", "Diode_THT:D_DO-35_SOD27_P7.62mm_Horizontal"], +} + +# Operation timeout values in seconds for external process management +# Prevents hanging operations and provides user feedback +TIMEOUT_CONSTANTS = { + "kicad_cli_version_check": 10.0, # Timeout for KiCad CLI version checks + "kicad_cli_export": 30.0, # Timeout for KiCad CLI export operations + "application_open": 10.0, # Timeout for opening applications (e.g., KiCad) + "subprocess_default": 30.0, # Default timeout for subprocess operations +} + +# Progress percentage milestones for long-running operations +# Provides consistent progress reporting across different tools +PROGRESS_CONSTANTS = { + "start": 10, # Initial progress percentage + "detection": 20, # Progress after CLI detection + "setup": 30, # Progress after setup complete + "processing": 50, # Progress during processing + "finishing": 70, # Progress when finishing up + "validation": 90, # Progress during validation + "complete": 100, # Progress when complete +} + +# User interface display configuration values +# Controls how much information is shown in previews and summaries +DISPLAY_CONSTANTS = { + "bom_preview_limit": 20, # Maximum number of BOM items to show in preview +} diff --git a/kicad_mcp/tools/bom_tools.py b/kicad_mcp/tools/bom_tools.py index 3a70c7a..897949a 100644 --- a/kicad_mcp/tools/bom_tools.py +++ b/kicad_mcp/tools/bom_tools.py @@ -297,9 +297,9 @@ def parse_bom_file(file_path: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any] components.append(dict(row)) elif ext == '.xml': - # Basic XML parsing - import xml.etree.ElementTree as ET - tree = ET.parse(file_path) + # Basic XML parsing with security protection + from defusedxml.ElementTree import parse as safe_parse + tree = safe_parse(file_path) root = tree.getroot() format_info["detected_format"] = "xml" diff --git a/kicad_mcp/tools/validation_tools.py b/kicad_mcp/tools/validation_tools.py new file mode 100644 index 0000000..f7a4a93 --- /dev/null +++ b/kicad_mcp/tools/validation_tools.py @@ -0,0 +1,298 @@ +""" +Validation tools for KiCad projects. + +Provides tools for validating circuit positioning, generating reports, +and checking component boundaries in existing projects. +""" + +import json +import os +from typing import Any + +from fastmcp import Context, FastMCP + +from kicad_mcp.utils.boundary_validator import BoundaryValidator +from kicad_mcp.utils.file_utils import get_project_files + + +async def validate_project_boundaries(project_path: str, ctx: Context = None) -> dict[str, Any]: + """ + Validate component boundaries for an entire KiCad project. + + Args: + project_path: Path to the KiCad project file (.kicad_pro) + ctx: Context for MCP communication + + Returns: + Dictionary with validation results and report + """ + try: + if ctx: + await ctx.info("Starting boundary validation for project") + await ctx.report_progress(10, 100) + + # Get project files + files = get_project_files(project_path) + if "schematic" not in files: + return {"success": False, "error": "No schematic file found in project"} + + schematic_file = files["schematic"] + + if ctx: + await ctx.report_progress(30, 100) + await ctx.info(f"Reading schematic file: {schematic_file}") + + # Read schematic file + with open(schematic_file) as f: + content = f.read().strip() + + # Parse components based on format + components = [] + + if content.startswith("(kicad_sch"): + # S-expression format - extract components + components = _extract_components_from_sexpr(content) + else: + # JSON format + try: + schematic_data = json.loads(content) + components = _extract_components_from_json(schematic_data) + except json.JSONDecodeError: + return { + "success": False, + "error": "Schematic file is neither valid S-expression nor JSON format", + } + + if ctx: + await ctx.report_progress(60, 100) + await ctx.info(f"Found {len(components)} components to validate") + + # Run boundary validation + validator = BoundaryValidator() + validation_report = validator.validate_circuit_components(components) + + if ctx: + await ctx.report_progress(80, 100) + await ctx.info( + f"Validation complete: {validation_report.out_of_bounds_count} out of bounds" + ) + + # Generate text report + report_text = validator.generate_validation_report_text(validation_report) + + if ctx: + await ctx.info(f"Validation Report:\n{report_text}") + await ctx.report_progress(100, 100) + + # Create result + result = { + "success": validation_report.success, + "total_components": validation_report.total_components, + "out_of_bounds_count": validation_report.out_of_bounds_count, + "corrected_positions": validation_report.corrected_positions, + "report_text": report_text, + "has_errors": validation_report.has_errors(), + "has_warnings": validation_report.has_warnings(), + "issues": [ + { + "severity": issue.severity.value, + "component_ref": issue.component_ref, + "message": issue.message, + "position": issue.position, + "suggested_position": issue.suggested_position, + } + for issue in validation_report.issues + ], + } + + return result + + except Exception as e: + error_msg = f"Error validating project boundaries: {str(e)}" + if ctx: + await ctx.info(error_msg) + return {"success": False, "error": error_msg} + + +async def generate_validation_report( + project_path: str, output_path: str = None, ctx: Context = None +) -> dict[str, Any]: + """ + Generate a comprehensive validation report for a KiCad project. + + Args: + project_path: Path to the KiCad project file (.kicad_pro) + output_path: Optional path to save the report (defaults to project directory) + ctx: Context for MCP communication + + Returns: + Dictionary with report generation results + """ + try: + if ctx: + await ctx.info("Generating validation report") + await ctx.report_progress(10, 100) + + # Run validation + validation_result = await validate_project_boundaries(project_path, ctx) + + if not validation_result["success"]: + return validation_result + + # Determine output path + if output_path is None: + project_dir = os.path.dirname(project_path) + project_name = os.path.splitext(os.path.basename(project_path))[0] + output_path = os.path.join(project_dir, f"{project_name}_validation_report.json") + + if ctx: + await ctx.report_progress(80, 100) + await ctx.info(f"Saving report to: {output_path}") + + # Save detailed report + report_data = { + "project_path": project_path, + "validation_timestamp": __import__("datetime").datetime.now().isoformat(), + "summary": { + "total_components": validation_result["total_components"], + "out_of_bounds_count": validation_result["out_of_bounds_count"], + "has_errors": validation_result["has_errors"], + "has_warnings": validation_result["has_warnings"], + }, + "corrected_positions": validation_result["corrected_positions"], + "issues": validation_result["issues"], + "report_text": validation_result["report_text"], + } + + with open(output_path, "w") as f: + json.dump(report_data, f, indent=2) + + if ctx: + await ctx.report_progress(100, 100) + await ctx.info("Validation report generated successfully") + + return {"success": True, "report_path": output_path, "summary": report_data["summary"]} + + except Exception as e: + error_msg = f"Error generating validation report: {str(e)}" + if ctx: + await ctx.info(error_msg) + return {"success": False, "error": error_msg} + + +def _extract_components_from_sexpr(content: str) -> list[dict[str, Any]]: + """Extract component information from S-expression format.""" + import re + + components = [] + + # Find all symbol instances + symbol_pattern = r'\(symbol\s+\(lib_id\s+"([^"]+)"\)\s+\(at\s+([\d.-]+)\s+([\d.-]+)\s+[\d.-]+\)\s+\(uuid\s+[^)]+\)(.*?)\n\s*\)' + + for match in re.finditer(symbol_pattern, content, re.DOTALL): + lib_id = match.group(1) + x_pos = float(match.group(2)) + y_pos = float(match.group(3)) + properties_text = match.group(4) + + # Extract reference from properties + ref_match = re.search(r'\(property\s+"Reference"\s+"([^"]+)"', properties_text) + reference = ref_match.group(1) if ref_match else "Unknown" + + # Determine component type from lib_id + component_type = _get_component_type_from_lib_id(lib_id) + + components.append( + { + "reference": reference, + "position": (x_pos, y_pos), + "component_type": component_type, + "lib_id": lib_id, + } + ) + + return components + + +def _extract_components_from_json(schematic_data: dict[str, Any]) -> list[dict[str, Any]]: + """Extract component information from JSON format.""" + components = [] + + if "symbol" in schematic_data: + for symbol in schematic_data["symbol"]: + # Extract reference + reference = "Unknown" + if "property" in symbol: + for prop in symbol["property"]: + if prop.get("name") == "Reference": + reference = prop.get("value", "Unknown") + break + + # Extract position + position = (0, 0) + if "at" in symbol and len(symbol["at"]) >= 2: + # Convert from internal units to mm + x_pos = float(symbol["at"][0]) / 10.0 + y_pos = float(symbol["at"][1]) / 10.0 + position = (x_pos, y_pos) + + # Determine component type + lib_id = symbol.get("lib_id", "") + component_type = _get_component_type_from_lib_id(lib_id) + + components.append( + { + "reference": reference, + "position": position, + "component_type": component_type, + "lib_id": lib_id, + } + ) + + return components + + +def _get_component_type_from_lib_id(lib_id: str) -> str: + """Determine component type from library ID.""" + lib_id_lower = lib_id.lower() + + if "resistor" in lib_id_lower or ":r" in lib_id_lower: + return "resistor" + elif "capacitor" in lib_id_lower or ":c" in lib_id_lower: + return "capacitor" + elif "inductor" in lib_id_lower or ":l" in lib_id_lower: + return "inductor" + elif "led" in lib_id_lower: + return "led" + elif "diode" in lib_id_lower or ":d" in lib_id_lower: + return "diode" + elif "transistor" in lib_id_lower or "npn" in lib_id_lower or "pnp" in lib_id_lower: + return "transistor" + elif "power:" in lib_id_lower: + return "power" + elif "switch" in lib_id_lower: + return "switch" + elif "connector" in lib_id_lower: + return "connector" + elif "mcu" in lib_id_lower or "ic" in lib_id_lower or ":u" in lib_id_lower: + return "ic" + else: + return "default" + + +def register_validation_tools(mcp: FastMCP) -> None: + """Register validation tools with the MCP server.""" + + @mcp.tool(name="validate_project_boundaries") + async def validate_project_boundaries_tool( + project_path: str, ctx: Context = None + ) -> dict[str, Any]: + """Validate component boundaries for an entire KiCad project.""" + return await validate_project_boundaries(project_path, ctx) + + @mcp.tool(name="generate_validation_report") + async def generate_validation_report_tool( + project_path: str, output_path: str = None, ctx: Context = None + ) -> dict[str, Any]: + """Generate a comprehensive validation report for a KiCad project.""" + return await generate_validation_report(project_path, output_path, ctx) diff --git a/kicad_mcp/utils/boundary_validator.py b/kicad_mcp/utils/boundary_validator.py new file mode 100644 index 0000000..183ca43 --- /dev/null +++ b/kicad_mcp/utils/boundary_validator.py @@ -0,0 +1,365 @@ +""" +Boundary validation system for KiCad circuit generation. + +Provides comprehensive validation for component positioning, boundary checking, +and validation report generation to prevent out-of-bounds placement issues. +""" + +from dataclasses import dataclass +from enum import Enum +import json +from typing import Any + +from kicad_mcp.utils.component_layout import ComponentLayoutManager, SchematicBounds +from kicad_mcp.utils.coordinate_converter import CoordinateConverter, validate_position + + +class ValidationSeverity(Enum): + """Severity levels for validation issues.""" + + ERROR = "error" + WARNING = "warning" + INFO = "info" + + +@dataclass +class ValidationIssue: + """Represents a validation issue found during boundary checking.""" + + severity: ValidationSeverity + component_ref: str + message: str + position: tuple[float, float] + suggested_position: tuple[float, float] | None = None + component_type: str = "default" + + +@dataclass +class ValidationReport: + """Comprehensive validation report for circuit positioning.""" + + success: bool + issues: list[ValidationIssue] + total_components: int + validated_components: int + out_of_bounds_count: int + corrected_positions: dict[str, tuple[float, float]] + + def has_errors(self) -> bool: + """Check if report contains any error-level issues.""" + return any(issue.severity == ValidationSeverity.ERROR for issue in self.issues) + + def has_warnings(self) -> bool: + """Check if report contains any warning-level issues.""" + return any(issue.severity == ValidationSeverity.WARNING for issue in self.issues) + + def get_issues_by_severity(self, severity: ValidationSeverity) -> list[ValidationIssue]: + """Get all issues of a specific severity level.""" + return [issue for issue in self.issues if issue.severity == severity] + + +class BoundaryValidator: + """ + Comprehensive boundary validation system for KiCad circuit generation. + + Features: + - Pre-generation coordinate validation + - Automatic position correction + - Detailed validation reports + - Integration with circuit generation pipeline + """ + + def __init__(self, bounds: SchematicBounds | None = None): + """ + Initialize the boundary validator. + + Args: + bounds: Schematic boundaries (defaults to A4) + """ + self.bounds = bounds or SchematicBounds() + self.converter = CoordinateConverter() + self.layout_manager = ComponentLayoutManager(self.bounds) + + def validate_component_position( + self, component_ref: str, x: float, y: float, component_type: str = "default" + ) -> ValidationIssue: + """ + Validate a single component position. + + Args: + component_ref: Component reference (e.g., "R1") + x: X coordinate in mm + y: Y coordinate in mm + component_type: Type of component + + Returns: + ValidationIssue describing the validation result + """ + # Check if position is within A4 bounds + if not validate_position(x, y, use_margins=True): + # Find a corrected position + corrected_x, corrected_y = self.layout_manager.find_valid_position( + component_ref, component_type, x, y + ) + + return ValidationIssue( + severity=ValidationSeverity.ERROR, + component_ref=component_ref, + message=f"Component {component_ref} at ({x:.2f}, {y:.2f}) is outside A4 bounds", + position=(x, y), + suggested_position=(corrected_x, corrected_y), + component_type=component_type, + ) + + # Check if position is within usable area (with margins) + if not validate_position(x, y, use_margins=False): + # Position is within absolute bounds but outside usable area + return ValidationIssue( + severity=ValidationSeverity.WARNING, + component_ref=component_ref, + message=f"Component {component_ref} at ({x:.2f}, {y:.2f}) is outside usable area (margins)", + position=(x, y), + component_type=component_type, + ) + + # Position is valid + return ValidationIssue( + severity=ValidationSeverity.INFO, + component_ref=component_ref, + message=f"Component {component_ref} position is valid", + position=(x, y), + component_type=component_type, + ) + + def validate_circuit_components(self, components: list[dict[str, Any]]) -> ValidationReport: + """ + Validate positioning for all components in a circuit. + + Args: + components: List of component dictionaries with position information + + Returns: + ValidationReport with comprehensive validation results + """ + issues = [] + corrected_positions = {} + out_of_bounds_count = 0 + + # Reset layout manager for this validation + self.layout_manager.clear_layout() + + for component in components: + component_ref = component.get("reference", "Unknown") + component_type = component.get("component_type", "default") + + # Extract position - handle different formats + position = component.get("position") + if position is None: + # No position specified - this is an info issue + issues.append( + ValidationIssue( + severity=ValidationSeverity.INFO, + component_ref=component_ref, + message=f"Component {component_ref} has no position specified", + position=(0, 0), + component_type=component_type, + ) + ) + continue + + # Handle position as tuple or list + if isinstance(position, list | tuple) and len(position) >= 2: + x, y = float(position[0]), float(position[1]) + else: + issues.append( + ValidationIssue( + severity=ValidationSeverity.ERROR, + component_ref=component_ref, + message=f"Component {component_ref} has invalid position format: {position}", + position=(0, 0), + component_type=component_type, + ) + ) + continue + + # Validate the position + validation_issue = self.validate_component_position(component_ref, x, y, component_type) + issues.append(validation_issue) + + # Track out of bounds components + if validation_issue.severity == ValidationSeverity.ERROR: + out_of_bounds_count += 1 + if validation_issue.suggested_position: + corrected_positions[component_ref] = validation_issue.suggested_position + + # Generate report + report = ValidationReport( + success=out_of_bounds_count == 0, + issues=issues, + total_components=len(components), + validated_components=len([c for c in components if c.get("position") is not None]), + out_of_bounds_count=out_of_bounds_count, + corrected_positions=corrected_positions, + ) + + return report + + def validate_wire_connection( + self, start_x: float, start_y: float, end_x: float, end_y: float + ) -> list[ValidationIssue]: + """ + Validate wire connection endpoints. + + Args: + start_x: Starting X coordinate in mm + start_y: Starting Y coordinate in mm + end_x: Ending X coordinate in mm + end_y: Ending Y coordinate in mm + + Returns: + List of validation issues for wire endpoints + """ + issues = [] + + # Validate start point + if not validate_position(start_x, start_y, use_margins=True): + issues.append( + ValidationIssue( + severity=ValidationSeverity.ERROR, + component_ref="WIRE_START", + message=f"Wire start point ({start_x:.2f}, {start_y:.2f}) is outside bounds", + position=(start_x, start_y), + ) + ) + + # Validate end point + if not validate_position(end_x, end_y, use_margins=True): + issues.append( + ValidationIssue( + severity=ValidationSeverity.ERROR, + component_ref="WIRE_END", + message=f"Wire end point ({end_x:.2f}, {end_y:.2f}) is outside bounds", + position=(end_x, end_y), + ) + ) + + return issues + + def auto_correct_positions( + self, components: list[dict[str, Any]] + ) -> tuple[list[dict[str, Any]], ValidationReport]: + """ + Automatically correct out-of-bounds component positions. + + Args: + components: List of component dictionaries + + Returns: + Tuple of (corrected_components, validation_report) + """ + # First validate to get correction suggestions + validation_report = self.validate_circuit_components(components) + + # Apply corrections + corrected_components = [] + for component in components: + component_ref = component.get("reference", "Unknown") + + if component_ref in validation_report.corrected_positions: + # Apply correction + corrected_component = component.copy() + corrected_component["position"] = validation_report.corrected_positions[ + component_ref + ] + corrected_components.append(corrected_component) + else: + corrected_components.append(component) + + return corrected_components, validation_report + + def generate_validation_report_text(self, report: ValidationReport) -> str: + """ + Generate a human-readable validation report. + + Args: + report: ValidationReport to format + + Returns: + Formatted text report + """ + lines = [] + lines.append("=" * 60) + lines.append("BOUNDARY VALIDATION REPORT") + lines.append("=" * 60) + + # Summary + lines.append(f"Status: {'PASS' if report.success else 'FAIL'}") + lines.append(f"Total Components: {report.total_components}") + lines.append(f"Validated Components: {report.validated_components}") + lines.append(f"Out of Bounds: {report.out_of_bounds_count}") + lines.append(f"Corrected Positions: {len(report.corrected_positions)}") + lines.append("") + + # Issues by severity + errors = report.get_issues_by_severity(ValidationSeverity.ERROR) + warnings = report.get_issues_by_severity(ValidationSeverity.WARNING) + info = report.get_issues_by_severity(ValidationSeverity.INFO) + + if errors: + lines.append("ERRORS:") + for issue in errors: + lines.append(f" ❌ {issue.message}") + if issue.suggested_position: + lines.append(f" → Suggested: {issue.suggested_position}") + lines.append("") + + if warnings: + lines.append("WARNINGS:") + for issue in warnings: + lines.append(f" âš ī¸ {issue.message}") + lines.append("") + + if info: + lines.append("INFO:") + for issue in info: + lines.append(f" â„šī¸ {issue.message}") + lines.append("") + + # Corrected positions + if report.corrected_positions: + lines.append("CORRECTED POSITIONS:") + for component_ref, (x, y) in report.corrected_positions.items(): + lines.append(f" {component_ref}: ({x:.2f}, {y:.2f})") + + return "\n".join(lines) + + def export_validation_report(self, report: ValidationReport, filepath: str) -> None: + """ + Export validation report to JSON file. + + Args: + report: ValidationReport to export + filepath: Path to output file + """ + # Convert report to serializable format + export_data = { + "success": report.success, + "total_components": report.total_components, + "validated_components": report.validated_components, + "out_of_bounds_count": report.out_of_bounds_count, + "corrected_positions": report.corrected_positions, + "issues": [ + { + "severity": issue.severity.value, + "component_ref": issue.component_ref, + "message": issue.message, + "position": issue.position, + "suggested_position": issue.suggested_position, + "component_type": issue.component_type, + } + for issue in report.issues + ], + } + + with open(filepath, "w") as f: + json.dump(export_data, f, indent=2) diff --git a/kicad_mcp/utils/file_utils.py b/kicad_mcp/utils/file_utils.py index 674d3a6..9cf9395 100644 --- a/kicad_mcp/utils/file_utils.py +++ b/kicad_mcp/utils/file_utils.py @@ -1,66 +1,71 @@ """ File handling utilities for KiCad MCP Server. """ -import os + import json -from typing import Dict, List, Any, Optional +import os +from typing import Any from kicad_mcp.utils.kicad_utils import get_project_name_from_path -def get_project_files(project_path: str) -> Dict[str, str]: +def get_project_files(project_path: str) -> dict[str, str]: """Get all files related to a KiCad project. - + Args: project_path: Path to the .kicad_pro file - + Returns: Dictionary mapping file types to file paths """ - from kicad_mcp.config import KICAD_EXTENSIONS, DATA_EXTENSIONS - + from kicad_mcp.config import DATA_EXTENSIONS, KICAD_EXTENSIONS + project_dir = os.path.dirname(project_path) project_name = get_project_name_from_path(project_path) - + files = {} - + # Check for standard KiCad files for file_type, extension in KICAD_EXTENSIONS.items(): if file_type == "project": # We already have the project file files[file_type] = project_path continue - + file_path = os.path.join(project_dir, f"{project_name}{extension}") if os.path.exists(file_path): files[file_type] = file_path - + # Check for data files - for ext in DATA_EXTENSIONS: - for file in os.listdir(project_dir): - if file.startswith(project_name) and file.endswith(ext): - # Extract the type from filename (e.g., project_name-bom.csv -> bom) - file_type = file[len(project_name):].strip('-_') - file_type = file_type.split('.')[0] - if not file_type: - file_type = ext[1:] # Use extension if no specific type - - files[file_type] = os.path.join(project_dir, file) - + try: + for ext in DATA_EXTENSIONS: + for file in os.listdir(project_dir): + if file.startswith(project_name) and file.endswith(ext): + # Extract the type from filename (e.g., project_name-bom.csv -> bom) + file_type = file[len(project_name) :].strip("-_") + file_type = file_type.split(".")[0] + if not file_type: + file_type = ext[1:] # Use extension if no specific type + + files[file_type] = os.path.join(project_dir, file) + except (OSError, FileNotFoundError): + # Directory doesn't exist or can't be accessed - return what we have + pass + return files -def load_project_json(project_path: str) -> Optional[Dict[str, Any]]: +def load_project_json(project_path: str) -> dict[str, Any] | None: """Load and parse a KiCad project file. - + Args: project_path: Path to the .kicad_pro file - + Returns: Parsed JSON data or None if parsing failed """ try: - with open(project_path, 'r') as f: + with open(project_path) as f: return json.load(f) except Exception: return None diff --git a/kicad_mcp/utils/kicad_cli.py b/kicad_mcp/utils/kicad_cli.py new file mode 100644 index 0000000..d1161ae --- /dev/null +++ b/kicad_mcp/utils/kicad_cli.py @@ -0,0 +1,241 @@ +""" +Centralized KiCad CLI detection and management. + +Provides a single source of truth for locating KiCad CLI across platforms +with caching and configuration support. +""" + +import logging +import os +import platform +import shutil +import subprocess + +from ..config import TIMEOUT_CONSTANTS + +logger = logging.getLogger(__name__) + + +class KiCadCLIError(Exception): + """Raised when KiCad CLI operations fail.""" + + pass + + +class KiCadCLIManager: + """ + Manages KiCad CLI detection and validation across platforms. + + Provides caching and fallback mechanisms for reliable CLI access. + """ + + def __init__(self): + """Initialize the CLI manager.""" + self._cached_cli_path: str | None = None + self._cache_validated = False + self._system = platform.system() + + def find_kicad_cli(self, force_refresh: bool = False) -> str | None: + """ + Find the KiCad CLI executable path. + + Args: + force_refresh: Force re-detection even if cached + + Returns: + Path to kicad-cli executable or None if not found + """ + # Return cached path if available and valid + if self._cached_cli_path and not force_refresh and self._cache_validated: + return self._cached_cli_path + + # Try to find CLI + cli_path = self._detect_cli_path() + + if cli_path: + # Validate the found CLI + if self._validate_cli_path(cli_path): + self._cached_cli_path = cli_path + self._cache_validated = True + logger.info(f"Found KiCad CLI at: {cli_path}") + return cli_path + else: + logger.warning(f"Found KiCad CLI at {cli_path} but validation failed") + + # Clear cache if detection failed + self._cached_cli_path = None + self._cache_validated = False + logger.warning("KiCad CLI not found on this system") + return None + + def get_cli_path(self, required: bool = True) -> str: + """ + Get KiCad CLI path, raising exception if not found and required. + + Args: + required: Whether to raise exception if CLI not found + + Returns: + Path to kicad-cli executable + + Raises: + KiCadCLIError: If CLI not found and required=True + """ + cli_path = self.find_kicad_cli() + + if cli_path is None and required: + raise KiCadCLIError( + "KiCad CLI not found. Please install KiCad or set KICAD_CLI_PATH environment variable." + ) + + return cli_path + + def is_available(self) -> bool: + """Check if KiCad CLI is available.""" + return self.find_kicad_cli() is not None + + def get_version(self) -> str | None: + """ + Get KiCad CLI version string. + + Returns: + Version string or None if CLI not available + """ + cli_path = self.find_kicad_cli() + if not cli_path: + return None + + try: + result = subprocess.run( # nosec B603 - CLI path is validated + [cli_path, "--version"], + capture_output=True, + text=True, + timeout=TIMEOUT_CONSTANTS["kicad_cli_version_check"], + ) + if result.returncode == 0: + return result.stdout.strip() + except (subprocess.SubprocessError, OSError) as e: + logger.warning(f"Failed to get KiCad CLI version: {e}") + + return None + + def _detect_cli_path(self) -> str | None: + """ + Detect KiCad CLI path using platform-specific strategies. + + Returns: + Path to CLI executable or None if not found + """ + # Check environment variable first + env_path = os.environ.get("KICAD_CLI_PATH") + if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK): + logger.info(f"Using KiCad CLI from environment: {env_path}") + return env_path + + # Try system PATH + cli_name = self._get_cli_executable_name() + system_path = shutil.which(cli_name) + if system_path: + logger.info(f"Found KiCad CLI in system PATH: {system_path}") + return system_path + + # Try platform-specific common locations + common_paths = self._get_common_installation_paths() + for path in common_paths: + if os.path.isfile(path) and os.access(path, os.X_OK): + logger.info(f"Found KiCad CLI at common location: {path}") + return path + + return None + + def _get_cli_executable_name(self) -> str: + """Get the CLI executable name for current platform.""" + if self._system == "Windows": + return "kicad-cli.exe" + return "kicad-cli" + + def _get_common_installation_paths(self) -> list[str]: + """Get list of common installation paths for current platform.""" + paths = [] + + if self._system == "Darwin": # macOS + paths.extend( + [ + "/Applications/KiCad/KiCad.app/Contents/MacOS/kicad-cli", + "/Applications/KiCad/kicad-cli", + "/usr/local/bin/kicad-cli", + "/opt/homebrew/bin/kicad-cli", + ] + ) + elif self._system == "Windows": + paths.extend( + [ + r"C:\Program Files\KiCad\bin\kicad-cli.exe", + r"C:\Program Files (x86)\KiCad\bin\kicad-cli.exe", + r"C:\KiCad\bin\kicad-cli.exe", + ] + ) + else: # Linux and other Unix-like systems + paths.extend( + [ + "/usr/bin/kicad-cli", + "/usr/local/bin/kicad-cli", + "/opt/kicad/bin/kicad-cli", + "/snap/kicad/current/usr/bin/kicad-cli", + ] + ) + + return paths + + def _validate_cli_path(self, cli_path: str) -> bool: + """ + Validate that a CLI path is working. + + Args: + cli_path: Path to validate + + Returns: + True if CLI is working + """ + try: + result = subprocess.run( # nosec B603 - CLI path is validated + [cli_path, "--version"], + capture_output=True, + text=True, + timeout=TIMEOUT_CONSTANTS["kicad_cli_version_check"], + ) + return result.returncode == 0 + except (subprocess.SubprocessError, OSError, FileNotFoundError): + return False + + +# Global CLI manager instance +_cli_manager = None + + +def get_cli_manager() -> KiCadCLIManager: + """Get the global KiCad CLI manager instance.""" + global _cli_manager + if _cli_manager is None: + _cli_manager = KiCadCLIManager() + return _cli_manager + + +def find_kicad_cli(force_refresh: bool = False) -> str | None: + """Convenience function to find KiCad CLI path.""" + return get_cli_manager().find_kicad_cli(force_refresh) + + +def get_kicad_cli_path(required: bool = True) -> str: + """Convenience function to get KiCad CLI path.""" + return get_cli_manager().get_cli_path(required) + + +def is_kicad_cli_available() -> bool: + """Convenience function to check if KiCad CLI is available.""" + return get_cli_manager().is_available() + + +def get_kicad_version() -> str | None: + """Convenience function to get KiCad CLI version.""" + return get_cli_manager().get_version() diff --git a/kicad_mcp/utils/path_validator.py b/kicad_mcp/utils/path_validator.py new file mode 100644 index 0000000..0c03081 --- /dev/null +++ b/kicad_mcp/utils/path_validator.py @@ -0,0 +1,226 @@ +""" +Path validation utility for KiCad MCP. + +Provides secure path validation to prevent path traversal attacks +and ensure file operations are restricted to safe directories. +""" + +import os +import pathlib + +from kicad_mcp.config import KICAD_EXTENSIONS + + +class PathValidationError(Exception): + """Raised when path validation fails.""" + + pass + + +class PathValidator: + """ + Validates file paths for security and correctness. + + Prevents path traversal attacks and ensures files are within + trusted directories with valid KiCad extensions. + """ + + def __init__(self, trusted_roots: set[str] | None = None): + """ + Initialize path validator. + + Args: + trusted_roots: Set of trusted root directories. If None, + uses current working directory. + """ + self.trusted_roots = trusted_roots or {os.getcwd()} + # Normalize trusted roots to absolute paths + self.trusted_roots = { + os.path.realpath(os.path.expanduser(root)) for root in self.trusted_roots + } + + def add_trusted_root(self, root_path: str) -> None: + """ + Add a trusted root directory. + + Args: + root_path: Path to add as trusted root + """ + normalized_root = os.path.realpath(os.path.expanduser(root_path)) + self.trusted_roots.add(normalized_root) + + def validate_path(self, file_path: str, must_exist: bool = False) -> str: + """ + Validate a file path for security and correctness. + + Args: + file_path: Path to validate + must_exist: Whether the file must exist + + Returns: + Normalized absolute path + + Raises: + PathValidationError: If path validation fails + """ + if not file_path or not isinstance(file_path, str): + raise PathValidationError("Path must be a non-empty string") + + try: + # Expand user home directory and resolve symbolic links + normalized_path = os.path.realpath(os.path.expanduser(file_path)) + except (OSError, ValueError) as e: + raise PathValidationError(f"Invalid path: {e}") from e + + # Check if path is within trusted roots + if not self._is_within_trusted_roots(normalized_path): + raise PathValidationError(f"Path '{file_path}' is outside trusted directories") + + # Check if file exists when required + if must_exist and not os.path.exists(normalized_path): + raise PathValidationError(f"Path does not exist: {file_path}") + + return normalized_path + + def validate_kicad_file(self, file_path: str, file_type: str, must_exist: bool = True) -> str: + """ + Validate a KiCad file path with extension checking. + + Args: + file_path: Path to validate + file_type: Expected KiCad file type ('project', 'schematic', 'pcb', etc.) + must_exist: Whether the file must exist + + Returns: + Normalized absolute path + + Raises: + PathValidationError: If path validation fails + """ + # First validate the basic path + normalized_path = self.validate_path(file_path, must_exist) + + # Check file extension + if file_type not in KICAD_EXTENSIONS: + raise PathValidationError(f"Unknown KiCad file type: {file_type}") + + expected_extension = KICAD_EXTENSIONS[file_type] + if not normalized_path.endswith(expected_extension): + raise PathValidationError( + f"File must have {expected_extension} extension, got: {file_path}" + ) + + return normalized_path + + def validate_directory(self, dir_path: str, must_exist: bool = True) -> str: + """ + Validate a directory path. + + Args: + dir_path: Directory path to validate + must_exist: Whether the directory must exist + + Returns: + Normalized absolute directory path + + Raises: + PathValidationError: If validation fails + """ + normalized_path = self.validate_path(dir_path, must_exist) + + if must_exist and not os.path.isdir(normalized_path): + raise PathValidationError(f"Path is not a directory: {dir_path}") + + return normalized_path + + def validate_project_directory(self, project_path: str) -> str: + """ + Validate and return the directory containing a KiCad project file. + + Args: + project_path: Path to .kicad_pro file + + Returns: + Normalized absolute directory path + + Raises: + PathValidationError: If validation fails + """ + validated_project = self.validate_kicad_file(project_path, "project", must_exist=True) + return os.path.dirname(validated_project) + + def create_safe_temp_path(self, base_name: str, extension: str = "") -> str: + """ + Create a safe temporary file path within trusted directories. + + Args: + base_name: Base name for the temporary file + extension: File extension (including dot) + + Returns: + Safe temporary file path + """ + import tempfile + + # Use the first trusted root as temp directory base + temp_root = next(iter(self.trusted_roots)) + + # Create temp directory if it doesn't exist + temp_dir = os.path.join(temp_root, "temp") + os.makedirs(temp_dir, exist_ok=True) + + # Generate unique temp file path + temp_fd, temp_path = tempfile.mkstemp( + suffix=extension, prefix=f"{base_name}_", dir=temp_dir + ) + os.close(temp_fd) # Close the file descriptor, we just need the path + + return temp_path + + def _is_within_trusted_roots(self, path: str) -> bool: + """ + Check if a path is within any trusted root directory. + + Args: + path: Normalized absolute path to check + + Returns: + True if path is within trusted roots + """ + for root in self.trusted_roots: + try: + # Check if path is within this root + pathlib.Path(root).resolve() + pathlib.Path(path).resolve().relative_to(pathlib.Path(root).resolve()) + return True + except ValueError: + # Path is not relative to this root + continue + return False + + +# Global default validator instance +_default_validator = None + + +def get_default_validator() -> PathValidator: + """Get the default global path validator instance.""" + global _default_validator + if _default_validator is None: + _default_validator = PathValidator() + return _default_validator + + +def validate_path(file_path: str, must_exist: bool = False) -> str: + """Convenience function using default validator.""" + return get_default_validator().validate_path(file_path, must_exist) + + +def validate_kicad_file(file_path: str, file_type: str, must_exist: bool = True) -> str: + """Convenience function using default validator.""" + return get_default_validator().validate_kicad_file(file_path, file_type, must_exist) + + +def validate_directory(dir_path: str, must_exist: bool = True) -> str: + """Convenience function using default validator.""" + return get_default_validator().validate_directory(dir_path, must_exist) diff --git a/kicad_mcp/utils/secure_subprocess.py b/kicad_mcp/utils/secure_subprocess.py new file mode 100644 index 0000000..77bb24a --- /dev/null +++ b/kicad_mcp/utils/secure_subprocess.py @@ -0,0 +1,294 @@ +""" +Secure subprocess execution utilities for KiCad MCP. + +Provides safe subprocess execution with input validation, +timeout enforcement, and security controls. +""" + +import asyncio +import logging +import os +import subprocess # nosec B404 - subprocess usage is secured with validation + +from ..config import TIMEOUT_CONSTANTS +from .kicad_cli import get_kicad_cli_path +from .path_validator import PathValidator, get_default_validator + +logger = logging.getLogger(__name__) + + +class SecureSubprocessError(Exception): + """Raised when secure subprocess operations fail.""" + + pass + + +class SecureSubprocessRunner: + """ + Secure subprocess runner with validation and safety controls. + + Provides methods for safely executing KiCad CLI commands and other + subprocess operations with proper input validation and security controls. + """ + + def __init__(self, path_validator: PathValidator | None = None): + """ + Initialize secure subprocess runner. + + Args: + path_validator: Path validator to use (defaults to global instance) + """ + self.path_validator = path_validator or get_default_validator() + self.default_timeout = TIMEOUT_CONSTANTS["subprocess_default"] + + def run_kicad_command( + self, + command_args: list[str], + input_files: list[str] | None = None, + output_files: list[str] | None = None, + working_dir: str | None = None, + timeout: float | None = None, + capture_output: bool = True, + ) -> subprocess.CompletedProcess: + """ + Run a KiCad CLI command with security validation. + + Args: + command_args: Command arguments (excluding the kicad-cli executable) + input_files: List of input file paths to validate + output_files: List of output file paths to validate + working_dir: Working directory for command execution + timeout: Command timeout in seconds + capture_output: Whether to capture stdout/stderr + + Returns: + CompletedProcess result + + Raises: + SecureSubprocessError: If validation fails or command fails + KiCadCLIError: If KiCad CLI not found + PathValidationError: If path validation fails + """ + # Get and validate KiCad CLI path + kicad_cli = get_kicad_cli_path(required=True) + + # Validate input files + if input_files: + for file_path in input_files: + self.path_validator.validate_path(file_path, must_exist=True) + + # Validate output file directories + if output_files: + for file_path in output_files: + output_dir = os.path.dirname(file_path) + if output_dir: # Only validate if there's a directory component + self.path_validator.validate_directory(output_dir, must_exist=True) + + # Validate working directory + if working_dir: + working_dir = self.path_validator.validate_directory(working_dir, must_exist=True) + + # Construct full command + full_command = [kicad_cli] + command_args + + # Log command for debugging (sanitized) + logger.debug(f"Executing KiCad command: {' '.join(full_command)}") + + try: + return self._run_subprocess( + full_command, + working_dir=working_dir, + timeout=timeout or self.default_timeout, + capture_output=capture_output, + ) + except subprocess.SubprocessError as e: + raise SecureSubprocessError(f"KiCad command failed: {e}") from e + + async def run_kicad_command_async( + self, + command_args: list[str], + input_files: list[str] | None = None, + output_files: list[str] | None = None, + working_dir: str | None = None, + timeout: float | None = None, + ) -> subprocess.CompletedProcess: + """ + Async version of run_kicad_command. + + Args: + command_args: Command arguments (excluding the kicad-cli executable) + input_files: List of input file paths to validate + output_files: List of output file paths to validate + working_dir: Working directory for command execution + timeout: Command timeout in seconds + + Returns: + CompletedProcess result + """ + # Run in thread pool to avoid blocking event loop + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + self.run_kicad_command, + command_args, + input_files, + output_files, + working_dir, + timeout, + True, # capture_output + ) + + def run_safe_command( + self, + command: list[str], + working_dir: str | None = None, + timeout: float | None = None, + allowed_commands: list[str] | None = None, + capture_output: bool = True, + ) -> subprocess.CompletedProcess: + """ + Run a general command with security validation. + + Args: + command: Full command list including executable + working_dir: Working directory for command execution + timeout: Command timeout in seconds + allowed_commands: List of allowed executables (whitelist) + capture_output: Whether to capture stdout/stderr + + Returns: + CompletedProcess result + + Raises: + SecureSubprocessError: If validation fails or command fails + """ + if not command: + raise SecureSubprocessError("Command cannot be empty") + + executable = command[0] + + # Validate executable against whitelist if provided + if allowed_commands and executable not in allowed_commands: + raise SecureSubprocessError(f"Command '{executable}' not in allowed list") + + # Validate working directory + if working_dir: + working_dir = self.path_validator.validate_directory(working_dir, must_exist=True) + + # Log command for debugging (sanitized) + logger.debug(f"Executing safe command: {' '.join(command)}") + + try: + return self._run_subprocess( + command, + working_dir=working_dir, + timeout=timeout or self.default_timeout, + capture_output=capture_output, + ) + except subprocess.SubprocessError as e: + raise SecureSubprocessError(f"Command failed: {e}") from e + + def create_temp_file( + self, suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None + ) -> str: + """ + Create a temporary file within validated directories. + + Args: + suffix: File suffix/extension + prefix: File prefix + content: Optional content to write to file + + Returns: + Path to created temporary file + """ + temp_path = self.path_validator.create_safe_temp_path(prefix.rstrip("_"), suffix) + + if content is not None: + with open(temp_path, "w", encoding="utf-8") as f: + f.write(content) + + return temp_path + + def _run_subprocess( + self, + command: list[str], + working_dir: str | None = None, + timeout: float = TIMEOUT_CONSTANTS["subprocess_default"], + capture_output: bool = True, + ) -> subprocess.CompletedProcess: + """ + Internal subprocess runner with consistent settings. + + Args: + command: Command to execute + working_dir: Working directory + timeout: Timeout in seconds + capture_output: Whether to capture output + + Returns: + CompletedProcess result + + Raises: + subprocess.SubprocessError: If command fails + """ + kwargs = { + "timeout": timeout, + "cwd": working_dir, + "text": True, + } + + if capture_output: + kwargs.update( + { + "capture_output": True, + "check": False, # Don't raise on non-zero exit code + } + ) + + return subprocess.run(command, **kwargs) # nosec B603 - input is validated + + +# Global secure subprocess runner instance +_subprocess_runner = None + + +def get_subprocess_runner() -> SecureSubprocessRunner: + """Get the global secure subprocess runner instance.""" + global _subprocess_runner + if _subprocess_runner is None: + _subprocess_runner = SecureSubprocessRunner() + return _subprocess_runner + + +def run_kicad_command( + command_args: list[str], + input_files: list[str] | None = None, + output_files: list[str] | None = None, + working_dir: str | None = None, + timeout: float | None = None, +) -> subprocess.CompletedProcess: + """Convenience function to run KiCad command.""" + return get_subprocess_runner().run_kicad_command( + command_args, input_files, output_files, working_dir, timeout + ) + + +async def run_kicad_command_async( + command_args: list[str], + input_files: list[str] | None = None, + output_files: list[str] | None = None, + working_dir: str | None = None, + timeout: float | None = None, +) -> subprocess.CompletedProcess: + """Convenience function to run KiCad command asynchronously.""" + return await get_subprocess_runner().run_kicad_command_async( + command_args, input_files, output_files, working_dir, timeout + ) + + +def create_temp_file( + suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None +) -> str: + """Convenience function to create temporary file.""" + return get_subprocess_runner().create_temp_file(suffix, prefix, content) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8a03f9d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,227 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "kicad-mcp" +version = "0.2.0" +description = "Model Context Protocol (MCP) server for KiCad electronic design automation (EDA) files" +readme = "README.md" +license = { text = "MIT" } +authors = [ + { name = "KiCad MCP Contributors" } +] +maintainers = [ + { name = "KiCad MCP Contributors" } +] +keywords = [ + "kicad", + "eda", + "electronics", + "schematic", + "pcb", + "mcp", + "model-context-protocol", + "ai", + "assistant" +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: Manufacturing", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Scientific/Engineering :: Electronic Design Automation (EDA)", + "Topic :: Software Development :: Libraries :: Python Modules", + "Typing :: Typed" +] +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.0.0", + "fastmcp>=0.1.0", + "pandas>=2.0.0", + "pyyaml>=6.0.0", + "defusedxml>=0.7.0", # Secure XML parsing +] + +[project.urls] +Homepage = "https://github.com/your-org/kicad-mcp" +Documentation = "https://github.com/your-org/kicad-mcp/blob/main/README.md" +Repository = "https://github.com/your-org/kicad-mcp" +"Bug Tracker" = "https://github.com/your-org/kicad-mcp/issues" +Changelog = "https://github.com/your-org/kicad-mcp/blob/main/CHANGELOG.md" + +[project.scripts] +kicad-mcp = "kicad_mcp.server:main" + +# UV dependency groups (replaces project.optional-dependencies) +[dependency-groups] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.23.0", + "pytest-mock>=3.10.0", + "pytest-cov>=4.0.0", + "pytest-xdist>=3.0.0", + "ruff>=0.1.0", + "mypy>=1.8.0", + "pre-commit>=3.0.0", + "bandit>=1.7.0", # Security linting for pre-commit hooks +] +docs = [ + "sphinx>=7.0.0", + "sphinx-rtd-theme>=1.3.0", + "myst-parser>=2.0.0", +] +security = [ + "bandit>=1.7.0", + "safety>=3.0.0", +] +performance = [ + "memory-profiler>=0.61.0", + "py-spy>=0.3.0", +] +visualization = [ + "cairosvg>=2.7.0", # SVG to PNG conversion + "Pillow>=10.0.0", # Image processing + "playwright>=1.40.0", # Browser automation (optional) +] + +# Tool configurations remain the same +[tool.ruff] +target-version = "py311" +line-length = 100 + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade + "SIM", # flake8-simplify +] +ignore = [ + "E501", # line too long, handled by ruff format + "B008", # do not perform function calls in argument defaults + "C901", # too complex (handled by other tools) + "B905", # zip() without an explicit strict= parameter +] +unfixable = [ + "B", # Avoid trying to fix flake8-bugbear violations +] + +[tool.ruff.lint.per-file-ignores] +"tests/**/*.py" = [ + "S101", # Use of assert detected + "D103", # Missing docstring in public function + "SLF001", # Private member accessed +] +"kicad_mcp/config.py" = [ + "E501", # Long lines in config are ok +] + +[tool.ruff.lint.isort] +known-first-party = ["kicad_mcp"] +force-sort-within-sections = true + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +line-ending = "auto" + +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = false +disallow_incomplete_defs = false +check_untyped_defs = true +disallow_untyped_decorators = false +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true +strict_equality = true +show_error_codes = true + +[[tool.mypy.overrides]] +module = [ + "pandas.*", + "mcp.*", +] +ignore_missing_imports = true + +[tool.pytest.ini_options] +minversion = "7.0" +addopts = [ + "--strict-markers", + "--strict-config", + "--cov=kicad_mcp", + "--cov-report=term-missing", + "--cov-report=html:htmlcov", + "--cov-report=xml", + "--cov-fail-under=80", + "-ra", + "--tb=short", +] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Tests that take more than a few seconds", + "requires_kicad: Tests that require KiCad CLI to be installed", + "performance: Performance benchmarking tests", +] +asyncio_mode = "auto" +filterwarnings = [ + "ignore::DeprecationWarning", + "ignore::PendingDeprecationWarning", + "ignore::RuntimeWarning:asyncio", +] + +[tool.coverage.run] +source = ["kicad_mcp"] +branch = true +omit = [ + "tests/*", + "kicad_mcp/__init__.py", + "*/migrations/*", + "*/venv/*", + "*/.venv/*", +] + +[tool.coverage.report] +precision = 2 +show_missing = true +skip_covered = false +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if self.debug:", + "if settings.DEBUG", + "raise AssertionError", + "raise NotImplementedError", + "if 0:", + "if __name__ == .__main__.:", + "class .*\\bProtocol\\):", + "@(abc\\.)?abstractmethod", +] + +[tool.bandit] +exclude_dirs = ["tests", "build", "dist"] +skips = ["B101", "B601", "B404", "B603", "B110", "B112"] # Skip low-severity subprocess and exception handling warnings + +[tool.bandit.assert_used] +skips = ["*_test.py", "*/test_*.py"] diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/utils/test_path_validator.py b/tests/unit/utils/test_path_validator.py new file mode 100644 index 0000000..9eff85f --- /dev/null +++ b/tests/unit/utils/test_path_validator.py @@ -0,0 +1,238 @@ +""" +Tests for path validation utility. +""" + +import os +import tempfile + +import pytest + +from kicad_mcp.utils.path_validator import ( + PathValidationError, + PathValidator, + validate_directory, + validate_kicad_file, + validate_path, +) + + +class TestPathValidator: + """Test cases for PathValidator class.""" + + def test_init_with_default_trusted_root(self): + """Test initialization with default trusted root.""" + validator = PathValidator() + assert len(validator.trusted_roots) == 1 + assert os.getcwd() in [os.path.realpath(root) for root in validator.trusted_roots] + + def test_init_with_custom_trusted_roots(self): + """Test initialization with custom trusted roots.""" + roots = {"/tmp", "/home/user"} + validator = PathValidator(trusted_roots=roots) + + # Should normalize paths + expected_roots = {os.path.realpath(root) for root in roots} + assert validator.trusted_roots == expected_roots + + def test_add_trusted_root(self): + """Test adding trusted root.""" + validator = PathValidator(trusted_roots={"/tmp"}) + validator.add_trusted_root("/home/user") + + assert os.path.realpath("/home/user") in validator.trusted_roots + + def test_validate_path_success(self): + """Test successful path validation.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + test_file = os.path.join(temp_dir, "test.txt") + + # Create test file + with open(test_file, "w") as f: + f.write("test") + + # Should succeed + result = validator.validate_path(test_file, must_exist=True) + assert result == os.path.realpath(test_file) + + def test_validate_path_traversal_attack(self): + """Test path traversal attack prevention.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + + # Try to access parent directory + malicious_path = os.path.join(temp_dir, "..", "..", "etc", "passwd") + + with pytest.raises(PathValidationError, match="outside trusted directories"): + validator.validate_path(malicious_path) + + def test_validate_path_empty_string(self): + """Test validation with empty string.""" + validator = PathValidator() + + with pytest.raises(PathValidationError, match="non-empty string"): + validator.validate_path("") + + def test_validate_path_none(self): + """Test validation with None.""" + validator = PathValidator() + + with pytest.raises(PathValidationError, match="non-empty string"): + validator.validate_path(None) + + def test_validate_path_nonexistent_when_required(self): + """Test validation of nonexistent file when existence required.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + nonexistent_file = os.path.join(temp_dir, "nonexistent.txt") + + with pytest.raises(PathValidationError, match="does not exist"): + validator.validate_path(nonexistent_file, must_exist=True) + + def test_validate_kicad_file_success(self): + """Test successful KiCad file validation.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + project_file = os.path.join(temp_dir, "test.kicad_pro") + + # Create test file + with open(project_file, "w") as f: + f.write("{}") + + result = validator.validate_kicad_file(project_file, "project") + assert result == os.path.realpath(project_file) + + def test_validate_kicad_file_wrong_extension(self): + """Test KiCad file validation with wrong extension.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + wrong_file = os.path.join(temp_dir, "test.txt") + + with open(wrong_file, "w") as f: + f.write("test") + + with pytest.raises(PathValidationError, match="must have .kicad_pro extension"): + validator.validate_kicad_file(wrong_file, "project") + + def test_validate_kicad_file_unknown_type(self): + """Test KiCad file validation with unknown file type.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + test_file = os.path.join(temp_dir, "test.txt") + + with open(test_file, "w") as f: + f.write("test") + + with pytest.raises(PathValidationError, match="Unknown KiCad file type"): + validator.validate_kicad_file(test_file, "unknown_type") + + def test_validate_directory_success(self): + """Test successful directory validation.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + sub_dir = os.path.join(temp_dir, "subdir") + os.makedirs(sub_dir) + + result = validator.validate_directory(sub_dir) + assert result == os.path.realpath(sub_dir) + + def test_validate_directory_not_directory(self): + """Test directory validation on file.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + test_file = os.path.join(temp_dir, "test.txt") + + with open(test_file, "w") as f: + f.write("test") + + with pytest.raises(PathValidationError, match="not a directory"): + validator.validate_directory(test_file) + + def test_validate_project_directory(self): + """Test project directory validation.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + project_file = os.path.join(temp_dir, "test.kicad_pro") + + with open(project_file, "w") as f: + f.write("{}") + + result = validator.validate_project_directory(project_file) + assert result == os.path.realpath(temp_dir) + + def test_create_safe_temp_path(self): + """Test safe temporary path creation.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + + temp_path = validator.create_safe_temp_path("test", ".txt") + + # Should be within trusted directory (handle symlinks with realpath) + assert os.path.realpath(temp_path).startswith(os.path.realpath(temp_dir)) + assert temp_path.endswith(".txt") + assert "test" in os.path.basename(temp_path) + + def test_symlink_resolution(self): + """Test symbolic link resolution.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + + # Create file and symlink + real_file = os.path.join(temp_dir, "real.txt") + link_file = os.path.join(temp_dir, "link.txt") + + with open(real_file, "w") as f: + f.write("test") + + os.symlink(real_file, link_file) + + # Both should resolve to same real path + real_result = validator.validate_path(real_file, must_exist=True) + link_result = validator.validate_path(link_file, must_exist=True) + + assert real_result == link_result == os.path.realpath(real_file) + + +class TestConvenienceFunctions: + """Test convenience functions.""" + + def test_validate_path_convenience(self): + """Test validate_path convenience function.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Add temp_dir to default validator + from kicad_mcp.utils.path_validator import get_default_validator + + get_default_validator().add_trusted_root(temp_dir) + + test_file = os.path.join(temp_dir, "test.txt") + with open(test_file, "w") as f: + f.write("test") + + result = validate_path(test_file, must_exist=True) + assert result == os.path.realpath(test_file) + + def test_validate_kicad_file_convenience(self): + """Test validate_kicad_file convenience function.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Add temp_dir to default validator + from kicad_mcp.utils.path_validator import get_default_validator + + get_default_validator().add_trusted_root(temp_dir) + + project_file = os.path.join(temp_dir, "test.kicad_pro") + with open(project_file, "w") as f: + f.write("{}") + + result = validate_kicad_file(project_file, "project") + assert result == os.path.realpath(project_file) + + def test_validate_directory_convenience(self): + """Test validate_directory convenience function.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Add temp_dir to default validator + from kicad_mcp.utils.path_validator import get_default_validator + + get_default_validator().add_trusted_root(temp_dir) + + result = validate_directory(temp_dir) + assert result == os.path.realpath(temp_dir) diff --git a/tests/unit/utils/test_secure_subprocess.py b/tests/unit/utils/test_secure_subprocess.py new file mode 100644 index 0000000..edbc551 --- /dev/null +++ b/tests/unit/utils/test_secure_subprocess.py @@ -0,0 +1,275 @@ +""" +Tests for secure subprocess utility. +""" + +import os +import subprocess +import tempfile +from unittest.mock import MagicMock, patch + +import pytest + +from kicad_mcp.utils.kicad_cli import KiCadCLIError +from kicad_mcp.utils.path_validator import PathValidationError, PathValidator +from kicad_mcp.utils.secure_subprocess import ( + SecureSubprocessError, + SecureSubprocessRunner, + create_temp_file, + run_kicad_command, +) + + +def _kicad_cli_available(): + """Check if KiCad CLI is available.""" + try: + from kicad_mcp.utils.kicad_cli import get_kicad_cli_path + + get_kicad_cli_path() + return True + except Exception: + return False + + +class TestSecureSubprocessRunner: + """Test cases for SecureSubprocessRunner class.""" + + def test_init_with_default_validator(self): + """Test initialization with default path validator.""" + runner = SecureSubprocessRunner() + assert runner.path_validator is not None + assert runner.default_timeout == 30.0 + + def test_init_with_custom_validator(self): + """Test initialization with custom path validator.""" + validator = PathValidator(trusted_roots={"/tmp"}) + runner = SecureSubprocessRunner(path_validator=validator) + assert runner.path_validator is validator + + @patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path") + @patch.object(SecureSubprocessRunner, "_run_subprocess") + def test_run_kicad_command_success(self, mock_run_subprocess, mock_get_cli): + """Test successful KiCad command execution.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Setup mocks + mock_get_cli.return_value = "/usr/bin/kicad-cli" + mock_result = MagicMock(returncode=0, stdout="Success") + mock_run_subprocess.return_value = mock_result + + # Create test files + input_file = os.path.join(temp_dir, "input.kicad_sch") + output_file = os.path.join(temp_dir, "output.svg") + + with open(input_file, "w") as f: + f.write("test schematic") + + # Create runner with temp directory as trusted root + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + # Run command + result = runner.run_kicad_command( + ["sch", "export", "svg", input_file, "-o", output_file], + input_files=[input_file], + output_files=[output_file], + ) + + assert result is mock_result + mock_run_subprocess.assert_called_once() + + @patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path") + def test_run_kicad_command_cli_not_found(self, mock_get_cli): + """Test KiCad command when CLI not found.""" + mock_get_cli.side_effect = KiCadCLIError("CLI not found") + + runner = SecureSubprocessRunner() + + with pytest.raises(KiCadCLIError): + runner.run_kicad_command(["--version"]) + + @pytest.mark.skipif(not _kicad_cli_available(), reason="KiCad CLI not available") + def test_run_kicad_command_invalid_input_file(self): + """Test KiCad command with invalid input file.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + # Try to use file outside trusted directory + with pytest.raises(PathValidationError): + runner.run_kicad_command( + ["sch", "export", "svg", "/etc/passwd"], input_files=["/etc/passwd"] + ) + + @pytest.mark.skipif(not _kicad_cli_available(), reason="KiCad CLI not available") + def test_run_kicad_command_invalid_output_directory(self): + """Test KiCad command with invalid output directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + # Try to output to directory outside trusted roots + with pytest.raises(PathValidationError): + runner.run_kicad_command( + ["sch", "export", "svg", "input.sch", "-o", "/etc/output.svg"], + output_files=["/etc/output.svg"], + ) + + @patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path") + @patch.object(SecureSubprocessRunner, "_run_subprocess") + def test_run_kicad_command_with_working_dir(self, mock_run_subprocess, mock_get_cli): + """Test KiCad command with working directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + mock_get_cli.return_value = "/usr/bin/kicad-cli" + mock_result = MagicMock(returncode=0) + mock_run_subprocess.return_value = mock_result + + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + runner.run_kicad_command(["--version"], working_dir=temp_dir) + + # Verify working directory was passed + mock_run_subprocess.assert_called_once() + call_args = mock_run_subprocess.call_args + assert call_args[1]["working_dir"] == os.path.realpath(temp_dir) + + @patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path") + @patch.object(SecureSubprocessRunner, "_run_subprocess") + def test_run_kicad_command_subprocess_error(self, mock_run_subprocess, mock_get_cli): + """Test KiCad command with subprocess error.""" + mock_get_cli.return_value = "/usr/bin/kicad-cli" + mock_run_subprocess.side_effect = subprocess.SubprocessError("Command failed") + + runner = SecureSubprocessRunner() + + with pytest.raises(SecureSubprocessError, match="KiCad command failed"): + runner.run_kicad_command(["--version"]) + + @pytest.mark.asyncio + @patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path") + @patch.object(SecureSubprocessRunner, "run_kicad_command") + async def test_run_kicad_command_async(self, mock_run_command, mock_get_cli): + """Test async KiCad command execution.""" + mock_get_cli.return_value = "/usr/bin/kicad-cli" + mock_result = MagicMock(returncode=0) + mock_run_command.return_value = mock_result + + runner = SecureSubprocessRunner() + + # Run the async function in a synchronous context for testing + result = await runner.run_kicad_command_async(["--version"]) + assert result is mock_result + + def test_run_safe_command_success(self): + """Test successful safe command execution.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + with patch.object(runner, "_run_subprocess") as mock_run: + mock_result = MagicMock(returncode=0) + mock_run.return_value = mock_result + + result = runner.run_safe_command(["echo", "test"], allowed_commands=["echo"]) + + assert result is mock_result + + def test_run_safe_command_empty_command(self): + """Test safe command with empty command list.""" + runner = SecureSubprocessRunner() + + with pytest.raises(SecureSubprocessError, match="Command cannot be empty"): + runner.run_safe_command([]) + + def test_run_safe_command_not_in_whitelist(self): + """Test safe command not in whitelist.""" + runner = SecureSubprocessRunner() + + with pytest.raises(SecureSubprocessError, match="not in allowed list"): + runner.run_safe_command(["rm", "-rf", "/"], allowed_commands=["echo", "ls"]) + + def test_run_safe_command_invalid_working_dir(self): + """Test safe command with invalid working directory.""" + runner = SecureSubprocessRunner() + + with pytest.raises(PathValidationError): + runner.run_safe_command(["echo", "test"], working_dir="/nonexistent/directory") + + def test_create_temp_file_without_content(self): + """Test temporary file creation without content.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + temp_path = runner.create_temp_file(suffix=".txt", prefix="test_") + + assert os.path.exists(temp_path) + assert temp_path.endswith(".txt") + assert "test_" in os.path.basename(temp_path) + + # Cleanup + os.unlink(temp_path) + + def test_create_temp_file_with_content(self): + """Test temporary file creation with content.""" + with tempfile.TemporaryDirectory() as temp_dir: + validator = PathValidator(trusted_roots={temp_dir}) + runner = SecureSubprocessRunner(path_validator=validator) + + content = "test content" + temp_path = runner.create_temp_file(content=content) + + assert os.path.exists(temp_path) + + with open(temp_path) as f: + assert f.read() == content + + # Cleanup + os.unlink(temp_path) + + def test_run_subprocess_with_capture_output(self): + """Test subprocess execution with output capture.""" + runner = SecureSubprocessRunner() + + result = runner._run_subprocess(["echo", "test"], capture_output=True, timeout=5.0) + + assert result.returncode == 0 + assert "test" in result.stdout + + def test_run_subprocess_without_capture_output(self): + """Test subprocess execution without output capture.""" + runner = SecureSubprocessRunner() + + result = runner._run_subprocess(["echo", "test"], capture_output=False, timeout=5.0) + + assert result.returncode == 0 + # stdout/stderr should be None when not captured + assert result.stdout is None + assert result.stderr is None + + def test_run_subprocess_timeout(self): + """Test subprocess timeout.""" + runner = SecureSubprocessRunner() + + with pytest.raises(subprocess.TimeoutExpired): + runner._run_subprocess(["sleep", "10"], timeout=0.1) + + +class TestConvenienceFunctions: + """Test convenience functions.""" + + @patch.object(SecureSubprocessRunner, "run_kicad_command") + def test_run_kicad_command_convenience(self, mock_run_command): + """Test run_kicad_command convenience function.""" + mock_result = MagicMock(returncode=0) + mock_run_command.return_value = mock_result + + result = run_kicad_command(["--version"]) + assert result is mock_result + + @patch.object(SecureSubprocessRunner, "create_temp_file") + def test_create_temp_file_convenience(self, mock_create_temp): + """Test create_temp_file convenience function.""" + mock_create_temp.return_value = "/tmp/test_file.txt" + + result = create_temp_file(suffix=".txt", prefix="test_") + assert result == "/tmp/test_file.txt"