
- Add PathValidator class for preventing path traversal attacks - Add SecureSubprocessRunner for safe command execution - Replace unsafe XML parsing with defusedxml for security - Add comprehensive input validation tools for circuit generation - Include security dependencies (defusedxml, bandit) in pyproject.toml - Add security scanning job to CI/CD pipeline - Add comprehensive test coverage for security utilities - Add timeout constants for safe operation limits - Add boundary validation for component positioning This establishes a strong security foundation for the KiCad MCP server by implementing defense-in-depth security measures across all input vectors and external process interactions. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
295 lines
9.5 KiB
Python
295 lines
9.5 KiB
Python
"""
|
|
Secure subprocess execution utilities for KiCad MCP.
|
|
|
|
Provides safe subprocess execution with input validation,
|
|
timeout enforcement, and security controls.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import subprocess # nosec B404 - subprocess usage is secured with validation
|
|
|
|
from ..config import TIMEOUT_CONSTANTS
|
|
from .kicad_cli import get_kicad_cli_path
|
|
from .path_validator import PathValidator, get_default_validator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecureSubprocessError(Exception):
|
|
"""Raised when secure subprocess operations fail."""
|
|
|
|
pass
|
|
|
|
|
|
class SecureSubprocessRunner:
|
|
"""
|
|
Secure subprocess runner with validation and safety controls.
|
|
|
|
Provides methods for safely executing KiCad CLI commands and other
|
|
subprocess operations with proper input validation and security controls.
|
|
"""
|
|
|
|
def __init__(self, path_validator: PathValidator | None = None):
|
|
"""
|
|
Initialize secure subprocess runner.
|
|
|
|
Args:
|
|
path_validator: Path validator to use (defaults to global instance)
|
|
"""
|
|
self.path_validator = path_validator or get_default_validator()
|
|
self.default_timeout = TIMEOUT_CONSTANTS["subprocess_default"]
|
|
|
|
def run_kicad_command(
|
|
self,
|
|
command_args: list[str],
|
|
input_files: list[str] | None = None,
|
|
output_files: list[str] | None = None,
|
|
working_dir: str | None = None,
|
|
timeout: float | None = None,
|
|
capture_output: bool = True,
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Run a KiCad CLI command with security validation.
|
|
|
|
Args:
|
|
command_args: Command arguments (excluding the kicad-cli executable)
|
|
input_files: List of input file paths to validate
|
|
output_files: List of output file paths to validate
|
|
working_dir: Working directory for command execution
|
|
timeout: Command timeout in seconds
|
|
capture_output: Whether to capture stdout/stderr
|
|
|
|
Returns:
|
|
CompletedProcess result
|
|
|
|
Raises:
|
|
SecureSubprocessError: If validation fails or command fails
|
|
KiCadCLIError: If KiCad CLI not found
|
|
PathValidationError: If path validation fails
|
|
"""
|
|
# Get and validate KiCad CLI path
|
|
kicad_cli = get_kicad_cli_path(required=True)
|
|
|
|
# Validate input files
|
|
if input_files:
|
|
for file_path in input_files:
|
|
self.path_validator.validate_path(file_path, must_exist=True)
|
|
|
|
# Validate output file directories
|
|
if output_files:
|
|
for file_path in output_files:
|
|
output_dir = os.path.dirname(file_path)
|
|
if output_dir: # Only validate if there's a directory component
|
|
self.path_validator.validate_directory(output_dir, must_exist=True)
|
|
|
|
# Validate working directory
|
|
if working_dir:
|
|
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
|
|
|
|
# Construct full command
|
|
full_command = [kicad_cli] + command_args
|
|
|
|
# Log command for debugging (sanitized)
|
|
logger.debug(f"Executing KiCad command: {' '.join(full_command)}")
|
|
|
|
try:
|
|
return self._run_subprocess(
|
|
full_command,
|
|
working_dir=working_dir,
|
|
timeout=timeout or self.default_timeout,
|
|
capture_output=capture_output,
|
|
)
|
|
except subprocess.SubprocessError as e:
|
|
raise SecureSubprocessError(f"KiCad command failed: {e}") from e
|
|
|
|
async def run_kicad_command_async(
|
|
self,
|
|
command_args: list[str],
|
|
input_files: list[str] | None = None,
|
|
output_files: list[str] | None = None,
|
|
working_dir: str | None = None,
|
|
timeout: float | None = None,
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Async version of run_kicad_command.
|
|
|
|
Args:
|
|
command_args: Command arguments (excluding the kicad-cli executable)
|
|
input_files: List of input file paths to validate
|
|
output_files: List of output file paths to validate
|
|
working_dir: Working directory for command execution
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
CompletedProcess result
|
|
"""
|
|
# Run in thread pool to avoid blocking event loop
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
self.run_kicad_command,
|
|
command_args,
|
|
input_files,
|
|
output_files,
|
|
working_dir,
|
|
timeout,
|
|
True, # capture_output
|
|
)
|
|
|
|
def run_safe_command(
|
|
self,
|
|
command: list[str],
|
|
working_dir: str | None = None,
|
|
timeout: float | None = None,
|
|
allowed_commands: list[str] | None = None,
|
|
capture_output: bool = True,
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Run a general command with security validation.
|
|
|
|
Args:
|
|
command: Full command list including executable
|
|
working_dir: Working directory for command execution
|
|
timeout: Command timeout in seconds
|
|
allowed_commands: List of allowed executables (whitelist)
|
|
capture_output: Whether to capture stdout/stderr
|
|
|
|
Returns:
|
|
CompletedProcess result
|
|
|
|
Raises:
|
|
SecureSubprocessError: If validation fails or command fails
|
|
"""
|
|
if not command:
|
|
raise SecureSubprocessError("Command cannot be empty")
|
|
|
|
executable = command[0]
|
|
|
|
# Validate executable against whitelist if provided
|
|
if allowed_commands and executable not in allowed_commands:
|
|
raise SecureSubprocessError(f"Command '{executable}' not in allowed list")
|
|
|
|
# Validate working directory
|
|
if working_dir:
|
|
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
|
|
|
|
# Log command for debugging (sanitized)
|
|
logger.debug(f"Executing safe command: {' '.join(command)}")
|
|
|
|
try:
|
|
return self._run_subprocess(
|
|
command,
|
|
working_dir=working_dir,
|
|
timeout=timeout or self.default_timeout,
|
|
capture_output=capture_output,
|
|
)
|
|
except subprocess.SubprocessError as e:
|
|
raise SecureSubprocessError(f"Command failed: {e}") from e
|
|
|
|
def create_temp_file(
|
|
self, suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
|
|
) -> str:
|
|
"""
|
|
Create a temporary file within validated directories.
|
|
|
|
Args:
|
|
suffix: File suffix/extension
|
|
prefix: File prefix
|
|
content: Optional content to write to file
|
|
|
|
Returns:
|
|
Path to created temporary file
|
|
"""
|
|
temp_path = self.path_validator.create_safe_temp_path(prefix.rstrip("_"), suffix)
|
|
|
|
if content is not None:
|
|
with open(temp_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return temp_path
|
|
|
|
def _run_subprocess(
|
|
self,
|
|
command: list[str],
|
|
working_dir: str | None = None,
|
|
timeout: float = TIMEOUT_CONSTANTS["subprocess_default"],
|
|
capture_output: bool = True,
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Internal subprocess runner with consistent settings.
|
|
|
|
Args:
|
|
command: Command to execute
|
|
working_dir: Working directory
|
|
timeout: Timeout in seconds
|
|
capture_output: Whether to capture output
|
|
|
|
Returns:
|
|
CompletedProcess result
|
|
|
|
Raises:
|
|
subprocess.SubprocessError: If command fails
|
|
"""
|
|
kwargs = {
|
|
"timeout": timeout,
|
|
"cwd": working_dir,
|
|
"text": True,
|
|
}
|
|
|
|
if capture_output:
|
|
kwargs.update(
|
|
{
|
|
"capture_output": True,
|
|
"check": False, # Don't raise on non-zero exit code
|
|
}
|
|
)
|
|
|
|
return subprocess.run(command, **kwargs) # nosec B603 - input is validated
|
|
|
|
|
|
# Global secure subprocess runner instance
|
|
_subprocess_runner = None
|
|
|
|
|
|
def get_subprocess_runner() -> SecureSubprocessRunner:
|
|
"""Get the global secure subprocess runner instance."""
|
|
global _subprocess_runner
|
|
if _subprocess_runner is None:
|
|
_subprocess_runner = SecureSubprocessRunner()
|
|
return _subprocess_runner
|
|
|
|
|
|
def run_kicad_command(
|
|
command_args: list[str],
|
|
input_files: list[str] | None = None,
|
|
output_files: list[str] | None = None,
|
|
working_dir: str | None = None,
|
|
timeout: float | None = None,
|
|
) -> subprocess.CompletedProcess:
|
|
"""Convenience function to run KiCad command."""
|
|
return get_subprocess_runner().run_kicad_command(
|
|
command_args, input_files, output_files, working_dir, timeout
|
|
)
|
|
|
|
|
|
async def run_kicad_command_async(
|
|
command_args: list[str],
|
|
input_files: list[str] | None = None,
|
|
output_files: list[str] | None = None,
|
|
working_dir: str | None = None,
|
|
timeout: float | None = None,
|
|
) -> subprocess.CompletedProcess:
|
|
"""Convenience function to run KiCad command asynchronously."""
|
|
return await get_subprocess_runner().run_kicad_command_async(
|
|
command_args, input_files, output_files, working_dir, timeout
|
|
)
|
|
|
|
|
|
def create_temp_file(
|
|
suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
|
|
) -> str:
|
|
"""Convenience function to create temporary file."""
|
|
return get_subprocess_runner().create_temp_file(suffix, prefix, content)
|