mcwaddams/tests/test_security_hardening.py
Ryan Malloy c9de63cf29
Some checks failed
Test Dashboard / test-and-dashboard (push) Has been cancelled
Security hardening + CalVer 2026.05.22 for first PyPI publish
Margaret Hamilton pre-publish review found 5 blockers + 9 flags. All
correctness/security issues fixed; H6 (connection pooling perf) deferred.

caching.py — comprehensive hardening:
- B3: base64.b64decode now uses validate=True (no silent mangling)
- B4: MCP_ALLOW_LOCAL_FILES evaluated per request, not at import
- B5: extension allowlist + 0o700 temp dir + 0o600 files + O_EXCL writes
- B2+H5: MCP_MAX_UPLOAD_BYTES / MCP_MAX_DOWNLOAD_BYTES caps (50MB default),
  enforced pre-decode and during chunked downloads
- H1: env var parsing strip()+lower(), truthy set {true,1,yes,on}
- H3: UUID-based unique temp paths replace SHA-prefix collision risk
- H7: ZIP magic bytes disambiguated via [Content_Types].xml peek
- H8: stronger CSV heuristic (commas/tabs + UTF-8 + no NULs)
- H9: specific exceptions in cache I/O with logged warnings
- New: upload_cleanup_scope() context manager + ContextVar tracker

decorators.py:
- cleanup_temp_uploads decorator wraps tool methods, auto-cleans temp
  upload files on return OR exception (B1+H4)

validation.py:
- OfficeFileError.__init__ scrubs /tmp/mcp_office_uploads/ paths from
  messages so server paths never leak to HTTP callers (H2)

mixins/{universal,word,excel}.py:
- @cleanup_temp_uploads applied to all 19 tool methods that resolve files

tests/test_security_hardening.py:
- 24 new tests, one per Hamilton finding, prove fixes work and catch
  regressions. Including end-to-end: temp file created → exists during
  scope → gone after scope exit (success AND exception paths)

pyproject.toml:
- version 0.1.0 → 2026.05.22 (CalVer per CLAUDE.md convention)
- URLs updated GitHub → git.supported.systems/MCP/mcwaddams
- Belt-and-suspenders sdist exclude list (defends against future
  include-list edits accidentally shipping CLAUDE.md, .env, etc.)
2026-05-22 14:49:00 -06:00

286 lines
10 KiB
Python

"""Verification tests for Margaret Hamilton review blockers.
Each test maps directly to a finding from the pre-publish review. These tests
exist to prove the fixes work as advertised and to prevent regressions.
"""
import base64
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from mcwaddams.utils.caching import (
_detect_extension_from_bytes,
_env_truthy,
_get_allow_local_files,
_get_max_upload_bytes,
_resolve_from_content,
resolve_office_file_path,
upload_cleanup_scope,
)
from mcwaddams.utils.decorators import cleanup_temp_uploads
from mcwaddams.utils.validation import OfficeFileError
def _minimal_docx_bytes() -> bytes:
"""Smallest plausible ZIP that looks like a docx for magic-byte tests."""
import io
import zipfile
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(
"[Content_Types].xml",
'<?xml version="1.0"?><Types xmlns="http://schemas.openxmlformats.org/'
'package/2006/content-types"><Override PartName="/word/document.xml" '
'ContentType="application/vnd.openxmlformats-officedocument.'
'wordprocessingml.document.main+xml"/></Types>',
)
return buf.getvalue()
class TestB3_Base64Validation:
"""B3: b64decode must reject garbage instead of silently mangling."""
@pytest.mark.asyncio
async def test_garbage_input_rejected(self):
with pytest.raises(OfficeFileError, match="Invalid base64"):
await _resolve_from_content("<html>not base64 at all</html>", "x.docx")
@pytest.mark.asyncio
async def test_whitespace_in_b64_rejected(self):
valid_b64 = base64.b64encode(b"hello").decode()
# Inject a non-b64 char in the middle
corrupted = valid_b64[:4] + "!!" + valid_b64[4:]
with pytest.raises(OfficeFileError, match="Invalid base64"):
await _resolve_from_content(corrupted, "x.txt")
class TestB4_EnvVarAtRequestTime:
"""B4: env var must be read at request time, not import time."""
@pytest.mark.asyncio
async def test_env_change_after_import_takes_effect(self):
with patch.dict(os.environ, {"MCP_TRANSPORT": "stdio"}, clear=False):
os.environ.pop("MCP_ALLOW_LOCAL_FILES", None)
assert _get_allow_local_files() is True
with patch.dict(os.environ, {"MCP_TRANSPORT": "streamable-http"}, clear=False):
os.environ.pop("MCP_ALLOW_LOCAL_FILES", None)
assert _get_allow_local_files() is False
class TestH1_EnvVarHygiene:
"""H1: env var parsing must tolerate whitespace, accept truthy set."""
def test_truthy_values(self):
for v in ("true", "True", "TRUE", "1", "yes", "on", " true ", "YES\n"):
assert _env_truthy(v) is True, f"Expected {v!r} → True"
def test_falsy_values(self):
for v in ("false", "0", "no", "off", "", None, "maybe", "2"):
assert _env_truthy(v) is False, f"Expected {v!r} → False"
@pytest.mark.asyncio
async def test_transport_with_trailing_space_defaults_correctly(self):
with patch.dict(os.environ, {"MCP_TRANSPORT": " stdio "}, clear=False):
os.environ.pop("MCP_ALLOW_LOCAL_FILES", None)
assert _get_allow_local_files() is True
class TestB5_ExtensionAllowlist:
"""B5: only known Office extensions allowed; unknown coerced to .bin."""
@pytest.mark.asyncio
async def test_executable_extension_blocked(self):
content = base64.b64encode(b"\x7fELF\x02\x01\x01\x00").decode()
path = await _resolve_from_content(content, "evil.sh")
try:
assert not str(path).endswith(".sh")
assert str(path).endswith(".bin") # falls through to magic-bytes default
finally:
Path(path).unlink(missing_ok=True)
@pytest.mark.asyncio
async def test_known_extension_preserved(self):
content = base64.b64encode(_minimal_docx_bytes()).decode()
path = await _resolve_from_content(content, "report.docx")
try:
assert str(path).endswith(".docx")
finally:
Path(path).unlink(missing_ok=True)
@pytest.mark.asyncio
async def test_path_traversal_in_filename_ignored(self):
content = base64.b64encode(b"hello,world\n").decode()
path = await _resolve_from_content(content, "../../etc/passwd")
try:
# No path traversal — file lands in our temp dir, not /etc
assert "/etc/" not in str(path)
assert "mcp_office_uploads" in str(path)
finally:
Path(path).unlink(missing_ok=True)
class TestB2_SizeLimits:
"""B2: oversized base64 input must be rejected before/after decode."""
@pytest.mark.asyncio
async def test_oversized_input_rejected(self):
with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "1024"}):
# Create 2KB of data → 2.7KB base64
payload = base64.b64encode(b"A" * 2048).decode()
with pytest.raises(OfficeFileError, match="too large"):
await _resolve_from_content(payload, "x.txt")
def test_max_upload_bytes_env_parsing(self):
with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "12345"}):
assert _get_max_upload_bytes() == 12345
def test_max_upload_bytes_bad_value_falls_back(self):
with patch.dict(os.environ, {"MCP_MAX_UPLOAD_BYTES": "not-a-number"}):
# Should not raise; should log warning and use default
assert _get_max_upload_bytes() > 0
class TestB1_TempFileCleanup:
"""B1: temp files must be cleaned up after each tool invocation."""
@pytest.mark.asyncio
async def test_cleanup_scope_removes_uploads(self):
temp_root = Path(tempfile.gettempdir()) / "mcp_office_uploads"
content = base64.b64encode(_minimal_docx_bytes()).decode()
async with upload_cleanup_scope():
path = await _resolve_from_content(content, "report.docx")
assert Path(path).exists()
inside_path = path
# After the scope exits, the file must be gone
assert not Path(inside_path).exists(), (
f"Temp file leaked: {inside_path} still exists after scope exit"
)
@pytest.mark.asyncio
async def test_cleanup_scope_removes_uploads_on_exception(self):
content = base64.b64encode(_minimal_docx_bytes()).decode()
inside_path = None
with pytest.raises(RuntimeError, match="simulated"):
async with upload_cleanup_scope():
inside_path = await _resolve_from_content(content, "report.docx")
assert Path(inside_path).exists()
raise RuntimeError("simulated tool failure")
assert inside_path is not None
assert not Path(inside_path).exists(), (
"Temp file must be cleaned up even when tool raises"
)
@pytest.mark.asyncio
async def test_decorator_applies_cleanup_to_tool_method(self):
captured_path = {}
@cleanup_temp_uploads
async def fake_tool(file_content):
path = await _resolve_from_content(file_content, "x.docx")
captured_path["p"] = path
return path
content = base64.b64encode(_minimal_docx_bytes()).decode()
result = await fake_tool(content)
assert captured_path["p"] == result
assert not Path(result).exists(), "Decorator failed to clean up"
class TestH3_UniqueTempPaths:
"""H3: concurrent uploads with same content must not collide."""
@pytest.mark.asyncio
async def test_two_resolves_get_different_paths(self):
content = base64.b64encode(_minimal_docx_bytes()).decode()
p1 = await _resolve_from_content(content, "x.docx")
p2 = await _resolve_from_content(content, "x.docx")
try:
assert p1 != p2, "Identical content must still get unique temp paths"
finally:
Path(p1).unlink(missing_ok=True)
Path(p2).unlink(missing_ok=True)
class TestH7_ZipDisambiguation:
"""H7: ZIP magic bytes must disambiguate docx/xlsx/pptx."""
def test_docx_detected_via_content_types(self):
ext = _detect_extension_from_bytes(_minimal_docx_bytes())
assert ext == ".docx"
def test_unknown_zip_returns_bin(self):
import io
import zipfile
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("hello.txt", "not an office doc")
ext = _detect_extension_from_bytes(buf.getvalue())
# No [Content_Types].xml → .bin
assert ext == ".bin"
class TestH8_CsvDetection:
"""H8: CSV detection must require commas/tabs + valid UTF-8."""
def test_binary_garbage_not_csv(self):
# Printable first byte but binary tail with NUL — must NOT be classified as CSV
garbage = b"Aabcdef\x00binary\xff\xfedata"
ext = _detect_extension_from_bytes(garbage)
assert ext != ".csv"
def test_actual_csv_detected(self):
csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n"
ext = _detect_extension_from_bytes(csv)
assert ext == ".csv"
class TestH2_ErrorPathScrubbing:
"""H2: OfficeFileError must never leak server-side upload paths."""
def test_error_message_scrubs_temp_path(self):
leaked = "/tmp/mcp_office_uploads/upload_deadbeef.docx is corrupt"
e = OfficeFileError(leaked)
msg = str(e)
assert "/tmp/mcp_office_uploads/" not in msg
assert "<uploaded file>" in msg
def test_clean_error_messages_unchanged(self):
msg = "Unsupported URL scheme: ftp"
assert str(OfficeFileError(msg)) == msg
class TestLocalFileBoundary:
"""Combined: the security boundary must hold across env config permutations."""
@pytest.mark.asyncio
async def test_local_file_blocked_in_http_mode(self):
with patch.dict(
os.environ,
{"MCP_TRANSPORT": "streamable-http", "MCP_ALLOW_LOCAL_FILES": ""},
clear=False,
):
os.environ.pop("MCP_ALLOW_LOCAL_FILES", None)
with pytest.raises(OfficeFileError, match="Local file access"):
await resolve_office_file_path("/etc/passwd")
@pytest.mark.asyncio
async def test_local_file_allowed_in_stdio_mode(self, tmp_path):
test_file = tmp_path / "test.txt"
test_file.write_text("hello")
with patch.dict(os.environ, {"MCP_TRANSPORT": "stdio"}, clear=False):
os.environ.pop("MCP_ALLOW_LOCAL_FILES", None)
result = await resolve_office_file_path(str(test_file))
assert result == str(test_file)