kicad-mcp/kicad_mcp/utils/secure_subprocess.py
Lauri Gates bd08a47a6f feat: add comprehensive security and input validation system
- Add PathValidator class for preventing path traversal attacks
- Add SecureSubprocessRunner for safe command execution
- Replace unsafe XML parsing with defusedxml for security
- Add comprehensive input validation tools for circuit generation
- Include security dependencies (defusedxml, bandit) in pyproject.toml
- Add security scanning job to CI/CD pipeline
- Add comprehensive test coverage for security utilities
- Add timeout constants for safe operation limits
- Add boundary validation for component positioning

This establishes a strong security foundation for the KiCad MCP server
by implementing defense-in-depth security measures across all input
vectors and external process interactions.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-17 21:34:16 +03:00

295 lines
9.5 KiB
Python

"""
Secure subprocess execution utilities for KiCad MCP.
Provides safe subprocess execution with input validation,
timeout enforcement, and security controls.
"""
import asyncio
import logging
import os
import subprocess # nosec B404 - subprocess usage is secured with validation
from ..config import TIMEOUT_CONSTANTS
from .kicad_cli import get_kicad_cli_path
from .path_validator import PathValidator, get_default_validator
logger = logging.getLogger(__name__)
class SecureSubprocessError(Exception):
"""Raised when secure subprocess operations fail."""
pass
class SecureSubprocessRunner:
"""
Secure subprocess runner with validation and safety controls.
Provides methods for safely executing KiCad CLI commands and other
subprocess operations with proper input validation and security controls.
"""
def __init__(self, path_validator: PathValidator | None = None):
"""
Initialize secure subprocess runner.
Args:
path_validator: Path validator to use (defaults to global instance)
"""
self.path_validator = path_validator or get_default_validator()
self.default_timeout = TIMEOUT_CONSTANTS["subprocess_default"]
def run_kicad_command(
self,
command_args: list[str],
input_files: list[str] | None = None,
output_files: list[str] | None = None,
working_dir: str | None = None,
timeout: float | None = None,
capture_output: bool = True,
) -> subprocess.CompletedProcess:
"""
Run a KiCad CLI command with security validation.
Args:
command_args: Command arguments (excluding the kicad-cli executable)
input_files: List of input file paths to validate
output_files: List of output file paths to validate
working_dir: Working directory for command execution
timeout: Command timeout in seconds
capture_output: Whether to capture stdout/stderr
Returns:
CompletedProcess result
Raises:
SecureSubprocessError: If validation fails or command fails
KiCadCLIError: If KiCad CLI not found
PathValidationError: If path validation fails
"""
# Get and validate KiCad CLI path
kicad_cli = get_kicad_cli_path(required=True)
# Validate input files
if input_files:
for file_path in input_files:
self.path_validator.validate_path(file_path, must_exist=True)
# Validate output file directories
if output_files:
for file_path in output_files:
output_dir = os.path.dirname(file_path)
if output_dir: # Only validate if there's a directory component
self.path_validator.validate_directory(output_dir, must_exist=True)
# Validate working directory
if working_dir:
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
# Construct full command
full_command = [kicad_cli] + command_args
# Log command for debugging (sanitized)
logger.debug(f"Executing KiCad command: {' '.join(full_command)}")
try:
return self._run_subprocess(
full_command,
working_dir=working_dir,
timeout=timeout or self.default_timeout,
capture_output=capture_output,
)
except subprocess.SubprocessError as e:
raise SecureSubprocessError(f"KiCad command failed: {e}") from e
async def run_kicad_command_async(
self,
command_args: list[str],
input_files: list[str] | None = None,
output_files: list[str] | None = None,
working_dir: str | None = None,
timeout: float | None = None,
) -> subprocess.CompletedProcess:
"""
Async version of run_kicad_command.
Args:
command_args: Command arguments (excluding the kicad-cli executable)
input_files: List of input file paths to validate
output_files: List of output file paths to validate
working_dir: Working directory for command execution
timeout: Command timeout in seconds
Returns:
CompletedProcess result
"""
# Run in thread pool to avoid blocking event loop
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.run_kicad_command,
command_args,
input_files,
output_files,
working_dir,
timeout,
True, # capture_output
)
def run_safe_command(
self,
command: list[str],
working_dir: str | None = None,
timeout: float | None = None,
allowed_commands: list[str] | None = None,
capture_output: bool = True,
) -> subprocess.CompletedProcess:
"""
Run a general command with security validation.
Args:
command: Full command list including executable
working_dir: Working directory for command execution
timeout: Command timeout in seconds
allowed_commands: List of allowed executables (whitelist)
capture_output: Whether to capture stdout/stderr
Returns:
CompletedProcess result
Raises:
SecureSubprocessError: If validation fails or command fails
"""
if not command:
raise SecureSubprocessError("Command cannot be empty")
executable = command[0]
# Validate executable against whitelist if provided
if allowed_commands and executable not in allowed_commands:
raise SecureSubprocessError(f"Command '{executable}' not in allowed list")
# Validate working directory
if working_dir:
working_dir = self.path_validator.validate_directory(working_dir, must_exist=True)
# Log command for debugging (sanitized)
logger.debug(f"Executing safe command: {' '.join(command)}")
try:
return self._run_subprocess(
command,
working_dir=working_dir,
timeout=timeout or self.default_timeout,
capture_output=capture_output,
)
except subprocess.SubprocessError as e:
raise SecureSubprocessError(f"Command failed: {e}") from e
def create_temp_file(
self, suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
) -> str:
"""
Create a temporary file within validated directories.
Args:
suffix: File suffix/extension
prefix: File prefix
content: Optional content to write to file
Returns:
Path to created temporary file
"""
temp_path = self.path_validator.create_safe_temp_path(prefix.rstrip("_"), suffix)
if content is not None:
with open(temp_path, "w", encoding="utf-8") as f:
f.write(content)
return temp_path
def _run_subprocess(
self,
command: list[str],
working_dir: str | None = None,
timeout: float = TIMEOUT_CONSTANTS["subprocess_default"],
capture_output: bool = True,
) -> subprocess.CompletedProcess:
"""
Internal subprocess runner with consistent settings.
Args:
command: Command to execute
working_dir: Working directory
timeout: Timeout in seconds
capture_output: Whether to capture output
Returns:
CompletedProcess result
Raises:
subprocess.SubprocessError: If command fails
"""
kwargs = {
"timeout": timeout,
"cwd": working_dir,
"text": True,
}
if capture_output:
kwargs.update(
{
"capture_output": True,
"check": False, # Don't raise on non-zero exit code
}
)
return subprocess.run(command, **kwargs) # nosec B603 - input is validated
# Global secure subprocess runner instance
_subprocess_runner = None
def get_subprocess_runner() -> SecureSubprocessRunner:
"""Get the global secure subprocess runner instance."""
global _subprocess_runner
if _subprocess_runner is None:
_subprocess_runner = SecureSubprocessRunner()
return _subprocess_runner
def run_kicad_command(
command_args: list[str],
input_files: list[str] | None = None,
output_files: list[str] | None = None,
working_dir: str | None = None,
timeout: float | None = None,
) -> subprocess.CompletedProcess:
"""Convenience function to run KiCad command."""
return get_subprocess_runner().run_kicad_command(
command_args, input_files, output_files, working_dir, timeout
)
async def run_kicad_command_async(
command_args: list[str],
input_files: list[str] | None = None,
output_files: list[str] | None = None,
working_dir: str | None = None,
timeout: float | None = None,
) -> subprocess.CompletedProcess:
"""Convenience function to run KiCad command asynchronously."""
return await get_subprocess_runner().run_kicad_command_async(
command_args, input_files, output_files, working_dir, timeout
)
def create_temp_file(
suffix: str = "", prefix: str = "kicad_mcp_", content: str | None = None
) -> str:
"""Convenience function to create temporary file."""
return get_subprocess_runner().create_temp_file(suffix, prefix, content)