From c9de63cf291555d79d7abf16213694a57cf68b6e Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 22 May 2026 14:49:00 -0600 Subject: [PATCH] Security hardening + CalVer 2026.05.22 for first PyPI publish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Margaret Hamilton pre-publish review found 5 blockers + 9 flags. All correctness/security issues fixed; H6 (connection pooling perf) deferred. caching.py — comprehensive hardening: - B3: base64.b64decode now uses validate=True (no silent mangling) - B4: MCP_ALLOW_LOCAL_FILES evaluated per request, not at import - B5: extension allowlist + 0o700 temp dir + 0o600 files + O_EXCL writes - B2+H5: MCP_MAX_UPLOAD_BYTES / MCP_MAX_DOWNLOAD_BYTES caps (50MB default), enforced pre-decode and during chunked downloads - H1: env var parsing strip()+lower(), truthy set {true,1,yes,on} - H3: UUID-based unique temp paths replace SHA-prefix collision risk - H7: ZIP magic bytes disambiguated via [Content_Types].xml peek - H8: stronger CSV heuristic (commas/tabs + UTF-8 + no NULs) - H9: specific exceptions in cache I/O with logged warnings - New: upload_cleanup_scope() context manager + ContextVar tracker decorators.py: - cleanup_temp_uploads decorator wraps tool methods, auto-cleans temp upload files on return OR exception (B1+H4) validation.py: - OfficeFileError.__init__ scrubs /tmp/mcp_office_uploads/ paths from messages so server paths never leak to HTTP callers (H2) mixins/{universal,word,excel}.py: - @cleanup_temp_uploads applied to all 19 tool methods that resolve files tests/test_security_hardening.py: - 24 new tests, one per Hamilton finding, prove fixes work and catch regressions. Including end-to-end: temp file created → exists during scope → gone after scope exit (success AND exception paths) pyproject.toml: - version 0.1.0 → 2026.05.22 (CalVer per CLAUDE.md convention) - URLs updated GitHub → git.supported.systems/MCP/mcwaddams - Belt-and-suspenders sdist exclude list (defends against future include-list edits accidentally shipping CLAUDE.md, .env, etc.) --- pyproject.toml | 45 ++- src/mcwaddams/mixins/excel.py | 4 + src/mcwaddams/mixins/universal.py | 7 + src/mcwaddams/mixins/word.py | 11 + src/mcwaddams/utils/caching.py | 463 +++++++++++++++++++++--------- src/mcwaddams/utils/decorators.py | 26 +- src/mcwaddams/utils/validation.py | 16 +- tests/test_security_hardening.py | 285 ++++++++++++++++++ uv.lock | 2 +- 9 files changed, 708 insertions(+), 151 deletions(-) create mode 100644 tests/test_security_hardening.py diff --git a/pyproject.toml b/pyproject.toml index b7f14e3..1b0a0e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "mcwaddams" -version = "0.1.0" +version = "2026.05.22" description = "MCP server for Microsoft Office document processing. Named for Milton Waddams, who was relocated to the basement with boxes of legacy documents." authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}] readme = "README.md" @@ -64,9 +64,9 @@ enhanced = [ ] [project.urls] -Homepage = "https://github.com/ryanmalloy/mcwaddams" -Repository = "https://github.com/ryanmalloy/mcwaddams" -Issues = "https://github.com/ryanmalloy/mcwaddams/issues" +Homepage = "https://mcwaddams.l.supported.systems" +Repository = "https://git.supported.systems/MCP/mcwaddams" +Issues = "https://git.supported.systems/MCP/mcwaddams/issues" [project.scripts] mcwaddams = "mcwaddams.server:main" @@ -79,13 +79,46 @@ build-backend = "hatchling.build" packages = ["src/mcwaddams"] [tool.hatch.build.targets.sdist] +# Belt: only ship what's listed here. include = [ "/src", - "/tests", - "/examples", "/README.md", "/LICENSE", ] +# Suspenders: even if a file matches an include glob, drop it if it matches here. +# These guard against accidental inclusion if the include list grows. +exclude = [ + "CLAUDE.md", + ".env", + ".env.*", + ".mcp.json", + ".pytest_cache", + ".ruff_cache", + ".mypy_cache", + "htmlcov", + "dist", + "build", + "reports", + "audits", + "tests", + "examples", + "docs", + "ADVANCED_TOOLS_PLAN.md", + "IMPLEMENTATION_STATUS.md", + "QUICKSTART_DASHBOARD.md", + "TESTING_STRATEGY.md", + "*.docx", + "*.xlsx", + "*.pptx", + "*.doc", + "*.xls", + "*.ppt", + "*.json", + "Dockerfile", + "docker-compose.yml", + "Makefile", + "uv.lock", +] # Code quality tools [tool.black] diff --git a/src/mcwaddams/mixins/excel.py b/src/mcwaddams/mixins/excel.py index ce6b025..2e411bb 100755 --- a/src/mcwaddams/mixins/excel.py +++ b/src/mcwaddams/mixins/excel.py @@ -15,6 +15,7 @@ from ..utils import ( resolve_field_defaults, handle_office_errors ) +from ..utils.decorators import cleanup_temp_uploads # Common field description for file_content parameter @@ -38,6 +39,7 @@ class ExcelMixin(MCPMixin): detect_data_types=True, check_data_quality=True ) + @cleanup_temp_uploads async def analyze_excel_data( self, file_path: str = Field(description="Path to Excel document or URL"), @@ -183,6 +185,7 @@ class ExcelMixin(MCPMixin): include_values=True, analyze_dependencies=True ) + @cleanup_temp_uploads async def extract_excel_formulas( self, file_path: str = Field(description="Path to Excel document or URL"), @@ -294,6 +297,7 @@ class ExcelMixin(MCPMixin): y_columns=[], output_format="chartjs" ) + @cleanup_temp_uploads async def create_excel_chart_data( self, file_path: str = Field(description="Path to Excel document or URL"), diff --git a/src/mcwaddams/mixins/universal.py b/src/mcwaddams/mixins/universal.py index 420091a..033b019 100755 --- a/src/mcwaddams/mixins/universal.py +++ b/src/mcwaddams/mixins/universal.py @@ -14,6 +14,7 @@ from ..utils import ( resolve_office_file_path, validate_office_file, ) +from ..utils.decorators import cleanup_temp_uploads from ..resources import resource_store, EmbeddedResource, ResourceStore @@ -31,6 +32,7 @@ class UniversalMixin(MCPMixin): name="extract_text", description="Extract text content from Office documents with intelligent method selection. Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), and CSV files. Uses multi-library fallback for maximum compatibility." ) + @cleanup_temp_uploads async def extract_text( self, file_path: str = Field(description="Path to Office document or URL"), @@ -90,6 +92,7 @@ class UniversalMixin(MCPMixin): name="extract_images", description="Extract images from Office documents with size filtering and format conversion." ) + @cleanup_temp_uploads async def extract_images( self, file_path: str = Field(description="Path to Office document or URL"), @@ -142,6 +145,7 @@ class UniversalMixin(MCPMixin): name="extract_metadata", description="Extract comprehensive metadata from Office documents." ) + @cleanup_temp_uploads async def extract_metadata( self, file_path: str = Field(description="Path to Office document or URL"), @@ -183,6 +187,7 @@ class UniversalMixin(MCPMixin): name="detect_office_format", description="Intelligent Office document format detection and analysis." ) + @cleanup_temp_uploads async def detect_office_format( self, file_path: str = Field(description="Path to Office document or URL"), @@ -208,6 +213,7 @@ class UniversalMixin(MCPMixin): name="analyze_document_health", description="Comprehensive document health and integrity analysis." ) + @cleanup_temp_uploads async def analyze_document_health( self, file_path: str = Field(description="Path to Office document or URL"), @@ -359,6 +365,7 @@ class UniversalMixin(MCPMixin): name="index_document", description="Scan and index all resources in a document (images, chapters, sheets, slides). Returns resource URIs that can be fetched individually. Use this before accessing resources via their URIs." ) + @cleanup_temp_uploads async def index_document( self, file_path: str = Field(description="Path to Office document or URL"), diff --git a/src/mcwaddams/mixins/word.py b/src/mcwaddams/mixins/word.py index 36418d5..6159041 100755 --- a/src/mcwaddams/mixins/word.py +++ b/src/mcwaddams/mixins/word.py @@ -15,6 +15,7 @@ from ..utils import ( resolve_field_defaults, handle_office_errors ) +from ..utils.decorators import cleanup_temp_uploads from ..pagination import paginate_document_conversion, PaginationParams @@ -48,6 +49,7 @@ class WordMixin(MCPMixin): session_id=None, return_all=False ) + @cleanup_temp_uploads async def convert_to_markdown( self, file_path: str = Field(description="Path to Office document or URL"), @@ -280,6 +282,7 @@ class WordMixin(MCPMixin): preserve_merged_cells=True, include_headers=True ) + @cleanup_temp_uploads async def extract_word_tables( self, file_path: str = Field(description="Path to Word document or URL"), @@ -457,6 +460,7 @@ class WordMixin(MCPMixin): extract_outline=True, analyze_styles=True ) + @cleanup_temp_uploads async def analyze_word_structure( self, file_path: str = Field(description="Path to Word document or URL"), @@ -653,6 +657,7 @@ class WordMixin(MCPMixin): description="Get a clean, structured outline of a Word document showing all headings, sections, and chapters with their locations. Perfect for understanding document structure before reading." ) @handle_office_errors("Document outline") + @cleanup_temp_uploads async def get_document_outline( self, file_path: str = Field(description="Path to Word document or URL"), @@ -774,6 +779,7 @@ class WordMixin(MCPMixin): description="Analyze a Word document for style inconsistencies, formatting issues, and potential problems like mismatched heading styles or missing chapters." ) @handle_office_errors("Style consistency check") + @cleanup_temp_uploads async def check_style_consistency( self, file_path: str = Field(description="Path to Word document or URL"), @@ -932,6 +938,7 @@ class WordMixin(MCPMixin): description="Search for text within a Word document and return matches with surrounding context and location information." ) @handle_office_errors("Document search") + @cleanup_temp_uploads async def search_document( self, file_path: str = Field(description="Path to Word document or URL"), @@ -1019,6 +1026,7 @@ class WordMixin(MCPMixin): description="Extract named entities (people, places, organizations) from a Word document using pattern-based recognition. Great for identifying key characters, locations, and institutions mentioned in the text." ) @handle_office_errors("Entity extraction") + @cleanup_temp_uploads async def extract_entities( self, file_path: str = Field(description="Path to Word document or URL"), @@ -1230,6 +1238,7 @@ class WordMixin(MCPMixin): description="Get brief summaries/previews of each chapter in a Word document. Extracts the opening sentences of each chapter to give a quick overview of content." ) @handle_office_errors("Chapter summaries") + @cleanup_temp_uploads async def get_chapter_summaries( self, file_path: str = Field(description="Path to Word document or URL"), @@ -1330,6 +1339,7 @@ class WordMixin(MCPMixin): description="Save your reading progress in a Word document. Creates a bookmark file to track which chapter/paragraph you're on, so you can resume reading later." ) @handle_office_errors("Save reading progress") + @cleanup_temp_uploads async def save_reading_progress( self, file_path: str = Field(description="Path to Word document"), @@ -1400,6 +1410,7 @@ class WordMixin(MCPMixin): description="Retrieve your saved reading progress for a Word document. Shows where you left off and your reading history." ) @handle_office_errors("Get reading progress") + @cleanup_temp_uploads async def get_reading_progress( self, file_path: str = Field(description="Path to Word document"), diff --git a/src/mcwaddams/utils/caching.py b/src/mcwaddams/utils/caching.py index 7ce869c..c1d1fc5 100644 --- a/src/mcwaddams/utils/caching.py +++ b/src/mcwaddams/utils/caching.py @@ -2,228 +2,318 @@ import os import time +import json +import uuid +import logging import hashlib import tempfile import base64 +import zipfile +import io +from contextvars import ContextVar from pathlib import Path -from typing import Optional, Dict, Any +from contextlib import asynccontextmanager +from typing import Optional, Dict, Any, AsyncIterator, List import aiofiles import aiohttp from urllib.parse import urlparse from .validation import OfficeFileError +logger = logging.getLogger(__name__) + +# Truthy values accepted for boolean env vars +_TRUTHY = frozenset({"true", "1", "yes", "on"}) + +# Allowlist of file extensions accepted from base64 uploads. +# Anything else gets coerced to .bin to avoid arbitrary-extension writes. +_ALLOWED_UPLOAD_EXTENSIONS = frozenset({ + ".docx", ".doc", + ".xlsx", ".xls", ".xlsm", + ".pptx", ".ppt", + ".csv", ".txt", + ".dotx", ".xltx", ".potx", +}) + +# Defaults for size limits — overridable via env vars. +_DEFAULT_MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB +_DEFAULT_MAX_DOWNLOAD_BYTES = 50 * 1024 * 1024 # 50 MB + + +# Per-request tracker for temp upload files. Set by tool-method decorator; +# _resolve_from_content registers paths here so the decorator can clean them up +# on exit (success OR exception). ContextVar is async-safe across concurrent tools. +_upload_tracker: ContextVar[Optional[List[Path]]] = ContextVar( + "mcwaddams_upload_tracker", default=None +) + + +def _register_temp_upload(path: Path) -> None: + """Register a temp upload for cleanup if a tracker is active.""" + tracker = _upload_tracker.get() + if tracker is not None: + tracker.append(path) + + +@asynccontextmanager +async def upload_cleanup_scope() -> AsyncIterator[None]: + """Track and clean up base64-upload temp files within the scope. + + Used by the @cleanup_temp_uploads decorator on tool methods. Any temp file + created by _resolve_from_content during the scope is deleted on exit, + regardless of whether the body raised or returned normally. + """ + tracker: List[Path] = [] + token = _upload_tracker.set(tracker) + try: + yield + finally: + _upload_tracker.reset(token) + for path in tracker: + try: + path.unlink(missing_ok=True) + except OSError as e: + logger.warning("Failed to clean up temp upload %s: %s", path, e) + + +def _env_truthy(value: Optional[str]) -> bool: + """Parse a truthy env var value, tolerant of whitespace and case.""" + if value is None: + return False + return value.strip().lower() in _TRUTHY + -# Environment variable to control local file access -# Default depends on transport mode: -# - stdio (local): allow local files by default -# - streamable-http (remote): block local files by default def _get_allow_local_files() -> bool: - """Determine if local file access is allowed based on transport mode.""" + """Determine if local file access is allowed based on transport mode. + + Evaluated at request time (not import time) so env changes after + import (test harnesses, embedded usage) are honored. + """ explicit = os.environ.get("MCP_ALLOW_LOCAL_FILES") if explicit is not None: - return explicit.lower() == "true" + return _env_truthy(explicit) - # If not explicitly set, default based on transport mode - transport = os.environ.get("MCP_TRANSPORT", "stdio").lower() + transport = (os.environ.get("MCP_TRANSPORT") or "stdio").strip().lower() return transport == "stdio" -MCP_ALLOW_LOCAL_FILES = _get_allow_local_files() + +def _get_max_upload_bytes() -> int: + """Max accepted size for base64 uploads, after decoding.""" + raw = os.environ.get("MCP_MAX_UPLOAD_BYTES") + if raw is not None: + try: + return max(1, int(raw.strip())) + except ValueError: + logger.warning( + "MCP_MAX_UPLOAD_BYTES=%r is not a valid integer; using default %d", + raw, _DEFAULT_MAX_UPLOAD_BYTES, + ) + return _DEFAULT_MAX_UPLOAD_BYTES + + +def _get_max_download_bytes() -> int: + """Max accepted size for URL downloads.""" + raw = os.environ.get("MCP_MAX_DOWNLOAD_BYTES") + if raw is not None: + try: + return max(1, int(raw.strip())) + except ValueError: + logger.warning( + "MCP_MAX_DOWNLOAD_BYTES=%r is not a valid integer; using default %d", + raw, _DEFAULT_MAX_DOWNLOAD_BYTES, + ) + return _DEFAULT_MAX_DOWNLOAD_BYTES class OfficeFileCache: """Simple file cache for downloaded Office documents.""" - + def __init__(self, cache_dir: Optional[str] = None, cache_duration: int = 3600): - """Initialize cache with optional custom directory and duration. - - Args: - cache_dir: Custom cache directory. If None, uses system temp. - cache_duration: Cache duration in seconds (default: 1 hour) - """ if cache_dir: self.cache_dir = Path(cache_dir) else: self.cache_dir = Path(tempfile.gettempdir()) / "mcp_office_cache" - + self.cache_duration = cache_duration - self.cache_dir.mkdir(exist_ok=True) - - # Cache metadata file + self.cache_dir.mkdir(exist_ok=True, mode=0o700) + self.metadata_file = self.cache_dir / "cache_metadata.json" self._metadata = self._load_metadata() - + def _load_metadata(self) -> Dict[str, Any]: - """Load cache metadata.""" + """Load cache metadata, tolerant of missing or corrupt files.""" + if not self.metadata_file.exists(): + return {} try: - if self.metadata_file.exists(): - import json - with open(self.metadata_file, 'r') as f: - return json.load(f) - except Exception: - pass - return {} - + with open(self.metadata_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as e: + logger.warning("Cache metadata unreadable (%s); starting fresh.", e) + return {} + def _save_metadata(self) -> None: - """Save cache metadata.""" + """Save cache metadata; log on failure rather than swallow silently.""" try: - import json with open(self.metadata_file, 'w') as f: json.dump(self._metadata, f, indent=2) - except Exception: - pass - + except OSError as e: + logger.warning("Failed to write cache metadata: %s", e) + def _get_cache_key(self, url: str) -> str: - """Generate cache key for URL.""" return hashlib.sha256(url.encode()).hexdigest() - + def _get_cache_path(self, cache_key: str) -> Path: - """Get cache file path for cache key.""" return self.cache_dir / f"{cache_key}.office" - + def is_cached(self, url: str) -> bool: - """Check if URL is cached and still valid.""" cache_key = self._get_cache_key(url) - + if cache_key not in self._metadata: return False - + cache_info = self._metadata[cache_key] cache_path = self._get_cache_path(cache_key) - - # Check if file exists + if not cache_path.exists(): del self._metadata[cache_key] self._save_metadata() return False - - # Check if cache is still valid + cache_time = cache_info.get('cached_at', 0) if time.time() - cache_time > self.cache_duration: self._remove_cache_entry(cache_key) return False - + return True - + def get_cached_path(self, url: str) -> Optional[str]: - """Get cached file path for URL if available.""" if not self.is_cached(url): return None - cache_key = self._get_cache_key(url) - cache_path = self._get_cache_path(cache_key) - return str(cache_path) - + return str(self._get_cache_path(cache_key)) + async def cache_url(self, url: str, timeout: int = 30) -> str: - """Download and cache file from URL.""" + """Download and cache file from URL with size-cap enforcement.""" cache_key = self._get_cache_key(url) cache_path = self._get_cache_path(cache_key) - - # Download file + max_bytes = _get_max_download_bytes() + try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=timeout) as response: response.raise_for_status() - - # Get response metadata + + # Reject based on declared Content-Length before reading. + content_length_hdr = response.headers.get('content-length') + if content_length_hdr is not None: + try: + declared = int(content_length_hdr) + if declared > max_bytes: + raise OfficeFileError( + f"Remote file too large: {declared} bytes " + f"(max {max_bytes})" + ) + except ValueError: + pass # malformed header — fall through to chunk-counted enforcement + content_type = response.headers.get('content-type', '') - content_length = response.headers.get('content-length') last_modified = response.headers.get('last-modified') - - # Write to cache file + + bytes_written = 0 async with aiofiles.open(cache_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): + bytes_written += len(chunk) + if bytes_written > max_bytes: + raise OfficeFileError( + f"Remote file exceeded {max_bytes} bytes during download" + ) await f.write(chunk) - - # Update metadata + self._metadata[cache_key] = { 'url': url, 'cached_at': time.time(), 'content_type': content_type, - 'content_length': content_length, + 'content_length': content_length_hdr, 'last_modified': last_modified, - 'file_size': cache_path.stat().st_size + 'file_size': cache_path.stat().st_size, } self._save_metadata() - + return str(cache_path) - + + except OfficeFileError: + # Clean up partial file before re-raising + if cache_path.exists(): + try: + cache_path.unlink() + except OSError: + pass + raise except Exception as e: - # Clean up on error if cache_path.exists(): try: cache_path.unlink() except OSError: pass raise OfficeFileError(f"Failed to download and cache file: {str(e)}") - + def _remove_cache_entry(self, cache_key: str) -> None: - """Remove cache entry and file.""" cache_path = self._get_cache_path(cache_key) - - # Remove file if cache_path.exists(): try: cache_path.unlink() - except OSError: - pass - - # Remove metadata + except OSError as e: + logger.warning("Failed to unlink cache file %s: %s", cache_path, e) if cache_key in self._metadata: del self._metadata[cache_key] self._save_metadata() - + def clear_cache(self) -> None: - """Clear all cached files.""" for cache_key in list(self._metadata.keys()): self._remove_cache_entry(cache_key) - + def cleanup_expired(self) -> int: - """Remove expired cache entries. Returns number of entries removed.""" current_time = time.time() expired_keys = [] - + for cache_key, cache_info in self._metadata.items(): cache_time = cache_info.get('cached_at', 0) if current_time - cache_time > self.cache_duration: expired_keys.append(cache_key) - + for cache_key in expired_keys: self._remove_cache_entry(cache_key) - + return len(expired_keys) - + def get_cache_stats(self) -> Dict[str, Any]: - """Get cache statistics.""" total_files = len(self._metadata) total_size = 0 expired_count = 0 current_time = time.time() - + for cache_key, cache_info in self._metadata.items(): cache_path = self._get_cache_path(cache_key) if cache_path.exists(): total_size += cache_path.stat().st_size - + cache_time = cache_info.get('cached_at', 0) if current_time - cache_time > self.cache_duration: expired_count += 1 - + return { 'total_files': total_files, 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'expired_files': expired_count, 'cache_directory': str(self.cache_dir), - 'cache_duration_hours': self.cache_duration / 3600 + 'cache_duration_hours': self.cache_duration / 3600, } -# Global cache instance _global_cache: Optional[OfficeFileCache] = None def get_cache() -> OfficeFileCache: - """Get global cache instance.""" global _global_cache if _global_cache is None: _global_cache = OfficeFileCache() @@ -234,9 +324,9 @@ async def resolve_office_file_path( file_path: str, use_cache: bool = True, file_content: Optional[str] = None, - filename: Optional[str] = None + filename: Optional[str] = None, ) -> str: - """Resolve file path, downloading from URL if necessary, or decode inline content. + """Resolve a file reference to a local path. Args: file_path: Local file path or URL (ignored if file_content provided) @@ -245,105 +335,196 @@ async def resolve_office_file_path( filename: Original filename for extension detection (used with file_content) Returns: - Local file path (temp file if from content, downloaded if from URL) + Local file path. Callers MUST clean up temp files when file_content was used — + prefer the `resolved_office_file()` context manager which handles cleanup automatically. Security: - When MCP_ALLOW_LOCAL_FILES=false (default for HTTP transport): - - Local file paths are rejected - - Only URLs and file_content are allowed - - This prevents hosted servers from accessing server-side files + When MCP_ALLOW_LOCAL_FILES is false (default for HTTP transport), + local filesystem paths are rejected. Env var is read at request time. """ - # Priority 1: If file_content is provided, decode and write to temp file if file_content: return await _resolve_from_content(file_content, filename or file_path) - # Check if it's a URL parsed = urlparse(file_path) is_url = bool(parsed.scheme and parsed.netloc) if not is_url: - # Local file path - check if allowed - if not MCP_ALLOW_LOCAL_FILES: + if not _get_allow_local_files(): raise OfficeFileError( "Local file access is disabled for this server. " - "Please use file_content parameter to upload document data, " - "or provide a URL. Set MCP_ALLOW_LOCAL_FILES=true to enable local files." + "Use the file_content parameter to upload document data, " + "or provide an https:// URL." ) return file_path - # Validate URL scheme if parsed.scheme not in ['http', 'https']: raise OfficeFileError(f"Unsupported URL scheme: {parsed.scheme}") cache = get_cache() - # Check cache first if use_cache and cache.is_cached(file_path): cached_path = cache.get_cached_path(file_path) if cached_path: return cached_path - # Download and cache if use_cache: return await cache.cache_url(file_path) else: - # Direct download without caching from .validation import download_office_file return await download_office_file(file_path) -async def _resolve_from_content(file_content: str, filename_hint: str) -> str: - """Decode base64 content and write to a temp file. +@asynccontextmanager +async def resolved_office_file( + file_path: str, + use_cache: bool = True, + file_content: Optional[str] = None, + filename: Optional[str] = None, +) -> AsyncIterator[str]: + """Async context manager: resolves path, cleans up temp files on exit. - Args: - file_content: Base64-encoded file data - filename_hint: Filename or path to extract extension from + Use this from tool implementations instead of calling resolve_office_file_path + directly when a base64 upload might be involved. Temp files created from + file_content are deleted on exit (success OR exception). URL cache and local + paths are left alone. - Returns: - Path to temporary file containing decoded content + Example: + async with resolved_office_file(file_path, file_content=file_content) as local_path: + return process(local_path) """ + is_temp_upload = bool(file_content) + local_path: Optional[str] = None try: - # Decode base64 content - content_bytes = base64.b64decode(file_content) - except Exception as e: + local_path = await resolve_office_file_path( + file_path, + use_cache=use_cache, + file_content=file_content, + filename=filename, + ) + yield local_path + finally: + if is_temp_upload and local_path: + try: + Path(local_path).unlink(missing_ok=True) + except OSError as e: + logger.warning("Failed to clean up temp upload %s: %s", local_path, e) + + +def _scrub_temp_path(message: str) -> str: + """Remove server-side temp upload paths from error messages.""" + temp_root = str(Path(tempfile.gettempdir()) / "mcp_office_uploads") + if temp_root in message: + return message.replace(temp_root, "") + return message + + +async def _resolve_from_content(file_content: str, filename_hint: str) -> str: + """Decode base64 content and write to a temp file with strict validation.""" + # Pre-decode size check (base64 expands by ~4/3) + max_bytes = _get_max_upload_bytes() + encoded_len = len(file_content) + max_encoded = (max_bytes * 4 // 3) + 4 # +4 for padding slack + if encoded_len > max_encoded: + raise OfficeFileError( + f"Upload too large: encoded size {encoded_len} exceeds limit " + f"(max {max_bytes} decoded bytes). Set MCP_MAX_UPLOAD_BYTES to override." + ) + + # Strict base64 decode — rejects garbage instead of silently mangling + try: + content_bytes = base64.b64decode(file_content, validate=True) + except (ValueError, base64.binascii.Error) as e: raise OfficeFileError(f"Invalid base64 content: {str(e)}") - # Extract extension from filename hint - ext = Path(filename_hint).suffix.lower() - if not ext: - # Try to detect from content magic bytes + if len(content_bytes) > max_bytes: + raise OfficeFileError( + f"Upload too large: {len(content_bytes)} bytes (max {max_bytes}). " + f"Set MCP_MAX_UPLOAD_BYTES to override." + ) + + # Extension determination — allowlist first, magic bytes as fallback, + # default to .bin for anything we don't recognize. + raw_ext = Path(filename_hint).suffix.lower() if filename_hint else "" + if raw_ext and raw_ext in _ALLOWED_UPLOAD_EXTENSIONS: + ext = raw_ext + else: ext = _detect_extension_from_bytes(content_bytes) - # Create temp file with correct extension + # Locked-down temp dir: owner-only access temp_dir = Path(tempfile.gettempdir()) / "mcp_office_uploads" - temp_dir.mkdir(exist_ok=True) + temp_dir.mkdir(exist_ok=True, mode=0o700) + # Re-apply mode in case dir existed with looser perms + try: + os.chmod(temp_dir, 0o700) + except OSError: + pass - # Generate unique filename - content_hash = hashlib.sha256(content_bytes).hexdigest()[:12] - temp_path = temp_dir / f"upload_{content_hash}{ext}" + # Unique filename — UUID prevents concurrent-write collisions + unique = uuid.uuid4().hex[:16] + temp_path = temp_dir / f"upload_{unique}{ext}" - # Write content to temp file - async with aiofiles.open(temp_path, 'wb') as f: - await f.write(content_bytes) + # Atomic-ish write: O_EXCL ensures we never overwrite + fd = os.open( + str(temp_path), + os.O_WRONLY | os.O_CREAT | os.O_EXCL, + 0o600, + ) + try: + async with aiofiles.open(fd, 'wb') as f: + await f.write(content_bytes) + except Exception: + try: + temp_path.unlink(missing_ok=True) + except OSError: + pass + raise + + # Register for automatic cleanup by the active tool-method scope (if any). + _register_temp_upload(temp_path) return str(temp_path) def _detect_extension_from_bytes(content: bytes) -> str: - """Detect file extension from magic bytes.""" - # ZIP-based formats (docx, xlsx, pptx) + """Detect file extension from magic bytes, with ZIP disambiguation.""" + # ZIP-based formats — peek inside to tell docx/xlsx/pptx apart if content[:4] == b'PK\x03\x04': - # Could be docx, xlsx, or pptx - default to .docx - # Full detection would require reading internal XML - return ".docx" + return _disambiguate_zip_format(content) # OLE Compound Document (doc, xls, ppt) if content[:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1': return ".doc" - # CSV (text-based, starts with printable characters) - if content[:1].isalpha() or content[:1] in b'"\'': - return ".csv" + # Stronger CSV heuristic: comma or tab in first KB, no NUL bytes, + # mostly printable ASCII or UTF-8 + head = content[:1024] + if head and b'\x00' not in head: + if b',' in head or b'\t' in head or b';' in head: + try: + head.decode('utf-8') + return ".csv" + except UnicodeDecodeError: + pass - # Default - return ".bin" \ No newline at end of file + return ".bin" + + +def _disambiguate_zip_format(content: bytes) -> str: + """Inspect [Content_Types].xml inside a ZIP to identify the Office format.""" + try: + with zipfile.ZipFile(io.BytesIO(content)) as zf: + try: + types_xml = zf.read("[Content_Types].xml").decode("utf-8", errors="replace") + except KeyError: + return ".bin" + + if "wordprocessingml" in types_xml: + return ".docx" + if "spreadsheetml" in types_xml: + return ".xlsx" + if "presentationml" in types_xml: + return ".pptx" + except (zipfile.BadZipFile, OSError): + pass + + return ".bin" diff --git a/src/mcwaddams/utils/decorators.py b/src/mcwaddams/utils/decorators.py index 2be3058..c4c0812 100644 --- a/src/mcwaddams/utils/decorators.py +++ b/src/mcwaddams/utils/decorators.py @@ -1,7 +1,8 @@ """ Decorators for MCP Office Tools. -Provides common patterns for error handling and Pydantic field resolution. +Provides common patterns for error handling, temp-file cleanup, +and Pydantic field resolution. """ from functools import wraps @@ -9,11 +10,34 @@ from typing import Any, Callable, TypeVar from pydantic.fields import FieldInfo +from .caching import upload_cleanup_scope from .validation import OfficeFileError T = TypeVar('T') +def cleanup_temp_uploads(func: Callable[..., T]) -> Callable[..., T]: + """Auto-clean base64-upload temp files created during the wrapped call. + + Wrap each MCP tool method that may receive a `file_content` parameter. + Any temp file written by `_resolve_from_content` during the call is deleted + when the function returns or raises. Safe for concurrent calls + (uses ContextVar — per-task isolation). + + Usage: + @mcp_tool(...) + @cleanup_temp_uploads + async def extract_text(self, file_path: str, file_content: str = None, ...): + local_path = await resolve_office_file_path(file_path, file_content=file_content, ...) + return process(local_path) + """ + @wraps(func) + async def wrapper(*args, **kwargs): + async with upload_cleanup_scope(): + return await func(*args, **kwargs) + return wrapper + + def resolve_field_defaults(**defaults: Any) -> Callable: """ Decorator to resolve Pydantic Field defaults for direct function calls. diff --git a/src/mcwaddams/utils/validation.py b/src/mcwaddams/utils/validation.py index db50ab0..92e69c9 100644 --- a/src/mcwaddams/utils/validation.py +++ b/src/mcwaddams/utils/validation.py @@ -1,6 +1,7 @@ """File validation utilities for Office documents.""" import os +import tempfile from pathlib import Path from typing import Dict, Any, Optional from urllib.parse import urlparse @@ -16,8 +17,19 @@ except ImportError: class OfficeFileError(Exception): - """Custom exception for Office file processing errors.""" - pass + """Custom exception for Office file processing errors. + + Sanitizes internal upload temp paths from messages so server-side + paths never leak to remote (HTTP transport) callers. + """ + + # Path prefix to strip from error messages (single source of truth) + _UPLOAD_TEMP_PREFIX = str(Path(tempfile.gettempdir()) / "mcp_office_uploads") + + def __init__(self, message: str = ""): + if message and self._UPLOAD_TEMP_PREFIX in message: + message = message.replace(self._UPLOAD_TEMP_PREFIX, "") + super().__init__(message) # Office format MIME types and extensions diff --git a/tests/test_security_hardening.py b/tests/test_security_hardening.py new file mode 100644 index 0000000..e62ae17 --- /dev/null +++ b/tests/test_security_hardening.py @@ -0,0 +1,285 @@ +"""Verification tests for Margaret Hamilton review blockers. + +Each test maps directly to a finding from the pre-publish review. These tests +exist to prove the fixes work as advertised and to prevent regressions. +""" + +import base64 +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest + +from mcwaddams.utils.caching import ( + _detect_extension_from_bytes, + _env_truthy, + _get_allow_local_files, + _get_max_upload_bytes, + _resolve_from_content, + resolve_office_file_path, + upload_cleanup_scope, +) +from mcwaddams.utils.decorators import cleanup_temp_uploads +from mcwaddams.utils.validation import OfficeFileError + + +def _minimal_docx_bytes() -> bytes: + """Smallest plausible ZIP that looks like a docx for magic-byte tests.""" + import io + import zipfile + + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr( + "[Content_Types].xml", + '', + ) + return buf.getvalue() + + +class TestB3_Base64Validation: + """B3: b64decode must reject garbage instead of silently mangling.""" + + @pytest.mark.asyncio + async def test_garbage_input_rejected(self): + with pytest.raises(OfficeFileError, match="Invalid base64"): + await _resolve_from_content("not base64 at all", "x.docx") + + @pytest.mark.asyncio + async def test_whitespace_in_b64_rejected(self): + valid_b64 = base64.b64encode(b"hello").decode() + # Inject a non-b64 char in the middle + corrupted = valid_b64[:4] + "!!" + valid_b64[4:] + with pytest.raises(OfficeFileError, match="Invalid base64"): + await _resolve_from_content(corrupted, "x.txt") + + +class TestB4_EnvVarAtRequestTime: + """B4: env var must be read at request time, not import time.""" + + @pytest.mark.asyncio + async def test_env_change_after_import_takes_effect(self): + with patch.dict(os.environ, {"MCP_TRANSPORT": "stdio"}, clear=False): + os.environ.pop("MCP_ALLOW_LOCAL_FILES", None) + assert _get_allow_local_files() is True + + with patch.dict(os.environ, {"MCP_TRANSPORT": "streamable-http"}, clear=False): + os.environ.pop("MCP_ALLOW_LOCAL_FILES", None) + assert _get_allow_local_files() is False + + +class TestH1_EnvVarHygiene: + """H1: env var parsing must tolerate whitespace, accept truthy set.""" + + def test_truthy_values(self): + for v in ("true", "True", "TRUE", "1", "yes", "on", " true ", "YES\n"): + assert _env_truthy(v) is True, f"Expected {v!r} → True" + + def test_falsy_values(self): + for v in ("false", "0", "no", "off", "", None, "maybe", "2"): + assert _env_truthy(v) is False, f"Expected {v!r} → False" + + @pytest.mark.asyncio + async def test_transport_with_trailing_space_defaults_correctly(self): + with patch.dict(os.environ, {"MCP_TRANSPORT": " stdio "}, clear=False): + os.environ.pop("MCP_ALLOW_LOCAL_FILES", None) + assert _get_allow_local_files() is True + + +class TestB5_ExtensionAllowlist: + """B5: only known Office extensions allowed; unknown coerced to .bin.""" + + @pytest.mark.asyncio + async def test_executable_extension_blocked(self): + content = base64.b64encode(b"\x7fELF\x02\x01\x01\x00").decode() + path = await _resolve_from_content(content, "evil.sh") + try: + assert not str(path).endswith(".sh") + assert str(path).endswith(".bin") # falls through to magic-bytes default + finally: + Path(path).unlink(missing_ok=True) + + @pytest.mark.asyncio + async def test_known_extension_preserved(self): + content = base64.b64encode(_minimal_docx_bytes()).decode() + path = await _resolve_from_content(content, "report.docx") + try: + assert str(path).endswith(".docx") + finally: + Path(path).unlink(missing_ok=True) + + @pytest.mark.asyncio + async def test_path_traversal_in_filename_ignored(self): + content = base64.b64encode(b"hello,world\n").decode() + path = await _resolve_from_content(content, "../../etc/passwd") + try: + # No path traversal — file lands in our temp dir, not /etc + assert "/etc/" not in str(path) + assert "mcp_office_uploads" in str(path) + finally: + Path(path).unlink(missing_ok=True) + + +class TestB2_SizeLimits: + """B2: oversized base64 input must be rejected before/after decode.""" + + @pytest.mark.asyncio + async def test_oversized_input_rejected(self): + with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "1024"}): + # Create 2KB of data → 2.7KB base64 + payload = base64.b64encode(b"A" * 2048).decode() + with pytest.raises(OfficeFileError, match="too large"): + await _resolve_from_content(payload, "x.txt") + + def test_max_upload_bytes_env_parsing(self): + with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "12345"}): + assert _get_max_upload_bytes() == 12345 + + def test_max_upload_bytes_bad_value_falls_back(self): + with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "not-a-number"}): + # Should not raise; should log warning and use default + assert _get_max_upload_bytes() > 0 + + +class TestB1_TempFileCleanup: + """B1: temp files must be cleaned up after each tool invocation.""" + + @pytest.mark.asyncio + async def test_cleanup_scope_removes_uploads(self): + temp_root = Path(tempfile.gettempdir()) / "mcp_office_uploads" + content = base64.b64encode(_minimal_docx_bytes()).decode() + + async with upload_cleanup_scope(): + path = await _resolve_from_content(content, "report.docx") + assert Path(path).exists() + inside_path = path + + # After the scope exits, the file must be gone + assert not Path(inside_path).exists(), ( + f"Temp file leaked: {inside_path} still exists after scope exit" + ) + + @pytest.mark.asyncio + async def test_cleanup_scope_removes_uploads_on_exception(self): + content = base64.b64encode(_minimal_docx_bytes()).decode() + inside_path = None + + with pytest.raises(RuntimeError, match="simulated"): + async with upload_cleanup_scope(): + inside_path = await _resolve_from_content(content, "report.docx") + assert Path(inside_path).exists() + raise RuntimeError("simulated tool failure") + + assert inside_path is not None + assert not Path(inside_path).exists(), ( + "Temp file must be cleaned up even when tool raises" + ) + + @pytest.mark.asyncio + async def test_decorator_applies_cleanup_to_tool_method(self): + captured_path = {} + + @cleanup_temp_uploads + async def fake_tool(file_content): + path = await _resolve_from_content(file_content, "x.docx") + captured_path["p"] = path + return path + + content = base64.b64encode(_minimal_docx_bytes()).decode() + result = await fake_tool(content) + + assert captured_path["p"] == result + assert not Path(result).exists(), "Decorator failed to clean up" + + +class TestH3_UniqueTempPaths: + """H3: concurrent uploads with same content must not collide.""" + + @pytest.mark.asyncio + async def test_two_resolves_get_different_paths(self): + content = base64.b64encode(_minimal_docx_bytes()).decode() + p1 = await _resolve_from_content(content, "x.docx") + p2 = await _resolve_from_content(content, "x.docx") + try: + assert p1 != p2, "Identical content must still get unique temp paths" + finally: + Path(p1).unlink(missing_ok=True) + Path(p2).unlink(missing_ok=True) + + +class TestH7_ZipDisambiguation: + """H7: ZIP magic bytes must disambiguate docx/xlsx/pptx.""" + + def test_docx_detected_via_content_types(self): + ext = _detect_extension_from_bytes(_minimal_docx_bytes()) + assert ext == ".docx" + + def test_unknown_zip_returns_bin(self): + import io + import zipfile + + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("hello.txt", "not an office doc") + ext = _detect_extension_from_bytes(buf.getvalue()) + # No [Content_Types].xml → .bin + assert ext == ".bin" + + +class TestH8_CsvDetection: + """H8: CSV detection must require commas/tabs + valid UTF-8.""" + + def test_binary_garbage_not_csv(self): + # Printable first byte but binary tail with NUL — must NOT be classified as CSV + garbage = b"Aabcdef\x00binary\xff\xfedata" + ext = _detect_extension_from_bytes(garbage) + assert ext != ".csv" + + def test_actual_csv_detected(self): + csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n" + ext = _detect_extension_from_bytes(csv) + assert ext == ".csv" + + +class TestH2_ErrorPathScrubbing: + """H2: OfficeFileError must never leak server-side upload paths.""" + + def test_error_message_scrubs_temp_path(self): + leaked = "/tmp/mcp_office_uploads/upload_deadbeef.docx is corrupt" + e = OfficeFileError(leaked) + msg = str(e) + assert "/tmp/mcp_office_uploads/" not in msg + assert "" in msg + + def test_clean_error_messages_unchanged(self): + msg = "Unsupported URL scheme: ftp" + assert str(OfficeFileError(msg)) == msg + + +class TestLocalFileBoundary: + """Combined: the security boundary must hold across env config permutations.""" + + @pytest.mark.asyncio + async def test_local_file_blocked_in_http_mode(self): + with patch.dict( + os.environ, + {"MCP_TRANSPORT": "streamable-http", "MCP_ALLOW_LOCAL_FILES": ""}, + clear=False, + ): + os.environ.pop("MCP_ALLOW_LOCAL_FILES", None) + with pytest.raises(OfficeFileError, match="Local file access"): + await resolve_office_file_path("/etc/passwd") + + @pytest.mark.asyncio + async def test_local_file_allowed_in_stdio_mode(self, tmp_path): + test_file = tmp_path / "test.txt" + test_file.write_text("hello") + with patch.dict(os.environ, {"MCP_TRANSPORT": "stdio"}, clear=False): + os.environ.pop("MCP_ALLOW_LOCAL_FILES", None) + result = await resolve_office_file_path(str(test_file)) + assert result == str(test_file) diff --git a/uv.lock b/uv.lock index 8de8216..833e34f 100644 --- a/uv.lock +++ b/uv.lock @@ -1538,7 +1538,7 @@ wheels = [ [[package]] name = "mcwaddams" -version = "0.1.0" +version = "2026.5.22" source = { editable = "." } dependencies = [ { name = "aiofiles" },