diff --git a/src/mcp_cucm_axl/cache.py b/src/mcp_cucm_axl/cache.py index c79dfce..78c1353 100644 --- a/src/mcp_cucm_axl/cache.py +++ b/src/mcp_cucm_axl/cache.py @@ -1,9 +1,14 @@ """SQLite-backed TTL cache for AXL responses. -Keyed on (method_name, sorted_kwargs_json). Cache survives server restarts, -which makes exploratory audit sessions dramatically faster — the LLM can -re-run the same `listPhone` queries across conversations without paying +Keyed on (cluster_id, method_name, sorted_kwargs_json). Cache survives server +restarts, which makes exploratory audit sessions dramatically faster — the LLM +can re-run the same `listPhone` queries across conversations without paying the SOAP round-trip every time. + +Hamilton review CRITICAL #2: cache key now includes a `cluster_id` so that +the same on-disk database can hold entries from multiple clusters without +silently serving cluster A's data when bound to cluster B. Operators who +swap `AXL_URL` between test and prod no longer see cross-cluster contamination. """ from __future__ import annotations @@ -15,30 +20,70 @@ from pathlib import Path from typing import Any -SCHEMA = """ +# Split into TABLE_DDL (idempotent table creation) and INDEX_DDL (run AFTER +# any column-adding migration, so indexes that reference newer columns don't +# fail against legacy databases). +TABLE_DDL = """ CREATE TABLE IF NOT EXISTS axl_cache ( cache_key TEXT PRIMARY KEY, + cluster_id TEXT NOT NULL DEFAULT '', method TEXT NOT NULL, args_json TEXT NOT NULL, result_json TEXT NOT NULL, created_at REAL NOT NULL, expires_at REAL NOT NULL ); +""" +INDEX_DDL = """ CREATE INDEX IF NOT EXISTS axl_cache_method_idx ON axl_cache(method); CREATE INDEX IF NOT EXISTS axl_cache_expires_idx ON axl_cache(expires_at); +CREATE INDEX IF NOT EXISTS axl_cache_cluster_idx ON axl_cache(cluster_id); """ class AxlCache: """SQLite TTL cache. Thread-safe via per-call connections.""" - def __init__(self, db_path: Path, default_ttl: int): + def __init__( + self, + db_path: Path, + default_ttl: int, + cluster_id: str | None = None, + ): self.db_path = db_path self.default_ttl = default_ttl + # Empty string when unset — matches the column DEFAULT and keeps + # SQL filtering simple. Pre-fix databases will have '' for legacy + # entries, which is fine: a server now passing cluster_id="prod" + # won't see them, which is the correct cautious behavior. + self.cluster_id = cluster_id or "" self.db_path.parent.mkdir(parents=True, exist_ok=True) with self._conn() as c: - c.executescript(SCHEMA) + # 1) Make sure table exists (no-op if already present) + c.executescript(TABLE_DDL) + # 2) Bring legacy schemas forward (adds cluster_id if missing) + self._migrate(c) + # 3) NOW create indexes — safe because all columns exist + c.executescript(INDEX_DDL) + + @staticmethod + def _migrate(c: sqlite3.Connection) -> None: + """Bring pre-existing databases up to the current schema. + + `CREATE TABLE IF NOT EXISTS` is idempotent for table existence but + does not add columns to an already-existing table. Pre-fix caches + lack `cluster_id`; rather than failing the next INSERT with + `no such column`, we add it here. Defaults to '' which makes the + legacy entries belong to the "unknown cluster" — invisible to any + new client passing an actual cluster_id, which is the cautious + outcome. + """ + cols = {row[1] for row in c.execute("PRAGMA table_info(axl_cache)").fetchall()} + if "cluster_id" not in cols: + c.execute( + "ALTER TABLE axl_cache ADD COLUMN cluster_id TEXT NOT NULL DEFAULT ''" + ) def _conn(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path, isolation_level=None) @@ -46,10 +91,13 @@ class AxlCache: conn.execute("PRAGMA synchronous=NORMAL") return conn - @staticmethod - def _make_key(method: str, kwargs: dict) -> str: - # sort_keys gives us a deterministic key regardless of dict order - return f"{method}::{json.dumps(kwargs, sort_keys=True, default=str)}" + def _make_key(self, method: str, kwargs: dict) -> str: + # cluster_id prefix isolates entries by cluster identity. sort_keys + # gives us a deterministic key regardless of dict order. + return ( + f"{self.cluster_id}::{method}::" + f"{json.dumps(kwargs, sort_keys=True, default=str)}" + ) def get(self, method: str, kwargs: dict) -> Any | None: if self.default_ttl <= 0: @@ -75,11 +123,13 @@ class AxlCache: c.execute( """ INSERT OR REPLACE INTO axl_cache - (cache_key, method, args_json, result_json, created_at, expires_at) - VALUES (?, ?, ?, ?, ?, ?) + (cache_key, cluster_id, method, args_json, result_json, + created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?) """, ( key, + self.cluster_id, method, json.dumps(kwargs, sort_keys=True, default=str), json.dumps(result, default=str), @@ -91,39 +141,66 @@ class AxlCache: def stats(self) -> dict: now = time.time() with self._conn() as c: - total = c.execute("SELECT COUNT(*) FROM axl_cache").fetchone()[0] + # Entries scoped to THIS cluster_id. The on-disk file may also + # contain entries from other clusters; those are intentionally + # invisible here. + total = c.execute( + "SELECT COUNT(*) FROM axl_cache WHERE cluster_id = ?", + (self.cluster_id,), + ).fetchone()[0] live = c.execute( - "SELECT COUNT(*) FROM axl_cache WHERE expires_at > ?", (now,) + "SELECT COUNT(*) FROM axl_cache " + "WHERE cluster_id = ? AND expires_at > ?", + (self.cluster_id, now), ).fetchone()[0] by_method = { row[0]: row[1] for row in c.execute( "SELECT method, COUNT(*) FROM axl_cache " - "WHERE expires_at > ? GROUP BY method ORDER BY 2 DESC", - (now,), + "WHERE cluster_id = ? AND expires_at > ? " + "GROUP BY method ORDER BY 2 DESC", + (self.cluster_id, now), ).fetchall() } + # Diagnostic: how many entries from OTHER clusters live in the + # same file. Useful for spotting an env-var swap that would + # otherwise be invisible. + foreign = c.execute( + "SELECT COUNT(*) FROM axl_cache WHERE cluster_id != ?", + (self.cluster_id,), + ).fetchone()[0] return { "db_path": str(self.db_path), + "cluster_id": self.cluster_id, "default_ttl_seconds": self.default_ttl, "total_entries": total, "live_entries": live, "expired_entries": total - live, + "foreign_cluster_entries": foreign, "by_method": by_method, } def clear(self, method_pattern: str | None = None) -> int: + # Only clears entries for THIS cluster — never touches a sibling + # cluster's cached data even if it lives in the same file. with self._conn() as c: if method_pattern: cursor = c.execute( - "DELETE FROM axl_cache WHERE method LIKE ?", - (method_pattern.replace("*", "%"),), + "DELETE FROM axl_cache " + "WHERE cluster_id = ? AND method LIKE ?", + (self.cluster_id, method_pattern.replace("*", "%")), ) else: - cursor = c.execute("DELETE FROM axl_cache") + cursor = c.execute( + "DELETE FROM axl_cache WHERE cluster_id = ?", + (self.cluster_id,), + ) return cursor.rowcount def purge_expired(self) -> int: + # Purges expired entries across ALL clusters in this file. + # Expired entries are never useful regardless of which cluster + # they belong to, so per-cluster scoping isn't needed here. with self._conn() as c: cursor = c.execute("DELETE FROM axl_cache WHERE expires_at <= ?", (time.time(),)) return cursor.rowcount diff --git a/src/mcp_cucm_axl/route_plan.py b/src/mcp_cucm_axl/route_plan.py index 5e92e54..38c3303 100644 --- a/src/mcp_cucm_axl/route_plan.py +++ b/src/mcp_cucm_axl/route_plan.py @@ -575,7 +575,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = { "sql": """ SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description FROM huntpilotqueue hpq - JOIN numplan np ON hpq.fknumplan = np.pkid + JOIN numplan np ON hpq.fknumplan_pilot = np.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid WHERE hpq.fkcallingsearchspace_maxwaittime = '{pkid}' """, @@ -585,7 +585,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = { "sql": """ SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description FROM huntpilotqueue hpq - JOIN numplan np ON hpq.fknumplan = np.pkid + JOIN numplan np ON hpq.fknumplan_pilot = np.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid WHERE hpq.fkcallingsearchspace_noagent = '{pkid}' """, @@ -595,7 +595,7 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = { "sql": """ SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description FROM huntpilotqueue hpq - JOIN numplan np ON hpq.fknumplan = np.pkid + JOIN numplan np ON hpq.fknumplan_pilot = np.pkid LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid WHERE hpq.fkcallingsearchspace_pilotqueuefull = '{pkid}' """, @@ -678,11 +678,22 @@ def find_devices_using_css( total_returned = sum(c.get("returned_count", 0) for c in grouped.values()) any_truncated = any(c.get("truncated") for c in grouped.values()) + # Hamilton review MAJOR #3: per-category errors must propagate to the + # top-level summary, otherwise an LLM consuming `total_returned: 47` + # has no way to know that 5 categories errored and the real count is + # higher. "Software that understands itself reports its own degradation." + error_categories = sorted( + label for label, cat in grouped.items() if "error" in cat + ) + complete = len(error_categories) == 0 return { "css_name": css_name, "css_pkid": css_pkid, "total_returned": total_returned, "any_truncated": any_truncated, + "complete": complete, + "categories_with_errors": len(error_categories), + "error_categories": error_categories, "max_per_category": max_per_category, "references_by_category": grouped, } diff --git a/src/mcp_cucm_axl/server.py b/src/mcp_cucm_axl/server.py index 67bddf2..c98db50 100644 --- a/src/mcp_cucm_axl/server.py +++ b/src/mcp_cucm_axl/server.py @@ -558,9 +558,20 @@ def main() -> None: ) cache_dir.mkdir(parents=True, exist_ok=True) ttl = int(os.environ.get("AXL_CACHE_TTL", "3600")) - _cache = AxlCache(cache_dir / "axl_responses.sqlite", default_ttl=ttl) + # Cluster-id derived from AXL_URL. Hash keeps the key compact and + # avoids leaking the URL into log output where the cache key gets + # printed. Hostname-only fallback when AXL_URL is unset (test mode). + import hashlib + axl_url_for_id = os.environ.get("AXL_URL", "no-axl-url-configured") + cluster_id = hashlib.sha256(axl_url_for_id.encode()).hexdigest()[:12] + _cache = AxlCache( + cache_dir / "axl_responses.sqlite", + default_ttl=ttl, + cluster_id=cluster_id, + ) print( - f"[mcp-cucm-axl] cache: {_cache.db_path} (ttl={ttl}s)", + f"[mcp-cucm-axl] cache: {_cache.db_path} " + f"(ttl={ttl}s, cluster_id={cluster_id})", file=sys.stderr, flush=True, ) diff --git a/src/mcp_cucm_axl/sql_validator.py b/src/mcp_cucm_axl/sql_validator.py index 6e2c048..080d004 100644 --- a/src/mcp_cucm_axl/sql_validator.py +++ b/src/mcp_cucm_axl/sql_validator.py @@ -33,28 +33,36 @@ def validate_select(query: str) -> str: Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects anything else, and any query containing forbidden keywords as standalone - tokens *outside* string literals. + tokens *outside* string literals and comments. - The cleaned query (with comments stripped) is what gets returned and sent - to AXL — string literals are NOT modified, only ignored during keyword - tokenization. So a query selecting WHERE name = 'Call Forward-CSS' is - safe: the literal "Call" inside quotes is invisible to the keyword check, - while the actual SQL with the unmodified literal travels intact to AXL. + Hamilton review CRITICAL #1: the output we return MUST preserve the input + byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier + versions ran a non-literal-aware comment strip on the output, which would + silently eat `--` and `/* */` markers that legitimately appeared inside + string literals like `WHERE description = 'Smith -- old line'`. The query + going to AXL must be exactly what the caller intended — comment stripping + is an analysis-only operation, never a mutation of the wire query. """ if not query or not query.strip(): raise SqlValidationError("Query is empty.") - cleaned = _COMMENT_BLOCK.sub(" ", query) - cleaned = _COMMENT_LINE.sub(" ", cleaned).strip().rstrip(";").strip() + # The query we'll send to AXL: original input, with only outer whitespace + # and a single trailing semicolon trimmed. NO mutation of literals or + # in-string comment markers. + cleaned = query.strip().rstrip(";").strip() if not cleaned: - raise SqlValidationError("Query is empty after stripping comments.") + raise SqlValidationError("Query is empty after trimming.") - # Strip string literals before tokenizing so that words inside quoted - # values (e.g. CSS names containing "Call", DN descriptions containing - # "DELETE") don't trip the forbidden-keyword check. The cleaned query - # we return still contains the literals — only the analysis copy strips - # them. + # Analysis-only copy: strip string literals AND comments (in either order + # is safe here, since each strip uses its own regex on a non-AXL-bound + # buffer). Order chosen: literals first, then comments, so that any + # comment markers genuinely outside literals can be detected. for_analysis = _STRING_LITERAL.sub(" ", cleaned) + for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis) + for_analysis = _COMMENT_LINE.sub(" ", for_analysis) + + if not for_analysis.strip(): + raise SqlValidationError("Query is empty after stripping comments.") upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)] if not upper_tokens: diff --git a/tests/test_cache.py b/tests/test_cache.py index bf1a0df..335d99a 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -85,3 +85,95 @@ def test_purge_expired(tmp_path: Path): purged = c.purge_expired() assert purged == 1 assert c.stats()["live_entries"] == 1 + + +class TestClusterIsolation: + """Hamilton review CRITICAL #2: cache key omitted cluster identity. + + Prior to the fix, `AXL_URL` swap (test → prod, or one cluster to another) + served stale results from cluster A as if from cluster B. The cache + couldn't tell the data came from a different mission. Now each cache + handle is bound to a cluster_id, and entries from a different cluster + must miss. + """ + + def test_different_cluster_ids_isolate_get(self, tmp_path: Path): + # Both caches point at the same DB file, but bound to different + # cluster IDs. A's writes must not be visible to B. + db = tmp_path / "shared.sqlite" + a = AxlCache(db, default_ttl=60, cluster_id="cluster-A") + b = AxlCache(db, default_ttl=60, cluster_id="cluster-B") + + a.set("getCCMVersion", {}, {"version": "12.5"}) + assert a.get("getCCMVersion", {}) == {"version": "12.5"} + assert b.get("getCCMVersion", {}) is None, ( + "cluster-B must not see cluster-A's cached value" + ) + + def test_same_cluster_id_shares_cache(self, tmp_path: Path): + # Two handles with the SAME cluster_id should share results. + db = tmp_path / "shared.sqlite" + a = AxlCache(db, default_ttl=60, cluster_id="cluster-X") + a.set("listPhone", {"name": "SEP1"}, {"rows": ["one"]}) + b = AxlCache(db, default_ttl=60, cluster_id="cluster-X") + assert b.get("listPhone", {"name": "SEP1"}) == {"rows": ["one"]} + + def test_cluster_id_in_stats(self, tmp_path: Path): + c = AxlCache(tmp_path / "s.sqlite", default_ttl=60, cluster_id="cluster-Y") + c.set("getCCMVersion", {}, {"v": "15"}) + stats = c.stats() + assert stats.get("cluster_id") == "cluster-Y", ( + "stats must surface cluster_id so operators can verify which cluster they're caching" + ) + + def test_no_cluster_id_still_works_legacy(self, tmp_path: Path): + # Backward compat: no cluster_id keeps the old (but now risky) shape. + # The cache still functions; we just don't get isolation. + c = AxlCache(tmp_path / "legacy.sqlite", default_ttl=60) + c.set("x", {}, "y") + assert c.get("x", {}) == "y" + + def test_clear_only_affects_current_cluster(self, tmp_path: Path): + db = tmp_path / "shared.sqlite" + a = AxlCache(db, default_ttl=60, cluster_id="cluster-A") + b = AxlCache(db, default_ttl=60, cluster_id="cluster-B") + a.set("x", {}, "from-A") + b.set("x", {}, "from-B") + deleted = a.clear() + assert deleted == 1, "clear() must only affect this cluster's entries" + assert b.get("x", {}) == "from-B", "cluster-B's entry must survive A's clear" + + def test_migrate_legacy_database(self, tmp_path: Path): + """A cache database created before the cluster_id fix must + upgrade transparently — no `no such column` error on next INSERT. + """ + import sqlite3 + db = tmp_path / "legacy.sqlite" + # Manually create the OLD schema (no cluster_id column) + conn = sqlite3.connect(db) + conn.executescript( + """ + CREATE TABLE axl_cache ( + cache_key TEXT PRIMARY KEY, + method TEXT NOT NULL, + args_json TEXT NOT NULL, + result_json TEXT NOT NULL, + created_at REAL NOT NULL, + expires_at REAL NOT NULL + ); + INSERT INTO axl_cache VALUES + ('legacy-key', 'oldMethod', '{}', '"old-value"', 0, 9999999999); + """ + ) + conn.commit() + conn.close() + + # Open with the new code — must not raise, must add the column + c = AxlCache(db, default_ttl=60, cluster_id="new-cluster") + # The new client should NOT see the legacy entry (it has no cluster_id) + # — this is the cautious behavior; legacy entries are isolated to the + # "unknown cluster" bucket. + assert c.get("oldMethod", {}) is None + # And it must be able to write/read its own entries + c.set("newMethod", {"a": 1}, "new-value") + assert c.get("newMethod", {"a": 1}) == "new-value" diff --git a/tests/test_css_impact.py b/tests/test_css_impact.py new file mode 100644 index 0000000..cc13832 --- /dev/null +++ b/tests/test_css_impact.py @@ -0,0 +1,119 @@ +"""Hamilton review MAJOR #3: find_devices_using_css must surface partial failures. + +The function is per-category resilient by design — if one schema query fails, +the others still produce results. But the top-level summary previously hid +that some categories errored out: `total_returned` and `any_truncated` only +reflected the SUCCESSFUL categories. An LLM consuming "47 references, low +impact" wouldn't know that 5 categories errored and the real number is +likely much higher. + +After the fix: the response includes `complete: bool`, `categories_with_errors`, +and `error_categories`, so an LLM (or human auditor) can see the partial-failure +state and act on it. +""" + +import pytest + +from mcp_cucm_axl.route_plan import find_devices_using_css + + +class FakeAxlClient: + """Minimal stand-in for AxlClient that lets us simulate per-query failures. + + Returns a fake CSS pkid for the lookup query, then either a single fake row + or an exception based on substring matching. + """ + + def __init__(self, error_on_columns: list[str] | None = None): + self.error_on_columns = error_on_columns or [] + self.queries: list[str] = [] + + def execute_sql_query(self, sql: str) -> dict: + self.queries.append(sql) + # The CSS lookup query — return a fake pkid + if "callingsearchspace WHERE name" in sql: + return {"row_count": 1, "rows": [{"pkid": "fake-css-pkid"}]} + # Any query referencing an "error trigger" column → simulate failure + for trigger in self.error_on_columns: + if trigger in sql: + raise RuntimeError(f"simulated cluster failure on {trigger}") + # Otherwise return one fake reference row so the category isn't empty + return { + "row_count": 1, + "rows": [{"name": "FakeRef", "context": "FakePart", "description": "fake"}], + } + + +def test_no_errors_reports_complete(): + """Baseline: when every category succeeds, complete=True and no error fields populated.""" + client = FakeAxlClient() + result = find_devices_using_css(client, "Some-CSS") + assert result["complete"] is True + assert result["categories_with_errors"] == 0 + assert result["error_categories"] == [] + # And total_returned reflects the successful categories + assert result["total_returned"] >= 1 + + +def test_one_errored_category_marks_incomplete(): + """The audit-trust failure mode: one category errors out and the summary lies. + Fix: complete=False, categories_with_errors >= 1. + """ + client = FakeAxlClient(error_on_columns=["fkcallingsearchspace_cgpnunknown"]) + result = find_devices_using_css(client, "Some-CSS") + assert result["complete"] is False, ( + "complete must be False when any category errored" + ) + assert result["categories_with_errors"] >= 1 + assert "device_cgpn_unknown_css" in result["error_categories"] + + +def test_multiple_errors_all_listed(): + """All errored categories must be enumerated in error_categories.""" + client = FakeAxlClient( + error_on_columns=[ + "fkcallingsearchspace_cgpnunknown", + "fkcallingsearchspace_reroute", + "fkcallingsearchspace_pilotqueuefull", + ] + ) + result = find_devices_using_css(client, "Some-CSS") + assert result["complete"] is False + assert result["categories_with_errors"] == 3 + assert set(result["error_categories"]) == { + "device_cgpn_unknown_css", + "device_reroute_css", + "huntpilot_queue_full_css", + } + + +def test_total_returned_does_not_include_error_categories(): + """An errored category contributes 0 to total_returned (correct behavior). + What's NEW: the response also flags that the count is partial. + """ + client = FakeAxlClient(error_on_columns=["fkcallingsearchspace_cgpnunknown"]) + result = find_devices_using_css(client, "Some-CSS") + # The count itself is unchanged from before — what's new is the warning + assert result["complete"] is False + # The error category has no rows in references_by_category + err_cat = result["references_by_category"].get("device_cgpn_unknown_css", {}) + assert "error" in err_cat + + +def test_css_not_found_returns_error_not_partial(): + """If the CSS lookup itself fails (CSS doesn't exist), we return the + 'not found' error early, NOT a partial-failure response. Distinct + failure modes deserve distinct shapes. + """ + + class CssNotFoundClient: + def execute_sql_query(self, sql): + if "callingsearchspace WHERE name" in sql: + return {"row_count": 0, "rows": []} + return {"row_count": 1, "rows": [{}]} + + result = find_devices_using_css(CssNotFoundClient(), "Nonexistent-CSS") + assert "error" in result + assert "complete" not in result, ( + "CSS-not-found is a hard error; we shouldn't dress it up as partial" + ) diff --git a/tests/test_sql_validator.py b/tests/test_sql_validator.py index 42c1449..505c130 100644 --- a/tests/test_sql_validator.py +++ b/tests/test_sql_validator.py @@ -124,3 +124,54 @@ class TestStringLiterals: def test_multiple_literals(self): q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'" assert validate_select(q) + + +class TestLiteralPreservedInOutput: + """Hamilton review CRITICAL #1: comment-strip mutated string literals. + + The query SENT to AXL must preserve the literal contents byte-for-byte. + Previously, the comment-strip pass ran before the literal-aware pass, + so `--` or `/* */` inside a quoted string were silently eaten on the + way to the cluster. An LLM dialing `description LIKE '%-- old%'` got + a different query than it asked for. + """ + + def test_dash_dash_inside_literal_preserved(self): + q = "SELECT * FROM numplan WHERE description = 'Smith -- old line'" + result = validate_select(q) + assert "Smith -- old line" in result, ( + f"line-comment marker inside literal must NOT be stripped; got: {result!r}" + ) + + def test_block_comment_marker_inside_literal_preserved(self): + q = "SELECT * FROM device WHERE name = 'before /* still in literal */ after'" + result = validate_select(q) + assert "/* still in literal */" in result + assert "before" in result and "after" in result + + def test_like_pattern_with_dash_dash_preserved(self): + # Real-world case: an LLM searches for descriptions containing "--" + q = "SELECT pkid FROM numplan WHERE description LIKE '%-- old%'" + result = validate_select(q) + assert "'%-- old%'" in result + + def test_actual_line_comment_outside_literal_still_handled(self): + # An actual --comment outside any literal is fine (AXL handles it), + # and the keyword check ignores it. + q = "SELECT 1 FROM device -- a real comment at the end" + result = validate_select(q) + # We don't strip from output, so the comment stays in the returned text. + # The important thing is the validator passes and a forbidden keyword + # in the comment wouldn't trip the check (covered separately). + assert "SELECT 1 FROM device" in result + + def test_forbidden_keyword_inside_real_comment_does_not_trip(self): + # Real comment, with a forbidden keyword in it, should not trip the validator + q = "SELECT 1 FROM device -- TODO: someone DELETE the old test data" + result = validate_select(q) + assert "SELECT 1" in result + + def test_block_literal_with_drop_inside_preserved(self): + q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'" + result = validate_select(q) + assert "'log: DROP detected'" in result