diff --git a/src/mcp_cucm_axl/route_plan.py b/src/mcp_cucm_axl/route_plan.py index edfb1a1..5e92e54 100644 --- a/src/mcp_cucm_axl/route_plan.py +++ b/src/mcp_cucm_axl/route_plan.py @@ -517,6 +517,89 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = { WHERE rl.fkcallingsearchspace = '{pkid}' """, }, + # PRIMARY device CSS — the field the GUI sets when you assign "Calling + # Search Space" to a phone, gateway, or trunk. Not suffixed; spelled out + # as fkcallingsearchspace (no _xxx). This is where most CSS assignments + # actually live; missing it produced false-zero impact analyses. + "device_primary_css": { + "table": "device", "column": "fkcallingsearchspace", + "sql": """ + SELECT d.name AS name, tc.name AS context, d.description AS description + FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum + WHERE d.fkcallingsearchspace = '{pkid}' + """, + }, + "device_cgpn_unknown_css": { + "table": "device", "column": "fkcallingsearchspace_cgpnunknown", + "sql": """ + SELECT d.name AS name, tc.name AS context, d.description AS description + FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum + WHERE d.fkcallingsearchspace_cgpnunknown = '{pkid}' + """, + }, + # Line-level CSS for phone-line monitoring (BLF, presence) + "line_monitoring_css": { + "table": "devicenumplanmap", "column": "fkcallingsearchspace_monitoring", + "sql": """ + SELECT d.name AS name, np.dnorpattern AS context, d.description AS description + FROM devicenumplanmap dnm + JOIN device d ON dnm.fkdevice = d.pkid + JOIN numplan np ON dnm.fknumplan = np.pkid + WHERE dnm.fkcallingsearchspace_monitoring = '{pkid}' + """, + }, + # H.323 / SIP gateway called-party transformation CSSs + "gateway_h323_called_xform_css": { + "table": "h323device", "column": "fkcallingsearchspace_cntdpntransform", + "sql": """ + SELECT d.name AS name, tc.name AS context, d.description AS description + FROM h323device h + JOIN device d ON h.fkdevice = d.pkid + LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum + WHERE h.fkcallingsearchspace_cntdpntransform = '{pkid}' + """, + }, + "gateway_sip_called_xform_css": { + "table": "sipdevice", "column": "fkcallingsearchspace_cntdpntransform", + "sql": """ + SELECT d.name AS name, tc.name AS context, d.description AS description + FROM sipdevice s + JOIN device d ON s.fkdevice = d.pkid + LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum + WHERE s.fkcallingsearchspace_cntdpntransform = '{pkid}' + """, + }, + # Hunt pilot queue CSSs (max-wait, no-agent, queue-full destinations) + "huntpilot_max_wait_css": { + "table": "huntpilotqueue", "column": "fkcallingsearchspace_maxwaittime", + "sql": """ + SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description + FROM huntpilotqueue hpq + JOIN numplan np ON hpq.fknumplan = np.pkid + LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid + WHERE hpq.fkcallingsearchspace_maxwaittime = '{pkid}' + """, + }, + "huntpilot_no_agent_css": { + "table": "huntpilotqueue", "column": "fkcallingsearchspace_noagent", + "sql": """ + SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description + FROM huntpilotqueue hpq + JOIN numplan np ON hpq.fknumplan = np.pkid + LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid + WHERE hpq.fkcallingsearchspace_noagent = '{pkid}' + """, + }, + "huntpilot_queue_full_css": { + "table": "huntpilotqueue", "column": "fkcallingsearchspace_pilotqueuefull", + "sql": """ + SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description + FROM huntpilotqueue hpq + JOIN numplan np ON hpq.fknumplan = np.pkid + LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid + WHERE hpq.fkcallingsearchspace_pilotqueuefull = '{pkid}' + """, + }, } diff --git a/src/mcp_cucm_axl/sql_validator.py b/src/mcp_cucm_axl/sql_validator.py index f2edb99..6e2c048 100644 --- a/src/mcp_cucm_axl/sql_validator.py +++ b/src/mcp_cucm_axl/sql_validator.py @@ -14,6 +14,8 @@ import re _COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL) _COMMENT_LINE = re.compile(r"--[^\n]*") +# Match Informix string literals: single-quoted, with '' as escaped quote. +_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL) _FORBIDDEN = { "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME", @@ -31,17 +33,30 @@ def validate_select(query: str) -> str: Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects anything else, and any query containing forbidden keywords as standalone - tokens. + tokens *outside* string literals. + + The cleaned query (with comments stripped) is what gets returned and sent + to AXL — string literals are NOT modified, only ignored during keyword + tokenization. So a query selecting WHERE name = 'Call Forward-CSS' is + safe: the literal "Call" inside quotes is invisible to the keyword check, + while the actual SQL with the unmodified literal travels intact to AXL. """ if not query or not query.strip(): raise SqlValidationError("Query is empty.") - stripped = _COMMENT_BLOCK.sub(" ", query) - stripped = _COMMENT_LINE.sub(" ", stripped).strip().rstrip(";").strip() - if not stripped: + cleaned = _COMMENT_BLOCK.sub(" ", query) + cleaned = _COMMENT_LINE.sub(" ", cleaned).strip().rstrip(";").strip() + if not cleaned: raise SqlValidationError("Query is empty after stripping comments.") - upper_tokens = [t.upper() for t in _WORD_RE.findall(stripped)] + # Strip string literals before tokenizing so that words inside quoted + # values (e.g. CSS names containing "Call", DN descriptions containing + # "DELETE") don't trip the forbidden-keyword check. The cleaned query + # we return still contains the literals — only the analysis copy strips + # them. + for_analysis = _STRING_LITERAL.sub(" ", cleaned) + + upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)] if not upper_tokens: raise SqlValidationError("Query contains no SQL keywords.") @@ -58,4 +73,4 @@ def validate_select(query: str) -> str: f"This server is read-only." ) - return stripped + return cleaned diff --git a/tests/test_sql_validator.py b/tests/test_sql_validator.py index ba8a10e..42c1449 100644 --- a/tests/test_sql_validator.py +++ b/tests/test_sql_validator.py @@ -82,3 +82,45 @@ class TestEdgeCases: def test_select_with_subquery(self): q = "SELECT name FROM device WHERE pkid IN (SELECT fkdevice FROM numplan)" assert "SELECT name FROM device" in validate_select(q) + + +class TestStringLiterals: + """Forbidden keywords inside string literals must be ignored. + + Otherwise CSS names like 'Call Forward-CSS', DN descriptions containing + 'DELETE' (e.g., 'Delete this voicemail line'), or partition names with + 'INSERT' would all fail to query, even though the SQL itself is read-only. + """ + + def test_call_inside_string_literal_passes(self): + q = "SELECT pkid FROM callingsearchspace WHERE name = 'Call Forward-CSS'" + result = validate_select(q) + assert "Call Forward-CSS" in result # literal preserved + + def test_delete_inside_string_literal_passes(self): + q = "SELECT pkid FROM numplan WHERE description = 'Delete after audit'" + result = validate_select(q) + assert "Delete after audit" in result + + def test_drop_inside_string_literal_passes(self): + q = "SELECT pkid FROM numplan WHERE description = 'DROP table backup'" + assert validate_select(q) + + def test_actual_drop_outside_literal_still_blocked(self): + with pytest.raises(SqlValidationError, match="DROP"): + validate_select("SELECT 1 FROM device; DROP TABLE backup") + + def test_escaped_quote_in_literal(self): + # Informix uses '' (doubled) as escaped single quote within literals + q = "SELECT pkid FROM numplan WHERE description = 'O''Brien''s line'" + result = validate_select(q) + assert "O''Brien''s line" in result + + def test_keyword_just_outside_literal_blocked(self): + # The literal 'safe text' is fine; the trailing DROP is not. + with pytest.raises(SqlValidationError, match="DROP"): + validate_select("SELECT 1 FROM device WHERE x = 'safe text' OR DROP") + + def test_multiple_literals(self): + q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'" + assert validate_select(q)