From cd08a7ec765f94a0f1f007bab3ccb65f511e6475 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 5 May 2026 17:38:07 -0600 Subject: [PATCH] route_plan: add patterns_targeting_device + wildcard count/enumerate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inverse of list_route_lists_and_groups — given a destination device, return every numplan whose direct target is that device. Closes the highest-priority gap from cucx-docs's prompt-suggestions handoff (see axl/agent-threads/cucx-prompt-suggestions/001 + 003 for the multi-message context). Schema walk: device → devicenumplanmap → numplan, with LEFT JOINs to routepartition + typepatternusage for friendly output. M:N is the landmark — numplan does NOT have a direct fkdevice column, which was cucx-docs's literal multi-attempt schema-discovery experience that motivated the tool. Three wildcard-expansion modes per cucx-docs Q2: - False (default) — patterns intact - "count" — per-pattern digit-string estimate + total surface; unbounded patterns (`!`, `@`) reported as None count and force the total to None so an auditor sees the partial measurement; per-pattern caps at 10,000 to prevent runaway estimation - "enumerate" — actual digit-string list, only for tightly-bounded patterns (no `!`, no `@`, no `.`, ≤ 1,000 expansions); patterns that violate any constraint return null with a skip reason Direct-target only per Q1 — full transitive reachability composes with route_lists_and_groups + route_translation_chain, called out both in the docstring and in the response's _note field. 37 new tests cover the math layer (count/enumerate helpers in isolation), the bounds/cap behavior, the unbounded-pattern flagging, empty-result handling, SQL injection escaping, and the integration through a FakeAxlClient. Full suite: 219/219 passing. Live-cluster smoke test pending — cluster TLS intermittently failing this session; will re-verify once stable. --- src/mcaxl/route_plan.py | 255 +++++++++++++++++++++++++++++++ src/mcaxl/server.py | 34 +++++ tests/test_patterns_targeting.py | 247 ++++++++++++++++++++++++++++++ 3 files changed, 536 insertions(+) create mode 100644 tests/test_patterns_targeting.py diff --git a/src/mcaxl/route_plan.py b/src/mcaxl/route_plan.py index 0686f8a..2dc6df4 100644 --- a/src/mcaxl/route_plan.py +++ b/src/mcaxl/route_plan.py @@ -261,6 +261,261 @@ def inspect_pattern( } +def patterns_targeting_device( + client: "AxlClient", + device_name: str, + expand_wildcards: str | bool = False, +) -> dict: + """Inverse of `list_route_lists_and_groups` — given a destination device, + return every numplan whose direct target is that device. + + Walks `device → devicenumplanmap → numplan` (the M:N join — `numplan` does + NOT have a direct `fkdevice`; that's a common schema gotcha). + + For "what numbers can ultimately reach this device" (including + translation-pattern hops and route-list intermediaries), compose this + tool with `list_route_lists_and_groups` and `route_translation_chain`. + This returns only direct-target patterns. + + Args: + device_name: exact device name. Route Lists, SIP Trunks, CTI Route + Points, Phones — any device that can be a direct numplan target. + expand_wildcards: how to handle wildcard patterns in the result. + - `False` (default): patterns returned as-is, syntax intact + - `"count"`: each pattern annotated with a per-pattern digit-string + count estimate; total surface across all patterns also returned. + Estimates capped at `_COUNT_CAP` per pattern; unbounded patterns + (`!`, `@`) reported as `None` count with `unbounded: true` + - `"enumerate"`: actually list digit strings — only safe for + tightly bounded patterns (no `!`, no `@`, no `.`, ≤ + `_ENUMERATE_CAP` total digit strings). Patterns that violate + either constraint are returned with `enumerated_digits: null` + + `enumeration_skipped: ` + """ + if expand_wildcards not in (False, "count", "enumerate"): + raise ValueError( + f"expand_wildcards must be False, 'count', or 'enumerate'; " + f"got {expand_wildcards!r}" + ) + + sql = f""" + SELECT + n.dnorpattern AS pattern, + rp.name AS partition, + n.description, + n.calledpartytransformationmask AS xform_mask, + n.prefixdigitsout AS prefix, + tp.name AS pattern_type, + d.name AS destination_device, + tc.name AS destination_class + FROM device d + JOIN typeclass tc ON d.tkclass = tc.enum + JOIN devicenumplanmap m ON m.fkdevice = d.pkid + JOIN numplan n ON m.fknumplan = n.pkid + LEFT OUTER JOIN routepartition rp ON n.fkroutepartition = rp.pkid + LEFT OUTER JOIN typepatternusage tp ON n.tkpatternusage = tp.enum + WHERE d.name = '{_esc(device_name)}' + ORDER BY n.dnorpattern + """ + result = client.execute_sql_query(sql) + rows = result["rows"] + + patterns: list[dict] = [] + total_count: int | None = 0 # None once we hit an unbounded pattern in 'count' mode + truncated = False + + for row in rows: + entry: dict = { + "pattern": row.get("pattern"), + "partition": row.get("partition"), + "description": row.get("description"), + "pattern_type": row.get("pattern_type"), + "xform_mask": row.get("xform_mask"), + "prefix": row.get("prefix"), + } + pattern = entry["pattern"] or "" + + if expand_wildcards == "count": + count, unbounded, capped = _count_pattern_digit_strings(pattern) + entry["digit_count"] = count + if unbounded: + entry["unbounded"] = True + total_count = None + if capped: + entry["count_capped"] = True + truncated = True + if total_count is not None and count is not None: + total_count += count + elif expand_wildcards == "enumerate": + digits, reason = _enumerate_pattern_digit_strings(pattern) + entry["enumerated_digits"] = digits + if digits is None: + entry["enumeration_skipped"] = reason + + patterns.append(entry) + + out = { + "device_name": device_name, + "destination_class": rows[0]["destination_class"] if rows else None, + "pattern_count": len(patterns), + "patterns": patterns, + "_note": ( + "Direct-target patterns only. For transitive reachability " + "(translation patterns, route-list intermediaries), compose " + "with route_lists_and_groups + route_translation_chain." + ), + } + if expand_wildcards == "count": + out["total_digit_count"] = total_count + if truncated: + out["truncated"] = True + return out + + +# Wildcard expansion bounds — per cucx-docs Q2 reply (msg 003 in +# axl/agent-threads/cucx-prompt-suggestions/). 'count' caps per-pattern +# estimates at _COUNT_CAP to avoid runaway computation on patterns like +# X{8} (1e8); 'enumerate' caps total digit strings at _ENUMERATE_CAP and +# refuses unbounded or `.`-containing patterns entirely. +_COUNT_CAP = 10_000 +_ENUMERATE_CAP = 1_000 + + +def _count_pattern_digit_strings(pattern: str) -> tuple[int | None, bool, bool]: + """Estimate the number of distinct digit strings a pattern can match. + + Returns (count, unbounded, capped): + - count: estimate, or None if unbounded + - unbounded: True if pattern contains `!` or `@` (treated as + infinite — the @ NANP/international dial plan is bounded but + too large to enumerate meaningfully) + - capped: True if the estimate hit _COUNT_CAP and was truncated + """ + if not pattern: + return 0, False, False + + # `+` is a literal in CUCM (E.164 prefix); strip for analysis but it + # doesn't change the count. Same for `.` (purely visual separator). + p = pattern.replace("+", "").replace(".", "") + + if "!" in p or "@" in p: + return None, True, False + + count = 1 + i = 0 + while i < len(p): + ch = p[i] + if ch == "X": + count *= 10 + elif ch == "[": + close = p.find("]", i) + if close == -1: + # Malformed — be conservative + return None, False, False + spec = p[i + 1 : close] + count *= _count_charclass(spec) + i = close + # other characters are literals contributing factor 1 + if count > _COUNT_CAP: + return _COUNT_CAP, False, True + i += 1 + return count, False, False + + +def _count_charclass(spec: str) -> int: + """Count the size of a CUCM character class like '0-9' or 'abc' or '2-9a-c'. + + Negation (`^`) is technically supported in CUCM but rare; treat as + its set size (the negated set has size 10 - len(set)). + """ + if not spec: + return 1 + negated = spec.startswith("^") + body = spec[1:] if negated else spec + chars: set[str] = set() + i = 0 + while i < len(body): + if i + 2 < len(body) and body[i + 1] == "-": + lo, hi = body[i], body[i + 2] + if lo.isdigit() and hi.isdigit() and lo <= hi: + chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1)) + i += 3 + else: + chars.add(body[i]) + i += 1 + size = len(chars) or 1 + return (10 - size) if negated else size + + +def _enumerate_pattern_digit_strings(pattern: str) -> tuple[list[str] | None, str | None]: + """Actually enumerate the digit strings a pattern matches. + + Returns (digits, skip_reason): + - digits: list of digit strings, or None if skipped + - skip_reason: human-readable explanation when skipped + + Only safe for tightly-bounded patterns. Per cucx-docs: + no `!`, no `@`, no `.`, total ≤ _ENUMERATE_CAP. + """ + if not pattern: + return [""], None + if "!" in pattern: + return None, "contains '!' (unbounded)" + if "@" in pattern: + return None, "contains '@' (NANP/international dial plan; too large)" + if "." in pattern: + return None, "contains '.' (separator semantics; not enumerable per spec)" + + p = pattern.replace("+", "") + count, _, _ = _count_pattern_digit_strings(p) + if count is None: + return None, "estimated count is unbounded" + if count > _ENUMERATE_CAP: + return None, f"estimated count {count} exceeds cap {_ENUMERATE_CAP}; use 'count' mode" + + out = [""] + i = 0 + while i < len(p): + ch = p[i] + if ch == "X": + out = [s + d for s in out for d in "0123456789"] + i += 1 + elif ch == "[": + close = p.find("]", i) + if close == -1: + return None, "malformed character class" + spec = p[i + 1 : close] + chars = _expand_charclass(spec) + out = [s + c for s in out for c in chars] + i = close + 1 + else: + out = [s + ch for s in out] + i += 1 + return out, None + + +def _expand_charclass(spec: str) -> list[str]: + """Expand a character class like '0-9' or 'abc' to its member list.""" + if not spec: + return [""] + negated = spec.startswith("^") + body = spec[1:] if negated else spec + chars: set[str] = set() + i = 0 + while i < len(body): + if i + 2 < len(body) and body[i + 1] == "-": + lo, hi = body[i], body[i + 2] + if lo.isdigit() and hi.isdigit() and lo <= hi: + chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1)) + i += 3 + else: + chars.add(body[i]) + i += 1 + if negated: + chars = set("0123456789") - chars + return sorted(chars) + + def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict: """Route lists with their ordered route groups and member gateways/trunks. diff --git a/src/mcaxl/server.py b/src/mcaxl/server.py index 3d3b4d1..6f03585 100644 --- a/src/mcaxl/server.py +++ b/src/mcaxl/server.py @@ -239,6 +239,40 @@ def route_inspect_pattern(pattern: str, partition: str | None = None) -> dict: return route_plan.inspect_pattern(_client(), pattern, partition) +@mcp.tool +def route_patterns_targeting( + device_name: str, + expand_wildcards: str | bool = False, +) -> dict: + """Inverse of `route_lists_and_groups`: given a destination device name, + return every numplan whose direct target is that device. + + Answers questions like *"which DIDs route to this fax server?"* in a + single call instead of a multi-attempt schema-discovery walk through + `numplan` ↔ `devicenumplanmap` ↔ `device`. + + Direct-target only — does NOT walk through translation patterns or + route-list intermediaries. For full transitive reachability, compose + with `route_lists_and_groups` and `route_translation_chain`. + + Args: + device_name: exact device name (case-sensitive). Route Lists, SIP + trunks, CTI Route Points, Phones — any device that can be a + direct numplan target. + expand_wildcards: how to handle wildcard patterns: + False (default) — patterns returned as-is. + "count" — annotate each pattern with a digit-string count + estimate; total surface across all patterns also returned. + Patterns containing `!` or `@` are flagged unbounded. + "enumerate" — actually list digit strings. Only safe for + tightly-bounded patterns; returns a skip reason for any + pattern with `!`, `@`, `.`, or estimated >1000 expansions. + """ + return route_plan.patterns_targeting_device( + _client(), device_name, expand_wildcards + ) + + @mcp.tool def route_lists_and_groups(name: str | None = None) -> dict: """Route lists with their ordered route groups and member devices. diff --git a/tests/test_patterns_targeting.py b/tests/test_patterns_targeting.py new file mode 100644 index 0000000..abab745 --- /dev/null +++ b/tests/test_patterns_targeting.py @@ -0,0 +1,247 @@ +"""Tests for patterns_targeting_device + wildcard count/enumerate helpers. + +Single-call tool that answers "what numplan rows directly target this +device" — the inverse of `list_route_lists_and_groups`. Direct-target +only per cucx-docs Q1 (msg 003 in axl/agent-threads/cucx-prompt-suggestions/). + +Three wildcard-expansion modes per cucx-docs Q2: + - False: patterns returned as-is + - "count": digit-string estimates per pattern + total + - "enumerate": actual digit-string list, only for tightly-bounded patterns +""" + +import pytest + +from mcaxl.route_plan import ( + _COUNT_CAP, + _ENUMERATE_CAP, + _count_pattern_digit_strings, + _enumerate_pattern_digit_strings, + patterns_targeting_device, +) + + +class FakeAxlClient: + """Returns canned SQL results — no network, no zeep.""" + + def __init__(self, rows: list[dict]): + self._rows = rows + self.queries: list[str] = [] + + def execute_sql_query(self, sql: str) -> dict: + self.queries.append(sql) + return {"row_count": len(self._rows), "rows": self._rows} + + +# ─── Wildcard counting (the math layer) ──────────────────────────────── + +class TestCountPatternDigitStrings: + """The estimator for how many distinct digit strings a pattern matches.""" + + @pytest.mark.parametrize("pattern,expected", [ + ("1234", 1), # all literals + ("X", 10), # one wildcard digit + ("XX", 100), + ("XXX", 1_000), + ("20878594XX", 100), # the real BMH RightFax block + ("9.20878594XX", 100), # `.` is a separator, ignored + ("+12085243000", 1), # E.164 literal + ("[2-9]XXXXXX", 7_000_000), # NANP-ish, hits cap → returns _COUNT_CAP + ("[2-9]", 8), # range + ("[abc]", 3), # set + ("[0-9]", 10), # equiv to X + ("[02468]", 5), # explicit even digits + ]) + def test_bounded_patterns(self, pattern, expected): + count, unbounded, capped = _count_pattern_digit_strings(pattern) + assert unbounded is False + if expected > _COUNT_CAP: + assert count == _COUNT_CAP + assert capped is True + else: + assert count == expected + assert capped is False + + @pytest.mark.parametrize("pattern", ["!", "9.!", "9.@", "@", "1!"]) + def test_unbounded_patterns_flagged(self, pattern): + count, unbounded, capped = _count_pattern_digit_strings(pattern) + assert count is None + assert unbounded is True + assert capped is False + + def test_empty_pattern_is_zero(self): + count, unbounded, capped = _count_pattern_digit_strings("") + assert count == 0 + assert unbounded is False + assert capped is False + + def test_malformed_charclass_is_unbounded(self): + # `[abc` with no closing bracket — be conservative + count, unbounded, capped = _count_pattern_digit_strings("12[abc") + assert count is None + + +# ─── Wildcard enumeration (the strict-bounds layer) ──────────────────── + +class TestEnumeratePatternDigitStrings: + """The strict enumerator — only safe for tightly-bounded patterns.""" + + def test_literals_only_returns_self(self): + digits, reason = _enumerate_pattern_digit_strings("1234") + assert digits == ["1234"] + assert reason is None + + def test_single_X_expands_to_ten(self): + digits, reason = _enumerate_pattern_digit_strings("12X") + assert digits == [f"12{d}" for d in "0123456789"] + assert reason is None + + def test_charclass_expands(self): + digits, _ = _enumerate_pattern_digit_strings("[2-4]") + assert digits == ["2", "3", "4"] + + def test_block_expansion(self): + # 100 DIDs in a /XX block + digits, _ = _enumerate_pattern_digit_strings("20878594XX") + assert len(digits) == 100 + assert "2087859400" in digits + assert "2087859499" in digits + + @pytest.mark.parametrize("pattern,expected_reason_fragment", [ + ("9.20878594XX", "'.'"), # cucx-docs explicit: no `.` + ("9!", "'!'"), # unbounded + ("9.@", "'@'"), # `@` is checked before `.` — either reason is valid + ("@", "'@'"), + ]) + def test_unsafe_patterns_skipped(self, pattern, expected_reason_fragment): + digits, reason = _enumerate_pattern_digit_strings(pattern) + assert digits is None + assert expected_reason_fragment in reason + + def test_oversized_pattern_skipped(self): + # 4 X's = 10,000 expansions; cap is 1,000 + digits, reason = _enumerate_pattern_digit_strings("XXXX") + assert digits is None + assert "exceeds cap" in reason + + +# ─── The tool itself (integration with FakeAxlClient) ────────────────── + +class TestPatternsTargetingDevice: + """End-to-end through the FakeAxlClient layer.""" + + @pytest.fixture + def rightfax_rows(self): + """Realistic-shape rows: 1 wildcard block + 2 internal carveouts.""" + base = { + "destination_device": "RightFax-RL", + "destination_class": "Route List", + "xform_mask": None, + "prefix": None, + "pattern_type": "Route Pattern", + } + return [ + {**base, "pattern": "20878594XX", "partition": "External-PT", + "description": "RightFax inbound block (100 DIDs)"}, + {**base, "pattern": "5550100", "partition": "Internal-PT", + "description": "Reception → fax internal hop"}, + {**base, "pattern": "5550101", "partition": "Internal-PT", + "description": "Front desk → fax internal hop"}, + ] + + def test_default_returns_patterns_intact(self, rightfax_rows): + client = FakeAxlClient(rightfax_rows) + result = patterns_targeting_device(client, "RightFax-RL") + assert result["device_name"] == "RightFax-RL" + assert result["destination_class"] == "Route List" + assert result["pattern_count"] == 3 + # No expansion fields present on default + assert "total_digit_count" not in result + for p in result["patterns"]: + assert "digit_count" not in p + assert "enumerated_digits" not in p + + def test_count_mode_per_pattern_and_total(self, rightfax_rows): + client = FakeAxlClient(rightfax_rows) + result = patterns_targeting_device( + client, "RightFax-RL", expand_wildcards="count" + ) + # Per-pattern counts inline (cucx-docs Q2 refinement) + counts = {p["pattern"]: p["digit_count"] for p in result["patterns"]} + assert counts == { + "20878594XX": 100, + "5550100": 1, + "5550101": 1, + } + assert result["total_digit_count"] == 102 + + def test_count_mode_with_unbounded_pattern(self): + rows = [ + {"pattern": "9.@", "partition": "PSTN-PT", "description": "PSTN catchall", + "destination_device": "Gateway-1", "destination_class": "Gateway", + "xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"}, + {"pattern": "5550100", "partition": "Internal-PT", "description": "extension", + "destination_device": "Gateway-1", "destination_class": "Gateway", + "xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"}, + ] + client = FakeAxlClient(rows) + result = patterns_targeting_device(client, "Gateway-1", expand_wildcards="count") + # Total goes None once any pattern is unbounded — auditor must see this + assert result["total_digit_count"] is None + unbounded = next(p for p in result["patterns"] if p["pattern"] == "9.@") + assert unbounded["unbounded"] is True + assert unbounded["digit_count"] is None + + def test_enumerate_mode_for_safe_patterns(self, rightfax_rows): + client = FakeAxlClient(rightfax_rows) + result = patterns_targeting_device( + client, "RightFax-RL", expand_wildcards="enumerate" + ) + block = next(p for p in result["patterns"] if p["pattern"] == "20878594XX") + assert len(block["enumerated_digits"]) == 100 + # The internal-extension carveouts are 1:1 enumerations of themselves + carveout = next(p for p in result["patterns"] if p["pattern"] == "5550100") + assert carveout["enumerated_digits"] == ["5550100"] + + def test_enumerate_mode_skips_unsafe_with_reason(self): + rows = [{ + "pattern": "9.@", "partition": "PSTN", "description": "PSTN catchall", + "destination_device": "GW", "destination_class": "Gateway", + "xform_mask": None, "prefix": None, "pattern_type": "Route Pattern", + }] + client = FakeAxlClient(rows) + result = patterns_targeting_device(client, "GW", expand_wildcards="enumerate") + skipped = result["patterns"][0] + assert skipped["enumerated_digits"] is None + # `@` is checked before `.` in the impl; either is a valid reason + # for `9.@` since both render the pattern non-enumerable. + assert "'@'" in skipped["enumeration_skipped"] + + def test_empty_result_returns_zero_patterns(self): + client = FakeAxlClient([]) + result = patterns_targeting_device(client, "Nonexistent-Device") + assert result["pattern_count"] == 0 + assert result["destination_class"] is None + assert result["patterns"] == [] + + def test_invalid_expand_wildcards_raises(self): + client = FakeAxlClient([]) + with pytest.raises(ValueError, match="expand_wildcards"): + patterns_targeting_device(client, "Foo", expand_wildcards="enumarate") # typo + + def test_device_name_is_quote_escaped(self): + # SQL injection attempt via device name; _esc should handle it + client = FakeAxlClient([]) + patterns_targeting_device(client, "fake'; DROP TABLE device --") + # The query was issued; double-single-quote indicates proper escaping + assert "fake''" in client.queries[0] + + def test_docstring_compose_note_present_in_response(self, rightfax_rows): + """The transitive-reachability composition note (Q1 docstring requirement) + must be in the response, so an operator browsing JSON sees it without + reading the docstring.""" + client = FakeAxlClient(rightfax_rows) + result = patterns_targeting_device(client, "RightFax-RL") + assert "_note" in result + assert "translation" in result["_note"].lower() + assert "route_translation_chain" in result["_note"]