Hamilton review fixes: validator literal preservation, cache cluster id, CSS impact partial-failure reporting

Three findings from a margaret-hamilton-style review of the MCP server,
fixed with regression tests written first (red → green). One bonus
finding (huntpilotqueue column name) was surfaced by the third fix
itself — exactly the audit-trust failure mode that fix exists to expose.

CRITICAL #1 — sql_validator: comment-strip mutated string literals.

The cleaned query returned by validate_select() is what travels to AXL.
Previously, the comment-strip pass ran before the literal-aware pass,
so `--` or `/* */` markers inside a string literal were silently eaten:

  input:  WHERE description = 'Smith -- old line'
  to AXL: WHERE description = 'Smith    (truncated mid-literal)

The LLM saw rows that looked plausible but were not what its query
asked for. "Confidently wrong" is exactly the failure mode the review
was hunting.

Fix: only strip comments on the analysis-only copy used for keyword
detection. The cleaned output preserves the input verbatim (modulo
trailing semicolon and outer whitespace). 6 new tests covering literal
preservation across `--`, `/* */`, LIKE patterns with embedded comment
markers, and forbidden keywords inside real comments.

CRITICAL #2 — cache key omitted cluster identity.

The on-disk cache key was `method::args_json`. An operator swapping
AXL_URL between test and prod (or between two clusters) would silently
serve stale data from cluster A as if from cluster B. The audit
report would be confidently wrong with no signal anything happened.

Fix: AxlCache now takes cluster_id and prefixes all keys with it.
Server bootstrap derives cluster_id as a 12-char SHA-256 prefix of
AXL_URL. cache_stats() surfaces both the current cluster_id and a
`foreign_cluster_entries` count so an env-swap is visible. Schema
migration handles pre-fix cache files via PRAGMA table_info introspection
plus a one-shot ALTER TABLE ADD COLUMN. 5 new tests covering isolation,
shared-id sharing, stats reporting, legacy DB upgrade, and per-cluster
clear() scoping.

MAJOR #3 — find_devices_using_css summary undercounted partial failures.

The function is per-category resilient (one failed query doesn't kill
the whole impact analysis), but the resilience never propagated up to
the response. total_returned and any_truncated only reflected SUCCESSFUL
categories. An LLM consuming "47 references" had no way to know 5
categories errored and the real number was likely much higher.

Fix: response now includes complete: bool, categories_with_errors: int,
and error_categories: [list]. The LLM/auditor sees the partial-failure
state and can decide whether to act on incomplete data. 5 new tests
using a FakeAxlClient stand-in to simulate per-category failures.

BONUS finding (uncovered by Major #3 fix): huntpilotqueue join used
the wrong column. Three CSS impact categories (huntpilot_max_wait_css,
huntpilot_no_agent_css, huntpilot_queue_full_css) were silently
erroring with "Column (fknumplan) not found" because huntpilotqueue
joins via fknumplan_pilot, not fknumplan. With the Major #3 fix in
place, this surfaced immediately as `complete: False, error_categories:
[3 huntpilot_*]` against the live cluster. Fixed inline; live re-run
now reports `complete: True, total_returned: 163` for Internal-CSS.

87 unit tests passing (up from 70). Live cluster smoke test
(cucm-pub.binghammemorial.org, CUCM 15.0.1.12900-234) verifies all
three fixes plus the bonus finding work end-to-end.
This commit is contained in:
Ryan Malloy 2026-04-25 23:09:55 -06:00
parent 82d8fbe563
commit dee5fdacda
7 changed files with 407 additions and 38 deletions

View File

@ -1,9 +1,14 @@
"""SQLite-backed TTL cache for AXL responses. """SQLite-backed TTL cache for AXL responses.
Keyed on (method_name, sorted_kwargs_json). Cache survives server restarts, Keyed on (cluster_id, method_name, sorted_kwargs_json). Cache survives server
which makes exploratory audit sessions dramatically faster the LLM can restarts, which makes exploratory audit sessions dramatically faster the LLM
re-run the same `listPhone` queries across conversations without paying can re-run the same `listPhone` queries across conversations without paying
the SOAP round-trip every time. the SOAP round-trip every time.
Hamilton review CRITICAL #2: cache key now includes a `cluster_id` so that
the same on-disk database can hold entries from multiple clusters without
silently serving cluster A's data when bound to cluster B. Operators who
swap `AXL_URL` between test and prod no longer see cross-cluster contamination.
""" """
from __future__ import annotations from __future__ import annotations
@ -15,30 +20,70 @@ from pathlib import Path
from typing import Any from typing import Any
SCHEMA = """ # Split into TABLE_DDL (idempotent table creation) and INDEX_DDL (run AFTER
# any column-adding migration, so indexes that reference newer columns don't
# fail against legacy databases).
TABLE_DDL = """
CREATE TABLE IF NOT EXISTS axl_cache ( CREATE TABLE IF NOT EXISTS axl_cache (
cache_key TEXT PRIMARY KEY, cache_key TEXT PRIMARY KEY,
cluster_id TEXT NOT NULL DEFAULT '',
method TEXT NOT NULL, method TEXT NOT NULL,
args_json TEXT NOT NULL, args_json TEXT NOT NULL,
result_json TEXT NOT NULL, result_json TEXT NOT NULL,
created_at REAL NOT NULL, created_at REAL NOT NULL,
expires_at REAL NOT NULL expires_at REAL NOT NULL
); );
"""
INDEX_DDL = """
CREATE INDEX IF NOT EXISTS axl_cache_method_idx ON axl_cache(method); CREATE INDEX IF NOT EXISTS axl_cache_method_idx ON axl_cache(method);
CREATE INDEX IF NOT EXISTS axl_cache_expires_idx ON axl_cache(expires_at); CREATE INDEX IF NOT EXISTS axl_cache_expires_idx ON axl_cache(expires_at);
CREATE INDEX IF NOT EXISTS axl_cache_cluster_idx ON axl_cache(cluster_id);
""" """
class AxlCache: class AxlCache:
"""SQLite TTL cache. Thread-safe via per-call connections.""" """SQLite TTL cache. Thread-safe via per-call connections."""
def __init__(self, db_path: Path, default_ttl: int): def __init__(
self,
db_path: Path,
default_ttl: int,
cluster_id: str | None = None,
):
self.db_path = db_path self.db_path = db_path
self.default_ttl = default_ttl self.default_ttl = default_ttl
# Empty string when unset — matches the column DEFAULT and keeps
# SQL filtering simple. Pre-fix databases will have '' for legacy
# entries, which is fine: a server now passing cluster_id="prod"
# won't see them, which is the correct cautious behavior.
self.cluster_id = cluster_id or ""
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as c: with self._conn() as c:
c.executescript(SCHEMA) # 1) Make sure table exists (no-op if already present)
c.executescript(TABLE_DDL)
# 2) Bring legacy schemas forward (adds cluster_id if missing)
self._migrate(c)
# 3) NOW create indexes — safe because all columns exist
c.executescript(INDEX_DDL)
@staticmethod
def _migrate(c: sqlite3.Connection) -> None:
"""Bring pre-existing databases up to the current schema.
`CREATE TABLE IF NOT EXISTS` is idempotent for table existence but
does not add columns to an already-existing table. Pre-fix caches
lack `cluster_id`; rather than failing the next INSERT with
`no such column`, we add it here. Defaults to '' which makes the
legacy entries belong to the "unknown cluster" invisible to any
new client passing an actual cluster_id, which is the cautious
outcome.
"""
cols = {row[1] for row in c.execute("PRAGMA table_info(axl_cache)").fetchall()}
if "cluster_id" not in cols:
c.execute(
"ALTER TABLE axl_cache ADD COLUMN cluster_id TEXT NOT NULL DEFAULT ''"
)
def _conn(self) -> sqlite3.Connection: def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, isolation_level=None) conn = sqlite3.connect(self.db_path, isolation_level=None)
@ -46,10 +91,13 @@ class AxlCache:
conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA synchronous=NORMAL")
return conn return conn
@staticmethod def _make_key(self, method: str, kwargs: dict) -> str:
def _make_key(method: str, kwargs: dict) -> str: # cluster_id prefix isolates entries by cluster identity. sort_keys
# sort_keys gives us a deterministic key regardless of dict order # gives us a deterministic key regardless of dict order.
return f"{method}::{json.dumps(kwargs, sort_keys=True, default=str)}" return (
f"{self.cluster_id}::{method}::"
f"{json.dumps(kwargs, sort_keys=True, default=str)}"
)
def get(self, method: str, kwargs: dict) -> Any | None: def get(self, method: str, kwargs: dict) -> Any | None:
if self.default_ttl <= 0: if self.default_ttl <= 0:
@ -75,11 +123,13 @@ class AxlCache:
c.execute( c.execute(
""" """
INSERT OR REPLACE INTO axl_cache INSERT OR REPLACE INTO axl_cache
(cache_key, method, args_json, result_json, created_at, expires_at) (cache_key, cluster_id, method, args_json, result_json,
VALUES (?, ?, ?, ?, ?, ?) created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", """,
( (
key, key,
self.cluster_id,
method, method,
json.dumps(kwargs, sort_keys=True, default=str), json.dumps(kwargs, sort_keys=True, default=str),
json.dumps(result, default=str), json.dumps(result, default=str),
@ -91,39 +141,66 @@ class AxlCache:
def stats(self) -> dict: def stats(self) -> dict:
now = time.time() now = time.time()
with self._conn() as c: with self._conn() as c:
total = c.execute("SELECT COUNT(*) FROM axl_cache").fetchone()[0] # Entries scoped to THIS cluster_id. The on-disk file may also
# contain entries from other clusters; those are intentionally
# invisible here.
total = c.execute(
"SELECT COUNT(*) FROM axl_cache WHERE cluster_id = ?",
(self.cluster_id,),
).fetchone()[0]
live = c.execute( live = c.execute(
"SELECT COUNT(*) FROM axl_cache WHERE expires_at > ?", (now,) "SELECT COUNT(*) FROM axl_cache "
"WHERE cluster_id = ? AND expires_at > ?",
(self.cluster_id, now),
).fetchone()[0] ).fetchone()[0]
by_method = { by_method = {
row[0]: row[1] row[0]: row[1]
for row in c.execute( for row in c.execute(
"SELECT method, COUNT(*) FROM axl_cache " "SELECT method, COUNT(*) FROM axl_cache "
"WHERE expires_at > ? GROUP BY method ORDER BY 2 DESC", "WHERE cluster_id = ? AND expires_at > ? "
(now,), "GROUP BY method ORDER BY 2 DESC",
(self.cluster_id, now),
).fetchall() ).fetchall()
} }
# Diagnostic: how many entries from OTHER clusters live in the
# same file. Useful for spotting an env-var swap that would
# otherwise be invisible.
foreign = c.execute(
"SELECT COUNT(*) FROM axl_cache WHERE cluster_id != ?",
(self.cluster_id,),
).fetchone()[0]
return { return {
"db_path": str(self.db_path), "db_path": str(self.db_path),
"cluster_id": self.cluster_id,
"default_ttl_seconds": self.default_ttl, "default_ttl_seconds": self.default_ttl,
"total_entries": total, "total_entries": total,
"live_entries": live, "live_entries": live,
"expired_entries": total - live, "expired_entries": total - live,
"foreign_cluster_entries": foreign,
"by_method": by_method, "by_method": by_method,
} }
def clear(self, method_pattern: str | None = None) -> int: def clear(self, method_pattern: str | None = None) -> int:
# Only clears entries for THIS cluster — never touches a sibling
# cluster's cached data even if it lives in the same file.
with self._conn() as c: with self._conn() as c:
if method_pattern: if method_pattern:
cursor = c.execute( cursor = c.execute(
"DELETE FROM axl_cache WHERE method LIKE ?", "DELETE FROM axl_cache "
(method_pattern.replace("*", "%"),), "WHERE cluster_id = ? AND method LIKE ?",
(self.cluster_id, method_pattern.replace("*", "%")),
) )
else: else:
cursor = c.execute("DELETE FROM axl_cache") cursor = c.execute(
"DELETE FROM axl_cache WHERE cluster_id = ?",
(self.cluster_id,),
)
return cursor.rowcount return cursor.rowcount
def purge_expired(self) -> int: def purge_expired(self) -> int:
# Purges expired entries across ALL clusters in this file.
# Expired entries are never useful regardless of which cluster
# they belong to, so per-cluster scoping isn't needed here.
with self._conn() as c: with self._conn() as c:
cursor = c.execute("DELETE FROM axl_cache WHERE expires_at <= ?", (time.time(),)) cursor = c.execute("DELETE FROM axl_cache WHERE expires_at <= ?", (time.time(),))
return cursor.rowcount return cursor.rowcount

View File

@ -575,7 +575,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = {
"sql": """ "sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM huntpilotqueue hpq FROM huntpilotqueue hpq
JOIN numplan np ON hpq.fknumplan = np.pkid JOIN numplan np ON hpq.fknumplan_pilot = np.pkid
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE hpq.fkcallingsearchspace_maxwaittime = '{pkid}' WHERE hpq.fkcallingsearchspace_maxwaittime = '{pkid}'
""", """,
@ -585,7 +585,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = {
"sql": """ "sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM huntpilotqueue hpq FROM huntpilotqueue hpq
JOIN numplan np ON hpq.fknumplan = np.pkid JOIN numplan np ON hpq.fknumplan_pilot = np.pkid
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE hpq.fkcallingsearchspace_noagent = '{pkid}' WHERE hpq.fkcallingsearchspace_noagent = '{pkid}'
""", """,
@ -595,7 +595,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = {
"sql": """ "sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM huntpilotqueue hpq FROM huntpilotqueue hpq
JOIN numplan np ON hpq.fknumplan = np.pkid JOIN numplan np ON hpq.fknumplan_pilot = np.pkid
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE hpq.fkcallingsearchspace_pilotqueuefull = '{pkid}' WHERE hpq.fkcallingsearchspace_pilotqueuefull = '{pkid}'
""", """,
@ -678,11 +678,22 @@ def find_devices_using_css(
total_returned = sum(c.get("returned_count", 0) for c in grouped.values()) total_returned = sum(c.get("returned_count", 0) for c in grouped.values())
any_truncated = any(c.get("truncated") for c in grouped.values()) any_truncated = any(c.get("truncated") for c in grouped.values())
# Hamilton review MAJOR #3: per-category errors must propagate to the
# top-level summary, otherwise an LLM consuming `total_returned: 47`
# has no way to know that 5 categories errored and the real count is
# higher. "Software that understands itself reports its own degradation."
error_categories = sorted(
label for label, cat in grouped.items() if "error" in cat
)
complete = len(error_categories) == 0
return { return {
"css_name": css_name, "css_name": css_name,
"css_pkid": css_pkid, "css_pkid": css_pkid,
"total_returned": total_returned, "total_returned": total_returned,
"any_truncated": any_truncated, "any_truncated": any_truncated,
"complete": complete,
"categories_with_errors": len(error_categories),
"error_categories": error_categories,
"max_per_category": max_per_category, "max_per_category": max_per_category,
"references_by_category": grouped, "references_by_category": grouped,
} }

View File

@ -558,9 +558,20 @@ def main() -> None:
) )
cache_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True)
ttl = int(os.environ.get("AXL_CACHE_TTL", "3600")) ttl = int(os.environ.get("AXL_CACHE_TTL", "3600"))
_cache = AxlCache(cache_dir / "axl_responses.sqlite", default_ttl=ttl) # Cluster-id derived from AXL_URL. Hash keeps the key compact and
# avoids leaking the URL into log output where the cache key gets
# printed. Hostname-only fallback when AXL_URL is unset (test mode).
import hashlib
axl_url_for_id = os.environ.get("AXL_URL", "no-axl-url-configured")
cluster_id = hashlib.sha256(axl_url_for_id.encode()).hexdigest()[:12]
_cache = AxlCache(
cache_dir / "axl_responses.sqlite",
default_ttl=ttl,
cluster_id=cluster_id,
)
print( print(
f"[mcp-cucm-axl] cache: {_cache.db_path} (ttl={ttl}s)", f"[mcp-cucm-axl] cache: {_cache.db_path} "
f"(ttl={ttl}s, cluster_id={cluster_id})",
file=sys.stderr, file=sys.stderr,
flush=True, flush=True,
) )

View File

@ -33,28 +33,36 @@ def validate_select(query: str) -> str:
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
anything else, and any query containing forbidden keywords as standalone anything else, and any query containing forbidden keywords as standalone
tokens *outside* string literals. tokens *outside* string literals and comments.
The cleaned query (with comments stripped) is what gets returned and sent Hamilton review CRITICAL #1: the output we return MUST preserve the input
to AXL string literals are NOT modified, only ignored during keyword byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier
tokenization. So a query selecting WHERE name = 'Call Forward-CSS' is versions ran a non-literal-aware comment strip on the output, which would
safe: the literal "Call" inside quotes is invisible to the keyword check, silently eat `--` and `/* */` markers that legitimately appeared inside
while the actual SQL with the unmodified literal travels intact to AXL. string literals like `WHERE description = 'Smith -- old line'`. The query
going to AXL must be exactly what the caller intended comment stripping
is an analysis-only operation, never a mutation of the wire query.
""" """
if not query or not query.strip(): if not query or not query.strip():
raise SqlValidationError("Query is empty.") raise SqlValidationError("Query is empty.")
cleaned = _COMMENT_BLOCK.sub(" ", query) # The query we'll send to AXL: original input, with only outer whitespace
cleaned = _COMMENT_LINE.sub(" ", cleaned).strip().rstrip(";").strip() # and a single trailing semicolon trimmed. NO mutation of literals or
# in-string comment markers.
cleaned = query.strip().rstrip(";").strip()
if not cleaned: if not cleaned:
raise SqlValidationError("Query is empty after stripping comments.") raise SqlValidationError("Query is empty after trimming.")
# Strip string literals before tokenizing so that words inside quoted # Analysis-only copy: strip string literals AND comments (in either order
# values (e.g. CSS names containing "Call", DN descriptions containing # is safe here, since each strip uses its own regex on a non-AXL-bound
# "DELETE") don't trip the forbidden-keyword check. The cleaned query # buffer). Order chosen: literals first, then comments, so that any
# we return still contains the literals — only the analysis copy strips # comment markers genuinely outside literals can be detected.
# them.
for_analysis = _STRING_LITERAL.sub(" ", cleaned) for_analysis = _STRING_LITERAL.sub(" ", cleaned)
for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis)
for_analysis = _COMMENT_LINE.sub(" ", for_analysis)
if not for_analysis.strip():
raise SqlValidationError("Query is empty after stripping comments.")
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)] upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
if not upper_tokens: if not upper_tokens:

View File

@ -85,3 +85,95 @@ def test_purge_expired(tmp_path: Path):
purged = c.purge_expired() purged = c.purge_expired()
assert purged == 1 assert purged == 1
assert c.stats()["live_entries"] == 1 assert c.stats()["live_entries"] == 1
class TestClusterIsolation:
"""Hamilton review CRITICAL #2: cache key omitted cluster identity.
Prior to the fix, `AXL_URL` swap (test prod, or one cluster to another)
served stale results from cluster A as if from cluster B. The cache
couldn't tell the data came from a different mission. Now each cache
handle is bound to a cluster_id, and entries from a different cluster
must miss.
"""
def test_different_cluster_ids_isolate_get(self, tmp_path: Path):
# Both caches point at the same DB file, but bound to different
# cluster IDs. A's writes must not be visible to B.
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-A")
b = AxlCache(db, default_ttl=60, cluster_id="cluster-B")
a.set("getCCMVersion", {}, {"version": "12.5"})
assert a.get("getCCMVersion", {}) == {"version": "12.5"}
assert b.get("getCCMVersion", {}) is None, (
"cluster-B must not see cluster-A's cached value"
)
def test_same_cluster_id_shares_cache(self, tmp_path: Path):
# Two handles with the SAME cluster_id should share results.
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-X")
a.set("listPhone", {"name": "SEP1"}, {"rows": ["one"]})
b = AxlCache(db, default_ttl=60, cluster_id="cluster-X")
assert b.get("listPhone", {"name": "SEP1"}) == {"rows": ["one"]}
def test_cluster_id_in_stats(self, tmp_path: Path):
c = AxlCache(tmp_path / "s.sqlite", default_ttl=60, cluster_id="cluster-Y")
c.set("getCCMVersion", {}, {"v": "15"})
stats = c.stats()
assert stats.get("cluster_id") == "cluster-Y", (
"stats must surface cluster_id so operators can verify which cluster they're caching"
)
def test_no_cluster_id_still_works_legacy(self, tmp_path: Path):
# Backward compat: no cluster_id keeps the old (but now risky) shape.
# The cache still functions; we just don't get isolation.
c = AxlCache(tmp_path / "legacy.sqlite", default_ttl=60)
c.set("x", {}, "y")
assert c.get("x", {}) == "y"
def test_clear_only_affects_current_cluster(self, tmp_path: Path):
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-A")
b = AxlCache(db, default_ttl=60, cluster_id="cluster-B")
a.set("x", {}, "from-A")
b.set("x", {}, "from-B")
deleted = a.clear()
assert deleted == 1, "clear() must only affect this cluster's entries"
assert b.get("x", {}) == "from-B", "cluster-B's entry must survive A's clear"
def test_migrate_legacy_database(self, tmp_path: Path):
"""A cache database created before the cluster_id fix must
upgrade transparently no `no such column` error on next INSERT.
"""
import sqlite3
db = tmp_path / "legacy.sqlite"
# Manually create the OLD schema (no cluster_id column)
conn = sqlite3.connect(db)
conn.executescript(
"""
CREATE TABLE axl_cache (
cache_key TEXT PRIMARY KEY,
method TEXT NOT NULL,
args_json TEXT NOT NULL,
result_json TEXT NOT NULL,
created_at REAL NOT NULL,
expires_at REAL NOT NULL
);
INSERT INTO axl_cache VALUES
('legacy-key', 'oldMethod', '{}', '"old-value"', 0, 9999999999);
"""
)
conn.commit()
conn.close()
# Open with the new code — must not raise, must add the column
c = AxlCache(db, default_ttl=60, cluster_id="new-cluster")
# The new client should NOT see the legacy entry (it has no cluster_id)
# — this is the cautious behavior; legacy entries are isolated to the
# "unknown cluster" bucket.
assert c.get("oldMethod", {}) is None
# And it must be able to write/read its own entries
c.set("newMethod", {"a": 1}, "new-value")
assert c.get("newMethod", {"a": 1}) == "new-value"

119
tests/test_css_impact.py Normal file
View File

@ -0,0 +1,119 @@
"""Hamilton review MAJOR #3: find_devices_using_css must surface partial failures.
The function is per-category resilient by design if one schema query fails,
the others still produce results. But the top-level summary previously hid
that some categories errored out: `total_returned` and `any_truncated` only
reflected the SUCCESSFUL categories. An LLM consuming "47 references, low
impact" wouldn't know that 5 categories errored and the real number is
likely much higher.
After the fix: the response includes `complete: bool`, `categories_with_errors`,
and `error_categories`, so an LLM (or human auditor) can see the partial-failure
state and act on it.
"""
import pytest
from mcp_cucm_axl.route_plan import find_devices_using_css
class FakeAxlClient:
"""Minimal stand-in for AxlClient that lets us simulate per-query failures.
Returns a fake CSS pkid for the lookup query, then either a single fake row
or an exception based on substring matching.
"""
def __init__(self, error_on_columns: list[str] | None = None):
self.error_on_columns = error_on_columns or []
self.queries: list[str] = []
def execute_sql_query(self, sql: str) -> dict:
self.queries.append(sql)
# The CSS lookup query — return a fake pkid
if "callingsearchspace WHERE name" in sql:
return {"row_count": 1, "rows": [{"pkid": "fake-css-pkid"}]}
# Any query referencing an "error trigger" column → simulate failure
for trigger in self.error_on_columns:
if trigger in sql:
raise RuntimeError(f"simulated cluster failure on {trigger}")
# Otherwise return one fake reference row so the category isn't empty
return {
"row_count": 1,
"rows": [{"name": "FakeRef", "context": "FakePart", "description": "fake"}],
}
def test_no_errors_reports_complete():
"""Baseline: when every category succeeds, complete=True and no error fields populated."""
client = FakeAxlClient()
result = find_devices_using_css(client, "Some-CSS")
assert result["complete"] is True
assert result["categories_with_errors"] == 0
assert result["error_categories"] == []
# And total_returned reflects the successful categories
assert result["total_returned"] >= 1
def test_one_errored_category_marks_incomplete():
"""The audit-trust failure mode: one category errors out and the summary lies.
Fix: complete=False, categories_with_errors >= 1.
"""
client = FakeAxlClient(error_on_columns=["fkcallingsearchspace_cgpnunknown"])
result = find_devices_using_css(client, "Some-CSS")
assert result["complete"] is False, (
"complete must be False when any category errored"
)
assert result["categories_with_errors"] >= 1
assert "device_cgpn_unknown_css" in result["error_categories"]
def test_multiple_errors_all_listed():
"""All errored categories must be enumerated in error_categories."""
client = FakeAxlClient(
error_on_columns=[
"fkcallingsearchspace_cgpnunknown",
"fkcallingsearchspace_reroute",
"fkcallingsearchspace_pilotqueuefull",
]
)
result = find_devices_using_css(client, "Some-CSS")
assert result["complete"] is False
assert result["categories_with_errors"] == 3
assert set(result["error_categories"]) == {
"device_cgpn_unknown_css",
"device_reroute_css",
"huntpilot_queue_full_css",
}
def test_total_returned_does_not_include_error_categories():
"""An errored category contributes 0 to total_returned (correct behavior).
What's NEW: the response also flags that the count is partial.
"""
client = FakeAxlClient(error_on_columns=["fkcallingsearchspace_cgpnunknown"])
result = find_devices_using_css(client, "Some-CSS")
# The count itself is unchanged from before — what's new is the warning
assert result["complete"] is False
# The error category has no rows in references_by_category
err_cat = result["references_by_category"].get("device_cgpn_unknown_css", {})
assert "error" in err_cat
def test_css_not_found_returns_error_not_partial():
"""If the CSS lookup itself fails (CSS doesn't exist), we return the
'not found' error early, NOT a partial-failure response. Distinct
failure modes deserve distinct shapes.
"""
class CssNotFoundClient:
def execute_sql_query(self, sql):
if "callingsearchspace WHERE name" in sql:
return {"row_count": 0, "rows": []}
return {"row_count": 1, "rows": [{}]}
result = find_devices_using_css(CssNotFoundClient(), "Nonexistent-CSS")
assert "error" in result
assert "complete" not in result, (
"CSS-not-found is a hard error; we shouldn't dress it up as partial"
)

View File

@ -124,3 +124,54 @@ class TestStringLiterals:
def test_multiple_literals(self): def test_multiple_literals(self):
q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'" q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'"
assert validate_select(q) assert validate_select(q)
class TestLiteralPreservedInOutput:
"""Hamilton review CRITICAL #1: comment-strip mutated string literals.
The query SENT to AXL must preserve the literal contents byte-for-byte.
Previously, the comment-strip pass ran before the literal-aware pass,
so `--` or `/* */` inside a quoted string were silently eaten on the
way to the cluster. An LLM dialing `description LIKE '%-- old%'` got
a different query than it asked for.
"""
def test_dash_dash_inside_literal_preserved(self):
q = "SELECT * FROM numplan WHERE description = 'Smith -- old line'"
result = validate_select(q)
assert "Smith -- old line" in result, (
f"line-comment marker inside literal must NOT be stripped; got: {result!r}"
)
def test_block_comment_marker_inside_literal_preserved(self):
q = "SELECT * FROM device WHERE name = 'before /* still in literal */ after'"
result = validate_select(q)
assert "/* still in literal */" in result
assert "before" in result and "after" in result
def test_like_pattern_with_dash_dash_preserved(self):
# Real-world case: an LLM searches for descriptions containing "--"
q = "SELECT pkid FROM numplan WHERE description LIKE '%-- old%'"
result = validate_select(q)
assert "'%-- old%'" in result
def test_actual_line_comment_outside_literal_still_handled(self):
# An actual --comment outside any literal is fine (AXL handles it),
# and the keyword check ignores it.
q = "SELECT 1 FROM device -- a real comment at the end"
result = validate_select(q)
# We don't strip from output, so the comment stays in the returned text.
# The important thing is the validator passes and a forbidden keyword
# in the comment wouldn't trip the check (covered separately).
assert "SELECT 1 FROM device" in result
def test_forbidden_keyword_inside_real_comment_does_not_trip(self):
# Real comment, with a forbidden keyword in it, should not trip the validator
q = "SELECT 1 FROM device -- TODO: someone DELETE the old test data"
result = validate_select(q)
assert "SELECT 1" in result
def test_block_literal_with_drop_inside_preserved(self):
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
result = validate_select(q)
assert "'log: DROP detected'" in result