route_plan: add patterns_targeting_device + wildcard count/enumerate

Inverse of list_route_lists_and_groups — given a destination device,
return every numplan whose direct target is that device. Closes the
highest-priority gap from cucx-docs's prompt-suggestions handoff
(see axl/agent-threads/cucx-prompt-suggestions/001 + 003 for the
multi-message context).

Schema walk: device → devicenumplanmap → numplan, with LEFT JOINs
to routepartition + typepatternusage for friendly output. M:N is the
landmark — numplan does NOT have a direct fkdevice column, which was
cucx-docs's literal multi-attempt schema-discovery experience that
motivated the tool.

Three wildcard-expansion modes per cucx-docs Q2:
  - False (default) — patterns intact
  - "count" — per-pattern digit-string estimate + total surface;
    unbounded patterns (`!`, `@`) reported as None count and force
    the total to None so an auditor sees the partial measurement;
    per-pattern caps at 10,000 to prevent runaway estimation
  - "enumerate" — actual digit-string list, only for tightly-bounded
    patterns (no `!`, no `@`, no `.`, ≤ 1,000 expansions); patterns
    that violate any constraint return null with a skip reason

Direct-target only per Q1 — full transitive reachability composes
with route_lists_and_groups + route_translation_chain, called out
both in the docstring and in the response's _note field.

37 new tests cover the math layer (count/enumerate helpers in
isolation), the bounds/cap behavior, the unbounded-pattern flagging,
empty-result handling, SQL injection escaping, and the integration
through a FakeAxlClient. Full suite: 219/219 passing.

Live-cluster smoke test pending — cluster TLS intermittently failing
this session; will re-verify once stable.
This commit is contained in:
Ryan Malloy 2026-05-05 17:38:07 -06:00
parent 38307aad67
commit cd08a7ec76
3 changed files with 536 additions and 0 deletions

View File

@ -261,6 +261,261 @@ def inspect_pattern(
}
def patterns_targeting_device(
client: "AxlClient",
device_name: str,
expand_wildcards: str | bool = False,
) -> dict:
"""Inverse of `list_route_lists_and_groups` — given a destination device,
return every numplan whose direct target is that device.
Walks `device devicenumplanmap numplan` (the M:N join `numplan` does
NOT have a direct `fkdevice`; that's a common schema gotcha).
For "what numbers can ultimately reach this device" (including
translation-pattern hops and route-list intermediaries), compose this
tool with `list_route_lists_and_groups` and `route_translation_chain`.
This returns only direct-target patterns.
Args:
device_name: exact device name. Route Lists, SIP Trunks, CTI Route
Points, Phones any device that can be a direct numplan target.
expand_wildcards: how to handle wildcard patterns in the result.
- `False` (default): patterns returned as-is, syntax intact
- `"count"`: each pattern annotated with a per-pattern digit-string
count estimate; total surface across all patterns also returned.
Estimates capped at `_COUNT_CAP` per pattern; unbounded patterns
(`!`, `@`) reported as `None` count with `unbounded: true`
- `"enumerate"`: actually list digit strings only safe for
tightly bounded patterns (no `!`, no `@`, no `.`,
`_ENUMERATE_CAP` total digit strings). Patterns that violate
either constraint are returned with `enumerated_digits: null` +
`enumeration_skipped: <reason>`
"""
if expand_wildcards not in (False, "count", "enumerate"):
raise ValueError(
f"expand_wildcards must be False, 'count', or 'enumerate'; "
f"got {expand_wildcards!r}"
)
sql = f"""
SELECT
n.dnorpattern AS pattern,
rp.name AS partition,
n.description,
n.calledpartytransformationmask AS xform_mask,
n.prefixdigitsout AS prefix,
tp.name AS pattern_type,
d.name AS destination_device,
tc.name AS destination_class
FROM device d
JOIN typeclass tc ON d.tkclass = tc.enum
JOIN devicenumplanmap m ON m.fkdevice = d.pkid
JOIN numplan n ON m.fknumplan = n.pkid
LEFT OUTER JOIN routepartition rp ON n.fkroutepartition = rp.pkid
LEFT OUTER JOIN typepatternusage tp ON n.tkpatternusage = tp.enum
WHERE d.name = '{_esc(device_name)}'
ORDER BY n.dnorpattern
"""
result = client.execute_sql_query(sql)
rows = result["rows"]
patterns: list[dict] = []
total_count: int | None = 0 # None once we hit an unbounded pattern in 'count' mode
truncated = False
for row in rows:
entry: dict = {
"pattern": row.get("pattern"),
"partition": row.get("partition"),
"description": row.get("description"),
"pattern_type": row.get("pattern_type"),
"xform_mask": row.get("xform_mask"),
"prefix": row.get("prefix"),
}
pattern = entry["pattern"] or ""
if expand_wildcards == "count":
count, unbounded, capped = _count_pattern_digit_strings(pattern)
entry["digit_count"] = count
if unbounded:
entry["unbounded"] = True
total_count = None
if capped:
entry["count_capped"] = True
truncated = True
if total_count is not None and count is not None:
total_count += count
elif expand_wildcards == "enumerate":
digits, reason = _enumerate_pattern_digit_strings(pattern)
entry["enumerated_digits"] = digits
if digits is None:
entry["enumeration_skipped"] = reason
patterns.append(entry)
out = {
"device_name": device_name,
"destination_class": rows[0]["destination_class"] if rows else None,
"pattern_count": len(patterns),
"patterns": patterns,
"_note": (
"Direct-target patterns only. For transitive reachability "
"(translation patterns, route-list intermediaries), compose "
"with route_lists_and_groups + route_translation_chain."
),
}
if expand_wildcards == "count":
out["total_digit_count"] = total_count
if truncated:
out["truncated"] = True
return out
# Wildcard expansion bounds — per cucx-docs Q2 reply (msg 003 in
# axl/agent-threads/cucx-prompt-suggestions/). 'count' caps per-pattern
# estimates at _COUNT_CAP to avoid runaway computation on patterns like
# X{8} (1e8); 'enumerate' caps total digit strings at _ENUMERATE_CAP and
# refuses unbounded or `.`-containing patterns entirely.
_COUNT_CAP = 10_000
_ENUMERATE_CAP = 1_000
def _count_pattern_digit_strings(pattern: str) -> tuple[int | None, bool, bool]:
"""Estimate the number of distinct digit strings a pattern can match.
Returns (count, unbounded, capped):
- count: estimate, or None if unbounded
- unbounded: True if pattern contains `!` or `@` (treated as
infinite the @ NANP/international dial plan is bounded but
too large to enumerate meaningfully)
- capped: True if the estimate hit _COUNT_CAP and was truncated
"""
if not pattern:
return 0, False, False
# `+` is a literal in CUCM (E.164 prefix); strip for analysis but it
# doesn't change the count. Same for `.` (purely visual separator).
p = pattern.replace("+", "").replace(".", "")
if "!" in p or "@" in p:
return None, True, False
count = 1
i = 0
while i < len(p):
ch = p[i]
if ch == "X":
count *= 10
elif ch == "[":
close = p.find("]", i)
if close == -1:
# Malformed — be conservative
return None, False, False
spec = p[i + 1 : close]
count *= _count_charclass(spec)
i = close
# other characters are literals contributing factor 1
if count > _COUNT_CAP:
return _COUNT_CAP, False, True
i += 1
return count, False, False
def _count_charclass(spec: str) -> int:
"""Count the size of a CUCM character class like '0-9' or 'abc' or '2-9a-c'.
Negation (`^`) is technically supported in CUCM but rare; treat as
its set size (the negated set has size 10 - len(set)).
"""
if not spec:
return 1
negated = spec.startswith("^")
body = spec[1:] if negated else spec
chars: set[str] = set()
i = 0
while i < len(body):
if i + 2 < len(body) and body[i + 1] == "-":
lo, hi = body[i], body[i + 2]
if lo.isdigit() and hi.isdigit() and lo <= hi:
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
i += 3
else:
chars.add(body[i])
i += 1
size = len(chars) or 1
return (10 - size) if negated else size
def _enumerate_pattern_digit_strings(pattern: str) -> tuple[list[str] | None, str | None]:
"""Actually enumerate the digit strings a pattern matches.
Returns (digits, skip_reason):
- digits: list of digit strings, or None if skipped
- skip_reason: human-readable explanation when skipped
Only safe for tightly-bounded patterns. Per cucx-docs:
no `!`, no `@`, no `.`, total _ENUMERATE_CAP.
"""
if not pattern:
return [""], None
if "!" in pattern:
return None, "contains '!' (unbounded)"
if "@" in pattern:
return None, "contains '@' (NANP/international dial plan; too large)"
if "." in pattern:
return None, "contains '.' (separator semantics; not enumerable per spec)"
p = pattern.replace("+", "")
count, _, _ = _count_pattern_digit_strings(p)
if count is None:
return None, "estimated count is unbounded"
if count > _ENUMERATE_CAP:
return None, f"estimated count {count} exceeds cap {_ENUMERATE_CAP}; use 'count' mode"
out = [""]
i = 0
while i < len(p):
ch = p[i]
if ch == "X":
out = [s + d for s in out for d in "0123456789"]
i += 1
elif ch == "[":
close = p.find("]", i)
if close == -1:
return None, "malformed character class"
spec = p[i + 1 : close]
chars = _expand_charclass(spec)
out = [s + c for s in out for c in chars]
i = close + 1
else:
out = [s + ch for s in out]
i += 1
return out, None
def _expand_charclass(spec: str) -> list[str]:
"""Expand a character class like '0-9' or 'abc' to its member list."""
if not spec:
return [""]
negated = spec.startswith("^")
body = spec[1:] if negated else spec
chars: set[str] = set()
i = 0
while i < len(body):
if i + 2 < len(body) and body[i + 1] == "-":
lo, hi = body[i], body[i + 2]
if lo.isdigit() and hi.isdigit() and lo <= hi:
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
i += 3
else:
chars.add(body[i])
i += 1
if negated:
chars = set("0123456789") - chars
return sorted(chars)
def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member gateways/trunks.

View File

@ -239,6 +239,40 @@ def route_inspect_pattern(pattern: str, partition: str | None = None) -> dict:
return route_plan.inspect_pattern(_client(), pattern, partition)
@mcp.tool
def route_patterns_targeting(
device_name: str,
expand_wildcards: str | bool = False,
) -> dict:
"""Inverse of `route_lists_and_groups`: given a destination device name,
return every numplan whose direct target is that device.
Answers questions like *"which DIDs route to this fax server?"* in a
single call instead of a multi-attempt schema-discovery walk through
`numplan` `devicenumplanmap` `device`.
Direct-target only does NOT walk through translation patterns or
route-list intermediaries. For full transitive reachability, compose
with `route_lists_and_groups` and `route_translation_chain`.
Args:
device_name: exact device name (case-sensitive). Route Lists, SIP
trunks, CTI Route Points, Phones any device that can be a
direct numplan target.
expand_wildcards: how to handle wildcard patterns:
False (default) patterns returned as-is.
"count" annotate each pattern with a digit-string count
estimate; total surface across all patterns also returned.
Patterns containing `!` or `@` are flagged unbounded.
"enumerate" actually list digit strings. Only safe for
tightly-bounded patterns; returns a skip reason for any
pattern with `!`, `@`, `.`, or estimated >1000 expansions.
"""
return route_plan.patterns_targeting_device(
_client(), device_name, expand_wildcards
)
@mcp.tool
def route_lists_and_groups(name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member devices.

View File

@ -0,0 +1,247 @@
"""Tests for patterns_targeting_device + wildcard count/enumerate helpers.
Single-call tool that answers "what numplan rows directly target this
device" — the inverse of `list_route_lists_and_groups`. Direct-target
only per cucx-docs Q1 (msg 003 in axl/agent-threads/cucx-prompt-suggestions/).
Three wildcard-expansion modes per cucx-docs Q2:
- False: patterns returned as-is
- "count": digit-string estimates per pattern + total
- "enumerate": actual digit-string list, only for tightly-bounded patterns
"""
import pytest
from mcaxl.route_plan import (
_COUNT_CAP,
_ENUMERATE_CAP,
_count_pattern_digit_strings,
_enumerate_pattern_digit_strings,
patterns_targeting_device,
)
class FakeAxlClient:
"""Returns canned SQL results — no network, no zeep."""
def __init__(self, rows: list[dict]):
self._rows = rows
self.queries: list[str] = []
def execute_sql_query(self, sql: str) -> dict:
self.queries.append(sql)
return {"row_count": len(self._rows), "rows": self._rows}
# ─── Wildcard counting (the math layer) ────────────────────────────────
class TestCountPatternDigitStrings:
"""The estimator for how many distinct digit strings a pattern matches."""
@pytest.mark.parametrize("pattern,expected", [
("1234", 1), # all literals
("X", 10), # one wildcard digit
("XX", 100),
("XXX", 1_000),
("20878594XX", 100), # the real BMH RightFax block
("9.20878594XX", 100), # `.` is a separator, ignored
("+12085243000", 1), # E.164 literal
("[2-9]XXXXXX", 7_000_000), # NANP-ish, hits cap → returns _COUNT_CAP
("[2-9]", 8), # range
("[abc]", 3), # set
("[0-9]", 10), # equiv to X
("[02468]", 5), # explicit even digits
])
def test_bounded_patterns(self, pattern, expected):
count, unbounded, capped = _count_pattern_digit_strings(pattern)
assert unbounded is False
if expected > _COUNT_CAP:
assert count == _COUNT_CAP
assert capped is True
else:
assert count == expected
assert capped is False
@pytest.mark.parametrize("pattern", ["!", "9.!", "9.@", "@", "1!"])
def test_unbounded_patterns_flagged(self, pattern):
count, unbounded, capped = _count_pattern_digit_strings(pattern)
assert count is None
assert unbounded is True
assert capped is False
def test_empty_pattern_is_zero(self):
count, unbounded, capped = _count_pattern_digit_strings("")
assert count == 0
assert unbounded is False
assert capped is False
def test_malformed_charclass_is_unbounded(self):
# `[abc` with no closing bracket — be conservative
count, unbounded, capped = _count_pattern_digit_strings("12[abc")
assert count is None
# ─── Wildcard enumeration (the strict-bounds layer) ────────────────────
class TestEnumeratePatternDigitStrings:
"""The strict enumerator — only safe for tightly-bounded patterns."""
def test_literals_only_returns_self(self):
digits, reason = _enumerate_pattern_digit_strings("1234")
assert digits == ["1234"]
assert reason is None
def test_single_X_expands_to_ten(self):
digits, reason = _enumerate_pattern_digit_strings("12X")
assert digits == [f"12{d}" for d in "0123456789"]
assert reason is None
def test_charclass_expands(self):
digits, _ = _enumerate_pattern_digit_strings("[2-4]")
assert digits == ["2", "3", "4"]
def test_block_expansion(self):
# 100 DIDs in a /XX block
digits, _ = _enumerate_pattern_digit_strings("20878594XX")
assert len(digits) == 100
assert "2087859400" in digits
assert "2087859499" in digits
@pytest.mark.parametrize("pattern,expected_reason_fragment", [
("9.20878594XX", "'.'"), # cucx-docs explicit: no `.`
("9!", "'!'"), # unbounded
("9.@", "'@'"), # `@` is checked before `.` — either reason is valid
("@", "'@'"),
])
def test_unsafe_patterns_skipped(self, pattern, expected_reason_fragment):
digits, reason = _enumerate_pattern_digit_strings(pattern)
assert digits is None
assert expected_reason_fragment in reason
def test_oversized_pattern_skipped(self):
# 4 X's = 10,000 expansions; cap is 1,000
digits, reason = _enumerate_pattern_digit_strings("XXXX")
assert digits is None
assert "exceeds cap" in reason
# ─── The tool itself (integration with FakeAxlClient) ──────────────────
class TestPatternsTargetingDevice:
"""End-to-end through the FakeAxlClient layer."""
@pytest.fixture
def rightfax_rows(self):
"""Realistic-shape rows: 1 wildcard block + 2 internal carveouts."""
base = {
"destination_device": "RightFax-RL",
"destination_class": "Route List",
"xform_mask": None,
"prefix": None,
"pattern_type": "Route Pattern",
}
return [
{**base, "pattern": "20878594XX", "partition": "External-PT",
"description": "RightFax inbound block (100 DIDs)"},
{**base, "pattern": "5550100", "partition": "Internal-PT",
"description": "Reception → fax internal hop"},
{**base, "pattern": "5550101", "partition": "Internal-PT",
"description": "Front desk → fax internal hop"},
]
def test_default_returns_patterns_intact(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(client, "RightFax-RL")
assert result["device_name"] == "RightFax-RL"
assert result["destination_class"] == "Route List"
assert result["pattern_count"] == 3
# No expansion fields present on default
assert "total_digit_count" not in result
for p in result["patterns"]:
assert "digit_count" not in p
assert "enumerated_digits" not in p
def test_count_mode_per_pattern_and_total(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(
client, "RightFax-RL", expand_wildcards="count"
)
# Per-pattern counts inline (cucx-docs Q2 refinement)
counts = {p["pattern"]: p["digit_count"] for p in result["patterns"]}
assert counts == {
"20878594XX": 100,
"5550100": 1,
"5550101": 1,
}
assert result["total_digit_count"] == 102
def test_count_mode_with_unbounded_pattern(self):
rows = [
{"pattern": "9.@", "partition": "PSTN-PT", "description": "PSTN catchall",
"destination_device": "Gateway-1", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
{"pattern": "5550100", "partition": "Internal-PT", "description": "extension",
"destination_device": "Gateway-1", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
]
client = FakeAxlClient(rows)
result = patterns_targeting_device(client, "Gateway-1", expand_wildcards="count")
# Total goes None once any pattern is unbounded — auditor must see this
assert result["total_digit_count"] is None
unbounded = next(p for p in result["patterns"] if p["pattern"] == "9.@")
assert unbounded["unbounded"] is True
assert unbounded["digit_count"] is None
def test_enumerate_mode_for_safe_patterns(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(
client, "RightFax-RL", expand_wildcards="enumerate"
)
block = next(p for p in result["patterns"] if p["pattern"] == "20878594XX")
assert len(block["enumerated_digits"]) == 100
# The internal-extension carveouts are 1:1 enumerations of themselves
carveout = next(p for p in result["patterns"] if p["pattern"] == "5550100")
assert carveout["enumerated_digits"] == ["5550100"]
def test_enumerate_mode_skips_unsafe_with_reason(self):
rows = [{
"pattern": "9.@", "partition": "PSTN", "description": "PSTN catchall",
"destination_device": "GW", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern",
}]
client = FakeAxlClient(rows)
result = patterns_targeting_device(client, "GW", expand_wildcards="enumerate")
skipped = result["patterns"][0]
assert skipped["enumerated_digits"] is None
# `@` is checked before `.` in the impl; either is a valid reason
# for `9.@` since both render the pattern non-enumerable.
assert "'@'" in skipped["enumeration_skipped"]
def test_empty_result_returns_zero_patterns(self):
client = FakeAxlClient([])
result = patterns_targeting_device(client, "Nonexistent-Device")
assert result["pattern_count"] == 0
assert result["destination_class"] is None
assert result["patterns"] == []
def test_invalid_expand_wildcards_raises(self):
client = FakeAxlClient([])
with pytest.raises(ValueError, match="expand_wildcards"):
patterns_targeting_device(client, "Foo", expand_wildcards="enumarate") # typo
def test_device_name_is_quote_escaped(self):
# SQL injection attempt via device name; _esc should handle it
client = FakeAxlClient([])
patterns_targeting_device(client, "fake'; DROP TABLE device --")
# The query was issued; double-single-quote indicates proper escaping
assert "fake''" in client.queries[0]
def test_docstring_compose_note_present_in_response(self, rightfax_rows):
"""The transitive-reachability composition note (Q1 docstring requirement)
must be in the response, so an operator browsing JSON sees it without
reading the docstring."""
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(client, "RightFax-RL")
assert "_note" in result
assert "translation" in result["_note"].lower()
assert "route_translation_chain" in result["_note"]