Merge pull request #21 from laurigates/pr-3-security-input-validation
feat: add comprehensive security and input validation system
This commit is contained in:
commit
cc809c563c
28
.github/workflows/ci.yml
vendored
28
.github/workflows/ci.yml
vendored
@ -39,7 +39,10 @@ jobs:
|
|||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, macos-latest]
|
os: [ubuntu-latest, macos-latest]
|
||||||
python-version: ["3.10", "3.11", "3.12"]
|
python-version: ["3.10", "3.11", "3.12", "3.13"]
|
||||||
|
exclude:
|
||||||
|
- os: macos-latest
|
||||||
|
python-version: "3.10"
|
||||||
|
|
||||||
name: Test Python ${{ matrix.python-version }} on ${{ matrix.os }}
|
name: Test Python ${{ matrix.python-version }} on ${{ matrix.os }}
|
||||||
|
|
||||||
@ -69,6 +72,29 @@ jobs:
|
|||||||
file: ./coverage.xml
|
file: ./coverage.xml
|
||||||
fail_ci_if_error: false
|
fail_ci_if_error: false
|
||||||
|
|
||||||
|
security:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
name: Security Scan
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Install uv
|
||||||
|
uses: astral-sh/setup-uv@v4
|
||||||
|
with:
|
||||||
|
enable-cache: true
|
||||||
|
|
||||||
|
- name: Set up Python 3.12
|
||||||
|
run: |
|
||||||
|
uv python install 3.12
|
||||||
|
uv python pin 3.12
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: uv sync --group dev
|
||||||
|
|
||||||
|
- name: Run security scan
|
||||||
|
run: uv run bandit -r kicad_mcp/
|
||||||
|
|
||||||
build:
|
build:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
name: Build Package
|
name: Build Package
|
||||||
|
23
Makefile
23
Makefile
@ -2,18 +2,27 @@
|
|||||||
|
|
||||||
help:
|
help:
|
||||||
@echo "Available commands:"
|
@echo "Available commands:"
|
||||||
@echo " install Install dependencies"
|
@echo " install Install dependencies"
|
||||||
@echo " test Run tests"
|
@echo " test Run tests"
|
||||||
@echo " lint Run linting"
|
@echo " test <file> Run specific test file"
|
||||||
@echo " format Format code"
|
@echo " lint Run linting"
|
||||||
@echo " clean Clean build artifacts"
|
@echo " format Format code"
|
||||||
@echo " build Build package"
|
@echo " clean Clean build artifacts"
|
||||||
|
@echo " build Build package"
|
||||||
|
@echo " run Start the KiCad MCP server"
|
||||||
|
|
||||||
install:
|
install:
|
||||||
uv sync --group dev
|
uv sync --group dev
|
||||||
|
|
||||||
test:
|
test:
|
||||||
uv run python -m pytest tests/ -v
|
# Collect extra args; if none, use tests/
|
||||||
|
@files="$(filter-out $@,$(MAKECMDGOALS))"; \
|
||||||
|
if [ -z "$$files" ]; then files="tests/"; fi; \
|
||||||
|
uv run pytest $$files -v
|
||||||
|
|
||||||
|
# Prevent “No rule to make target …” errors for the extra args
|
||||||
|
%::
|
||||||
|
@:
|
||||||
|
|
||||||
lint:
|
lint:
|
||||||
uv run ruff check kicad_mcp/ tests/
|
uv run ruff check kicad_mcp/ tests/
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
KiCad MCP Server - A Model Context Protocol server for KiCad.
|
KiCad MCP Server.
|
||||||
|
|
||||||
|
A Model Context Protocol (MCP) server for KiCad electronic design automation (EDA) files.
|
||||||
"""
|
"""
|
||||||
from .server import *
|
from .server import *
|
||||||
from .config import *
|
from .config import *
|
||||||
|
@ -1,14 +1,46 @@
|
|||||||
"""
|
"""
|
||||||
Configuration settings for the KiCad MCP server.
|
Configuration settings for the KiCad MCP server.
|
||||||
"""
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
This module provides platform-specific configuration for KiCad integration,
|
||||||
|
including file paths, extensions, component libraries, and operational constants.
|
||||||
|
All settings are determined at import time based on the operating system.
|
||||||
|
|
||||||
|
Module Variables:
|
||||||
|
system (str): Operating system name from platform.system()
|
||||||
|
KICAD_USER_DIR (str): User's KiCad documents directory
|
||||||
|
KICAD_APP_PATH (str): KiCad application installation path
|
||||||
|
ADDITIONAL_SEARCH_PATHS (List[str]): Additional project search locations
|
||||||
|
DEFAULT_PROJECT_LOCATIONS (List[str]): Common project directory patterns
|
||||||
|
KICAD_PYTHON_BASE (str): KiCad Python framework base path (macOS only)
|
||||||
|
KICAD_EXTENSIONS (Dict[str, str]): KiCad file extension mappings
|
||||||
|
DATA_EXTENSIONS (List[str]): Recognized data file extensions
|
||||||
|
CIRCUIT_DEFAULTS (Dict[str, Union[float, List[float]]]): Default circuit parameters
|
||||||
|
COMMON_LIBRARIES (Dict[str, Dict[str, Dict[str, str]]]): Component library mappings
|
||||||
|
DEFAULT_FOOTPRINTS (Dict[str, List[str]]): Default footprint suggestions per component
|
||||||
|
TIMEOUT_CONSTANTS (Dict[str, float]): Operation timeout values in seconds
|
||||||
|
PROGRESS_CONSTANTS (Dict[str, int]): Progress reporting percentage values
|
||||||
|
DISPLAY_CONSTANTS (Dict[str, int]): UI display configuration values
|
||||||
|
|
||||||
|
Platform Support:
|
||||||
|
- macOS (Darwin): Full support with application bundle paths
|
||||||
|
- Windows: Standard installation paths
|
||||||
|
- Linux: System package paths
|
||||||
|
- Unknown: Defaults to macOS paths for compatibility
|
||||||
|
|
||||||
|
Dependencies:
|
||||||
|
- os: File system operations and environment variables
|
||||||
|
- platform: Operating system detection
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
# Determine operating system
|
# Determine operating system for platform-specific configuration
|
||||||
|
# Returns 'Darwin' (macOS), 'Windows', 'Linux', or other
|
||||||
system = platform.system()
|
system = platform.system()
|
||||||
|
|
||||||
# KiCad paths based on operating system
|
# Platform-specific KiCad installation and user directory paths
|
||||||
|
# These paths are used for finding KiCad resources and user projects
|
||||||
if system == "Darwin": # macOS
|
if system == "Darwin": # macOS
|
||||||
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
||||||
KICAD_APP_PATH = "/Applications/KiCad/KiCad.app"
|
KICAD_APP_PATH = "/Applications/KiCad/KiCad.app"
|
||||||
@ -19,42 +51,52 @@ elif system == "Linux":
|
|||||||
KICAD_USER_DIR = os.path.expanduser("~/KiCad")
|
KICAD_USER_DIR = os.path.expanduser("~/KiCad")
|
||||||
KICAD_APP_PATH = "/usr/share/kicad"
|
KICAD_APP_PATH = "/usr/share/kicad"
|
||||||
else:
|
else:
|
||||||
# Default to macOS paths if system is unknown
|
# Default to macOS paths if system is unknown for maximum compatibility
|
||||||
|
# This ensures the server can start even on unrecognized platforms
|
||||||
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
||||||
KICAD_APP_PATH = "/Applications/KiCad/KiCad.app"
|
KICAD_APP_PATH = "/Applications/KiCad/KiCad.app"
|
||||||
|
|
||||||
# Additional search paths from environment variable
|
# Additional search paths from environment variable KICAD_SEARCH_PATHS
|
||||||
|
# Users can specify custom project locations as comma-separated paths
|
||||||
ADDITIONAL_SEARCH_PATHS = []
|
ADDITIONAL_SEARCH_PATHS = []
|
||||||
env_search_paths = os.environ.get("KICAD_SEARCH_PATHS", "")
|
env_search_paths = os.environ.get("KICAD_SEARCH_PATHS", "")
|
||||||
if env_search_paths:
|
if env_search_paths:
|
||||||
for path in env_search_paths.split(","):
|
for path in env_search_paths.split(","):
|
||||||
expanded_path = os.path.expanduser(path.strip())
|
expanded_path = os.path.expanduser(path.strip()) # Expand ~ and variables
|
||||||
if os.path.exists(expanded_path):
|
if os.path.exists(expanded_path): # Only add existing directories
|
||||||
ADDITIONAL_SEARCH_PATHS.append(expanded_path)
|
ADDITIONAL_SEARCH_PATHS.append(expanded_path)
|
||||||
|
|
||||||
# Try to auto-detect common project locations if not specified
|
# Auto-detect common project locations for convenient project discovery
|
||||||
|
# These are typical directory names users create for electronics projects
|
||||||
DEFAULT_PROJECT_LOCATIONS = [
|
DEFAULT_PROJECT_LOCATIONS = [
|
||||||
"~/Documents/PCB",
|
"~/Documents/PCB", # Common Windows/macOS location
|
||||||
"~/PCB",
|
"~/PCB", # Simple home directory structure
|
||||||
"~/Electronics",
|
"~/Electronics", # Generic electronics projects
|
||||||
"~/Projects/Electronics",
|
"~/Projects/Electronics", # Organized project structure
|
||||||
"~/Projects/PCB",
|
"~/Projects/PCB", # PCB-specific project directory
|
||||||
"~/Projects/KiCad"
|
"~/Projects/KiCad", # KiCad-specific project directory
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Add existing default locations to search paths
|
||||||
|
# Avoids duplicates and only includes directories that actually exist
|
||||||
for location in DEFAULT_PROJECT_LOCATIONS:
|
for location in DEFAULT_PROJECT_LOCATIONS:
|
||||||
expanded_path = os.path.expanduser(location)
|
expanded_path = os.path.expanduser(location)
|
||||||
if os.path.exists(expanded_path) and expanded_path not in ADDITIONAL_SEARCH_PATHS:
|
if os.path.exists(expanded_path) and expanded_path not in ADDITIONAL_SEARCH_PATHS:
|
||||||
ADDITIONAL_SEARCH_PATHS.append(expanded_path)
|
ADDITIONAL_SEARCH_PATHS.append(expanded_path)
|
||||||
|
|
||||||
# Base path to KiCad's Python framework
|
# Base path to KiCad's Python framework for API access
|
||||||
|
# macOS bundles Python framework within the application
|
||||||
if system == "Darwin": # macOS
|
if system == "Darwin": # macOS
|
||||||
KICAD_PYTHON_BASE = os.path.join(KICAD_APP_PATH, "Contents/Frameworks/Python.framework/Versions")
|
KICAD_PYTHON_BASE = os.path.join(
|
||||||
|
KICAD_APP_PATH, "Contents/Frameworks/Python.framework/Versions"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
|
# Linux/Windows use system Python or require dynamic detection
|
||||||
KICAD_PYTHON_BASE = "" # Will be determined dynamically in python_path.py
|
KICAD_PYTHON_BASE = "" # Will be determined dynamically in python_path.py
|
||||||
|
|
||||||
|
|
||||||
# File extensions
|
# KiCad file extension mappings for project file identification
|
||||||
|
# Used by file discovery and validation functions
|
||||||
KICAD_EXTENSIONS = {
|
KICAD_EXTENSIONS = {
|
||||||
"project": ".kicad_pro",
|
"project": ".kicad_pro",
|
||||||
"pcb": ".kicad_pcb",
|
"pcb": ".kicad_pcb",
|
||||||
@ -66,8 +108,92 @@ KICAD_EXTENSIONS = {
|
|||||||
"kibot_config": ".kibot.yaml",
|
"kibot_config": ".kibot.yaml",
|
||||||
}
|
}
|
||||||
|
|
||||||
# Recognized data files
|
# Additional data file extensions that may be part of KiCad projects
|
||||||
|
# Includes manufacturing files, component data, and export formats
|
||||||
DATA_EXTENSIONS = [
|
DATA_EXTENSIONS = [
|
||||||
".csv", # BOM or other data
|
".csv", # BOM or other data
|
||||||
".pos", # Component position file
|
".pos", # Component position file
|
||||||
|
".net", # Netlist files
|
||||||
|
".zip", # Gerber files and other archives
|
||||||
|
".drl", # Drill files
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Default parameters for circuit creation and component placement
|
||||||
|
# Values in mm unless otherwise specified, following KiCad conventions
|
||||||
|
CIRCUIT_DEFAULTS = {
|
||||||
|
"grid_spacing": 1.0, # Default grid spacing in mm for user coordinates
|
||||||
|
"component_spacing": 10.16, # Default component spacing in mm
|
||||||
|
"wire_width": 6, # Default wire width in KiCad units (0.006 inch)
|
||||||
|
"text_size": [1.27, 1.27], # Default text size in mm
|
||||||
|
"pin_length": 2.54, # Default pin length in mm
|
||||||
|
}
|
||||||
|
|
||||||
|
# Predefined component library mappings for quick circuit creation
|
||||||
|
# Maps common component types to their KiCad library and symbol names
|
||||||
|
# Organized by functional categories: basic, power, connectors
|
||||||
|
COMMON_LIBRARIES = {
|
||||||
|
"basic": {
|
||||||
|
"resistor": {"library": "Device", "symbol": "R"},
|
||||||
|
"capacitor": {"library": "Device", "symbol": "C"},
|
||||||
|
"inductor": {"library": "Device", "symbol": "L"},
|
||||||
|
"led": {"library": "Device", "symbol": "LED"},
|
||||||
|
"diode": {"library": "Device", "symbol": "D"},
|
||||||
|
},
|
||||||
|
"power": {
|
||||||
|
"vcc": {"library": "power", "symbol": "VCC"},
|
||||||
|
"gnd": {"library": "power", "symbol": "GND"},
|
||||||
|
"+5v": {"library": "power", "symbol": "+5V"},
|
||||||
|
"+3v3": {"library": "power", "symbol": "+3V3"},
|
||||||
|
"+12v": {"library": "power", "symbol": "+12V"},
|
||||||
|
"-12v": {"library": "power", "symbol": "-12V"},
|
||||||
|
},
|
||||||
|
"connectors": {
|
||||||
|
"conn_2pin": {"library": "Connector", "symbol": "Conn_01x02_Male"},
|
||||||
|
"conn_4pin": {"library": "Connector_Generic", "symbol": "Conn_01x04"},
|
||||||
|
"conn_8pin": {"library": "Connector_Generic", "symbol": "Conn_01x08"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Suggested footprints for common components, ordered by preference
|
||||||
|
# SMD variants listed first, followed by through-hole alternatives
|
||||||
|
DEFAULT_FOOTPRINTS = {
|
||||||
|
"R": [
|
||||||
|
"Resistor_SMD:R_0805_2012Metric",
|
||||||
|
"Resistor_SMD:R_0603_1608Metric",
|
||||||
|
"Resistor_THT:R_Axial_DIN0207_L6.3mm_D2.5mm_P10.16mm_Horizontal",
|
||||||
|
],
|
||||||
|
"C": [
|
||||||
|
"Capacitor_SMD:C_0805_2012Metric",
|
||||||
|
"Capacitor_SMD:C_0603_1608Metric",
|
||||||
|
"Capacitor_THT:C_Disc_D5.0mm_W2.5mm_P5.00mm",
|
||||||
|
],
|
||||||
|
"LED": ["LED_SMD:LED_0805_2012Metric", "LED_THT:LED_D5.0mm"],
|
||||||
|
"D": ["Diode_SMD:D_SOD-123", "Diode_THT:D_DO-35_SOD27_P7.62mm_Horizontal"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Operation timeout values in seconds for external process management
|
||||||
|
# Prevents hanging operations and provides user feedback
|
||||||
|
TIMEOUT_CONSTANTS = {
|
||||||
|
"kicad_cli_version_check": 10.0, # Timeout for KiCad CLI version checks
|
||||||
|
"kicad_cli_export": 30.0, # Timeout for KiCad CLI export operations
|
||||||
|
"application_open": 10.0, # Timeout for opening applications (e.g., KiCad)
|
||||||
|
"subprocess_default": 30.0, # Default timeout for subprocess operations
|
||||||
|
}
|
||||||
|
|
||||||
|
# Progress percentage milestones for long-running operations
|
||||||
|
# Provides consistent progress reporting across different tools
|
||||||
|
PROGRESS_CONSTANTS = {
|
||||||
|
"start": 10, # Initial progress percentage
|
||||||
|
"detection": 20, # Progress after CLI detection
|
||||||
|
"setup": 30, # Progress after setup complete
|
||||||
|
"processing": 50, # Progress during processing
|
||||||
|
"finishing": 70, # Progress when finishing up
|
||||||
|
"validation": 90, # Progress during validation
|
||||||
|
"complete": 100, # Progress when complete
|
||||||
|
}
|
||||||
|
|
||||||
|
# User interface display configuration values
|
||||||
|
# Controls how much information is shown in previews and summaries
|
||||||
|
DISPLAY_CONSTANTS = {
|
||||||
|
"bom_preview_limit": 20, # Maximum number of BOM items to show in preview
|
||||||
|
}
|
||||||
|
@ -297,9 +297,9 @@ def parse_bom_file(file_path: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any]
|
|||||||
components.append(dict(row))
|
components.append(dict(row))
|
||||||
|
|
||||||
elif ext == '.xml':
|
elif ext == '.xml':
|
||||||
# Basic XML parsing
|
# Basic XML parsing with security protection
|
||||||
import xml.etree.ElementTree as ET
|
from defusedxml.ElementTree import parse as safe_parse
|
||||||
tree = ET.parse(file_path)
|
tree = safe_parse(file_path)
|
||||||
root = tree.getroot()
|
root = tree.getroot()
|
||||||
|
|
||||||
format_info["detected_format"] = "xml"
|
format_info["detected_format"] = "xml"
|
||||||
|
298
kicad_mcp/tools/validation_tools.py
Normal file
298
kicad_mcp/tools/validation_tools.py
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
"""
|
||||||
|
Validation tools for KiCad projects.
|
||||||
|
|
||||||
|
Provides tools for validating circuit positioning, generating reports,
|
||||||
|
and checking component boundaries in existing projects.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import Context, FastMCP
|
||||||
|
|
||||||
|
from kicad_mcp.utils.boundary_validator import BoundaryValidator
|
||||||
|
from kicad_mcp.utils.file_utils import get_project_files
|
||||||
|
|
||||||
|
|
||||||
|
async def validate_project_boundaries(project_path: str, ctx: Context = None) -> dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Validate component boundaries for an entire KiCad project.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
project_path: Path to the KiCad project file (.kicad_pro)
|
||||||
|
ctx: Context for MCP communication
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with validation results and report
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if ctx:
|
||||||
|
await ctx.info("Starting boundary validation for project")
|
||||||
|
await ctx.report_progress(10, 100)
|
||||||
|
|
||||||
|
# Get project files
|
||||||
|
files = get_project_files(project_path)
|
||||||
|
if "schematic" not in files:
|
||||||
|
return {"success": False, "error": "No schematic file found in project"}
|
||||||
|
|
||||||
|
schematic_file = files["schematic"]
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.report_progress(30, 100)
|
||||||
|
await ctx.info(f"Reading schematic file: {schematic_file}")
|
||||||
|
|
||||||
|
# Read schematic file
|
||||||
|
with open(schematic_file) as f:
|
||||||
|
content = f.read().strip()
|
||||||
|
|
||||||
|
# Parse components based on format
|
||||||
|
components = []
|
||||||
|
|
||||||
|
if content.startswith("(kicad_sch"):
|
||||||
|
# S-expression format - extract components
|
||||||
|
components = _extract_components_from_sexpr(content)
|
||||||
|
else:
|
||||||
|
# JSON format
|
||||||
|
try:
|
||||||
|
schematic_data = json.loads(content)
|
||||||
|
components = _extract_components_from_json(schematic_data)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": "Schematic file is neither valid S-expression nor JSON format",
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.report_progress(60, 100)
|
||||||
|
await ctx.info(f"Found {len(components)} components to validate")
|
||||||
|
|
||||||
|
# Run boundary validation
|
||||||
|
validator = BoundaryValidator()
|
||||||
|
validation_report = validator.validate_circuit_components(components)
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.report_progress(80, 100)
|
||||||
|
await ctx.info(
|
||||||
|
f"Validation complete: {validation_report.out_of_bounds_count} out of bounds"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate text report
|
||||||
|
report_text = validator.generate_validation_report_text(validation_report)
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.info(f"Validation Report:\n{report_text}")
|
||||||
|
await ctx.report_progress(100, 100)
|
||||||
|
|
||||||
|
# Create result
|
||||||
|
result = {
|
||||||
|
"success": validation_report.success,
|
||||||
|
"total_components": validation_report.total_components,
|
||||||
|
"out_of_bounds_count": validation_report.out_of_bounds_count,
|
||||||
|
"corrected_positions": validation_report.corrected_positions,
|
||||||
|
"report_text": report_text,
|
||||||
|
"has_errors": validation_report.has_errors(),
|
||||||
|
"has_warnings": validation_report.has_warnings(),
|
||||||
|
"issues": [
|
||||||
|
{
|
||||||
|
"severity": issue.severity.value,
|
||||||
|
"component_ref": issue.component_ref,
|
||||||
|
"message": issue.message,
|
||||||
|
"position": issue.position,
|
||||||
|
"suggested_position": issue.suggested_position,
|
||||||
|
}
|
||||||
|
for issue in validation_report.issues
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error validating project boundaries: {str(e)}"
|
||||||
|
if ctx:
|
||||||
|
await ctx.info(error_msg)
|
||||||
|
return {"success": False, "error": error_msg}
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_validation_report(
|
||||||
|
project_path: str, output_path: str = None, ctx: Context = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Generate a comprehensive validation report for a KiCad project.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
project_path: Path to the KiCad project file (.kicad_pro)
|
||||||
|
output_path: Optional path to save the report (defaults to project directory)
|
||||||
|
ctx: Context for MCP communication
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with report generation results
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if ctx:
|
||||||
|
await ctx.info("Generating validation report")
|
||||||
|
await ctx.report_progress(10, 100)
|
||||||
|
|
||||||
|
# Run validation
|
||||||
|
validation_result = await validate_project_boundaries(project_path, ctx)
|
||||||
|
|
||||||
|
if not validation_result["success"]:
|
||||||
|
return validation_result
|
||||||
|
|
||||||
|
# Determine output path
|
||||||
|
if output_path is None:
|
||||||
|
project_dir = os.path.dirname(project_path)
|
||||||
|
project_name = os.path.splitext(os.path.basename(project_path))[0]
|
||||||
|
output_path = os.path.join(project_dir, f"{project_name}_validation_report.json")
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.report_progress(80, 100)
|
||||||
|
await ctx.info(f"Saving report to: {output_path}")
|
||||||
|
|
||||||
|
# Save detailed report
|
||||||
|
report_data = {
|
||||||
|
"project_path": project_path,
|
||||||
|
"validation_timestamp": __import__("datetime").datetime.now().isoformat(),
|
||||||
|
"summary": {
|
||||||
|
"total_components": validation_result["total_components"],
|
||||||
|
"out_of_bounds_count": validation_result["out_of_bounds_count"],
|
||||||
|
"has_errors": validation_result["has_errors"],
|
||||||
|
"has_warnings": validation_result["has_warnings"],
|
||||||
|
},
|
||||||
|
"corrected_positions": validation_result["corrected_positions"],
|
||||||
|
"issues": validation_result["issues"],
|
||||||
|
"report_text": validation_result["report_text"],
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(output_path, "w") as f:
|
||||||
|
json.dump(report_data, f, indent=2)
|
||||||
|
|
||||||
|
if ctx:
|
||||||
|
await ctx.report_progress(100, 100)
|
||||||
|
await ctx.info("Validation report generated successfully")
|
||||||
|
|
||||||
|
return {"success": True, "report_path": output_path, "summary": report_data["summary"]}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error generating validation report: {str(e)}"
|
||||||
|
if ctx:
|
||||||
|
await ctx.info(error_msg)
|
||||||
|
return {"success": False, "error": error_msg}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_components_from_sexpr(content: str) -> list[dict[str, Any]]:
|
||||||
|
"""Extract component information from S-expression format."""
|
||||||
|
import re
|
||||||
|
|
||||||
|
components = []
|
||||||
|
|
||||||
|
# Find all symbol instances
|
||||||
|
symbol_pattern = r'\(symbol\s+\(lib_id\s+"([^"]+)"\)\s+\(at\s+([\d.-]+)\s+([\d.-]+)\s+[\d.-]+\)\s+\(uuid\s+[^)]+\)(.*?)\n\s*\)'
|
||||||
|
|
||||||
|
for match in re.finditer(symbol_pattern, content, re.DOTALL):
|
||||||
|
lib_id = match.group(1)
|
||||||
|
x_pos = float(match.group(2))
|
||||||
|
y_pos = float(match.group(3))
|
||||||
|
properties_text = match.group(4)
|
||||||
|
|
||||||
|
# Extract reference from properties
|
||||||
|
ref_match = re.search(r'\(property\s+"Reference"\s+"([^"]+)"', properties_text)
|
||||||
|
reference = ref_match.group(1) if ref_match else "Unknown"
|
||||||
|
|
||||||
|
# Determine component type from lib_id
|
||||||
|
component_type = _get_component_type_from_lib_id(lib_id)
|
||||||
|
|
||||||
|
components.append(
|
||||||
|
{
|
||||||
|
"reference": reference,
|
||||||
|
"position": (x_pos, y_pos),
|
||||||
|
"component_type": component_type,
|
||||||
|
"lib_id": lib_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return components
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_components_from_json(schematic_data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||||
|
"""Extract component information from JSON format."""
|
||||||
|
components = []
|
||||||
|
|
||||||
|
if "symbol" in schematic_data:
|
||||||
|
for symbol in schematic_data["symbol"]:
|
||||||
|
# Extract reference
|
||||||
|
reference = "Unknown"
|
||||||
|
if "property" in symbol:
|
||||||
|
for prop in symbol["property"]:
|
||||||
|
if prop.get("name") == "Reference":
|
||||||
|
reference = prop.get("value", "Unknown")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Extract position
|
||||||
|
position = (0, 0)
|
||||||
|
if "at" in symbol and len(symbol["at"]) >= 2:
|
||||||
|
# Convert from internal units to mm
|
||||||
|
x_pos = float(symbol["at"][0]) / 10.0
|
||||||
|
y_pos = float(symbol["at"][1]) / 10.0
|
||||||
|
position = (x_pos, y_pos)
|
||||||
|
|
||||||
|
# Determine component type
|
||||||
|
lib_id = symbol.get("lib_id", "")
|
||||||
|
component_type = _get_component_type_from_lib_id(lib_id)
|
||||||
|
|
||||||
|
components.append(
|
||||||
|
{
|
||||||
|
"reference": reference,
|
||||||
|
"position": position,
|
||||||
|
"component_type": component_type,
|
||||||
|
"lib_id": lib_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return components
|
||||||
|
|
||||||
|
|
||||||
|
def _get_component_type_from_lib_id(lib_id: str) -> str:
|
||||||
|
"""Determine component type from library ID."""
|
||||||
|
lib_id_lower = lib_id.lower()
|
||||||
|
|
||||||
|
if "resistor" in lib_id_lower or ":r" in lib_id_lower:
|
||||||
|
return "resistor"
|
||||||
|
elif "capacitor" in lib_id_lower or ":c" in lib_id_lower:
|
||||||
|
return "capacitor"
|
||||||
|
elif "inductor" in lib_id_lower or ":l" in lib_id_lower:
|
||||||
|
return "inductor"
|
||||||
|
elif "led" in lib_id_lower:
|
||||||
|
return "led"
|
||||||
|
elif "diode" in lib_id_lower or ":d" in lib_id_lower:
|
||||||
|
return "diode"
|
||||||
|
elif "transistor" in lib_id_lower or "npn" in lib_id_lower or "pnp" in lib_id_lower:
|
||||||
|
return "transistor"
|
||||||
|
elif "power:" in lib_id_lower:
|
||||||
|
return "power"
|
||||||
|
elif "switch" in lib_id_lower:
|
||||||
|
return "switch"
|
||||||
|
elif "connector" in lib_id_lower:
|
||||||
|
return "connector"
|
||||||
|
elif "mcu" in lib_id_lower or "ic" in lib_id_lower or ":u" in lib_id_lower:
|
||||||
|
return "ic"
|
||||||
|
else:
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
def register_validation_tools(mcp: FastMCP) -> None:
|
||||||
|
"""Register validation tools with the MCP server."""
|
||||||
|
|
||||||
|
@mcp.tool(name="validate_project_boundaries")
|
||||||
|
async def validate_project_boundaries_tool(
|
||||||
|
project_path: str, ctx: Context = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Validate component boundaries for an entire KiCad project."""
|
||||||
|
return await validate_project_boundaries(project_path, ctx)
|
||||||
|
|
||||||
|
@mcp.tool(name="generate_validation_report")
|
||||||
|
async def generate_validation_report_tool(
|
||||||
|
project_path: str, output_path: str = None, ctx: Context = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Generate a comprehensive validation report for a KiCad project."""
|
||||||
|
return await generate_validation_report(project_path, output_path, ctx)
|
365
kicad_mcp/utils/boundary_validator.py
Normal file
365
kicad_mcp/utils/boundary_validator.py
Normal file
@ -0,0 +1,365 @@
|
|||||||
|
"""
|
||||||
|
Boundary validation system for KiCad circuit generation.
|
||||||
|
|
||||||
|
Provides comprehensive validation for component positioning, boundary checking,
|
||||||
|
and validation report generation to prevent out-of-bounds placement issues.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
import json
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from kicad_mcp.utils.component_layout import ComponentLayoutManager, SchematicBounds
|
||||||
|
from kicad_mcp.utils.coordinate_converter import CoordinateConverter, validate_position
|
||||||
|
|
||||||
|
|
||||||
|
class ValidationSeverity(Enum):
|
||||||
|
"""Severity levels for validation issues."""
|
||||||
|
|
||||||
|
ERROR = "error"
|
||||||
|
WARNING = "warning"
|
||||||
|
INFO = "info"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ValidationIssue:
|
||||||
|
"""Represents a validation issue found during boundary checking."""
|
||||||
|
|
||||||
|
severity: ValidationSeverity
|
||||||
|
component_ref: str
|
||||||
|
message: str
|
||||||
|
position: tuple[float, float]
|
||||||
|
suggested_position: tuple[float, float] | None = None
|
||||||
|
component_type: str = "default"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ValidationReport:
|
||||||
|
"""Comprehensive validation report for circuit positioning."""
|
||||||
|
|
||||||
|
success: bool
|
||||||
|
issues: list[ValidationIssue]
|
||||||
|
total_components: int
|
||||||
|
validated_components: int
|
||||||
|
out_of_bounds_count: int
|
||||||
|
corrected_positions: dict[str, tuple[float, float]]
|
||||||
|
|
||||||
|
def has_errors(self) -> bool:
|
||||||
|
"""Check if report contains any error-level issues."""
|
||||||
|
return any(issue.severity == ValidationSeverity.ERROR for issue in self.issues)
|
||||||
|
|
||||||
|
def has_warnings(self) -> bool:
|
||||||
|
"""Check if report contains any warning-level issues."""
|
||||||
|
return any(issue.severity == ValidationSeverity.WARNING for issue in self.issues)
|
||||||
|
|
||||||
|
def get_issues_by_severity(self, severity: ValidationSeverity) -> list[ValidationIssue]:
|
||||||
|
"""Get all issues of a specific severity level."""
|
||||||
|
return [issue for issue in self.issues if issue.severity == severity]
|
||||||
|
|
||||||
|
|
||||||
|
class BoundaryValidator:
|
||||||
|
"""
|
||||||
|
Comprehensive boundary validation system for KiCad circuit generation.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Pre-generation coordinate validation
|
||||||
|
- Automatic position correction
|
||||||
|
- Detailed validation reports
|
||||||
|
- Integration with circuit generation pipeline
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, bounds: SchematicBounds | None = None):
|
||||||
|
"""
|
||||||
|
Initialize the boundary validator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bounds: Schematic boundaries (defaults to A4)
|
||||||
|
"""
|
||||||
|
self.bounds = bounds or SchematicBounds()
|
||||||
|
self.converter = CoordinateConverter()
|
||||||
|
self.layout_manager = ComponentLayoutManager(self.bounds)
|
||||||
|
|
||||||
|
def validate_component_position(
|
||||||
|
self, component_ref: str, x: float, y: float, component_type: str = "default"
|
||||||
|
) -> ValidationIssue:
|
||||||
|
"""
|
||||||
|
Validate a single component position.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
component_ref: Component reference (e.g., "R1")
|
||||||
|
x: X coordinate in mm
|
||||||
|
y: Y coordinate in mm
|
||||||
|
component_type: Type of component
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ValidationIssue describing the validation result
|
||||||
|
"""
|
||||||
|
# Check if position is within A4 bounds
|
||||||
|
if not validate_position(x, y, use_margins=True):
|
||||||
|
# Find a corrected position
|
||||||
|
corrected_x, corrected_y = self.layout_manager.find_valid_position(
|
||||||
|
component_ref, component_type, x, y
|
||||||
|
)
|
||||||
|
|
||||||
|
return ValidationIssue(
|
||||||
|
severity=ValidationSeverity.ERROR,
|
||||||
|
component_ref=component_ref,
|
||||||
|
message=f"Component {component_ref} at ({x:.2f}, {y:.2f}) is outside A4 bounds",
|
||||||
|
position=(x, y),
|
||||||
|
suggested_position=(corrected_x, corrected_y),
|
||||||
|
component_type=component_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if position is within usable area (with margins)
|
||||||
|
if not validate_position(x, y, use_margins=False):
|
||||||
|
# Position is within absolute bounds but outside usable area
|
||||||
|
return ValidationIssue(
|
||||||
|
severity=ValidationSeverity.WARNING,
|
||||||
|
component_ref=component_ref,
|
||||||
|
message=f"Component {component_ref} at ({x:.2f}, {y:.2f}) is outside usable area (margins)",
|
||||||
|
position=(x, y),
|
||||||
|
component_type=component_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Position is valid
|
||||||
|
return ValidationIssue(
|
||||||
|
severity=ValidationSeverity.INFO,
|
||||||
|
component_ref=component_ref,
|
||||||
|
message=f"Component {component_ref} position is valid",
|
||||||
|
position=(x, y),
|
||||||
|
component_type=component_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
def validate_circuit_components(self, components: list[dict[str, Any]]) -> ValidationReport:
|
||||||
|
"""
|
||||||
|
Validate positioning for all components in a circuit.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
components: List of component dictionaries with position information
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ValidationReport with comprehensive validation results
|
||||||
|
"""
|
||||||
|
issues = []
|
||||||
|
corrected_positions = {}
|
||||||
|
out_of_bounds_count = 0
|
||||||
|
|
||||||
|
# Reset layout manager for this validation
|
||||||
|
self.layout_manager.clear_layout()
|
||||||
|
|
||||||
|
for component in components:
|
||||||
|
component_ref = component.get("reference", "Unknown")
|
||||||
|
component_type = component.get("component_type", "default")
|
||||||
|
|
||||||
|
# Extract position - handle different formats
|
||||||
|
position = component.get("position")
|
||||||
|
if position is None:
|
||||||
|
# No position specified - this is an info issue
|
||||||
|
issues.append(
|
||||||
|
ValidationIssue(
|
||||||
|
severity=ValidationSeverity.INFO,
|
||||||
|
component_ref=component_ref,
|
||||||
|
message=f"Component {component_ref} has no position specified",
|
||||||
|
position=(0, 0),
|
||||||
|
component_type=component_type,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle position as tuple or list
|
||||||
|
if isinstance(position, list | tuple) and len(position) >= 2:
|
||||||
|
x, y = float(position[0]), float(position[1])
|
||||||
|
else:
|
||||||
|
issues.append(
|
||||||
|
ValidationIssue(
|
||||||
|
severity=ValidationSeverity.ERROR,
|
||||||
|
component_ref=component_ref,
|
||||||
|
message=f"Component {component_ref} has invalid position format: {position}",
|
||||||
|
position=(0, 0),
|
||||||
|
component_type=component_type,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Validate the position
|
||||||
|
validation_issue = self.validate_component_position(component_ref, x, y, component_type)
|
||||||
|
issues.append(validation_issue)
|
||||||
|
|
||||||
|
# Track out of bounds components
|
||||||
|
if validation_issue.severity == ValidationSeverity.ERROR:
|
||||||
|
out_of_bounds_count += 1
|
||||||
|
if validation_issue.suggested_position:
|
||||||
|
corrected_positions[component_ref] = validation_issue.suggested_position
|
||||||
|
|
||||||
|
# Generate report
|
||||||
|
report = ValidationReport(
|
||||||
|
success=out_of_bounds_count == 0,
|
||||||
|
issues=issues,
|
||||||
|
total_components=len(components),
|
||||||
|
validated_components=len([c for c in components if c.get("position") is not None]),
|
||||||
|
out_of_bounds_count=out_of_bounds_count,
|
||||||
|
corrected_positions=corrected_positions,
|
||||||
|
)
|
||||||
|
|
||||||
|
return report
|
||||||
|
|
||||||
|
def validate_wire_connection(
|
||||||
|
self, start_x: float, start_y: float, end_x: float, end_y: float
|
||||||
|
) -> list[ValidationIssue]:
|
||||||
|
"""
|
||||||
|
Validate wire connection endpoints.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start_x: Starting X coordinate in mm
|
||||||
|
start_y: Starting Y coordinate in mm
|
||||||
|
end_x: Ending X coordinate in mm
|
||||||
|
end_y: Ending Y coordinate in mm
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of validation issues for wire endpoints
|
||||||
|
"""
|
||||||
|
issues = []
|
||||||
|
|
||||||
|
# Validate start point
|
||||||
|
if not validate_position(start_x, start_y, use_margins=True):
|
||||||
|
issues.append(
|
||||||
|
ValidationIssue(
|
||||||
|
severity=ValidationSeverity.ERROR,
|
||||||
|
component_ref="WIRE_START",
|
||||||
|
message=f"Wire start point ({start_x:.2f}, {start_y:.2f}) is outside bounds",
|
||||||
|
position=(start_x, start_y),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Validate end point
|
||||||
|
if not validate_position(end_x, end_y, use_margins=True):
|
||||||
|
issues.append(
|
||||||
|
ValidationIssue(
|
||||||
|
severity=ValidationSeverity.ERROR,
|
||||||
|
component_ref="WIRE_END",
|
||||||
|
message=f"Wire end point ({end_x:.2f}, {end_y:.2f}) is outside bounds",
|
||||||
|
position=(end_x, end_y),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def auto_correct_positions(
|
||||||
|
self, components: list[dict[str, Any]]
|
||||||
|
) -> tuple[list[dict[str, Any]], ValidationReport]:
|
||||||
|
"""
|
||||||
|
Automatically correct out-of-bounds component positions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
components: List of component dictionaries
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (corrected_components, validation_report)
|
||||||
|
"""
|
||||||
|
# First validate to get correction suggestions
|
||||||
|
validation_report = self.validate_circuit_components(components)
|
||||||
|
|
||||||
|
# Apply corrections
|
||||||
|
corrected_components = []
|
||||||
|
for component in components:
|
||||||
|
component_ref = component.get("reference", "Unknown")
|
||||||
|
|
||||||
|
if component_ref in validation_report.corrected_positions:
|
||||||
|
# Apply correction
|
||||||
|
corrected_component = component.copy()
|
||||||
|
corrected_component["position"] = validation_report.corrected_positions[
|
||||||
|
component_ref
|
||||||
|
]
|
||||||
|
corrected_components.append(corrected_component)
|
||||||
|
else:
|
||||||
|
corrected_components.append(component)
|
||||||
|
|
||||||
|
return corrected_components, validation_report
|
||||||
|
|
||||||
|
def generate_validation_report_text(self, report: ValidationReport) -> str:
|
||||||
|
"""
|
||||||
|
Generate a human-readable validation report.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
report: ValidationReport to format
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted text report
|
||||||
|
"""
|
||||||
|
lines = []
|
||||||
|
lines.append("=" * 60)
|
||||||
|
lines.append("BOUNDARY VALIDATION REPORT")
|
||||||
|
lines.append("=" * 60)
|
||||||
|
|
||||||
|
# Summary
|
||||||
|
lines.append(f"Status: {'PASS' if report.success else 'FAIL'}")
|
||||||
|
lines.append(f"Total Components: {report.total_components}")
|
||||||
|
lines.append(f"Validated Components: {report.validated_components}")
|
||||||
|
lines.append(f"Out of Bounds: {report.out_of_bounds_count}")
|
||||||
|
lines.append(f"Corrected Positions: {len(report.corrected_positions)}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Issues by severity
|
||||||
|
errors = report.get_issues_by_severity(ValidationSeverity.ERROR)
|
||||||
|
warnings = report.get_issues_by_severity(ValidationSeverity.WARNING)
|
||||||
|
info = report.get_issues_by_severity(ValidationSeverity.INFO)
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
lines.append("ERRORS:")
|
||||||
|
for issue in errors:
|
||||||
|
lines.append(f" ❌ {issue.message}")
|
||||||
|
if issue.suggested_position:
|
||||||
|
lines.append(f" → Suggested: {issue.suggested_position}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if warnings:
|
||||||
|
lines.append("WARNINGS:")
|
||||||
|
for issue in warnings:
|
||||||
|
lines.append(f" ⚠️ {issue.message}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if info:
|
||||||
|
lines.append("INFO:")
|
||||||
|
for issue in info:
|
||||||
|
lines.append(f" ℹ️ {issue.message}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Corrected positions
|
||||||
|
if report.corrected_positions:
|
||||||
|
lines.append("CORRECTED POSITIONS:")
|
||||||
|
for component_ref, (x, y) in report.corrected_positions.items():
|
||||||
|
lines.append(f" {component_ref}: ({x:.2f}, {y:.2f})")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def export_validation_report(self, report: ValidationReport, filepath: str) -> None:
|
||||||
|
"""
|
||||||
|
Export validation report to JSON file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
report: ValidationReport to export
|
||||||
|
filepath: Path to output file
|
||||||
|
"""
|
||||||
|
# Convert report to serializable format
|
||||||
|
export_data = {
|
||||||
|
"success": report.success,
|
||||||
|
"total_components": report.total_components,
|
||||||
|
"validated_components": report.validated_components,
|
||||||
|
"out_of_bounds_count": report.out_of_bounds_count,
|
||||||
|
"corrected_positions": report.corrected_positions,
|
||||||
|
"issues": [
|
||||||
|
{
|
||||||
|
"severity": issue.severity.value,
|
||||||
|
"component_ref": issue.component_ref,
|
||||||
|
"message": issue.message,
|
||||||
|
"position": issue.position,
|
||||||
|
"suggested_position": issue.suggested_position,
|
||||||
|
"component_type": issue.component_type,
|
||||||
|
}
|
||||||
|
for issue in report.issues
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(filepath, "w") as f:
|
||||||
|
json.dump(export_data, f, indent=2)
|
@ -1,14 +1,15 @@
|
|||||||
"""
|
"""
|
||||||
File handling utilities for KiCad MCP Server.
|
File handling utilities for KiCad MCP Server.
|
||||||
"""
|
"""
|
||||||
import os
|
|
||||||
import json
|
import json
|
||||||
from typing import Dict, List, Any, Optional
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from kicad_mcp.utils.kicad_utils import get_project_name_from_path
|
from kicad_mcp.utils.kicad_utils import get_project_name_from_path
|
||||||
|
|
||||||
|
|
||||||
def get_project_files(project_path: str) -> Dict[str, str]:
|
def get_project_files(project_path: str) -> dict[str, str]:
|
||||||
"""Get all files related to a KiCad project.
|
"""Get all files related to a KiCad project.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -17,7 +18,7 @@ def get_project_files(project_path: str) -> Dict[str, str]:
|
|||||||
Returns:
|
Returns:
|
||||||
Dictionary mapping file types to file paths
|
Dictionary mapping file types to file paths
|
||||||
"""
|
"""
|
||||||
from kicad_mcp.config import KICAD_EXTENSIONS, DATA_EXTENSIONS
|
from kicad_mcp.config import DATA_EXTENSIONS, KICAD_EXTENSIONS
|
||||||
|
|
||||||
project_dir = os.path.dirname(project_path)
|
project_dir = os.path.dirname(project_path)
|
||||||
project_name = get_project_name_from_path(project_path)
|
project_name = get_project_name_from_path(project_path)
|
||||||
@ -36,21 +37,25 @@ def get_project_files(project_path: str) -> Dict[str, str]:
|
|||||||
files[file_type] = file_path
|
files[file_type] = file_path
|
||||||
|
|
||||||
# Check for data files
|
# Check for data files
|
||||||
for ext in DATA_EXTENSIONS:
|
try:
|
||||||
for file in os.listdir(project_dir):
|
for ext in DATA_EXTENSIONS:
|
||||||
if file.startswith(project_name) and file.endswith(ext):
|
for file in os.listdir(project_dir):
|
||||||
# Extract the type from filename (e.g., project_name-bom.csv -> bom)
|
if file.startswith(project_name) and file.endswith(ext):
|
||||||
file_type = file[len(project_name):].strip('-_')
|
# Extract the type from filename (e.g., project_name-bom.csv -> bom)
|
||||||
file_type = file_type.split('.')[0]
|
file_type = file[len(project_name) :].strip("-_")
|
||||||
if not file_type:
|
file_type = file_type.split(".")[0]
|
||||||
file_type = ext[1:] # Use extension if no specific type
|
if not file_type:
|
||||||
|
file_type = ext[1:] # Use extension if no specific type
|
||||||
|
|
||||||
files[file_type] = os.path.join(project_dir, file)
|
files[file_type] = os.path.join(project_dir, file)
|
||||||
|
except (OSError, FileNotFoundError):
|
||||||
|
# Directory doesn't exist or can't be accessed - return what we have
|
||||||
|
pass
|
||||||
|
|
||||||
return files
|
return files
|
||||||
|
|
||||||
|
|
||||||
def load_project_json(project_path: str) -> Optional[Dict[str, Any]]:
|
def load_project_json(project_path: str) -> dict[str, Any] | None:
|
||||||
"""Load and parse a KiCad project file.
|
"""Load and parse a KiCad project file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -60,7 +65,7 @@ def load_project_json(project_path: str) -> Optional[Dict[str, Any]]:
|
|||||||
Parsed JSON data or None if parsing failed
|
Parsed JSON data or None if parsing failed
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with open(project_path, 'r') as f:
|
with open(project_path) as f:
|
||||||
return json.load(f)
|
return json.load(f)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
241
kicad_mcp/utils/kicad_cli.py
Normal file
241
kicad_mcp/utils/kicad_cli.py
Normal file
@ -0,0 +1,241 @@
|
|||||||
|
"""
|
||||||
|
Centralized KiCad CLI detection and management.
|
||||||
|
|
||||||
|
Provides a single source of truth for locating KiCad CLI across platforms
|
||||||
|
with caching and configuration support.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import platform
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from ..config import TIMEOUT_CONSTANTS
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class KiCadCLIError(Exception):
|
||||||
|
"""Raised when KiCad CLI operations fail."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KiCadCLIManager:
|
||||||
|
"""
|
||||||
|
Manages KiCad CLI detection and validation across platforms.
|
||||||
|
|
||||||
|
Provides caching and fallback mechanisms for reliable CLI access.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize the CLI manager."""
|
||||||
|
self._cached_cli_path: str | None = None
|
||||||
|
self._cache_validated = False
|
||||||
|
self._system = platform.system()
|
||||||
|
|
||||||
|
def find_kicad_cli(self, force_refresh: bool = False) -> str | None:
|
||||||
|
"""
|
||||||
|
Find the KiCad CLI executable path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
force_refresh: Force re-detection even if cached
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to kicad-cli executable or None if not found
|
||||||
|
"""
|
||||||
|
# Return cached path if available and valid
|
||||||
|
if self._cached_cli_path and not force_refresh and self._cache_validated:
|
||||||
|
return self._cached_cli_path
|
||||||
|
|
||||||
|
# Try to find CLI
|
||||||
|
cli_path = self._detect_cli_path()
|
||||||
|
|
||||||
|
if cli_path:
|
||||||
|
# Validate the found CLI
|
||||||
|
if self._validate_cli_path(cli_path):
|
||||||
|
self._cached_cli_path = cli_path
|
||||||
|
self._cache_validated = True
|
||||||
|
logger.info(f"Found KiCad CLI at: {cli_path}")
|
||||||
|
return cli_path
|
||||||
|
else:
|
||||||
|
logger.warning(f"Found KiCad CLI at {cli_path} but validation failed")
|
||||||
|
|
||||||
|
# Clear cache if detection failed
|
||||||
|
self._cached_cli_path = None
|
||||||
|
self._cache_validated = False
|
||||||
|
logger.warning("KiCad CLI not found on this system")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_cli_path(self, required: bool = True) -> str:
|
||||||
|
"""
|
||||||
|
Get KiCad CLI path, raising exception if not found and required.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
required: Whether to raise exception if CLI not found
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to kicad-cli executable
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KiCadCLIError: If CLI not found and required=True
|
||||||
|
"""
|
||||||
|
cli_path = self.find_kicad_cli()
|
||||||
|
|
||||||
|
if cli_path is None and required:
|
||||||
|
raise KiCadCLIError(
|
||||||
|
"KiCad CLI not found. Please install KiCad or set KICAD_CLI_PATH environment variable."
|
||||||
|
)
|
||||||
|
|
||||||
|
return cli_path
|
||||||
|
|
||||||
|
def is_available(self) -> bool:
|
||||||
|
"""Check if KiCad CLI is available."""
|
||||||
|
return self.find_kicad_cli() is not None
|
||||||
|
|
||||||
|
def get_version(self) -> str | None:
|
||||||
|
"""
|
||||||
|
Get KiCad CLI version string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Version string or None if CLI not available
|
||||||
|
"""
|
||||||
|
cli_path = self.find_kicad_cli()
|
||||||
|
if not cli_path:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run( # nosec B603 - CLI path is validated
|
||||||
|
[cli_path, "--version"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=TIMEOUT_CONSTANTS["kicad_cli_version_check"],
|
||||||
|
)
|
||||||
|
if result.returncode == 0:
|
||||||
|
return result.stdout.strip()
|
||||||
|
except (subprocess.SubprocessError, OSError) as e:
|
||||||
|
logger.warning(f"Failed to get KiCad CLI version: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _detect_cli_path(self) -> str | None:
|
||||||
|
"""
|
||||||
|
Detect KiCad CLI path using platform-specific strategies.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to CLI executable or None if not found
|
||||||
|
"""
|
||||||
|
# Check environment variable first
|
||||||
|
env_path = os.environ.get("KICAD_CLI_PATH")
|
||||||
|
if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK):
|
||||||
|
logger.info(f"Using KiCad CLI from environment: {env_path}")
|
||||||
|
return env_path
|
||||||
|
|
||||||
|
# Try system PATH
|
||||||
|
cli_name = self._get_cli_executable_name()
|
||||||
|
system_path = shutil.which(cli_name)
|
||||||
|
if system_path:
|
||||||
|
logger.info(f"Found KiCad CLI in system PATH: {system_path}")
|
||||||
|
return system_path
|
||||||
|
|
||||||
|
# Try platform-specific common locations
|
||||||
|
common_paths = self._get_common_installation_paths()
|
||||||
|
for path in common_paths:
|
||||||
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
||||||
|
logger.info(f"Found KiCad CLI at common location: {path}")
|
||||||
|
return path
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_cli_executable_name(self) -> str:
|
||||||
|
"""Get the CLI executable name for current platform."""
|
||||||
|
if self._system == "Windows":
|
||||||
|
return "kicad-cli.exe"
|
||||||
|
return "kicad-cli"
|
||||||
|
|
||||||
|
def _get_common_installation_paths(self) -> list[str]:
|
||||||
|
"""Get list of common installation paths for current platform."""
|
||||||
|
paths = []
|
||||||
|
|
||||||
|
if self._system == "Darwin": # macOS
|
||||||
|
paths.extend(
|
||||||
|
[
|
||||||
|
"/Applications/KiCad/KiCad.app/Contents/MacOS/kicad-cli",
|
||||||
|
"/Applications/KiCad/kicad-cli",
|
||||||
|
"/usr/local/bin/kicad-cli",
|
||||||
|
"/opt/homebrew/bin/kicad-cli",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
elif self._system == "Windows":
|
||||||
|
paths.extend(
|
||||||
|
[
|
||||||
|
r"C:\Program Files\KiCad\bin\kicad-cli.exe",
|
||||||
|
r"C:\Program Files (x86)\KiCad\bin\kicad-cli.exe",
|
||||||
|
r"C:\KiCad\bin\kicad-cli.exe",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
else: # Linux and other Unix-like systems
|
||||||
|
paths.extend(
|
||||||
|
[
|
||||||
|
"/usr/bin/kicad-cli",
|
||||||
|
"/usr/local/bin/kicad-cli",
|
||||||
|
"/opt/kicad/bin/kicad-cli",
|
||||||
|
"/snap/kicad/current/usr/bin/kicad-cli",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
return paths
|
||||||
|
|
||||||
|
def _validate_cli_path(self, cli_path: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that a CLI path is working.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cli_path: Path to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if CLI is working
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = subprocess.run( # nosec B603 - CLI path is validated
|
||||||
|
[cli_path, "--version"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=TIMEOUT_CONSTANTS["kicad_cli_version_check"],
|
||||||
|
)
|
||||||
|
return result.returncode == 0
|
||||||
|
except (subprocess.SubprocessError, OSError, FileNotFoundError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Global CLI manager instance
|
||||||
|
_cli_manager = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cli_manager() -> KiCadCLIManager:
|
||||||
|
"""Get the global KiCad CLI manager instance."""
|
||||||
|
global _cli_manager
|
||||||
|
if _cli_manager is None:
|
||||||
|
_cli_manager = KiCadCLIManager()
|
||||||
|
return _cli_manager
|
||||||
|
|
||||||
|
|
||||||
|
def find_kicad_cli(force_refresh: bool = False) -> str | None:
|
||||||
|
"""Convenience function to find KiCad CLI path."""
|
||||||
|
return get_cli_manager().find_kicad_cli(force_refresh)
|
||||||
|
|
||||||
|
|
||||||
|
def get_kicad_cli_path(required: bool = True) -> str:
|
||||||
|
"""Convenience function to get KiCad CLI path."""
|
||||||
|
return get_cli_manager().get_cli_path(required)
|
||||||
|
|
||||||
|
|
||||||
|
def is_kicad_cli_available() -> bool:
|
||||||
|
"""Convenience function to check if KiCad CLI is available."""
|
||||||
|
return get_cli_manager().is_available()
|
||||||
|
|
||||||
|
|
||||||
|
def get_kicad_version() -> str | None:
|
||||||
|
"""Convenience function to get KiCad CLI version."""
|
||||||
|
return get_cli_manager().get_version()
|
226
kicad_mcp/utils/path_validator.py
Normal file
226
kicad_mcp/utils/path_validator.py
Normal file
@ -0,0 +1,226 @@
|
|||||||
|
"""
|
||||||
|
Path validation utility for KiCad MCP.
|
||||||
|
|
||||||
|
Provides secure path validation to prevent path traversal attacks
|
||||||
|
and ensure file operations are restricted to safe directories.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
from kicad_mcp.config import KICAD_EXTENSIONS
|
||||||
|
|
||||||
|
|
||||||
|
class PathValidationError(Exception):
|
||||||
|
"""Raised when path validation fails."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class PathValidator:
|
||||||
|
"""
|
||||||
|
Validates file paths for security and correctness.
|
||||||
|
|
||||||
|
Prevents path traversal attacks and ensures files are within
|
||||||
|
trusted directories with valid KiCad extensions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, trusted_roots: set[str] | None = None):
|
||||||
|
"""
|
||||||
|
Initialize path validator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
trusted_roots: Set of trusted root directories. If None,
|
||||||
|
uses current working directory.
|
||||||
|
"""
|
||||||
|
self.trusted_roots = trusted_roots or {os.getcwd()}
|
||||||
|
# Normalize trusted roots to absolute paths
|
||||||
|
self.trusted_roots = {
|
||||||
|
os.path.realpath(os.path.expanduser(root)) for root in self.trusted_roots
|
||||||
|
}
|
||||||
|
|
||||||
|
def add_trusted_root(self, root_path: str) -> None:
|
||||||
|
"""
|
||||||
|
Add a trusted root directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
root_path: Path to add as trusted root
|
||||||
|
"""
|
||||||
|
normalized_root = os.path.realpath(os.path.expanduser(root_path))
|
||||||
|
self.trusted_roots.add(normalized_root)
|
||||||
|
|
||||||
|
def validate_path(self, file_path: str, must_exist: bool = False) -> str:
|
||||||
|
"""
|
||||||
|
Validate a file path for security and correctness.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to validate
|
||||||
|
must_exist: Whether the file must exist
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized absolute path
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PathValidationError: If path validation fails
|
||||||
|
"""
|
||||||
|
if not file_path or not isinstance(file_path, str):
|
||||||
|
raise PathValidationError("Path must be a non-empty string")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Expand user home directory and resolve symbolic links
|
||||||
|
normalized_path = os.path.realpath(os.path.expanduser(file_path))
|
||||||
|
except (OSError, ValueError) as e:
|
||||||
|
raise PathValidationError(f"Invalid path: {e}") from e
|
||||||
|
|
||||||
|
# Check if path is within trusted roots
|
||||||
|
if not self._is_within_trusted_roots(normalized_path):
|
||||||
|
raise PathValidationError(f"Path '{file_path}' is outside trusted directories")
|
||||||
|
|
||||||
|
# Check if file exists when required
|
||||||
|
if must_exist and not os.path.exists(normalized_path):
|
||||||
|
raise PathValidationError(f"Path does not exist: {file_path}")
|
||||||
|
|
||||||
|
return normalized_path
|
||||||
|
|
||||||
|
def validate_kicad_file(self, file_path: str, file_type: str, must_exist: bool = True) -> str:
|
||||||
|
"""
|
||||||
|
Validate a KiCad file path with extension checking.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to validate
|
||||||
|
file_type: Expected KiCad file type ('project', 'schematic', 'pcb', etc.)
|
||||||
|
must_exist: Whether the file must exist
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized absolute path
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PathValidationError: If path validation fails
|
||||||
|
"""
|
||||||
|
# First validate the basic path
|
||||||
|
normalized_path = self.validate_path(file_path, must_exist)
|
||||||
|
|
||||||
|
# Check file extension
|
||||||
|
if file_type not in KICAD_EXTENSIONS:
|
||||||
|
raise PathValidationError(f"Unknown KiCad file type: {file_type}")
|
||||||
|
|
||||||
|
expected_extension = KICAD_EXTENSIONS[file_type]
|
||||||
|
if not normalized_path.endswith(expected_extension):
|
||||||
|
raise PathValidationError(
|
||||||
|
f"File must have {expected_extension} extension, got: {file_path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return normalized_path
|
||||||
|
|
||||||
|
def validate_directory(self, dir_path: str, must_exist: bool = True) -> str:
|
||||||
|
"""
|
||||||
|
Validate a directory path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dir_path: Directory path to validate
|
||||||
|
must_exist: Whether the directory must exist
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized absolute directory path
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PathValidationError: If validation fails
|
||||||
|
"""
|
||||||
|
normalized_path = self.validate_path(dir_path, must_exist)
|
||||||
|
|
||||||
|
if must_exist and not os.path.isdir(normalized_path):
|
||||||
|
raise PathValidationError(f"Path is not a directory: {dir_path}")
|
||||||
|
|
||||||
|
return normalized_path
|
||||||
|
|
||||||
|
def validate_project_directory(self, project_path: str) -> str:
|
||||||
|
"""
|
||||||
|
Validate and return the directory containing a KiCad project file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
project_path: Path to .kicad_pro file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized absolute directory path
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PathValidationError: If validation fails
|
||||||
|
"""
|
||||||
|
validated_project = self.validate_kicad_file(project_path, "project", must_exist=True)
|
||||||
|
return os.path.dirname(validated_project)
|
||||||
|
|
||||||
|
def create_safe_temp_path(self, base_name: str, extension: str = "") -> str:
|
||||||
|
"""
|
||||||
|
Create a safe temporary file path within trusted directories.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_name: Base name for the temporary file
|
||||||
|
extension: File extension (including dot)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Safe temporary file path
|
||||||
|
"""
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
# Use the first trusted root as temp directory base
|
||||||
|
temp_root = next(iter(self.trusted_roots))
|
||||||
|
|
||||||
|
# Create temp directory if it doesn't exist
|
||||||
|
temp_dir = os.path.join(temp_root, "temp")
|
||||||
|
os.makedirs(temp_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Generate unique temp file path
|
||||||
|
temp_fd, temp_path = tempfile.mkstemp(
|
||||||
|
suffix=extension, prefix=f"{base_name}_", dir=temp_dir
|
||||||
|
)
|
||||||
|
os.close(temp_fd) # Close the file descriptor, we just need the path
|
||||||
|
|
||||||
|
return temp_path
|
||||||
|
|
||||||
|
def _is_within_trusted_roots(self, path: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a path is within any trusted root directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: Normalized absolute path to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if path is within trusted roots
|
||||||
|
"""
|
||||||
|
for root in self.trusted_roots:
|
||||||
|
try:
|
||||||
|
# Check if path is within this root
|
||||||
|
pathlib.Path(root).resolve()
|
||||||
|
pathlib.Path(path).resolve().relative_to(pathlib.Path(root).resolve())
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
# Path is not relative to this root
|
||||||
|
continue
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Global default validator instance
|
||||||
|
_default_validator = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_default_validator() -> PathValidator:
|
||||||
|
"""Get the default global path validator instance."""
|
||||||
|
global _default_validator
|
||||||
|
if _default_validator is None:
|
||||||
|
_default_validator = PathValidator()
|
||||||
|
return _default_validator
|
||||||
|
|
||||||
|
|
||||||
|
def validate_path(file_path: str, must_exist: bool = False) -> str:
|
||||||
|
"""Convenience function using default validator."""
|
||||||
|
return get_default_validator().validate_path(file_path, must_exist)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_kicad_file(file_path: str, file_type: str, must_exist: bool = True) -> str:
|
||||||
|
"""Convenience function using default validator."""
|
||||||
|
return get_default_validator().validate_kicad_file(file_path, file_type, must_exist)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_directory(dir_path: str, must_exist: bool = True) -> str:
|
||||||
|
"""Convenience function using default validator."""
|
||||||
|
return get_default_validator().validate_directory(dir_path, must_exist)
|
294
kicad_mcp/utils/secure_subprocess.py
Normal file
294
kicad_mcp/utils/secure_subprocess.py
Normal file
@ -0,0 +1,294 @@
|
|||||||
|
"""
|
||||||
|
Secure subprocess execution utilities for KiCad MCP.
|
||||||
|
|
||||||
|
Provides safe subprocess execution with input validation,
|
||||||
|
timeout enforcement, and security controls.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess # nosec B404 - subprocess usage is secured with validation
|
||||||
|
|
||||||
|
from ..config import TIMEOUT_CONSTANTS
|
||||||
|
from .kicad_cli import get_kicad_cli_path
|
||||||
|
from .path_validator import PathValidator, get_default_validator
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SecureSubprocessError(Exception):
|
||||||
|
"""Raised when secure subprocess operations fail."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SecureSubprocessRunner:
|
||||||
|
"""
|
||||||
|
Secure subprocess runner with validation and safety controls.
|
||||||
|
|
||||||
|
Provides methods for safely executing KiCad CLI commands and other
|
||||||
|
subprocess operations with proper input validation and security controls.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, path_validator: PathValidator | None = None):
|
||||||
|
"""
|
||||||
|
Initialize secure subprocess runner.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path_validator: Path validator to use (defaults to global instance)
|
||||||
|
"""
|
||||||
|
self.path_validator = path_validator or get_default_validator()
|
||||||
|
self.default_timeout = TIMEOUT_CONSTANTS["subprocess_default"]
|
||||||
|
|
||||||
|
def run_kicad_command(
|
||||||
|
self,
|
||||||
|
command_args: list[str],
|
||||||
|
input_files: list[str] | None = None,
|
||||||
|
output_files: list[str] | None = None,
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
capture_output: bool = True,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""
|
||||||
|
Run a KiCad CLI command with security validation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command_args: Command arguments (excluding the kicad-cli executable)
|
||||||
|
input_files: List of input file paths to validate
|
||||||
|
output_files: List of output file paths to validate
|
||||||
|
working_dir: Working directory for command execution
|
||||||
|
timeout: Command timeout in seconds
|
||||||
|
capture_output: Whether to capture stdout/stderr
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompletedProcess result
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SecureSubprocessError: If validation fails or command fails
|
||||||
|
KiCadCLIError: If KiCad CLI not found
|
||||||
|
PathValidationError: If path validation fails
|
||||||
|
"""
|
||||||
|
# Get and validate KiCad CLI path
|
||||||
|
kicad_cli = get_kicad_cli_path(required=True)
|
||||||
|
|
||||||
|
# Validate input files
|
||||||
|
if input_files:
|
||||||
|
for file_path in input_files:
|
||||||
|
self.path_validator.validate_path(file_path, must_exist=True)
|
||||||
|
|
||||||
|
# Validate output file directories
|
||||||
|
if output_files:
|
||||||
|
for file_path in output_files:
|
||||||
|
output_dir = os.path.dirname(file_path)
|
||||||
|
if output_dir: # Only validate if there's a directory component
|
||||||
|
self.path_validator.validate_directory(output_dir, must_exist=True)
|
||||||
|
|
||||||
|
# Validate working directory
|
||||||
|
if working_dir:
|
||||||
|
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
|
||||||
|
|
||||||
|
# Construct full command
|
||||||
|
full_command = [kicad_cli] + command_args
|
||||||
|
|
||||||
|
# Log command for debugging (sanitized)
|
||||||
|
logger.debug(f"Executing KiCad command: {' '.join(full_command)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self._run_subprocess(
|
||||||
|
full_command,
|
||||||
|
working_dir=working_dir,
|
||||||
|
timeout=timeout or self.default_timeout,
|
||||||
|
capture_output=capture_output,
|
||||||
|
)
|
||||||
|
except subprocess.SubprocessError as e:
|
||||||
|
raise SecureSubprocessError(f"KiCad command failed: {e}") from e
|
||||||
|
|
||||||
|
async def run_kicad_command_async(
|
||||||
|
self,
|
||||||
|
command_args: list[str],
|
||||||
|
input_files: list[str] | None = None,
|
||||||
|
output_files: list[str] | None = None,
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""
|
||||||
|
Async version of run_kicad_command.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command_args: Command arguments (excluding the kicad-cli executable)
|
||||||
|
input_files: List of input file paths to validate
|
||||||
|
output_files: List of output file paths to validate
|
||||||
|
working_dir: Working directory for command execution
|
||||||
|
timeout: Command timeout in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompletedProcess result
|
||||||
|
"""
|
||||||
|
# Run in thread pool to avoid blocking event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
return await loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
self.run_kicad_command,
|
||||||
|
command_args,
|
||||||
|
input_files,
|
||||||
|
output_files,
|
||||||
|
working_dir,
|
||||||
|
timeout,
|
||||||
|
True, # capture_output
|
||||||
|
)
|
||||||
|
|
||||||
|
def run_safe_command(
|
||||||
|
self,
|
||||||
|
command: list[str],
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
allowed_commands: list[str] | None = None,
|
||||||
|
capture_output: bool = True,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""
|
||||||
|
Run a general command with security validation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: Full command list including executable
|
||||||
|
working_dir: Working directory for command execution
|
||||||
|
timeout: Command timeout in seconds
|
||||||
|
allowed_commands: List of allowed executables (whitelist)
|
||||||
|
capture_output: Whether to capture stdout/stderr
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompletedProcess result
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SecureSubprocessError: If validation fails or command fails
|
||||||
|
"""
|
||||||
|
if not command:
|
||||||
|
raise SecureSubprocessError("Command cannot be empty")
|
||||||
|
|
||||||
|
executable = command[0]
|
||||||
|
|
||||||
|
# Validate executable against whitelist if provided
|
||||||
|
if allowed_commands and executable not in allowed_commands:
|
||||||
|
raise SecureSubprocessError(f"Command '{executable}' not in allowed list")
|
||||||
|
|
||||||
|
# Validate working directory
|
||||||
|
if working_dir:
|
||||||
|
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
|
||||||
|
|
||||||
|
# Log command for debugging (sanitized)
|
||||||
|
logger.debug(f"Executing safe command: {' '.join(command)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self._run_subprocess(
|
||||||
|
command,
|
||||||
|
working_dir=working_dir,
|
||||||
|
timeout=timeout or self.default_timeout,
|
||||||
|
capture_output=capture_output,
|
||||||
|
)
|
||||||
|
except subprocess.SubprocessError as e:
|
||||||
|
raise SecureSubprocessError(f"Command failed: {e}") from e
|
||||||
|
|
||||||
|
def create_temp_file(
|
||||||
|
self, suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Create a temporary file within validated directories.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
suffix: File suffix/extension
|
||||||
|
prefix: File prefix
|
||||||
|
content: Optional content to write to file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to created temporary file
|
||||||
|
"""
|
||||||
|
temp_path = self.path_validator.create_safe_temp_path(prefix.rstrip("_"), suffix)
|
||||||
|
|
||||||
|
if content is not None:
|
||||||
|
with open(temp_path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(content)
|
||||||
|
|
||||||
|
return temp_path
|
||||||
|
|
||||||
|
def _run_subprocess(
|
||||||
|
self,
|
||||||
|
command: list[str],
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float = TIMEOUT_CONSTANTS["subprocess_default"],
|
||||||
|
capture_output: bool = True,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""
|
||||||
|
Internal subprocess runner with consistent settings.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: Command to execute
|
||||||
|
working_dir: Working directory
|
||||||
|
timeout: Timeout in seconds
|
||||||
|
capture_output: Whether to capture output
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompletedProcess result
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
subprocess.SubprocessError: If command fails
|
||||||
|
"""
|
||||||
|
kwargs = {
|
||||||
|
"timeout": timeout,
|
||||||
|
"cwd": working_dir,
|
||||||
|
"text": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
if capture_output:
|
||||||
|
kwargs.update(
|
||||||
|
{
|
||||||
|
"capture_output": True,
|
||||||
|
"check": False, # Don't raise on non-zero exit code
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return subprocess.run(command, **kwargs) # nosec B603 - input is validated
|
||||||
|
|
||||||
|
|
||||||
|
# Global secure subprocess runner instance
|
||||||
|
_subprocess_runner = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_subprocess_runner() -> SecureSubprocessRunner:
|
||||||
|
"""Get the global secure subprocess runner instance."""
|
||||||
|
global _subprocess_runner
|
||||||
|
if _subprocess_runner is None:
|
||||||
|
_subprocess_runner = SecureSubprocessRunner()
|
||||||
|
return _subprocess_runner
|
||||||
|
|
||||||
|
|
||||||
|
def run_kicad_command(
|
||||||
|
command_args: list[str],
|
||||||
|
input_files: list[str] | None = None,
|
||||||
|
output_files: list[str] | None = None,
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""Convenience function to run KiCad command."""
|
||||||
|
return get_subprocess_runner().run_kicad_command(
|
||||||
|
command_args, input_files, output_files, working_dir, timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_kicad_command_async(
|
||||||
|
command_args: list[str],
|
||||||
|
input_files: list[str] | None = None,
|
||||||
|
output_files: list[str] | None = None,
|
||||||
|
working_dir: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
) -> subprocess.CompletedProcess:
|
||||||
|
"""Convenience function to run KiCad command asynchronously."""
|
||||||
|
return await get_subprocess_runner().run_kicad_command_async(
|
||||||
|
command_args, input_files, output_files, working_dir, timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def create_temp_file(
|
||||||
|
suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
|
||||||
|
) -> str:
|
||||||
|
"""Convenience function to create temporary file."""
|
||||||
|
return get_subprocess_runner().create_temp_file(suffix, prefix, content)
|
165
pyproject.toml
165
pyproject.toml
@ -11,22 +11,42 @@ license = { text = "MIT" }
|
|||||||
authors = [
|
authors = [
|
||||||
{ name = "KiCad MCP Contributors" }
|
{ name = "KiCad MCP Contributors" }
|
||||||
]
|
]
|
||||||
|
maintainers = [
|
||||||
|
{ name = "KiCad MCP Contributors" }
|
||||||
|
]
|
||||||
|
keywords = [
|
||||||
|
"kicad",
|
||||||
|
"eda",
|
||||||
|
"electronics",
|
||||||
|
"schematic",
|
||||||
|
"pcb",
|
||||||
|
"mcp",
|
||||||
|
"model-context-protocol",
|
||||||
|
"ai",
|
||||||
|
"assistant"
|
||||||
|
]
|
||||||
|
classifiers = [
|
||||||
|
"Development Status :: 4 - Beta",
|
||||||
|
"Intended Audience :: Developers",
|
||||||
|
"Intended Audience :: Manufacturing",
|
||||||
|
"License :: OSI Approved :: MIT License",
|
||||||
|
"Operating System :: OS Independent",
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"Programming Language :: Python :: 3.10",
|
||||||
|
"Programming Language :: Python :: 3.11",
|
||||||
|
"Programming Language :: Python :: 3.12",
|
||||||
|
"Programming Language :: Python :: 3.13",
|
||||||
|
"Topic :: Scientific/Engineering :: Electronic Design Automation (EDA)",
|
||||||
|
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||||
|
"Typing :: Typed"
|
||||||
|
]
|
||||||
requires-python = ">=3.10"
|
requires-python = ">=3.10"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"mcp[cli]>=1.0.0",
|
"mcp[cli]>=1.0.0",
|
||||||
"pandas>=2.0.0",
|
|
||||||
"fastmcp>=2.0.0",
|
"fastmcp>=2.0.0",
|
||||||
]
|
"pandas>=2.0.0",
|
||||||
classifiers = [
|
"pyyaml>=6.0.0",
|
||||||
"Programming Language :: Python :: 3",
|
"defusedxml>=0.7.0", # Secure XML parsing
|
||||||
"Programming Language :: Python :: 3.10",
|
|
||||||
"Programming Language :: Python :: 3.11",
|
|
||||||
"Programming Language :: Python :: 3.12",
|
|
||||||
"License :: OSI Approved :: MIT License",
|
|
||||||
"Operating System :: OS Independent",
|
|
||||||
"Intended Audience :: Developers",
|
|
||||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
|
||||||
"Topic :: Scientific/Engineering :: Electronic Design Automation (EDA)",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
@ -47,6 +67,25 @@ dev = [
|
|||||||
"ruff>=0.1.0",
|
"ruff>=0.1.0",
|
||||||
"mypy>=1.8.0",
|
"mypy>=1.8.0",
|
||||||
"pre-commit>=3.0.0",
|
"pre-commit>=3.0.0",
|
||||||
|
"bandit>=1.7.0", # Security linting for pre-commit hooks
|
||||||
|
]
|
||||||
|
docs = [
|
||||||
|
"sphinx>=7.0.0",
|
||||||
|
"sphinx-rtd-theme>=1.3.0",
|
||||||
|
"myst-parser>=2.0.0",
|
||||||
|
]
|
||||||
|
security = [
|
||||||
|
"bandit>=1.7.0",
|
||||||
|
"safety>=3.0.0",
|
||||||
|
]
|
||||||
|
performance = [
|
||||||
|
"memory-profiler>=0.61.0",
|
||||||
|
"py-spy>=0.3.0",
|
||||||
|
]
|
||||||
|
visualization = [
|
||||||
|
"cairosvg>=2.7.0", # SVG to PNG conversion
|
||||||
|
"Pillow>=10.0.0", # Image processing
|
||||||
|
"playwright>=1.40.0", # Browser automation (optional)
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
@ -60,16 +99,64 @@ select = [
|
|||||||
"F", # pyflakes
|
"F", # pyflakes
|
||||||
"I", # isort
|
"I", # isort
|
||||||
"B", # flake8-bugbear
|
"B", # flake8-bugbear
|
||||||
|
"C4", # flake8-comprehensions
|
||||||
|
"UP", # pyupgrade
|
||||||
|
"SIM", # flake8-simplify
|
||||||
"UP", # pyupgrade
|
"UP", # pyupgrade
|
||||||
]
|
]
|
||||||
ignore = [
|
ignore = [
|
||||||
"E501", # line too long, handled by ruff format
|
"E501", # line too long, handled by ruff format
|
||||||
"B008", # do not perform function calls in argument defaults
|
"B008", # do not perform function calls in argument defaults
|
||||||
|
"C901", # too complex (handled by other tools)
|
||||||
|
"B905", # zip() without an explicit strict= parameter
|
||||||
]
|
]
|
||||||
|
unfixable = [
|
||||||
|
"B", # Avoid trying to fix flake8-bugbear violations
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.ruff.lint.per-file-ignores]
|
||||||
|
"tests/**/*.py" = [
|
||||||
|
"S101", # Use of assert detected
|
||||||
|
"D103", # Missing docstring in public function
|
||||||
|
"SLF001", # Private member accessed
|
||||||
|
]
|
||||||
|
"kicad_mcp/config.py" = [
|
||||||
|
"E501", # Long lines in config are ok
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.ruff.lint.isort]
|
||||||
|
known-first-party = ["kicad_mcp"]
|
||||||
|
force-sort-within-sections = true
|
||||||
|
|
||||||
[tool.ruff.format]
|
[tool.ruff.format]
|
||||||
quote-style = "double"
|
quote-style = "double"
|
||||||
indent-style = "space"
|
indent-style = "space"
|
||||||
|
skip-magic-trailing-comma = false
|
||||||
|
line-ending = "auto"
|
||||||
|
|
||||||
|
[tool.mypy]
|
||||||
|
python_version = "3.11"
|
||||||
|
warn_return_any = true
|
||||||
|
warn_unused_configs = true
|
||||||
|
disallow_untyped_defs = false
|
||||||
|
disallow_incomplete_defs = false
|
||||||
|
check_untyped_defs = true
|
||||||
|
disallow_untyped_decorators = false
|
||||||
|
no_implicit_optional = true
|
||||||
|
warn_redundant_casts = true
|
||||||
|
warn_unused_ignores = true
|
||||||
|
warn_no_return = true
|
||||||
|
warn_unreachable = true
|
||||||
|
strict_equality = true
|
||||||
|
show_error_codes = true
|
||||||
|
|
||||||
|
[[tool.mypy.overrides]]
|
||||||
|
module = [
|
||||||
|
"pandas.*",
|
||||||
|
"mcp.*",
|
||||||
|
]
|
||||||
|
ignore_missing_imports = true
|
||||||
|
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
minversion = "7.0"
|
minversion = "7.0"
|
||||||
@ -80,10 +167,62 @@ addopts = [
|
|||||||
"--cov-report=term-missing",
|
"--cov-report=term-missing",
|
||||||
"--cov-report=html:htmlcov",
|
"--cov-report=html:htmlcov",
|
||||||
"--cov-report=xml",
|
"--cov-report=xml",
|
||||||
"--cov-fail-under=30",
|
"--cov-fail-under=80",
|
||||||
|
"-ra",
|
||||||
|
"--tb=short",
|
||||||
]
|
]
|
||||||
testpaths = ["tests"]
|
testpaths = ["tests"]
|
||||||
|
python_files = ["test_*.py"]
|
||||||
|
python_classes = ["Test*"]
|
||||||
|
python_functions = ["test_*"]
|
||||||
|
markers = [
|
||||||
|
"unit: Unit tests",
|
||||||
|
"integration: Integration tests",
|
||||||
|
"slow: Tests that take more than a few seconds",
|
||||||
|
"requires_kicad: Tests that require KiCad CLI to be installed",
|
||||||
|
"performance: Performance benchmarking tests",
|
||||||
|
]
|
||||||
asyncio_mode = "auto"
|
asyncio_mode = "auto"
|
||||||
|
filterwarnings = [
|
||||||
|
"ignore::DeprecationWarning",
|
||||||
|
"ignore::PendingDeprecationWarning",
|
||||||
|
"ignore::RuntimeWarning:asyncio",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.coverage.run]
|
||||||
|
source = ["kicad_mcp"]
|
||||||
|
branch = true
|
||||||
|
omit = [
|
||||||
|
"tests/*",
|
||||||
|
"kicad_mcp/__init__.py",
|
||||||
|
"*/migrations/*",
|
||||||
|
"*/venv/*",
|
||||||
|
"*/.venv/*",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.coverage.report]
|
||||||
|
precision = 2
|
||||||
|
show_missing = true
|
||||||
|
skip_covered = false
|
||||||
|
exclude_lines = [
|
||||||
|
"pragma: no cover",
|
||||||
|
"def __repr__",
|
||||||
|
"if self.debug:",
|
||||||
|
"if settings.DEBUG",
|
||||||
|
"raise AssertionError",
|
||||||
|
"raise NotImplementedError",
|
||||||
|
"if 0:",
|
||||||
|
"if __name__ == .__main__.:",
|
||||||
|
"class .*\\bProtocol\\):",
|
||||||
|
"@(abc\\.)?abstractmethod",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.bandit]
|
||||||
|
exclude_dirs = ["tests", "build", "dist"]
|
||||||
|
skips = ["B101", "B601", "B404", "B603", "B110", "B112"] # Skip low-severity subprocess and exception handling warnings
|
||||||
|
|
||||||
|
[tool.bandit.assert_used]
|
||||||
|
skips = ["*_test.py", "*/test_*.py"]
|
||||||
|
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
where = ["."]
|
where = ["."]
|
||||||
|
0
tests/unit/utils/__init__.py
Normal file
0
tests/unit/utils/__init__.py
Normal file
238
tests/unit/utils/test_path_validator.py
Normal file
238
tests/unit/utils/test_path_validator.py
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
"""
|
||||||
|
Tests for path validation utility.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kicad_mcp.utils.path_validator import (
|
||||||
|
PathValidationError,
|
||||||
|
PathValidator,
|
||||||
|
validate_directory,
|
||||||
|
validate_kicad_file,
|
||||||
|
validate_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPathValidator:
|
||||||
|
"""Test cases for PathValidator class."""
|
||||||
|
|
||||||
|
def test_init_with_default_trusted_root(self):
|
||||||
|
"""Test initialization with default trusted root."""
|
||||||
|
validator = PathValidator()
|
||||||
|
assert len(validator.trusted_roots) == 1
|
||||||
|
assert os.getcwd() in [os.path.realpath(root) for root in validator.trusted_roots]
|
||||||
|
|
||||||
|
def test_init_with_custom_trusted_roots(self):
|
||||||
|
"""Test initialization with custom trusted roots."""
|
||||||
|
roots = {"/tmp", "/home/user"}
|
||||||
|
validator = PathValidator(trusted_roots=roots)
|
||||||
|
|
||||||
|
# Should normalize paths
|
||||||
|
expected_roots = {os.path.realpath(root) for root in roots}
|
||||||
|
assert validator.trusted_roots == expected_roots
|
||||||
|
|
||||||
|
def test_add_trusted_root(self):
|
||||||
|
"""Test adding trusted root."""
|
||||||
|
validator = PathValidator(trusted_roots={"/tmp"})
|
||||||
|
validator.add_trusted_root("/home/user")
|
||||||
|
|
||||||
|
assert os.path.realpath("/home/user") in validator.trusted_roots
|
||||||
|
|
||||||
|
def test_validate_path_success(self):
|
||||||
|
"""Test successful path validation."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
test_file = os.path.join(temp_dir, "test.txt")
|
||||||
|
|
||||||
|
# Create test file
|
||||||
|
with open(test_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
# Should succeed
|
||||||
|
result = validator.validate_path(test_file, must_exist=True)
|
||||||
|
assert result == os.path.realpath(test_file)
|
||||||
|
|
||||||
|
def test_validate_path_traversal_attack(self):
|
||||||
|
"""Test path traversal attack prevention."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
|
||||||
|
# Try to access parent directory
|
||||||
|
malicious_path = os.path.join(temp_dir, "..", "..", "etc", "passwd")
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="outside trusted directories"):
|
||||||
|
validator.validate_path(malicious_path)
|
||||||
|
|
||||||
|
def test_validate_path_empty_string(self):
|
||||||
|
"""Test validation with empty string."""
|
||||||
|
validator = PathValidator()
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="non-empty string"):
|
||||||
|
validator.validate_path("")
|
||||||
|
|
||||||
|
def test_validate_path_none(self):
|
||||||
|
"""Test validation with None."""
|
||||||
|
validator = PathValidator()
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="non-empty string"):
|
||||||
|
validator.validate_path(None)
|
||||||
|
|
||||||
|
def test_validate_path_nonexistent_when_required(self):
|
||||||
|
"""Test validation of nonexistent file when existence required."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
nonexistent_file = os.path.join(temp_dir, "nonexistent.txt")
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="does not exist"):
|
||||||
|
validator.validate_path(nonexistent_file, must_exist=True)
|
||||||
|
|
||||||
|
def test_validate_kicad_file_success(self):
|
||||||
|
"""Test successful KiCad file validation."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
project_file = os.path.join(temp_dir, "test.kicad_pro")
|
||||||
|
|
||||||
|
# Create test file
|
||||||
|
with open(project_file, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
result = validator.validate_kicad_file(project_file, "project")
|
||||||
|
assert result == os.path.realpath(project_file)
|
||||||
|
|
||||||
|
def test_validate_kicad_file_wrong_extension(self):
|
||||||
|
"""Test KiCad file validation with wrong extension."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
wrong_file = os.path.join(temp_dir, "test.txt")
|
||||||
|
|
||||||
|
with open(wrong_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="must have .kicad_pro extension"):
|
||||||
|
validator.validate_kicad_file(wrong_file, "project")
|
||||||
|
|
||||||
|
def test_validate_kicad_file_unknown_type(self):
|
||||||
|
"""Test KiCad file validation with unknown file type."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
test_file = os.path.join(temp_dir, "test.txt")
|
||||||
|
|
||||||
|
with open(test_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="Unknown KiCad file type"):
|
||||||
|
validator.validate_kicad_file(test_file, "unknown_type")
|
||||||
|
|
||||||
|
def test_validate_directory_success(self):
|
||||||
|
"""Test successful directory validation."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
sub_dir = os.path.join(temp_dir, "subdir")
|
||||||
|
os.makedirs(sub_dir)
|
||||||
|
|
||||||
|
result = validator.validate_directory(sub_dir)
|
||||||
|
assert result == os.path.realpath(sub_dir)
|
||||||
|
|
||||||
|
def test_validate_directory_not_directory(self):
|
||||||
|
"""Test directory validation on file."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
test_file = os.path.join(temp_dir, "test.txt")
|
||||||
|
|
||||||
|
with open(test_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError, match="not a directory"):
|
||||||
|
validator.validate_directory(test_file)
|
||||||
|
|
||||||
|
def test_validate_project_directory(self):
|
||||||
|
"""Test project directory validation."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
project_file = os.path.join(temp_dir, "test.kicad_pro")
|
||||||
|
|
||||||
|
with open(project_file, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
result = validator.validate_project_directory(project_file)
|
||||||
|
assert result == os.path.realpath(temp_dir)
|
||||||
|
|
||||||
|
def test_create_safe_temp_path(self):
|
||||||
|
"""Test safe temporary path creation."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
|
||||||
|
temp_path = validator.create_safe_temp_path("test", ".txt")
|
||||||
|
|
||||||
|
# Should be within trusted directory (handle symlinks with realpath)
|
||||||
|
assert os.path.realpath(temp_path).startswith(os.path.realpath(temp_dir))
|
||||||
|
assert temp_path.endswith(".txt")
|
||||||
|
assert "test" in os.path.basename(temp_path)
|
||||||
|
|
||||||
|
def test_symlink_resolution(self):
|
||||||
|
"""Test symbolic link resolution."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
|
||||||
|
# Create file and symlink
|
||||||
|
real_file = os.path.join(temp_dir, "real.txt")
|
||||||
|
link_file = os.path.join(temp_dir, "link.txt")
|
||||||
|
|
||||||
|
with open(real_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
os.symlink(real_file, link_file)
|
||||||
|
|
||||||
|
# Both should resolve to same real path
|
||||||
|
real_result = validator.validate_path(real_file, must_exist=True)
|
||||||
|
link_result = validator.validate_path(link_file, must_exist=True)
|
||||||
|
|
||||||
|
assert real_result == link_result == os.path.realpath(real_file)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConvenienceFunctions:
|
||||||
|
"""Test convenience functions."""
|
||||||
|
|
||||||
|
def test_validate_path_convenience(self):
|
||||||
|
"""Test validate_path convenience function."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
# Add temp_dir to default validator
|
||||||
|
from kicad_mcp.utils.path_validator import get_default_validator
|
||||||
|
|
||||||
|
get_default_validator().add_trusted_root(temp_dir)
|
||||||
|
|
||||||
|
test_file = os.path.join(temp_dir, "test.txt")
|
||||||
|
with open(test_file, "w") as f:
|
||||||
|
f.write("test")
|
||||||
|
|
||||||
|
result = validate_path(test_file, must_exist=True)
|
||||||
|
assert result == os.path.realpath(test_file)
|
||||||
|
|
||||||
|
def test_validate_kicad_file_convenience(self):
|
||||||
|
"""Test validate_kicad_file convenience function."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
# Add temp_dir to default validator
|
||||||
|
from kicad_mcp.utils.path_validator import get_default_validator
|
||||||
|
|
||||||
|
get_default_validator().add_trusted_root(temp_dir)
|
||||||
|
|
||||||
|
project_file = os.path.join(temp_dir, "test.kicad_pro")
|
||||||
|
with open(project_file, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
|
|
||||||
|
result = validate_kicad_file(project_file, "project")
|
||||||
|
assert result == os.path.realpath(project_file)
|
||||||
|
|
||||||
|
def test_validate_directory_convenience(self):
|
||||||
|
"""Test validate_directory convenience function."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
# Add temp_dir to default validator
|
||||||
|
from kicad_mcp.utils.path_validator import get_default_validator
|
||||||
|
|
||||||
|
get_default_validator().add_trusted_root(temp_dir)
|
||||||
|
|
||||||
|
result = validate_directory(temp_dir)
|
||||||
|
assert result == os.path.realpath(temp_dir)
|
275
tests/unit/utils/test_secure_subprocess.py
Normal file
275
tests/unit/utils/test_secure_subprocess.py
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
"""
|
||||||
|
Tests for secure subprocess utility.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kicad_mcp.utils.kicad_cli import KiCadCLIError
|
||||||
|
from kicad_mcp.utils.path_validator import PathValidationError, PathValidator
|
||||||
|
from kicad_mcp.utils.secure_subprocess import (
|
||||||
|
SecureSubprocessError,
|
||||||
|
SecureSubprocessRunner,
|
||||||
|
create_temp_file,
|
||||||
|
run_kicad_command,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _kicad_cli_available():
|
||||||
|
"""Check if KiCad CLI is available."""
|
||||||
|
try:
|
||||||
|
from kicad_mcp.utils.kicad_cli import get_kicad_cli_path
|
||||||
|
|
||||||
|
get_kicad_cli_path()
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecureSubprocessRunner:
|
||||||
|
"""Test cases for SecureSubprocessRunner class."""
|
||||||
|
|
||||||
|
def test_init_with_default_validator(self):
|
||||||
|
"""Test initialization with default path validator."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
assert runner.path_validator is not None
|
||||||
|
assert runner.default_timeout == 30.0
|
||||||
|
|
||||||
|
def test_init_with_custom_validator(self):
|
||||||
|
"""Test initialization with custom path validator."""
|
||||||
|
validator = PathValidator(trusted_roots={"/tmp"})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
assert runner.path_validator is validator
|
||||||
|
|
||||||
|
@patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path")
|
||||||
|
@patch.object(SecureSubprocessRunner, "_run_subprocess")
|
||||||
|
def test_run_kicad_command_success(self, mock_run_subprocess, mock_get_cli):
|
||||||
|
"""Test successful KiCad command execution."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
# Setup mocks
|
||||||
|
mock_get_cli.return_value = "/usr/bin/kicad-cli"
|
||||||
|
mock_result = MagicMock(returncode=0, stdout="Success")
|
||||||
|
mock_run_subprocess.return_value = mock_result
|
||||||
|
|
||||||
|
# Create test files
|
||||||
|
input_file = os.path.join(temp_dir, "input.kicad_sch")
|
||||||
|
output_file = os.path.join(temp_dir, "output.svg")
|
||||||
|
|
||||||
|
with open(input_file, "w") as f:
|
||||||
|
f.write("test schematic")
|
||||||
|
|
||||||
|
# Create runner with temp directory as trusted root
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
# Run command
|
||||||
|
result = runner.run_kicad_command(
|
||||||
|
["sch", "export", "svg", input_file, "-o", output_file],
|
||||||
|
input_files=[input_file],
|
||||||
|
output_files=[output_file],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is mock_result
|
||||||
|
mock_run_subprocess.assert_called_once()
|
||||||
|
|
||||||
|
@patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path")
|
||||||
|
def test_run_kicad_command_cli_not_found(self, mock_get_cli):
|
||||||
|
"""Test KiCad command when CLI not found."""
|
||||||
|
mock_get_cli.side_effect = KiCadCLIError("CLI not found")
|
||||||
|
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(KiCadCLIError):
|
||||||
|
runner.run_kicad_command(["--version"])
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not _kicad_cli_available(), reason="KiCad CLI not available")
|
||||||
|
def test_run_kicad_command_invalid_input_file(self):
|
||||||
|
"""Test KiCad command with invalid input file."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
# Try to use file outside trusted directory
|
||||||
|
with pytest.raises(PathValidationError):
|
||||||
|
runner.run_kicad_command(
|
||||||
|
["sch", "export", "svg", "/etc/passwd"], input_files=["/etc/passwd"]
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not _kicad_cli_available(), reason="KiCad CLI not available")
|
||||||
|
def test_run_kicad_command_invalid_output_directory(self):
|
||||||
|
"""Test KiCad command with invalid output directory."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
# Try to output to directory outside trusted roots
|
||||||
|
with pytest.raises(PathValidationError):
|
||||||
|
runner.run_kicad_command(
|
||||||
|
["sch", "export", "svg", "input.sch", "-o", "/etc/output.svg"],
|
||||||
|
output_files=["/etc/output.svg"],
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path")
|
||||||
|
@patch.object(SecureSubprocessRunner, "_run_subprocess")
|
||||||
|
def test_run_kicad_command_with_working_dir(self, mock_run_subprocess, mock_get_cli):
|
||||||
|
"""Test KiCad command with working directory."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
mock_get_cli.return_value = "/usr/bin/kicad-cli"
|
||||||
|
mock_result = MagicMock(returncode=0)
|
||||||
|
mock_run_subprocess.return_value = mock_result
|
||||||
|
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
runner.run_kicad_command(["--version"], working_dir=temp_dir)
|
||||||
|
|
||||||
|
# Verify working directory was passed
|
||||||
|
mock_run_subprocess.assert_called_once()
|
||||||
|
call_args = mock_run_subprocess.call_args
|
||||||
|
assert call_args[1]["working_dir"] == os.path.realpath(temp_dir)
|
||||||
|
|
||||||
|
@patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path")
|
||||||
|
@patch.object(SecureSubprocessRunner, "_run_subprocess")
|
||||||
|
def test_run_kicad_command_subprocess_error(self, mock_run_subprocess, mock_get_cli):
|
||||||
|
"""Test KiCad command with subprocess error."""
|
||||||
|
mock_get_cli.return_value = "/usr/bin/kicad-cli"
|
||||||
|
mock_run_subprocess.side_effect = subprocess.SubprocessError("Command failed")
|
||||||
|
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(SecureSubprocessError, match="KiCad command failed"):
|
||||||
|
runner.run_kicad_command(["--version"])
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("kicad_mcp.utils.secure_subprocess.get_kicad_cli_path")
|
||||||
|
@patch.object(SecureSubprocessRunner, "run_kicad_command")
|
||||||
|
async def test_run_kicad_command_async(self, mock_run_command, mock_get_cli):
|
||||||
|
"""Test async KiCad command execution."""
|
||||||
|
mock_get_cli.return_value = "/usr/bin/kicad-cli"
|
||||||
|
mock_result = MagicMock(returncode=0)
|
||||||
|
mock_run_command.return_value = mock_result
|
||||||
|
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
# Run the async function in a synchronous context for testing
|
||||||
|
result = await runner.run_kicad_command_async(["--version"])
|
||||||
|
assert result is mock_result
|
||||||
|
|
||||||
|
def test_run_safe_command_success(self):
|
||||||
|
"""Test successful safe command execution."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
with patch.object(runner, "_run_subprocess") as mock_run:
|
||||||
|
mock_result = MagicMock(returncode=0)
|
||||||
|
mock_run.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.run_safe_command(["echo", "test"], allowed_commands=["echo"])
|
||||||
|
|
||||||
|
assert result is mock_result
|
||||||
|
|
||||||
|
def test_run_safe_command_empty_command(self):
|
||||||
|
"""Test safe command with empty command list."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(SecureSubprocessError, match="Command cannot be empty"):
|
||||||
|
runner.run_safe_command([])
|
||||||
|
|
||||||
|
def test_run_safe_command_not_in_whitelist(self):
|
||||||
|
"""Test safe command not in whitelist."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(SecureSubprocessError, match="not in allowed list"):
|
||||||
|
runner.run_safe_command(["rm", "-rf", "/"], allowed_commands=["echo", "ls"])
|
||||||
|
|
||||||
|
def test_run_safe_command_invalid_working_dir(self):
|
||||||
|
"""Test safe command with invalid working directory."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(PathValidationError):
|
||||||
|
runner.run_safe_command(["echo", "test"], working_dir="/nonexistent/directory")
|
||||||
|
|
||||||
|
def test_create_temp_file_without_content(self):
|
||||||
|
"""Test temporary file creation without content."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
temp_path = runner.create_temp_file(suffix=".txt", prefix="test_")
|
||||||
|
|
||||||
|
assert os.path.exists(temp_path)
|
||||||
|
assert temp_path.endswith(".txt")
|
||||||
|
assert "test_" in os.path.basename(temp_path)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
os.unlink(temp_path)
|
||||||
|
|
||||||
|
def test_create_temp_file_with_content(self):
|
||||||
|
"""Test temporary file creation with content."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
validator = PathValidator(trusted_roots={temp_dir})
|
||||||
|
runner = SecureSubprocessRunner(path_validator=validator)
|
||||||
|
|
||||||
|
content = "test content"
|
||||||
|
temp_path = runner.create_temp_file(content=content)
|
||||||
|
|
||||||
|
assert os.path.exists(temp_path)
|
||||||
|
|
||||||
|
with open(temp_path) as f:
|
||||||
|
assert f.read() == content
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
os.unlink(temp_path)
|
||||||
|
|
||||||
|
def test_run_subprocess_with_capture_output(self):
|
||||||
|
"""Test subprocess execution with output capture."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
result = runner._run_subprocess(["echo", "test"], capture_output=True, timeout=5.0)
|
||||||
|
|
||||||
|
assert result.returncode == 0
|
||||||
|
assert "test" in result.stdout
|
||||||
|
|
||||||
|
def test_run_subprocess_without_capture_output(self):
|
||||||
|
"""Test subprocess execution without output capture."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
result = runner._run_subprocess(["echo", "test"], capture_output=False, timeout=5.0)
|
||||||
|
|
||||||
|
assert result.returncode == 0
|
||||||
|
# stdout/stderr should be None when not captured
|
||||||
|
assert result.stdout is None
|
||||||
|
assert result.stderr is None
|
||||||
|
|
||||||
|
def test_run_subprocess_timeout(self):
|
||||||
|
"""Test subprocess timeout."""
|
||||||
|
runner = SecureSubprocessRunner()
|
||||||
|
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
runner._run_subprocess(["sleep", "10"], timeout=0.1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConvenienceFunctions:
|
||||||
|
"""Test convenience functions."""
|
||||||
|
|
||||||
|
@patch.object(SecureSubprocessRunner, "run_kicad_command")
|
||||||
|
def test_run_kicad_command_convenience(self, mock_run_command):
|
||||||
|
"""Test run_kicad_command convenience function."""
|
||||||
|
mock_result = MagicMock(returncode=0)
|
||||||
|
mock_run_command.return_value = mock_result
|
||||||
|
|
||||||
|
result = run_kicad_command(["--version"])
|
||||||
|
assert result is mock_result
|
||||||
|
|
||||||
|
@patch.object(SecureSubprocessRunner, "create_temp_file")
|
||||||
|
def test_create_temp_file_convenience(self, mock_create_temp):
|
||||||
|
"""Test create_temp_file convenience function."""
|
||||||
|
mock_create_temp.return_value = "/tmp/test_file.txt"
|
||||||
|
|
||||||
|
result = create_temp_file(suffix=".txt", prefix="test_")
|
||||||
|
assert result == "/tmp/test_file.txt"
|
Loading…
x
Reference in New Issue
Block a user