Three findings from a margaret-hamilton-style review of the MCP server, fixed with regression tests written first (red → green). One bonus finding (huntpilotqueue column name) was surfaced by the third fix itself — exactly the audit-trust failure mode that fix exists to expose. CRITICAL #1 — sql_validator: comment-strip mutated string literals. The cleaned query returned by validate_select() is what travels to AXL. Previously, the comment-strip pass ran before the literal-aware pass, so `--` or `/* */` markers inside a string literal were silently eaten: input: WHERE description = 'Smith -- old line' to AXL: WHERE description = 'Smith (truncated mid-literal) The LLM saw rows that looked plausible but were not what its query asked for. "Confidently wrong" is exactly the failure mode the review was hunting. Fix: only strip comments on the analysis-only copy used for keyword detection. The cleaned output preserves the input verbatim (modulo trailing semicolon and outer whitespace). 6 new tests covering literal preservation across `--`, `/* */`, LIKE patterns with embedded comment markers, and forbidden keywords inside real comments. CRITICAL #2 — cache key omitted cluster identity. The on-disk cache key was `method::args_json`. An operator swapping AXL_URL between test and prod (or between two clusters) would silently serve stale data from cluster A as if from cluster B. The audit report would be confidently wrong with no signal anything happened. Fix: AxlCache now takes cluster_id and prefixes all keys with it. Server bootstrap derives cluster_id as a 12-char SHA-256 prefix of AXL_URL. cache_stats() surfaces both the current cluster_id and a `foreign_cluster_entries` count so an env-swap is visible. Schema migration handles pre-fix cache files via PRAGMA table_info introspection plus a one-shot ALTER TABLE ADD COLUMN. 5 new tests covering isolation, shared-id sharing, stats reporting, legacy DB upgrade, and per-cluster clear() scoping. MAJOR #3 — find_devices_using_css summary undercounted partial failures. The function is per-category resilient (one failed query doesn't kill the whole impact analysis), but the resilience never propagated up to the response. total_returned and any_truncated only reflected SUCCESSFUL categories. An LLM consuming "47 references" had no way to know 5 categories errored and the real number was likely much higher. Fix: response now includes complete: bool, categories_with_errors: int, and error_categories: [list]. The LLM/auditor sees the partial-failure state and can decide whether to act on incomplete data. 5 new tests using a FakeAxlClient stand-in to simulate per-category failures. BONUS finding (uncovered by Major #3 fix): huntpilotqueue join used the wrong column. Three CSS impact categories (huntpilot_max_wait_css, huntpilot_no_agent_css, huntpilot_queue_full_css) were silently erroring with "Column (fknumplan) not found" because huntpilotqueue joins via fknumplan_pilot, not fknumplan. With the Major #3 fix in place, this surfaced immediately as `complete: False, error_categories: [3 huntpilot_*]` against the live cluster. Fixed inline; live re-run now reports `complete: True, total_returned: 163` for Internal-CSS. 87 unit tests passing (up from 70). Live cluster smoke test (cucm-pub.binghammemorial.org, CUCM 15.0.1.12900-234) verifies all three fixes plus the bonus finding work end-to-end.
178 lines
7.4 KiB
Python
178 lines
7.4 KiB
Python
"""Tests for the SELECT-only SQL guardrail."""
|
|
|
|
import pytest
|
|
|
|
from mcp_cucm_axl.sql_validator import validate_select, SqlValidationError
|
|
|
|
|
|
class TestSelectAccepted:
|
|
def test_simple_select(self):
|
|
assert validate_select("SELECT * FROM device") == "SELECT * FROM device"
|
|
|
|
def test_with_cte(self):
|
|
q = "WITH x AS (SELECT 1 FROM systables) SELECT * FROM x"
|
|
assert validate_select(q) == q
|
|
|
|
def test_lowercase_select(self):
|
|
assert validate_select("select * from numplan") == "select * from numplan"
|
|
|
|
def test_trailing_semicolon_stripped(self):
|
|
assert validate_select("SELECT 1 FROM device;") == "SELECT 1 FROM device"
|
|
|
|
def test_block_comments_stripped(self):
|
|
q = "/* comment */ SELECT 1 FROM device"
|
|
cleaned = validate_select(q)
|
|
assert "SELECT 1 FROM device" in cleaned
|
|
|
|
def test_line_comments_stripped(self):
|
|
q = "-- a comment\nSELECT 1 FROM device"
|
|
cleaned = validate_select(q)
|
|
assert "SELECT 1 FROM device" in cleaned
|
|
|
|
|
|
class TestRejected:
|
|
def test_empty(self):
|
|
with pytest.raises(SqlValidationError, match="empty"):
|
|
validate_select("")
|
|
|
|
def test_whitespace_only(self):
|
|
with pytest.raises(SqlValidationError, match="empty"):
|
|
validate_select(" \n ")
|
|
|
|
def test_only_comments(self):
|
|
with pytest.raises(SqlValidationError, match="empty"):
|
|
validate_select("-- just a comment\n/* and another */")
|
|
|
|
def test_insert_rejected(self):
|
|
with pytest.raises(SqlValidationError, match="INSERT"):
|
|
validate_select("INSERT INTO device VALUES (1)")
|
|
|
|
def test_update_rejected(self):
|
|
with pytest.raises(SqlValidationError, match="UPDATE"):
|
|
validate_select("UPDATE device SET name='x' WHERE pkid='y'")
|
|
|
|
def test_delete_rejected(self):
|
|
with pytest.raises(SqlValidationError, match="DELETE"):
|
|
validate_select("DELETE FROM device WHERE pkid='y'")
|
|
|
|
def test_drop_rejected(self):
|
|
with pytest.raises(SqlValidationError, match="DROP"):
|
|
validate_select("DROP TABLE device")
|
|
|
|
def test_select_with_embedded_drop_rejected(self):
|
|
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish
|
|
# position our keyword filter still catches it. AXL would also reject
|
|
# this, but failing fast on the client saves a SOAP round-trip.
|
|
with pytest.raises(SqlValidationError, match="DROP"):
|
|
validate_select("SELECT 1 FROM device; DROP TABLE device")
|
|
|
|
def test_truncate_rejected(self):
|
|
with pytest.raises(SqlValidationError, match="TRUNCATE"):
|
|
validate_select("TRUNCATE TABLE device")
|
|
|
|
|
|
class TestEdgeCases:
|
|
def test_keyword_as_column_name_blocked(self):
|
|
# A column named "delete" would be blocked. This is acceptable —
|
|
# the data dictionary doesn't use SQL keywords as column names,
|
|
# and conservative blocking is the right call for v1.
|
|
with pytest.raises(SqlValidationError):
|
|
validate_select("SELECT delete FROM device")
|
|
|
|
def test_select_with_subquery(self):
|
|
q = "SELECT name FROM device WHERE pkid IN (SELECT fkdevice FROM numplan)"
|
|
assert "SELECT name FROM device" in validate_select(q)
|
|
|
|
|
|
class TestStringLiterals:
|
|
"""Forbidden keywords inside string literals must be ignored.
|
|
|
|
Otherwise CSS names like 'Call Forward-CSS', DN descriptions containing
|
|
'DELETE' (e.g., 'Delete this voicemail line'), or partition names with
|
|
'INSERT' would all fail to query, even though the SQL itself is read-only.
|
|
"""
|
|
|
|
def test_call_inside_string_literal_passes(self):
|
|
q = "SELECT pkid FROM callingsearchspace WHERE name = 'Call Forward-CSS'"
|
|
result = validate_select(q)
|
|
assert "Call Forward-CSS" in result # literal preserved
|
|
|
|
def test_delete_inside_string_literal_passes(self):
|
|
q = "SELECT pkid FROM numplan WHERE description = 'Delete after audit'"
|
|
result = validate_select(q)
|
|
assert "Delete after audit" in result
|
|
|
|
def test_drop_inside_string_literal_passes(self):
|
|
q = "SELECT pkid FROM numplan WHERE description = 'DROP table backup'"
|
|
assert validate_select(q)
|
|
|
|
def test_actual_drop_outside_literal_still_blocked(self):
|
|
with pytest.raises(SqlValidationError, match="DROP"):
|
|
validate_select("SELECT 1 FROM device; DROP TABLE backup")
|
|
|
|
def test_escaped_quote_in_literal(self):
|
|
# Informix uses '' (doubled) as escaped single quote within literals
|
|
q = "SELECT pkid FROM numplan WHERE description = 'O''Brien''s line'"
|
|
result = validate_select(q)
|
|
assert "O''Brien''s line" in result
|
|
|
|
def test_keyword_just_outside_literal_blocked(self):
|
|
# The literal 'safe text' is fine; the trailing DROP is not.
|
|
with pytest.raises(SqlValidationError, match="DROP"):
|
|
validate_select("SELECT 1 FROM device WHERE x = 'safe text' OR DROP")
|
|
|
|
def test_multiple_literals(self):
|
|
q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'"
|
|
assert validate_select(q)
|
|
|
|
|
|
class TestLiteralPreservedInOutput:
|
|
"""Hamilton review CRITICAL #1: comment-strip mutated string literals.
|
|
|
|
The query SENT to AXL must preserve the literal contents byte-for-byte.
|
|
Previously, the comment-strip pass ran before the literal-aware pass,
|
|
so `--` or `/* */` inside a quoted string were silently eaten on the
|
|
way to the cluster. An LLM dialing `description LIKE '%-- old%'` got
|
|
a different query than it asked for.
|
|
"""
|
|
|
|
def test_dash_dash_inside_literal_preserved(self):
|
|
q = "SELECT * FROM numplan WHERE description = 'Smith -- old line'"
|
|
result = validate_select(q)
|
|
assert "Smith -- old line" in result, (
|
|
f"line-comment marker inside literal must NOT be stripped; got: {result!r}"
|
|
)
|
|
|
|
def test_block_comment_marker_inside_literal_preserved(self):
|
|
q = "SELECT * FROM device WHERE name = 'before /* still in literal */ after'"
|
|
result = validate_select(q)
|
|
assert "/* still in literal */" in result
|
|
assert "before" in result and "after" in result
|
|
|
|
def test_like_pattern_with_dash_dash_preserved(self):
|
|
# Real-world case: an LLM searches for descriptions containing "--"
|
|
q = "SELECT pkid FROM numplan WHERE description LIKE '%-- old%'"
|
|
result = validate_select(q)
|
|
assert "'%-- old%'" in result
|
|
|
|
def test_actual_line_comment_outside_literal_still_handled(self):
|
|
# An actual --comment outside any literal is fine (AXL handles it),
|
|
# and the keyword check ignores it.
|
|
q = "SELECT 1 FROM device -- a real comment at the end"
|
|
result = validate_select(q)
|
|
# We don't strip from output, so the comment stays in the returned text.
|
|
# The important thing is the validator passes and a forbidden keyword
|
|
# in the comment wouldn't trip the check (covered separately).
|
|
assert "SELECT 1 FROM device" in result
|
|
|
|
def test_forbidden_keyword_inside_real_comment_does_not_trip(self):
|
|
# Real comment, with a forbidden keyword in it, should not trip the validator
|
|
q = "SELECT 1 FROM device -- TODO: someone DELETE the old test data"
|
|
result = validate_select(q)
|
|
assert "SELECT 1" in result
|
|
|
|
def test_block_literal_with_drop_inside_preserved(self):
|
|
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
|
|
result = validate_select(q)
|
|
assert "'log: DROP detected'" in result
|