Initial mcp-cucm-axl

Read-only MCP server for Cisco Unified CM 15 AXL — built for LLM-driven
cluster auditing, with a particular focus on the Route Plan Report:
partitions, calling search spaces, route patterns, translation patterns,
called/calling party transformations, and digit-discard instructions.

Pairs intentionally with the sibling mcp-cisco-docs server (live
cluster state + vendor docs in one LLM context).

Architecture:
  - zeep SOAP client to CUCM AXL
  - WSDL bootstrap from Cisco's axlsqltoolkit.zip (auto-extract on
    first launch; zip is gitignored, vendor-licensed)
  - SQLite response cache at ~/.cache/mcp-cucm-axl/responses/
  - Schema-grounded prompts that pull chunks from the sibling
    cisco-docs index (docs_loader.py)

Read-only by structural guarantee — never registers AXL write methods
(no executeSQLUpdate, no add*/update*/remove*/apply*/reset*/restart*
tools). SQL queries also client-side validated (sql_validator.py) to
begin with SELECT or WITH.

Tools exposed:
  Foundational: axl_version, axl_sql, axl_list_tables,
                axl_describe_table, cache_stats, cache_clear
  Route plan:   route_partitions, route_calling_search_spaces,
                route_patterns, route_inspect_pattern,
                route_lists_and_groups, route_translation_chain,
                route_digit_discard_instructions

Prompts (schema-grounded):
  route_plan_overview, investigate_pattern, audit_routing,
  cucm_sql_help

Tests cover cache, docs_loader, normalize, sql_validator, wildcard.
This commit is contained in:
Ryan Malloy 2026-04-25 20:29:18 -06:00
commit 8b3da9d729
21 changed files with 4734 additions and 0 deletions

3
.env.example Normal file
View File

@ -0,0 +1,3 @@
AXL_URL=https://cucm-pub:8443/axl
AXL_USER=AxlUser
AXL_PASS=TopSecretPasswordNoSpecialCharacters

15
.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
__pycache__/
*.py[cod]
*.egg-info/
.venv/
.pytest_cache/
.ruff_cache/
dist/
build/
*.sqlite
*.sqlite-journal
.env.local
# Cisco AXL Toolkit — vendor-licensed, do not redistribute
axlsqltoolkit.zip
schema/

13
.mcp.json Normal file
View File

@ -0,0 +1,13 @@
{
"mcpServers": {
"cucm-axl": {
"command": "uv",
"args": [
"run",
"--directory",
"/home/rpm/bingham/axl",
"mcp-cucm-axl"
]
}
}
}

130
README.md Normal file
View File

@ -0,0 +1,130 @@
# mcp-cucm-axl
Read-only MCP server for **Cisco Unified CM 15** AXL — built for LLM-driven
cluster auditing, with a particular focus on the **Route Plan Report**:
partitions, calling search spaces, route patterns, translation patterns,
called/calling party transformations, and digit-discard instructions.
## Why this exists
CUCM's admin UI is great for one-config-at-a-time work but painful for
audit/discovery questions like:
- "Which translation patterns rewrite the calling party number, and why?"
- "Which CSSs include the `Internal_PT` partition, in what order?"
- "Show me every route pattern targeting the SIP trunk to the carrier."
- "Are there partitions defined but unreachable from any CSS?"
This server gives an LLM SQL access to CUCM's Informix data dictionary,
plus focused tools that bake in the right joins for routing-audit work.
Pair it with the sibling [`mcp-cisco-docs`](../docs/) server and the LLM
gets vendor documentation alongside live cluster state — answering
"is our config consistent with Cisco's recommended baseline?" in a single
conversation.
## Read-only by structural guarantee
The server **never registers** AXL write methods. There is no
`executeSQLUpdate`, no `add*`/`update*`/`remove*`/`apply*`/`reset*`/
`restart*` tool. Read-only is enforced by *absence* of write operations,
not by runtime sanitization. Defense-in-depth: SQL queries are also
client-side validated to begin with `SELECT` or `WITH`.
## Setup
### 1. Configure environment
Edit `.env` (already gitignored):
```env
AXL_URL=https://cucm-pub:8443/axl
AXL_USER=AxlUser
AXL_PASS=...
AXL_VERIFY_TLS=false # CUCM ships self-signed certs; default off
AXL_CACHE_TTL=3600 # 1 hour; 0 disables caching
AXL_WSDL_PATH= # optional explicit WSDL location
CISCO_DOCS_INDEX_PATH= # optional override for prompt enrichment
```
### 2. Bootstrap the AXL WSDL
Download the **Cisco AXL Toolkit** from your CUCM admin UI:
> Application → Plugins → Find → "Cisco AXL Toolkit" → Download
Drop the resulting `axlsqltoolkit.zip` into the project directory. On first
launch, the server auto-extracts `schema/15.0/` into `~/.cache/mcp-cucm-axl/wsdl/15.0/`.
The zip is gitignored (Cisco-licensed; not redistributable).
Alternatives (in resolution order):
```bash
# A: explicit zip elsewhere
export AXL_WSDL_ZIP=/path/to/axlsqltoolkit.zip
# B: explicit WSDL file
export AXL_WSDL_PATH=/path/to/schema/15.0/AXLAPI.wsdl
# C: pre-populated cache directory
mkdir -p ~/.cache/mcp-cucm-axl/wsdl/15.0/
cp /path/to/schema/15.0/* ~/.cache/mcp-cucm-axl/wsdl/15.0/
```
### 3. Install + run
```bash
uv sync
uv run mcp-cucm-axl
```
Or via the bundled `.mcp.json`, automatically registered when Claude Code
opens this directory.
## Tool surface
### Foundational
| Tool | Purpose |
|---|---|
| `axl_version()` | Cluster version sanity check |
| `axl_sql(query)` | Execute a SELECT against Informix data dictionary |
| `axl_list_tables(pattern=None)` | Discover Informix tables |
| `axl_describe_table(name)` | Column metadata for one table |
| `cache_stats()`, `cache_clear(pattern=None)` | Cache plumbing |
### Route plan
| Tool | Purpose |
|---|---|
| `route_partitions()` | All partitions with member counts |
| `route_calling_search_spaces(name=None)` | CSS list with ordered partitions |
| `route_patterns(kind=None, partition=None, filter=None)` | Route Plan Report |
| `route_inspect_pattern(pattern, partition=None)` | One-pattern deep dive + reverse CSS lookup |
| `route_lists_and_groups(name=None)` | Route list → route group → device chain |
| `route_translation_chain(number, css_name=None)` | "What patterns might match this number?" (literal/prefix only) |
| `route_digit_discard_instructions()` | DDI catalog |
## Prompts
Schema-grounded conversation seeds. They pull relevant chunks from the
sibling `cisco-docs` index and embed them inline:
- `route_plan_overview` — fresh audit conversation seed
- `investigate_pattern(pattern, partition=None)` — deep-dive a specific pattern
- `audit_routing(focus="full")` — comprehensive audit walkthrough
- `cucm_sql_help(question)` — catch-all for arbitrary SQL questions
## Cache
Responses are cached in SQLite at `~/.cache/mcp-cucm-axl/responses/axl_responses.sqlite`.
Cache survives restarts. Clear with `cache_clear()` after a known config change.
## Notes
- `route_translation_chain` does literal/prefix matching only. CUCM's actual
matcher evaluates wildcards (`X`, `!`, `[0-9]`, etc.) and selects the
longest match. Treat results as "patterns to investigate" rather than
"definitive route."
- Pattern type codes (`tkpatternusage`) used by `route_patterns(kind=...)` are
stable across CUCM versions but enumerated against the `typepatternusage`
table at query time, so any cluster-specific custom types still work.

40
pyproject.toml Normal file
View File

@ -0,0 +1,40 @@
[project]
name = "mcp-cucm-axl"
version = "0.1.0"
description = "Read-only MCP server for CUCM 15 AXL — exposes executeSQLQuery + Informix data dictionary introspection, with schema-grounded prompts that pull from the sibling cisco-docs index. Built for LLM-driven cluster auditing."
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.11"
dependencies = [
"fastmcp>=3.2",
"zeep>=4.3",
"platformdirs>=4.9",
"numpy>=1.26",
"python-dotenv>=1.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.0",
"pytest-asyncio>=0.24",
]
[project.scripts]
mcp-cucm-axl = "mcp_cucm_axl.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_cucm_axl"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

View File

@ -0,0 +1,5 @@
"""mcp-cucm-axl — read-only MCP server for CUCM 15 AXL."""
from .server import main
__all__ = ["main"]

129
src/mcp_cucm_axl/cache.py Normal file
View File

@ -0,0 +1,129 @@
"""SQLite-backed TTL cache for AXL responses.
Keyed on (method_name, sorted_kwargs_json). Cache survives server restarts,
which makes exploratory audit sessions dramatically faster the LLM can
re-run the same `listPhone` queries across conversations without paying
the SOAP round-trip every time.
"""
from __future__ import annotations
import json
import sqlite3
import time
from pathlib import Path
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS axl_cache (
cache_key TEXT PRIMARY KEY,
method TEXT NOT NULL,
args_json TEXT NOT NULL,
result_json TEXT NOT NULL,
created_at REAL NOT NULL,
expires_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS axl_cache_method_idx ON axl_cache(method);
CREATE INDEX IF NOT EXISTS axl_cache_expires_idx ON axl_cache(expires_at);
"""
class AxlCache:
"""SQLite TTL cache. Thread-safe via per-call connections."""
def __init__(self, db_path: Path, default_ttl: int):
self.db_path = db_path
self.default_ttl = default_ttl
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as c:
c.executescript(SCHEMA)
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
@staticmethod
def _make_key(method: str, kwargs: dict) -> str:
# sort_keys gives us a deterministic key regardless of dict order
return f"{method}::{json.dumps(kwargs, sort_keys=True, default=str)}"
def get(self, method: str, kwargs: dict) -> Any | None:
if self.default_ttl <= 0:
return None
key = self._make_key(method, kwargs)
now = time.time()
with self._conn() as c:
row = c.execute(
"SELECT result_json FROM axl_cache WHERE cache_key = ? AND expires_at > ?",
(key, now),
).fetchone()
return json.loads(row[0]) if row else None
def set(self, method: str, kwargs: dict, result: Any, ttl: int | None = None) -> None:
if self.default_ttl <= 0 and ttl is None:
return
ttl = ttl if ttl is not None else self.default_ttl
if ttl <= 0:
return
key = self._make_key(method, kwargs)
now = time.time()
with self._conn() as c:
c.execute(
"""
INSERT OR REPLACE INTO axl_cache
(cache_key, method, args_json, result_json, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
key,
method,
json.dumps(kwargs, sort_keys=True, default=str),
json.dumps(result, default=str),
now,
now + ttl,
),
)
def stats(self) -> dict:
now = time.time()
with self._conn() as c:
total = c.execute("SELECT COUNT(*) FROM axl_cache").fetchone()[0]
live = c.execute(
"SELECT COUNT(*) FROM axl_cache WHERE expires_at > ?", (now,)
).fetchone()[0]
by_method = {
row[0]: row[1]
for row in c.execute(
"SELECT method, COUNT(*) FROM axl_cache "
"WHERE expires_at > ? GROUP BY method ORDER BY 2 DESC",
(now,),
).fetchall()
}
return {
"db_path": str(self.db_path),
"default_ttl_seconds": self.default_ttl,
"total_entries": total,
"live_entries": live,
"expired_entries": total - live,
"by_method": by_method,
}
def clear(self, method_pattern: str | None = None) -> int:
with self._conn() as c:
if method_pattern:
cursor = c.execute(
"DELETE FROM axl_cache WHERE method LIKE ?",
(method_pattern.replace("*", "%"),),
)
else:
cursor = c.execute("DELETE FROM axl_cache")
return cursor.rowcount
def purge_expired(self) -> int:
with self._conn() as c:
cursor = c.execute("DELETE FROM axl_cache WHERE expires_at <= ?", (time.time(),))
return cursor.rowcount

299
src/mcp_cucm_axl/client.py Normal file
View File

@ -0,0 +1,299 @@
"""AXL SOAP client wrapper.
Lazy connection instantiated on first tool call, not at server boot.
This means the FastMCP server registers tools and prompts immediately,
even if the cluster is unreachable, and the user gets a clear error
only when they actually invoke a tool that needs CUCM.
"""
from __future__ import annotations
import os
import sys
import urllib3
from pathlib import Path
from typing import Any
from requests import Session
from requests.auth import HTTPBasicAuth
from zeep import Client, Settings
from zeep.cache import SqliteCache
from zeep.transports import Transport
from .cache import AxlCache
from .sql_validator import validate_select
from .wsdl_loader import resolve_wsdl_path
class AxlClient:
"""Lazy-loaded zeep client for CUCM AXL."""
def __init__(self, response_cache: AxlCache):
self._client: Client | None = None
self._service: Any = None
self._response_cache = response_cache
self._connection_error: str | None = None
def _ensure_connected(self) -> None:
if self._service is not None:
return
if self._connection_error is not None:
raise RuntimeError(self._connection_error)
try:
url = os.environ["AXL_URL"]
user = os.environ["AXL_USER"]
password = os.environ["AXL_PASS"]
except KeyError as e:
raise RuntimeError(
f"Missing required env var {e.args[0]}. "
f"Set AXL_URL, AXL_USER, AXL_PASS in .env or the environment."
) from None
# CUCM's AXL endpoint 302-redirects /axl to /axl/. The redirect
# converts POST to GET (standard HTTP/1.1 behavior for 302), which
# makes the SOAP request silently fail with an HTML status page.
# Normalize the trailing slash so users don't need to remember.
if not url.rstrip().endswith("/"):
url = url.rstrip() + "/"
verify_tls = os.environ.get("AXL_VERIFY_TLS", "false").lower() in ("1", "true", "yes")
if not verify_tls:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
wsdl_path = resolve_wsdl_path()
session = Session()
session.verify = verify_tls
session.auth = HTTPBasicAuth(user, password)
# zeep's own WSDL cache (separate from our response cache) keeps
# repeat startups fast — it parses the WSDL once and reuses
from platformdirs import user_cache_dir
zeep_cache_path = Path(user_cache_dir("mcp-cucm-axl")) / "zeep_wsdl.db"
zeep_cache_path.parent.mkdir(parents=True, exist_ok=True)
transport = Transport(
session=session,
cache=SqliteCache(path=str(zeep_cache_path), timeout=86400),
timeout=30,
)
try:
self._client = Client(
wsdl=str(wsdl_path),
settings=Settings(strict=False, xml_huge_tree=True),
transport=transport,
)
# AXL endpoint is the AXL_URL itself; override the WSDL's default
# service location which usually points at a placeholder host.
self._service = self._client.create_service(
"{http://www.cisco.com/AXLAPIService/}AXLAPIBinding",
url,
)
print(
f"[mcp-cucm-axl] connected to {url} (TLS verify={verify_tls})",
file=sys.stderr,
flush=True,
)
except Exception as e:
self._connection_error = f"AXL connection failed: {e}"
raise RuntimeError(self._connection_error) from e
# ---- read-only operations ----
def get_ccm_version(self) -> dict:
cached = self._response_cache.get("getCCMVersion", {})
if cached is not None:
return cached
self._ensure_connected()
resp = self._service.getCCMVersion()
# zeep CompoundValue → dict; the actual payload is under "return"
full = _zeep_to_dict(resp)
result = full.get("return", full) if isinstance(full, dict) else full
self._response_cache.set("getCCMVersion", {}, result, ttl=3600)
return result
def execute_sql_query(self, query: str) -> dict:
cleaned = validate_select(query)
cached = self._response_cache.get("executeSQLQuery", {"sql": cleaned})
if cached is not None:
return {**cached, "_cache": "hit"}
self._ensure_connected()
resp = self._service.executeSQLQuery(sql=cleaned)
rows = _parse_sql_rows(resp)
result = {"row_count": len(rows), "rows": rows, "query": cleaned}
self._response_cache.set("executeSQLQuery", {"sql": cleaned}, result)
return {**result, "_cache": "miss"}
def list_informix_tables(self, pattern: str | None = None) -> dict:
# systables is the Informix system catalog. tabid > 99 filters out
# internal/system tables and leaves CUCM's data dictionary tables.
if pattern:
safe_pattern = pattern.replace("'", "''")
sql = (
"SELECT tabname FROM systables "
f"WHERE tabid > 99 AND tabname LIKE '{safe_pattern}' "
"ORDER BY tabname"
)
else:
sql = "SELECT tabname FROM systables WHERE tabid > 99 ORDER BY tabname"
result = self.execute_sql_query(sql)
names = [row.get("tabname") for row in result.get("rows", []) if row.get("tabname")]
return {"table_count": len(names), "tables": names, "pattern": pattern}
def describe_informix_table(self, table_name: str) -> dict:
# Join syscolumns to systables to get column metadata for one table.
# coltype encoding: low byte = type code, high bit = NOT NULL flag.
safe = table_name.replace("'", "''")
sql = (
"SELECT c.colname, c.coltype, c.collength "
"FROM syscolumns c, systables t "
f"WHERE t.tabname = '{safe}' AND c.tabid = t.tabid "
"ORDER BY c.colno"
)
result = self.execute_sql_query(sql)
columns = []
for row in result.get("rows", []):
coltype_raw = int(row.get("coltype", 0))
type_code = coltype_raw & 0xFF
not_null = bool(coltype_raw & 0x100)
columns.append({
"name": row.get("colname"),
"informix_type_code": type_code,
"type": _INFORMIX_TYPE_NAMES.get(type_code, f"type_{type_code}"),
"length": int(row.get("collength", 0)),
"not_null": not_null,
})
if not columns:
return {"table": table_name, "error": "Table not found or has no columns."}
return {"table": table_name, "column_count": len(columns), "columns": columns}
# Informix type codes — partial list, enough for CUCM's data dictionary.
# Full list: https://www.ibm.com/docs/en/informix-servers/14.10?topic=tables-syscolumns
_INFORMIX_TYPE_NAMES = {
0: "CHAR",
1: "SMALLINT",
2: "INTEGER",
3: "FLOAT",
4: "SMALLFLOAT",
5: "DECIMAL",
6: "SERIAL",
7: "DATE",
8: "MONEY",
10: "DATETIME",
11: "BYTE",
12: "TEXT",
13: "VARCHAR",
14: "INTERVAL",
15: "NCHAR",
16: "NVARCHAR",
17: "INT8",
18: "SERIAL8",
19: "SET",
20: "MULTISET",
21: "LIST",
22: "ROW",
23: "COLLECTION",
41: "LVARCHAR",
43: "LVARCHAR",
45: "BOOLEAN",
}
def _zeep_to_dict(obj: Any) -> Any:
"""Recursively convert zeep CompoundValue objects to plain dicts/lists."""
if obj is None:
return None
if hasattr(obj, "__values__"):
return {k: _zeep_to_dict(v) for k, v in obj.__values__.items()}
if isinstance(obj, list):
return [_zeep_to_dict(item) for item in obj]
if isinstance(obj, dict):
return {k: _zeep_to_dict(v) for k, v in obj.items()}
return obj
def _parse_sql_rows(resp: Any) -> list[dict]:
"""Pull the row list out of an executeSQLQuery response.
AXL's executeSQLQuery returns rows as raw lxml elements wrapped in
`<return><row><colname>val</colname>...</row></return>`. Zeep doesn't
schema-bind these because the columns vary per query they come
through as a list of `lxml.etree._Element` row objects with column
children.
When the query matches zero rows, the response is `<return/>` (empty),
which arrives as a CompoundValue with .return = None. In that case we
must return [] NOT fall back to parsing the response envelope itself,
which would yield a phantom row of `{"return": None, "sequence": None}`.
"""
if resp is None:
return []
# Find the row container at .return / ["return"] / __values__["return"]
container = None
for accessor in (
lambda: getattr(resp, "return", None) if hasattr(resp, "return") else None,
lambda: resp.__values__.get("return") if hasattr(resp, "__values__") else None,
lambda: resp.get("return") if isinstance(resp, dict) else None,
):
try:
v = accessor()
except Exception:
v = None
if v is not None:
container = v
break
# No `return` member, or it's None → zero rows. Critical: do NOT fall
# back to parsing `resp` itself, which would produce a phantom row.
if container is None:
return []
# If the container is itself the rows list, use it; else look for .row
if isinstance(container, list):
row_iter = container
elif hasattr(container, "row"):
row_iter = container.row or []
elif isinstance(container, dict) and "row" in container:
row_iter = container["row"] or []
else:
# Container present but no obvious row collection — try iterating it
row_iter = list(container) if hasattr(container, "__iter__") else [container]
if not isinstance(row_iter, list):
row_iter = [row_iter]
out = []
for r in row_iter:
# AXL's executeSQLQuery wraps each row as a list of lxml column
# elements: [<Element colname1>, <Element colname2>, ...].
if isinstance(r, list):
out.append({
child.tag: child.text
for child in r
if hasattr(child, "tag")
})
continue
# Single lxml element with children (some response shapes)
if hasattr(r, "tag") and not isinstance(r, str):
try:
out.append({child.tag: child.text for child in r})
continue
except TypeError:
pass
if hasattr(r, "__values__"):
out.append({k: _stringify(v) for k, v in r.__values__.items()})
elif isinstance(r, dict):
out.append({k: _stringify(v) for k, v in r.items()})
else:
out.append({"value": str(r)})
return out
def _stringify(v: Any) -> Any:
if v is None or isinstance(v, (str, int, float, bool)):
return v
return str(v)

View File

@ -0,0 +1,152 @@
"""Read the sibling cisco-docs index and surface chunks for prompt enrichment.
We deliberately do NOT load sentence-transformers here (would add ~500MB to
the dep tree). Prompt parameters are well-bounded (topic strings, audit-type
enums, table names), so substring-and-keyword matching against chunk text
and heading_path gets us most of the value.
For free-text semantic queries, the prompt instructs the LLM to invoke the
sibling cisco-docs MCP server's `search_docs` tool — composition over
duplication.
Doc-name weighting: the cisco-docs index for CUCM is dominated by CLI
reference chunks (~475 of 511) where most chunks are command syntax with
no conceptual content. We bias toward conceptual docs (system-config,
feature-config, admin) and penalize cli-reference for topical questions.
The bias only matters for ranking every doc still gets matched.
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
# Default to the sibling docs index in this monorepo. Override with env var
# if mcp-cucm-axl gets used outside this layout.
_DEFAULT_INDEX_DIR = Path("/home/rpm/bingham/docs/src/assets/.cisco-docs-index")
# Doc-name multipliers — higher = preferred for conceptual prompts.
# Keys match the `doc` field in indexed chunks.
_DOC_WEIGHTS: dict[str, float] = {
"system-config-guide": 3.0,
"feature-config-guide": 2.5,
"admin-guide": 2.0,
"interop-sip-trunking-guide": 1.5,
"security-guide": 1.2,
"recording-use-cases": 1.0,
"rtmt-guide": 0.8,
"cli-reference": 0.3, # mostly command syntax, low conceptual signal
"release-notes": 0.5,
"hardware-compat": 0.2,
"server-os-compat": 0.2,
}
class DocsIndex:
"""In-memory chunk store with keyword filtering. Light, fast, no torch."""
def __init__(self, chunks: list[dict], meta: dict):
self.chunks = chunks
self.meta = meta
@classmethod
def load(cls, index_dir: Path | None = None) -> "DocsIndex | None":
index_dir = index_dir or Path(
os.environ.get("CISCO_DOCS_INDEX_PATH", _DEFAULT_INDEX_DIR)
)
chunks_path = index_dir / "chunks.jsonl"
meta_path = index_dir / "index_meta.json"
if not chunks_path.exists() or not meta_path.exists():
print(
f"[mcp-cucm-axl] cisco-docs index not found at {index_dir}; "
f"prompts will run without schema enrichment.",
file=sys.stderr,
flush=True,
)
return None
meta = json.loads(meta_path.read_text())
chunks = [
json.loads(line)
for line in chunks_path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
print(
f"[mcp-cucm-axl] loaded {len(chunks)} doc chunks from {index_dir}",
file=sys.stderr,
flush=True,
)
return cls(chunks, meta)
def cucm_chunks(self) -> list[dict]:
return [c for c in self.chunks if c.get("product") == "cucm"]
def find(
self,
keywords: list[str],
product: str = "cucm",
max_chunks: int = 6,
max_chars_per_chunk: int = 800,
) -> list[dict]:
"""Score chunks by keyword hits in heading_path + text. Lowercase-insensitive.
Heading hits weight 3x text hits heading paths are a much better
topical signal than incidental text mentions.
"""
if not keywords:
return []
kws = [k.lower() for k in keywords if k]
scored: list[tuple[float, dict]] = []
for chunk in self.chunks:
if product and chunk.get("product") != product:
continue
heading = " ".join(chunk.get("heading_path") or []).lower()
text = (chunk.get("text") or "").lower()
doc = chunk.get("doc") or ""
doc_lower = doc.lower()
raw = 0
for k in kws:
raw += heading.count(k) * 3
raw += doc_lower.count(k) * 2
raw += text.count(k)
if raw > 0:
weight = _DOC_WEIGHTS.get(doc, 1.0)
scored.append((raw * weight, chunk))
scored.sort(key=lambda t: t[0], reverse=True)
out = []
for score, chunk in scored[:max_chunks]:
text = chunk.get("text", "")
if len(text) > max_chars_per_chunk:
text = text[:max_chars_per_chunk] + ""
out.append({
"score": round(score, 1),
"heading_path": chunk.get("heading_path"),
"doc": chunk.get("doc"),
"version": chunk.get("version"),
"source_path": chunk.get("source_path"),
"text": text,
"chunk_id": chunk.get("id"),
})
return out
def format_chunks_for_prompt(self, chunks: list[dict]) -> str:
"""Render chunks as a markdown reference block for embedding in prompt seeds."""
if not chunks:
return "_No matching schema documentation found in the local index._"
lines = []
for c in chunks:
heading = " > ".join(c.get("heading_path") or []) or "(no heading)"
doc = c.get("doc", "")
version = c.get("version", "")
lines.append(f"### {heading} \n_source: {doc} ({version}) — score {c['score']}_")
lines.append("")
lines.append(c["text"])
lines.append("")
return "\n".join(lines)

View File

@ -0,0 +1,125 @@
"""Normalize Informix-flavored result values for LLM consumption.
CUCM's Informix database returns several encodings that are awkward for
an LLM to interpret correctly without help:
- Booleans as 't'/'f' strings (not native booleans).
- Foreign-key codes like `tkreleasecausevalue='0'` that need a separate
join to a `type*` table to get a human-readable name.
This module:
1. Converts 't'/'f' to True/False for known boolean columns.
2. Provides `TypeDecoder` a small lazy cache that looks up tk* enum
codes against their CUCM `type*` lookup tables on first use, so the
output gets the human-readable name without us hardcoding any mappings.
The decoder reads from CUCM's own `typereleasecausevalue` etc. tables, so
it stays correct across CUCM versions without maintenance.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .client import AxlClient
# Columns we know to be boolean from the CUCM data dictionary. Adding a
# column here is safe — if it's not actually a 't'/'f' boolean, normalization
# is a no-op (only literal 't' and 'f' strings get coerced).
BOOLEAN_COLUMNS: frozenset[str] = frozenset({
"block_enabled",
"blockenable",
"isemergencyservicenumber",
"isanonymous",
"iscallable",
"ismessagewaitingon",
"supportoverlapsending",
"outsidedialtone",
"rejectanonymouscall",
"routethisdialplan",
"usecallercss",
"useoriginatorcss",
"withtag",
"withvalueclause",
"personalroutingenabled",
})
def normalize_bool(value: object) -> object:
"""'t'/'f' → True/False; passthrough everything else."""
if value == "t":
return True
if value == "f":
return False
return value
def normalize_row(row: dict) -> dict:
"""Apply boolean normalization to known boolean columns in a row dict."""
out = {}
for k, v in row.items():
out[k] = normalize_bool(v) if k in BOOLEAN_COLUMNS else v
return out
def normalize_rows(rows: list[dict]) -> list[dict]:
return [normalize_row(r) for r in rows]
class TypeDecoder:
"""Lazily resolve `tk*` foreign-key codes to their human-readable names.
Usage:
decoder = TypeDecoder(client)
name = decoder.decode("tkreleasecausevalue", "0") # → "No Error"
On first call for a given tk* code, queries the matching `type*` table
and caches the {enum: name} mapping. Subsequent calls hit the cache.
"""
# Mapping from tk* column → type* lookup table. Add as we encounter
# new codes that benefit from decoding. Codes not in this map remain
# as the raw integer string (still queryable, just not decoded).
TK_TO_TYPE: dict[str, str] = {
"tkpatternusage": "typepatternusage",
"tkreleasecausevalue": "typereleasecausevalue",
"tkpatternprecedence": "typepatternprecedence",
"tkpatternrouteclass": "typepatternrouteclass",
"tkclass": "typeclass",
"tkdeviceprotocol": "typedeviceprotocol",
"tkmodel": "typemodel",
"tkproduct": "typeproduct",
"tkstatus_partyentrancetone": "typestatus_partyentrancetone",
}
def __init__(self, client: "AxlClient"):
self.client = client
self._cache: dict[str, dict[str, str]] = {}
def _load_table(self, type_table: str) -> dict[str, str]:
if type_table in self._cache:
return self._cache[type_table]
try:
result = self.client.execute_sql_query(
f"SELECT enum, name FROM {type_table}"
)
mapping = {
str(r.get("enum")): r.get("name", "")
for r in result.get("rows", [])
if r.get("enum") is not None
}
except Exception:
mapping = {}
self._cache[type_table] = mapping
return mapping
def decode(self, tk_column: str, code: object) -> object:
"""Return the human-readable name for code, or the original code if unknown."""
if code is None or code == "":
return code
type_table = self.TK_TO_TYPE.get(tk_column)
if type_table is None:
return code
mapping = self._load_table(type_table)
return mapping.get(str(code), code)

View File

@ -0,0 +1,803 @@
"""Route plan tools — focused on CUCM call routing audit.
The CUCM dial plan is centered on one table: `numplan`. Every "pattern"
(directory number, route pattern, translation pattern, hunt pilot, etc.) is
a row there, distinguished by `tkpatternusage`. Calling/called party
transformations and digit-discard instructions live as columns on the same
row, which is why the Route Plan Report is essentially a single query.
Access control is in `callingsearchspace` + `callingsearchspacemember`
the latter is an ordered list (sortorder) of partitions inside each CSS.
Routing destinations: numplan devicenumplanmap device(tkclass='Route List')
routelist routegroup routegroupdevicemap device
Local Route Group resolution: route groups with no static members resolve
through devicepoolroutegroupmap, which maps a calling device's device pool
to the actual gateway-bearing route group. fkroutegroup_local is the named
placeholder; fkroutegroup is the actual destination group.
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from .normalize import normalize_rows, normalize_row, normalize_bool
if TYPE_CHECKING:
from .client import AxlClient
# Pattern type codes (numplan.tkpatternusage), verified against the
# typepatternusage table in CUCM 15.0.1.12900. To inspect on a different
# cluster: SELECT enum, name FROM typepatternusage ORDER BY enum.
PATTERN_KINDS: dict[str, int] = {
"call_park": 0,
"conference": 1,
"directory_number": 2,
"translation": 3,
"call_pickup_group": 4,
"route": 5,
"message_waiting": 6,
"hunt_pilot": 7,
"voicemail_port": 8,
"device_template": 11,
"directed_call_park": 12,
"device_intercom": 13,
"translation_intercom": 14,
"translation_calling_party": 15,
"called_party_xform": 20,
"ils_learned_enterprise": 23,
"ils_learned_e164": 24,
"ils_learned_uri": 28,
}
def _esc(s: str) -> str:
return s.replace("'", "''")
def list_partitions(client: "AxlClient") -> dict:
"""All route partitions with how many patterns and CSS members each has."""
sql = """
SELECT
rp.name AS name,
rp.description AS description,
(SELECT COUNT(*) FROM numplan np WHERE np.fkroutepartition = rp.pkid) AS pattern_count,
(SELECT COUNT(*) FROM callingsearchspacemember csm WHERE csm.fkroutepartition = rp.pkid) AS css_member_count
FROM routepartition rp
ORDER BY rp.name
"""
result = client.execute_sql_query(sql)
return {
"partition_count": result["row_count"],
"partitions": normalize_rows(result["rows"]),
}
def list_calling_search_spaces(client: "AxlClient", name: str | None = None) -> dict:
"""Calling search spaces with their ordered partition list.
If `name` is given, return only that CSS. Otherwise return all CSS
grouped, with each one's partitions in sortorder.
"""
where = f"WHERE css.name = '{_esc(name)}'" if name else ""
sql = f"""
SELECT
css.name AS css_name,
css.description AS css_description,
rp.name AS partition_name,
csm.sortorder AS sortorder
FROM callingsearchspace css
LEFT OUTER JOIN callingsearchspacemember csm ON csm.fkcallingsearchspace = css.pkid
LEFT OUTER JOIN routepartition rp ON csm.fkroutepartition = rp.pkid
{where}
ORDER BY css.name, csm.sortorder
"""
result = client.execute_sql_query(sql)
grouped: dict[str, dict] = {}
for row in result["rows"]:
css = row.get("css_name")
if not css:
continue
if css not in grouped:
grouped[css] = {
"name": css,
"description": row.get("css_description"),
"partitions": [],
}
if row.get("partition_name"):
grouped[css]["partitions"].append({
"name": row["partition_name"],
"sortorder": row.get("sortorder"),
})
css_list = list(grouped.values())
return {"css_count": len(css_list), "calling_search_spaces": css_list}
def list_patterns(
client: "AxlClient",
kind: str | None = None,
partition: str | None = None,
filter_substring: str | None = None,
limit: int = 500,
) -> dict:
"""The Route Plan Report — patterns with their transformations.
Args:
kind: One of PATTERN_KINDS keys, or None for all. e.g. "route", "translation".
partition: Filter by partition name.
filter_substring: Substring match against pattern text (LIKE '%X%').
limit: Max rows. Defaults to 500.
"""
where_clauses = []
if kind is not None:
if kind not in PATTERN_KINDS:
return {
"error": f"Unknown kind {kind!r}. Valid: {sorted(PATTERN_KINDS)}",
}
where_clauses.append(f"np.tkpatternusage = {PATTERN_KINDS[kind]}")
if partition:
where_clauses.append(f"rp.name = '{_esc(partition)}'")
if filter_substring:
where_clauses.append(f"np.dnorpattern LIKE '%{_esc(filter_substring)}%'")
where = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
sql = f"""
SELECT FIRST {int(limit)}
np.dnorpattern AS pattern,
np.description AS description,
tpu.name AS pattern_type,
np.tkpatternusage AS pattern_type_code,
rp.name AS partition_name,
np.callingpartytransformationmask AS calling_party_xform_mask,
np.calledpartytransformationmask AS called_party_xform_mask,
np.prefixdigitsout AS prefix_digits_out,
np.callingpartyprefixdigits AS calling_prefix_digits,
ddi.name AS digit_discard_instructions,
np.blockenable AS block_enabled,
np.pkid AS pkid
FROM numplan np
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
LEFT OUTER JOIN typepatternusage tpu ON np.tkpatternusage = tpu.enum
LEFT OUTER JOIN digitdiscardinstruction ddi ON np.fkdigitdiscardinstruction = ddi.pkid
{where}
ORDER BY rp.name, np.dnorpattern
"""
result = client.execute_sql_query(sql)
return {
"pattern_count": result["row_count"],
"filter": {"kind": kind, "partition": partition, "substring": filter_substring},
"patterns": normalize_rows(result["rows"]),
}
def inspect_pattern(
client: "AxlClient",
pattern: str,
partition: str | None = None,
) -> dict:
"""Deep dive on one pattern — transforms, target, reverse CSS lookup."""
where = f"np.dnorpattern = '{_esc(pattern)}'"
if partition:
where += f" AND rp.name = '{_esc(partition)}'"
detail_sql = f"""
SELECT
np.dnorpattern AS pattern,
np.description AS description,
tpu.name AS pattern_type,
rp.name AS partition_name,
np.callingpartytransformationmask AS calling_party_xform_mask,
np.calledpartytransformationmask AS called_party_xform_mask,
np.prefixdigitsout AS prefix_digits_out,
np.callingpartyprefixdigits AS calling_prefix_digits,
ddi.name AS digit_discard_instructions,
cssxlate.name AS css_for_translation,
rf.name AS route_filter_name,
rf.clause AS route_filter_clause,
np.blockenable AS block_enabled,
tprc.name AS release_cause,
np.pkid AS pkid
FROM numplan np
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
LEFT OUTER JOIN typepatternusage tpu ON np.tkpatternusage = tpu.enum
LEFT OUTER JOIN digitdiscardinstruction ddi ON np.fkdigitdiscardinstruction = ddi.pkid
LEFT OUTER JOIN callingsearchspace cssxlate ON np.fkcallingsearchspace_translation = cssxlate.pkid
LEFT OUTER JOIN routefilter rf ON np.fkroutefilter = rf.pkid
LEFT OUTER JOIN typereleasecausevalue tprc ON np.tkreleasecausevalue = tprc.enum
WHERE {where}
"""
detail = client.execute_sql_query(detail_sql)
if not detail["rows"]:
return {"error": f"Pattern {pattern!r} not found" + (f" in partition {partition!r}" if partition else "")}
if len(detail["rows"]) > 1 and not partition:
return {
"error": f"Multiple patterns named {pattern!r} in different partitions; specify partition.",
"matches": [
{"pattern": r["pattern"], "partition": r["partition_name"]}
for r in detail["rows"]
],
}
row = normalize_row(detail["rows"][0])
partition_name = row.get("partition_name")
pattern_pkid = row.get("pkid")
# Reverse CSS lookup: which calling search spaces include this pattern's partition?
reachable_from = []
if partition_name:
css_sql = f"""
SELECT css.name AS css_name, csm.sortorder AS sortorder
FROM callingsearchspace css
JOIN callingsearchspacemember csm ON csm.fkcallingsearchspace = css.pkid
JOIN routepartition rp ON csm.fkroutepartition = rp.pkid
WHERE rp.name = '{_esc(partition_name)}'
ORDER BY css.name
"""
css_result = client.execute_sql_query(css_sql)
reachable_from = css_result["rows"]
# Forward routing: where does this pattern actually route?
destination = None
route_list_chain = None
if pattern_pkid:
destination = resolve_pattern_destination(client, pattern_pkid)
# If destination is a Route List, expand the route group / device chain
if destination and destination.get("destination_class") == "Route List":
chain = list_route_lists_and_groups(client, name=destination["destination_name"])
if chain.get("route_lists"):
route_list_chain = chain["route_lists"][0]
return {
"pattern": row,
"reachable_from_css": reachable_from,
"destination": destination,
"route_list_chain": route_list_chain,
}
def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member gateways/trunks.
Schema chain:
device (tkclass='Route List') the "Route List" header
routelist (fkdevice = ) list of route group steps (selectionorder)
routegroup (fkroutegroup) the route group at this step
routegroupdevicemap ordered (deviceselectionorder) gateways/trunks
device (fkdevice = ) the actual gateway/trunk
"""
where = f"AND rl_dev.name = '{_esc(name)}'" if name else ""
sql = f"""
SELECT
rl_dev.name AS route_list_name,
rl_dev.description AS route_list_description,
rg.name AS route_group_name,
rl.selectionorder AS group_order,
gw.name AS device_name,
rgdm.deviceselectionorder AS device_order,
tc_gw.name AS device_class
FROM device rl_dev
JOIN typeclass tc_rl ON rl_dev.tkclass = tc_rl.enum
LEFT OUTER JOIN routelist rl ON rl.fkdevice = rl_dev.pkid
LEFT OUTER JOIN routegroup rg ON rl.fkroutegroup = rg.pkid
LEFT OUTER JOIN routegroupdevicemap rgdm ON rgdm.fkroutegroup = rg.pkid
LEFT OUTER JOIN device gw ON rgdm.fkdevice = gw.pkid
LEFT OUTER JOIN typeclass tc_gw ON gw.tkclass = tc_gw.enum
WHERE tc_rl.name = 'Route List' {where}
ORDER BY rl_dev.name, rl.selectionorder, rgdm.deviceselectionorder
"""
result = client.execute_sql_query(sql)
grouped: dict[str, dict] = {}
for row in result["rows"]:
rl_name = row.get("route_list_name")
if not rl_name:
continue
if rl_name not in grouped:
grouped[rl_name] = {
"name": rl_name,
"description": row.get("route_list_description"),
"route_groups": {},
}
rg_name = row.get("route_group_name")
if rg_name:
if rg_name not in grouped[rl_name]["route_groups"]:
grouped[rl_name]["route_groups"][rg_name] = {
"name": rg_name,
"order_within_route_list": _to_int(row.get("group_order")),
"devices": [],
}
if row.get("device_name"):
grouped[rl_name]["route_groups"][rg_name]["devices"].append({
"name": row["device_name"],
"class": row.get("device_class"),
"order_within_group": _to_int(row.get("device_order")),
})
out = []
for rl in grouped.values():
# Sort route groups by their order, and annotate empty groups —
# which on CUCM usually means "uses device-pool-assigned local
# route group" (CUCM's Standard Local Route Group feature)
ordered_groups = sorted(
rl["route_groups"].values(),
key=lambda rg: rg.get("order_within_route_list") or 0,
)
for g in ordered_groups:
if not g["devices"]:
g["_note"] = (
"No static device members. Likely resolves to a Local "
"Route Group via the calling device's device pool at "
"call-time (CUCM Standard Local Route Group feature)."
)
rl["route_groups"] = ordered_groups
out.append(rl)
return {"route_list_count": len(out), "route_lists": out}
def _to_int(v: object) -> int | None:
"""Cast Informix string-encoded integers to int; passthrough None/non-numeric."""
if v is None:
return None
try:
return int(v)
except (TypeError, ValueError):
return None
# ====================================================================
# Device Pool → Route Group resolution (Local Route Group feature)
# ====================================================================
def list_device_pool_route_groups(
client: "AxlClient",
device_pool_name: str | None = None,
) -> dict:
"""Show how each device pool resolves Local Route Group placeholders.
CUCM's Standard Local Route Group feature lets a route list step reference
a *named* route group (the "local placeholder" e.g., "Primary PSTN Route
Group"), and at call-time the calling phone's device pool determines which
*actual* route group (with real gateways/trunks) gets used.
Schema:
devicepoolroutegroupmap.fkdevicepool which DP
devicepoolroutegroupmap.fkroutegroup_local named placeholder ref'd by route lists
devicepoolroutegroupmap.fkroutegroup actual route group with gateways
Args:
device_pool_name: If given, return only that DP's mappings.
"""
where = ""
if device_pool_name:
where = f"WHERE dp.name = '{_esc(device_pool_name)}'"
sql = f"""
SELECT
dp.name AS device_pool,
placeholder.name AS local_placeholder,
target.name AS resolves_to_route_group
FROM devicepoolroutegroupmap m
JOIN devicepool dp ON m.fkdevicepool = dp.pkid
LEFT OUTER JOIN routegroup placeholder ON m.fkroutegroup_local = placeholder.pkid
LEFT OUTER JOIN routegroup target ON m.fkroutegroup = target.pkid
{where}
ORDER BY dp.name, placeholder.name
"""
result = client.execute_sql_query(sql)
grouped: dict[str, dict] = {}
for row in result["rows"]:
dp = row.get("device_pool")
if not dp:
continue
if dp not in grouped:
grouped[dp] = {"name": dp, "local_route_group_resolutions": []}
grouped[dp]["local_route_group_resolutions"].append({
"placeholder": row.get("local_placeholder"),
"resolves_to": row.get("resolves_to_route_group"),
})
return {
"device_pool_count": len(grouped),
"device_pools": list(grouped.values()),
}
# ====================================================================
# CSS impact analysis: which devices/lines/patterns reference this CSS
# ====================================================================
# CSS reference points: for each, the SQL is hand-written because the
# identifier column varies per table. Each entry returns rows with a
# common shape: name, context (e.g. partition), table, column.
_CSS_REFERENCE_QUERIES: dict[str, dict] = {
# Line-level forwarding CSSs (call-forward variants on a DN)
"line_call_forward_all_css": {
"table": "numplan", "column": "fkcallingsearchspace_cfapt",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_cfapt = '{pkid}'
ORDER BY rp.name, np.dnorpattern
""",
},
"line_call_forward_busy_css": {
"table": "numplan", "column": "fkcallingsearchspace_cfb",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_cfb = '{pkid}'
""",
},
"line_call_forward_no_answer_css": {
"table": "numplan", "column": "fkcallingsearchspace_cfna",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_cfna = '{pkid}'
""",
},
"line_call_forward_unregistered_css": {
"table": "numplan", "column": "fkcallingsearchspace_cfur",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_cfur = '{pkid}'
""",
},
"translation_pattern_css": {
"table": "numplan", "column": "fkcallingsearchspace_translation",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_translation = '{pkid}'
""",
},
"line_shared_appearance_css": {
"table": "numplan", "column": "fkcallingsearchspace_sharedlineappear",
"sql": """
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
FROM numplan np LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
WHERE np.fkcallingsearchspace_sharedlineappear = '{pkid}'
""",
},
# Device-level CSS variants
"device_reroute_css": {
"table": "device", "column": "fkcallingsearchspace_reroute",
"sql": """
SELECT d.name AS name, tc.name AS context, d.description AS description
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
WHERE d.fkcallingsearchspace_reroute = '{pkid}'
""",
},
"device_restrict_css": {
"table": "device", "column": "fkcallingsearchspace_restrict",
"sql": """
SELECT d.name AS name, tc.name AS context, d.description AS description
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
WHERE d.fkcallingsearchspace_restrict = '{pkid}'
""",
},
"device_refer_css": {
"table": "device", "column": "fkcallingsearchspace_refer",
"sql": """
SELECT d.name AS name, tc.name AS context, d.description AS description
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
WHERE d.fkcallingsearchspace_refer = '{pkid}'
""",
},
"device_rdn_transform_css": {
"table": "device", "column": "fkcallingsearchspace_rdntransform",
"sql": """
SELECT d.name AS name, tc.name AS context, d.description AS description
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
WHERE d.fkcallingsearchspace_rdntransform = '{pkid}'
""",
},
# Voicemail pilot — has directorynumber instead of name
"voicemail_pilot_css": {
"table": "voicemessagingpilot", "column": "fkcallingsearchspace",
"sql": """
SELECT directorynumber AS name, NULL AS context, description
FROM voicemessagingpilot
WHERE fkcallingsearchspace = '{pkid}'
""",
},
# Route lists — routelist is a join table; identify by the route-list device name
"route_list_css": {
"table": "routelist", "column": "fkcallingsearchspace",
"sql": """
SELECT d.name AS name, tc.name AS context, d.description AS description
FROM routelist rl JOIN device d ON rl.fkdevice = d.pkid
LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
WHERE rl.fkcallingsearchspace = '{pkid}'
""",
},
}
def find_devices_using_css(client: "AxlClient", css_name: str) -> dict:
"""Impact analysis: enumerate every reference to this CSS across the schema.
Useful before changing or removing a CSS answers "what would break if
I touched this CSS?" Cross-references the major schema tables that hold
fkcallingsearchspace* columns.
Returns groups of references by category. Each reference has:
- name: the object identifier (pattern text, device name, or DN)
- context: partition (for patterns) or device class (for devices)
- description: free-text description
- _table, _column: which fk-column referenced this CSS
"""
safe = _esc(css_name)
css_lookup = client.execute_sql_query(
f"SELECT pkid FROM callingsearchspace WHERE name = '{safe}'"
)
if not css_lookup["rows"]:
return {"error": f"CSS {css_name!r} not found", "css_name": css_name}
css_pkid = css_lookup["rows"][0]["pkid"]
safe_pkid = _esc(css_pkid)
grouped: dict[str, list[dict]] = {}
for label, spec in _CSS_REFERENCE_QUERIES.items():
sql = spec["sql"].format(pkid=safe_pkid)
try:
result = client.execute_sql_query(sql)
if result["rows"]:
grouped[label] = [
{**r, "_table": spec["table"], "_column": spec["column"]}
for r in result["rows"]
]
except Exception as e:
# Don't let one failed reference point block the whole audit
grouped[label] = [{
"_error": str(e)[:200],
"_table": spec["table"],
"_column": spec["column"],
}]
total = sum(len(v) for v in grouped.values())
return {
"css_name": css_name,
"css_pkid": css_pkid,
"total_references": total,
"references_by_category": grouped,
}
# ====================================================================
# Route Filters
# ====================================================================
def list_route_filters(client: "AxlClient", name: str | None = None) -> dict:
"""List route filters with their member clauses.
Route filters compose with @-pattern (NANPA) route patterns to constrain
matches e.g., a filter that matches only when AREA-CODE == 208 narrows
a `9.@` pattern to in-state calls. Members specify (digit, operator, tag)
triples; the clause column shows the assembled expression.
"""
where = f"WHERE rf.name = '{_esc(name)}'" if name else ""
sql = f"""
SELECT
rf.name AS filter_name,
rf.clause AS clause,
dp.name AS dial_plan,
rf.pkid AS pkid
FROM routefilter rf
LEFT OUTER JOIN dialplan dp ON rf.fkdialplan = dp.pkid
{where}
ORDER BY rf.name
"""
result = client.execute_sql_query(sql)
if not result["rows"]:
return {"filter_count": 0, "route_filters": []}
# Fetch members per filter
out = []
for filt in result["rows"]:
member_sql = f"""
SELECT
dpt.tag AS tag,
op.name AS operator,
rfm.digits AS digits,
rfm.precedence AS precedence
FROM routefiltermember rfm
LEFT OUTER JOIN dialplantag dpt ON rfm.fkdialplantag = dpt.pkid
LEFT OUTER JOIN typeoperator op ON rfm.tkoperator = op.enum
WHERE rfm.fkroutefilter = '{_esc(filt["pkid"])}'
ORDER BY rfm.precedence
"""
members = client.execute_sql_query(member_sql)
out.append({
"name": filt.get("filter_name"),
"clause": filt.get("clause"),
"dial_plan": filt.get("dial_plan"),
"members": members["rows"],
})
return {"filter_count": len(out), "route_filters": out}
# ====================================================================
# Wildcard pattern matcher (better translation_chain)
# ====================================================================
def _wildcard_to_regex(pattern: str) -> str:
r"""Convert a CUCM dial-plan pattern to a Python regex.
CUCM wildcards:
X any single digit (0-9)
! one or more digits
. terminator separator (after-dot digits get discarded by PreDot DDI)
[0-9] character class (passes through to regex unchanged)
*, # literal special-keypad symbols
\+ literal + (escaped in CUCM)
@ NANPA route filter represented as `\d+` here (we don't model the filter)
We escape regex metachars except those CUCM uses literally as wildcards.
"""
out = []
i = 0
while i < len(pattern):
c = pattern[i]
if c == "X":
out.append(r"\d")
elif c == "!":
out.append(r"\d+")
elif c == "@":
# NANPA — would normally apply a route filter; treat as "any digits"
out.append(r"\d+")
elif c == ".":
# Terminator separator — matches a literal dot if used in test;
# but in pattern matching against a dialed number it has no
# effect on what's matched. Treat as zero-width.
pass
elif c == "[":
# Character class — copy through up to ]
j = pattern.find("]", i)
if j == -1:
out.append(re.escape(c))
i += 1
continue
out.append(pattern[i:j + 1])
i = j
elif c == "\\" and i + 1 < len(pattern):
# Escaped literal — keep as literal
out.append(re.escape(pattern[i + 1]))
i += 1
else:
out.append(re.escape(c))
i += 1
return "^" + "".join(out) + "$"
def _pattern_matches_number(pattern: str, number: str) -> bool:
"""Test whether a CUCM dial pattern matches a number string."""
try:
regex = _wildcard_to_regex(pattern)
return re.match(regex, number) is not None
except re.error:
return False
def resolve_pattern_destination(client: "AxlClient", pattern_pkid: str) -> dict | None:
"""Given a route pattern pkid, return its destination route list / device.
Route patterns connect to their destination via `devicenumplanmap`:
the route-list header device (device.tkclass='Route List') maps to the
pattern (numplan with tkpatternusage=5) through this join table. Same
table also connects phone lines, so we filter by tkclass to disambiguate.
"""
sql = f"""
SELECT
d.name AS destination_name,
d.description AS destination_description,
tc.name AS destination_class,
d.pkid AS destination_pkid
FROM devicenumplanmap dnm
JOIN device d ON dnm.fkdevice = d.pkid
JOIN typeclass tc ON d.tkclass = tc.enum
WHERE dnm.fknumplan = '{_esc(pattern_pkid)}'
AND tc.name IN ('Route List', 'Gateway', 'SIP Trunk', 'H323 Gateway',
'Hunt List', 'CTI Route Point', 'Voicemail Port')
"""
result = client.execute_sql_query(sql)
return result["rows"][0] if result["rows"] else None
def list_digit_discard_instructions(client: "AxlClient") -> dict:
"""Digit Discard Instructions catalog."""
sql = """
SELECT
ddi.name AS name,
ddi.description AS description,
ddi.pkid AS pkid
FROM digitdiscardinstruction ddi
ORDER BY ddi.name
"""
result = client.execute_sql_query(sql)
return {"ddi_count": result["row_count"], "digit_discard_instructions": result["rows"]}
def translation_chain(client: "AxlClient", number: str, css_name: str | None = None) -> dict:
"""Find dial patterns that match a number, with CUCM wildcard evaluation.
Two-stage approach:
1. SQL fetches all patterns reachable from the given CSS (or all
patterns if no CSS specified). This filters by partition membership.
2. Python evaluates each pattern's CUCM wildcards (X, !, [0-9], @, etc.)
against the number to find true matches, then sorts by pattern length
(longest match wins in CUCM).
Caveats:
- The @-wildcard matches "any digits" rather than applying a route filter.
For accurate filter-aware matching, inspect the route filter via
list_route_filters() and reason about it manually.
- We don't model the dial-plan's interdigit timeout or T302 timer
every pattern is evaluated against the complete number.
- For overlapping matches, longest-match-wins is approximated by sorting
result by pattern length descending. CUCM's actual matcher uses pattern
specificity (a sub-rule based on wildcard depth); for unambiguous
patterns the two agree.
"""
css_filter = ""
if css_name:
css_filter = f"""
AND rp.pkid IN (
SELECT csm.fkroutepartition
FROM callingsearchspacemember csm
JOIN callingsearchspace css ON csm.fkcallingsearchspace = css.pkid
WHERE css.name = '{_esc(css_name)}'
)
"""
# Pull every pattern in scope; we filter in Python with wildcard logic.
sql = f"""
SELECT
np.dnorpattern AS pattern,
tpu.name AS pattern_type,
rp.name AS partition_name,
np.callingpartytransformationmask AS calling_party_xform_mask,
np.calledpartytransformationmask AS called_party_xform_mask,
np.prefixdigitsout AS prefix_digits_out,
ddi.name AS digit_discard_instructions,
rf.name AS route_filter,
np.description AS description
FROM numplan np
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
LEFT OUTER JOIN typepatternusage tpu ON np.tkpatternusage = tpu.enum
LEFT OUTER JOIN digitdiscardinstruction ddi ON np.fkdigitdiscardinstruction = ddi.pkid
LEFT OUTER JOIN routefilter rf ON np.fkroutefilter = rf.pkid
WHERE np.tkpatternusage IN (3, 5, 7)
AND np.dnorpattern IS NOT NULL
{css_filter}
"""
result = client.execute_sql_query(sql)
matches = []
for row in result["rows"]:
pattern = row.get("pattern") or ""
if _pattern_matches_number(pattern, number):
matches.append({**row, "_match_specificity": len(pattern)})
# Longest-match-first (CUCM's specificity heuristic, simplified)
matches.sort(key=lambda m: m["_match_specificity"], reverse=True)
return {
"number": number,
"css_name": css_name,
"candidates_evaluated": result["row_count"],
"match_count": len(matches),
"matches": matches,
"_note": (
"Wildcards evaluated: X, !, [0-9], @, \\+. "
"@-pattern matches any digit string (route filter constraints not applied). "
"Longest-match-wins is approximated by pattern length; CUCM uses pattern "
"specificity (wildcard depth). For most clusters these agree."
),
}

569
src/mcp_cucm_axl/server.py Normal file
View File

@ -0,0 +1,569 @@
"""FastMCP server: read-only CUCM 15 AXL with route-plan focus.
Tool surface:
- Foundational: axl_sql, axl_describe_table, axl_list_tables, axl_version,
cache_stats, cache_clear
- Route plan: route_partitions, route_calling_search_spaces, route_patterns,
route_inspect_pattern, route_lists_and_groups, route_translation_chain,
route_digit_discard_instructions
Prompts (schema-grounded via the sibling cisco-docs index):
- route_plan_overview
- investigate_pattern
- audit_routing
- cucm_sql_help
"""
from __future__ import annotations
import os
import sys
from importlib.metadata import version as _pkg_version
from pathlib import Path
from dotenv import load_dotenv
from fastmcp import FastMCP
from platformdirs import user_cache_dir
from . import route_plan
from .cache import AxlCache
from .client import AxlClient
from .docs_loader import DocsIndex
# ---- Module-level singletons, initialized in main() ----
_cache: AxlCache | None = None
_axl: AxlClient | None = None
_docs: DocsIndex | None = None
mcp = FastMCP("CUCM AXL (read-only)")
def _client() -> AxlClient:
if _axl is None:
raise RuntimeError("AXL client not initialized — server bootstrap failed.")
return _axl
def _docs_or_empty_msg() -> str:
return (
"_The cisco-docs index is not loaded. Set CISCO_DOCS_INDEX_PATH or "
"ensure /home/rpm/bingham/docs/src/assets/.cisco-docs-index/ exists. "
"You can also use the sibling `cisco-docs` MCP server's `search_docs` "
"tool for live semantic search._"
)
# ====================================================================
# Foundational tools
# ====================================================================
@mcp.tool
def axl_version() -> dict:
"""Return CUCM cluster version. Sanity check that AXL is reachable.
Cached for 1 hour version doesn't change between cluster upgrades.
"""
return _client().get_ccm_version()
@mcp.tool
def axl_sql(query: str) -> dict:
"""Execute a SELECT (or WITH CTE) against the CUCM Informix data dictionary.
Read-only by structural guarantee: the server never exposes executeSQLUpdate
or any add/update/remove AXL methods. Queries are also client-side validated
to start with SELECT or WITH and to contain no write keywords.
Args:
query: A single SQL SELECT statement. Trailing semicolon optional.
Returns:
dict with row_count, rows (list of columnvalue dicts), and the
cleaned query as it was sent. Includes _cache: hit|miss for diagnostics.
"""
return _client().execute_sql_query(query)
@mcp.tool
def axl_list_tables(pattern: str | None = None) -> dict:
"""List Informix tables in the CUCM database.
Args:
pattern: Optional LIKE pattern (% wildcards). e.g. "route%" finds
routelist, routegroup, routepartition, routefilter, etc.
"""
return _client().list_informix_tables(pattern)
@mcp.tool
def axl_describe_table(table_name: str) -> dict:
"""Describe an Informix table's columns: name, type, length, nullability.
Use this BEFORE writing axl_sql queries against an unfamiliar table.
Combine with the cisco-docs MCP's search_docs("data dictionary <table>")
for human-authored descriptions of what each column means.
"""
return _client().describe_informix_table(table_name)
@mcp.tool
def cache_stats() -> dict:
"""Cache statistics: total entries, live entries, breakdown by method."""
if _cache is None:
return {"error": "Cache not initialized"}
return _cache.stats()
@mcp.tool
def cache_clear(method_pattern: str | None = None) -> dict:
"""Clear cache entries.
Args:
method_pattern: Optional method-name pattern (% wildcards). If omitted,
clears the entire cache. Use after a known config change to force
fresh queries.
"""
if _cache is None:
return {"error": "Cache not initialized"}
deleted = _cache.clear(method_pattern)
return {"deleted_entries": deleted, "method_pattern": method_pattern}
# ====================================================================
# Route plan tools
# ====================================================================
@mcp.tool
def route_partitions() -> dict:
"""All route partitions, with pattern count and CSS member count per partition.
A partition groups together patterns (DNs, route patterns, translations) for
access control. CSSs include partitions in a specific order; longest match
in the first reachable partition wins.
"""
return route_plan.list_partitions(_client())
@mcp.tool
def route_calling_search_spaces(name: str | None = None) -> dict:
"""Calling Search Spaces with their ordered partition lists.
Args:
name: Optional CSS name to fetch one specific CSS. If None, returns all.
The order of partitions inside a CSS is the search order CUCM walks the
partitions top-down, longest-match within each, and routes via the first
matching pattern.
"""
return route_plan.list_calling_search_spaces(_client(), name)
@mcp.tool
def route_patterns(
kind: str | None = None,
partition: str | None = None,
filter: str | None = None,
limit: int = 500,
) -> dict:
"""The Route Plan Report — patterns with their transformations.
Args:
kind: Pattern kind filter. One of: directory_number, translation, route,
conference, voicemail, hunt_pilot, call_pickup_group, park_code,
directed_pickup, message_waiting, device_template. Default: all kinds.
partition: Filter by partition name (exact match).
filter: Substring match against the pattern text (e.g. "9.@" or "+1").
limit: Max rows. Default 500.
Each row includes calling/called party transformation masks, prefix digits,
digit discard instructions the full transformation profile.
"""
return route_plan.list_patterns(
_client(),
kind=kind,
partition=partition,
filter_substring=filter,
limit=limit,
)
@mcp.tool
def route_inspect_pattern(pattern: str, partition: str | None = None) -> dict:
"""Deep dive on a single pattern.
Returns:
- The pattern row itself with all transformations
- Reverse CSS lookup: which calling search spaces include this pattern's
partition (i.e., which phones/devices can reach this pattern)
- Routing target: route list or device this pattern points to
Args:
pattern: The pattern text (e.g. "9.@", "+1.[2-9]XX[2-9]XXXXXX", "1001").
partition: Disambiguate when the same pattern exists in multiple partitions.
"""
return route_plan.inspect_pattern(_client(), pattern, partition)
@mcp.tool
def route_lists_and_groups(name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member devices.
Route lists are how route patterns reach the outside world: a route pattern
points to a route list, the route list contains an ordered list of route
groups, and each route group contains an ordered list of devices (gateways,
SIP trunks, etc.). Top-down with failover.
Args:
name: Optional route list name to fetch one specific list.
"""
return route_plan.list_route_lists_and_groups(_client(), name)
@mcp.tool
def route_translation_chain(number: str, css_name: str | None = None) -> dict:
"""Find candidate patterns that might match a given number from a given CSS.
Best-effort: literal/prefix matching only. CUCM's actual matcher evaluates
wildcards (X, !, [0-9], etc.) and selects longest match. Use as a starting
point for "where could this number be coming from?" investigations.
Args:
number: The number to test (e.g. "9123456789", "+15551234567", "1001").
css_name: Restrict candidates to patterns in partitions reachable from
this CSS. If None, considers all patterns.
"""
return route_plan.translation_chain(_client(), number, css_name)
@mcp.tool
def route_digit_discard_instructions() -> dict:
"""List all Digit Discard Instructions (DDIs).
DDIs are named rules like "PreDot" or "10-10-Dialing" that strip digits
from called-party numbers before they leave the cluster. Patterns reference
DDIs via fkdigitdiscardinstruction.
"""
return route_plan.list_digit_discard_instructions(_client())
@mcp.tool
def route_device_pool_route_groups(device_pool_name: str | None = None) -> dict:
"""How each device pool resolves Local Route Group placeholders to actual gateways.
Closes the loop on Local Route Group routing: when a route list step references
a "named placeholder" route group with no static devices, this is where the
actual gateway is resolved per-device-pool at call time.
Args:
device_pool_name: Optional. If given, return only that DP's mappings.
"""
return route_plan.list_device_pool_route_groups(_client(), device_pool_name)
@mcp.tool
def route_devices_using_css(css_name: str) -> dict:
"""Impact analysis: every reference to a CSS across the schema.
Use before changing or removing a CSS to find all dependent devices, lines,
translation patterns, route lists, voicemail pilots, etc. Cross-references
the major fkcallingsearchspace* columns across the data dictionary.
Args:
css_name: The exact CSS name (case-sensitive).
"""
return route_plan.find_devices_using_css(_client(), css_name)
@mcp.tool
def route_filters(name: str | None = None) -> dict:
"""List route filters with their composition rules.
Route filters compose with @-pattern (NANPA) route patterns to constrain
which calls match e.g., "AREA-CODE == 208" narrows a `9.@` pattern to
in-state calls. Each filter has an ordered list of (digit, operator, tag)
member clauses.
Args:
name: Optional. If given, return only the named filter.
"""
return route_plan.list_route_filters(_client(), name)
# ====================================================================
# Prompts — schema-grounded conversation seeds
# ====================================================================
_ROUTE_KEYWORDS = [
"route plan", "route pattern", "translation pattern",
"calling search space", "partition", "transformation",
"digit discard", "numplan", "routepartition",
]
_AUDIT_PROMPTS = {
"route_plan_overview": [
"route plan", "route pattern", "calling search space", "partition",
],
"translations": [
"translation pattern", "called party transformation",
"calling party transformation", "digit discard",
],
"css_partitions": [
"calling search space", "partition", "css",
],
"transformations": [
"called party transformation", "calling party transformation",
"transformation mask", "prefix digits",
],
}
@mcp.prompt
def route_plan_overview() -> str:
"""Snapshot of the cluster's routing setup, with schema reference embedded.
Use this when you want to start a fresh route-plan audit conversation.
"""
chunks = []
if _docs is not None:
chunks = _docs.find(_ROUTE_KEYWORDS, max_chunks=5, max_chars_per_chunk=1000)
schema_block = (
_docs.format_chunks_for_prompt(chunks) if _docs else _docs_or_empty_msg()
)
return f"""# CUCM Route Plan Overview
You are auditing the routing configuration of a CUCM 15 cluster via the
`mcp-cucm-axl` MCP server (read-only). Begin by gathering a high-level
snapshot, then drill in where anything looks wrong or surprising.
## Suggested first calls (in order)
1. `axl_version()` confirm cluster reachability + version
2. `route_partitions()` partition catalog with member counts
3. `route_calling_search_spaces()` CSS list with ordered partitions
4. `route_patterns(kind="route")` outbound route patterns
5. `route_patterns(kind="translation")` translation patterns
6. `route_lists_and_groups()` route list route group device chain
7. `route_digit_discard_instructions()` DDI catalog
## What to look for in your initial summary
- **Partition sprawl**: > ~30 partitions usually indicates accumulated
legacy config. Note any with zero patterns or zero CSS membership.
- **CSS-partition asymmetry**: partitions not reachable from any CSS are
effectively dead.
- **Pattern density**: which partitions hold the bulk of route/translation
patterns? That's where the dial plan logic lives.
- **Transformation patterns**: cgpn/cdpn masks and DDIs that look unusual
or undocumented.
- **Route list depth**: route lists with one route group are fine; with many,
understand the failover order.
## Reference: CUCM data dictionary (route plan)
{schema_block}
Now run the calls above and produce a written audit summary.
"""
@mcp.prompt
def investigate_pattern(pattern: str, partition: str | None = None) -> str:
"""Deep-dive seed for one specific pattern. Schema chunks embedded."""
chunks = []
if _docs is not None:
chunks = _docs.find(
["numplan", "transformation", "translation pattern", "route pattern"],
max_chunks=4,
max_chars_per_chunk=900,
)
schema_block = (
_docs.format_chunks_for_prompt(chunks) if _docs else _docs_or_empty_msg()
)
partition_clause = f" in partition `{partition}`" if partition else ""
return f"""# Investigate Pattern: `{pattern}`{partition_clause}
Walk the user through this pattern in detail.
## Suggested calls
1. `route_inspect_pattern(pattern={pattern!r}{f', partition={partition!r}' if partition else ''})`
pattern detail, transformations, target route list/device, reverse CSS lookup
2. `route_translation_chain(number=<sample number>)` what other patterns
would compete for matches if this pattern matched a real call
3. If it's a route pattern with a route list target, follow with
`route_lists_and_groups(name=<route list name>)`
## What to report
- **Type**: directory number / route / translation / hunt pilot / etc.
- **Transformations applied**:
- Called party transformation mask
- Calling party transformation mask
- Prefix digits
- Digit discard instructions
- **Routing target**: where does the call ultimately go?
- **Who can reach it**: which CSSs include this pattern's partition? Which
device-pool/phone classes use those CSSs?
- **Anything anomalous**: missing description, undocumented transformations,
patterns that shadow each other, etc.
## Reference: CUCM data dictionary
{schema_block}
"""
@mcp.prompt
def audit_routing(focus: str = "full") -> str:
"""Comprehensive routing audit walkthrough.
Args:
focus: One of "full", "translations", "css_partitions", "transformations",
"route_lists". Tunes which schema chunks get embedded.
"""
keyword_set = _AUDIT_PROMPTS.get(focus, _ROUTE_KEYWORDS)
chunks = []
if _docs is not None:
chunks = _docs.find(keyword_set, max_chunks=6, max_chars_per_chunk=900)
schema_block = (
_docs.format_chunks_for_prompt(chunks) if _docs else _docs_or_empty_msg()
)
return f"""# CUCM Routing Audit — focus: `{focus}`
Conduct a focused audit of the cluster's routing configuration. Goal: produce
an actionable findings report not just a description of the config.
## Audit checklist
### Partitions and access control
- [ ] Are there partitions with zero patterns? (legacy/orphaned)
- [ ] Are there partitions not referenced by any CSS? (unreachable)
- [ ] Does the partition naming convention reflect actual scope?
### Calling Search Spaces
- [ ] CSS-to-CSS comparison: any near-duplicates that differ only in order?
- [ ] CSSs that include partitions in surprising orders (longest-match implications)
- [ ] CSSs that are referenced by zero devices (use axl_sql against device table)
### Translation patterns
- [ ] What does each translation pattern actually transform? Any with no
transformation that exist purely for partition routing?
- [ ] Calling-party transformations applied at translation: are they
documented? Why is the calling number being rewritten?
- [ ] Translation chains: do any translations route into partitions where
another translation will match again? (chains can be intentional but
obscure caller-ID and routing logic)
### Route patterns
- [ ] Wildcard breadth: any `9.@` (NANPA at-sign) patterns? Are they intentional?
- [ ] Block patterns: which patterns have `block_enabled` set? What are they
blocking and why?
- [ ] Patterns with no description flag for documentation.
### Transformations (called party / calling party)
- [ ] Digit discard instructions: which DDIs are referenced? Are any DDIs
defined but never used?
- [ ] Prefix-digits-out: any unusual prefixes (international, special service)?
- [ ] Calling-party masks that hide internal extensions on outbound calls.
### Route lists and groups
- [ ] Route lists with only one route group: simple, fine.
- [ ] Route lists with many: walk the failover order, confirm it's intentional.
- [ ] Route groups containing devices that are unregistered or disabled.
## Reference: CUCM data dictionary
{schema_block}
Run the relevant tool calls now and produce a structured findings report
with category headers, observation, severity (info/warning/error), and
recommended action where applicable.
"""
@mcp.prompt
def cucm_sql_help(question: str) -> str:
"""Catch-all prompt for arbitrary CUCM SQL questions. Includes schema chunks."""
keywords = [w for w in question.lower().split() if len(w) > 3][:8]
chunks = []
if _docs is not None and keywords:
chunks = _docs.find(keywords, max_chunks=5, max_chars_per_chunk=900)
schema_block = (
_docs.format_chunks_for_prompt(chunks) if _docs else _docs_or_empty_msg()
)
return f"""# CUCM SQL Question
The user asks: **{question}**
## How to approach this
1. If you don't recognize the relevant tables, call `axl_list_tables(pattern=...)`
with a substring guess (e.g., "route%", "device%", "user%").
2. Run `axl_describe_table(<table_name>)` for the candidate tables to see
exact column names and types.
3. If the schema chunks below already answer the question, draft the SQL
directly. If not, also invoke the `cisco-docs` MCP server's `search_docs`
tool with a relevant query (e.g., search_docs("data dictionary table for X")).
4. Compose the SELECT, run it via `axl_sql(query=...)`.
5. Summarize the result for the user counts, anomalies, and what you'd
recommend doing about them.
## Possibly relevant schema chunks
{schema_block}
Now answer the question.
"""
# ====================================================================
# Bootstrap
# ====================================================================
def _banner() -> None:
try:
v = _pkg_version("mcp-cucm-axl")
except Exception:
v = "0.1.0"
axl_url = os.environ.get("AXL_URL", "(unset)")
print(f"[mcp-cucm-axl] v{v} starting", file=sys.stderr, flush=True)
print(f"[mcp-cucm-axl] AXL_URL={axl_url}", file=sys.stderr, flush=True)
def main() -> None:
global _cache, _axl, _docs
# Load .env from the project directory (where the user runs uv from)
cwd_env = Path.cwd() / ".env"
if cwd_env.exists():
load_dotenv(cwd_env)
_banner()
cache_dir = Path(
os.environ.get("AXL_CACHE_DIR")
or (Path(user_cache_dir("mcp-cucm-axl")) / "responses")
)
cache_dir.mkdir(parents=True, exist_ok=True)
ttl = int(os.environ.get("AXL_CACHE_TTL", "3600"))
_cache = AxlCache(cache_dir / "axl_responses.sqlite", default_ttl=ttl)
print(
f"[mcp-cucm-axl] cache: {_cache.db_path} (ttl={ttl}s)",
file=sys.stderr,
flush=True,
)
_axl = AxlClient(_cache)
_docs = DocsIndex.load() # may be None; prompts handle gracefully
mcp.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,61 @@
"""Defense-in-depth: ensure queries we send to executeSQLQuery are SELECT-only.
CUCM's AXL service rejects writes via executeSQLQuery server-side (writes go
through a separate executeSQLUpdate method that we never expose). This client-
side check exists to:
1. Give the LLM a fast, clear error before a SOAP round-trip.
2. Make read-only intent visible at the boundary, not implicit.
"""
from __future__ import annotations
import re
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
_COMMENT_LINE = re.compile(r"--[^\n]*")
_FORBIDDEN = {
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
"EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH",
}
_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b")
class SqlValidationError(ValueError):
"""Raised when a query is not a safe read-only SELECT/WITH."""
def validate_select(query: str) -> str:
"""Return the cleaned query, or raise SqlValidationError.
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
anything else, and any query containing forbidden keywords as standalone
tokens.
"""
if not query or not query.strip():
raise SqlValidationError("Query is empty.")
stripped = _COMMENT_BLOCK.sub(" ", query)
stripped = _COMMENT_LINE.sub(" ", stripped).strip().rstrip(";").strip()
if not stripped:
raise SqlValidationError("Query is empty after stripping comments.")
upper_tokens = [t.upper() for t in _WORD_RE.findall(stripped)]
if not upper_tokens:
raise SqlValidationError("Query contains no SQL keywords.")
first = upper_tokens[0]
if first not in {"SELECT", "WITH"}:
raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first!r}."
)
forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN)
if forbidden_hits:
raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. "
f"This server is read-only."
)
return stripped

View File

@ -0,0 +1,155 @@
"""Resolve the AXLAPI WSDL for CUCM 15.
Resolution chain:
1. AXL_WSDL_PATH env var (explicit override) use it
2. ~/.cache/mcp-cucm-axl/wsdl/15.0/AXLAPI.wsdl use cached copy
3. Auto-extract `schema/15.0/` from a Cisco AXL Toolkit zip:
- $AXL_WSDL_ZIP if set
- ./axlsqltoolkit.zip in the current working directory
4. Raise with a clear pointer to the AXL toolkit download in the CUCM admin UI
The full AXL toolkit ships as three files:
AXLAPI.wsdl, AXLEnums.xsd, AXLSoap.xsd
all in the same directory. Zeep follows the schema imports from the WSDL.
"""
from __future__ import annotations
import os
import sys
import zipfile
from pathlib import Path
from platformdirs import user_cache_dir
WSDL_FILES = ("AXLAPI.wsdl", "AXLEnums.xsd", "AXLSoap.xsd")
WSDL_VERSION = "15.0"
def cache_wsdl_dir(version: str = WSDL_VERSION) -> Path:
"""Return ~/.cache/mcp-cucm-axl/wsdl/<version>/."""
return Path(user_cache_dir("mcp-cucm-axl")) / "wsdl" / version
def _has_complete_wsdl(directory: Path) -> bool:
return all((directory / fname).exists() for fname in WSDL_FILES)
def _try_extract_from_zip(version: str = WSDL_VERSION) -> Path | None:
"""Look for a Cisco AXL Toolkit zip and extract `schema/<version>/` from it.
Search order:
1. $AXL_WSDL_ZIP env var
2. ./axlsqltoolkit.zip in current working directory
Returns the cache directory path if successful, None if no zip found
or extraction failed.
"""
candidates: list[Path] = []
if env_zip := os.environ.get("AXL_WSDL_ZIP"):
candidates.append(Path(env_zip).expanduser())
candidates.append(Path.cwd() / "axlsqltoolkit.zip")
zip_path = next((p for p in candidates if p.exists() and p.is_file()), None)
if zip_path is None:
return None
target = cache_wsdl_dir(version)
target.mkdir(parents=True, exist_ok=True)
print(
f"[mcp-cucm-axl] extracting AXL schema {version} from {zip_path}",
file=sys.stderr,
flush=True,
)
try:
with zipfile.ZipFile(zip_path) as zf:
extracted = []
for fname in WSDL_FILES:
member = f"schema/{version}/{fname}"
if member not in zf.namelist():
print(
f"[mcp-cucm-axl] zip missing member: {member}",
file=sys.stderr,
flush=True,
)
return None
data = zf.read(member)
(target / fname).write_bytes(data)
extracted.append(fname)
print(
f"[mcp-cucm-axl] extracted {len(extracted)} files into {target}",
file=sys.stderr,
flush=True,
)
return target
except (zipfile.BadZipFile, OSError) as e:
print(f"[mcp-cucm-axl] zip extraction failed: {e}", file=sys.stderr, flush=True)
return None
def resolve_wsdl_path() -> Path:
"""Return the path to AXLAPI.wsdl, or raise SystemExit with setup help."""
explicit = os.environ.get("AXL_WSDL_PATH")
if explicit:
p = Path(explicit).expanduser().resolve()
if not p.exists():
raise SystemExit(f"AXL_WSDL_PATH points to nonexistent file: {p}")
if p.is_dir():
p = p / "AXLAPI.wsdl"
if not p.exists():
raise SystemExit(
f"AXL_WSDL_PATH is a directory but AXLAPI.wsdl is not inside it: {p.parent}"
)
if not _has_complete_wsdl(p.parent):
missing = [f for f in WSDL_FILES if not (p.parent / f).exists()]
print(
f"[mcp-cucm-axl] warning: WSDL dir missing {missing} alongside {p.name}; "
f"zeep may fail to resolve schema imports.",
file=sys.stderr,
flush=True,
)
return p
cache_dir = cache_wsdl_dir()
if _has_complete_wsdl(cache_dir):
return cache_dir / "AXLAPI.wsdl"
# Auto-extract from a Cisco AXL Toolkit zip if present
extracted = _try_extract_from_zip()
if extracted and _has_complete_wsdl(extracted):
return extracted / "AXLAPI.wsdl"
_bootstrap_help(cache_dir)
raise SystemExit(2) # never reached; _bootstrap_help raises
def _bootstrap_help(cache_dir: Path) -> None:
"""Print setup instructions and exit. Never returns."""
msg = [
"",
"AXL WSDL not found. To bootstrap, do one of:",
"",
" Option A — drop axlsqltoolkit.zip into the project directory:",
f" cp /path/to/axlsqltoolkit.zip {Path.cwd()}/",
" # Auto-extracts schema/15.0/ on next launch",
"",
" Option B — point at a zip elsewhere:",
" export AXL_WSDL_ZIP=/path/to/axlsqltoolkit.zip",
"",
" Option C — explicit WSDL file path:",
" export AXL_WSDL_PATH=/path/to/schema/15.0/AXLAPI.wsdl",
"",
" Option D — drop the three schema files into the cache:",
f" mkdir -p {cache_dir}",
" # Copy AXLAPI.wsdl, AXLEnums.xsd, AXLSoap.xsd into that directory",
"",
"To obtain the AXL toolkit:",
" 1. Sign in to CUCM admin",
" 2. Application > Plugins > Find > download 'Cisco AXL Toolkit'",
" 3. Use the resulting axlsqltoolkit.zip with any option above",
"",
]
raise SystemExit("\n".join(msg))

0
tests/__init__.py Normal file
View File

87
tests/test_cache.py Normal file
View File

@ -0,0 +1,87 @@
"""Tests for the SQLite TTL cache."""
import time
from pathlib import Path
import pytest
from mcp_cucm_axl.cache import AxlCache
@pytest.fixture
def cache(tmp_path: Path) -> AxlCache:
return AxlCache(tmp_path / "test.sqlite", default_ttl=60)
def test_set_and_get(cache: AxlCache):
cache.set("getCCMVersion", {}, {"version": "15.0.1"})
assert cache.get("getCCMVersion", {}) == {"version": "15.0.1"}
def test_miss_returns_none(cache: AxlCache):
assert cache.get("nonexistent", {"x": 1}) is None
def test_kwargs_order_independent(cache: AxlCache):
cache.set("listPhone", {"name": "SEP1", "limit": 10}, {"rows": ["p1"]})
# Different order should still hit
assert cache.get("listPhone", {"limit": 10, "name": "SEP1"}) == {"rows": ["p1"]}
def test_different_args_different_keys(cache: AxlCache):
cache.set("listPhone", {"name": "SEP1"}, {"rows": ["a"]})
cache.set("listPhone", {"name": "SEP2"}, {"rows": ["b"]})
assert cache.get("listPhone", {"name": "SEP1"}) == {"rows": ["a"]}
assert cache.get("listPhone", {"name": "SEP2"}) == {"rows": ["b"]}
def test_expired_entries_not_returned(tmp_path: Path):
c = AxlCache(tmp_path / "ttl.sqlite", default_ttl=60)
c.set("foo", {}, {"x": 1}, ttl=1)
time.sleep(1.1)
assert c.get("foo", {}) is None
def test_ttl_zero_disables_caching(tmp_path: Path):
c = AxlCache(tmp_path / "off.sqlite", default_ttl=0)
c.set("foo", {}, {"x": 1})
# default_ttl=0 means writes are no-ops
assert c.get("foo", {}) is None
def test_stats_reports_breakdown(cache: AxlCache):
cache.set("listPhone", {}, {"x": 1})
cache.set("listPhone", {"a": 1}, {"x": 2})
cache.set("getCCMVersion", {}, {"v": "15"})
stats = cache.stats()
assert stats["live_entries"] == 3
assert stats["by_method"]["listPhone"] == 2
assert stats["by_method"]["getCCMVersion"] == 1
def test_clear_all(cache: AxlCache):
cache.set("a", {}, "x")
cache.set("b", {}, "y")
deleted = cache.clear()
assert deleted == 2
assert cache.stats()["live_entries"] == 0
def test_clear_by_pattern(cache: AxlCache):
cache.set("listPhone", {}, "p")
cache.set("listLine", {}, "l")
cache.set("getCCMVersion", {}, "v")
deleted = cache.clear("list*")
assert deleted == 2
assert cache.get("getCCMVersion", {}) == "v"
def test_purge_expired(tmp_path: Path):
c = AxlCache(tmp_path / "p.sqlite", default_ttl=60)
c.set("a", {}, "x", ttl=1)
c.set("b", {}, "y", ttl=60)
time.sleep(1.1)
purged = c.purge_expired()
assert purged == 1
assert c.stats()["live_entries"] == 1

99
tests/test_docs_loader.py Normal file
View File

@ -0,0 +1,99 @@
"""Tests for the docs index loader (chunk filtering for prompt enrichment)."""
import json
from pathlib import Path
import pytest
from mcp_cucm_axl.docs_loader import DocsIndex
@pytest.fixture
def fake_index(tmp_path: Path) -> Path:
chunks = [
{
"id": "cucm::v15::admin::Route-Plan-Overview::0",
"text": "The route plan defines how calls are routed through the cluster.",
"heading_path": ["Route Plan Overview"],
"source_path": str(tmp_path / "fake.md"),
"product": "cucm",
"version": "v15",
"doc": "admin",
},
{
"id": "cucm::v15::admin::Translation-Patterns::0",
"text": "Translation patterns rewrite digits before routing.",
"heading_path": ["Call Routing", "Translation Patterns"],
"source_path": str(tmp_path / "fake.md"),
"product": "cucm",
"version": "v15",
"doc": "admin",
},
{
"id": "cer::v15::admin::Caller-ID::0",
"text": "Caller ID handling for emergency calls.",
"heading_path": ["Caller ID"],
"source_path": str(tmp_path / "fake.md"),
"product": "cer",
"version": "v15",
"doc": "admin",
},
]
(tmp_path / "chunks.jsonl").write_text(
"\n".join(json.dumps(c) for c in chunks)
)
(tmp_path / "index_meta.json").write_text(
json.dumps({"model_name": "test", "embedding_dim": 384, "products": ["cucm", "cer"]})
)
return tmp_path
def test_load_index(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
assert len(idx.chunks) == 3
def test_load_missing_returns_none(tmp_path: Path):
assert DocsIndex.load(tmp_path / "nope") is None
def test_find_filters_by_product(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
cucm_only = idx.find(["caller"], product="cucm")
assert all(c.get("doc") for c in cucm_only)
cer_only = idx.find(["caller"], product="cer")
assert any("Caller" in (c["heading_path"] or [""])[0] for c in cer_only)
def test_find_scores_heading_higher_than_text(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
results = idx.find(["translation"], product="cucm")
assert results
# The chunk with "Translation Patterns" in heading should rank above
# any other chunk that just mentions translation incidentally
assert "Translation" in " ".join(results[0]["heading_path"] or [])
def test_find_no_matches(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
assert idx.find(["xyzzyplugh"]) == []
def test_format_for_prompt_includes_heading_and_text(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
chunks = idx.find(["route plan"], product="cucm")
rendered = idx.format_chunks_for_prompt(chunks)
assert "Route Plan Overview" in rendered
assert "route plan defines" in rendered.lower()
def test_format_empty_chunks(fake_index: Path):
idx = DocsIndex.load(fake_index)
assert idx is not None
rendered = idx.format_chunks_for_prompt([])
assert "No matching" in rendered

70
tests/test_normalize.py Normal file
View File

@ -0,0 +1,70 @@
"""Tests for value normalization (Informix t/f → bool, etc.)."""
import pytest
from mcp_cucm_axl.normalize import (
BOOLEAN_COLUMNS,
normalize_bool,
normalize_row,
normalize_rows,
)
class TestNormalizeBool:
def test_t_becomes_true(self):
assert normalize_bool("t") is True
def test_f_becomes_false(self):
assert normalize_bool("f") is False
def test_passthrough_other_strings(self):
assert normalize_bool("True") == "True"
assert normalize_bool("yes") == "yes"
def test_passthrough_native_types(self):
assert normalize_bool(None) is None
assert normalize_bool(0) == 0
assert normalize_bool(True) is True
assert normalize_bool([]) == []
class TestNormalizeRow:
def test_normalizes_known_boolean_columns(self):
row = {"block_enabled": "t", "name": "RP-1", "ismessagewaitingon": "f"}
result = normalize_row(row)
assert result["block_enabled"] is True
assert result["ismessagewaitingon"] is False
assert result["name"] == "RP-1" # not a boolean column, untouched
def test_unknown_column_with_t_value_unchanged(self):
# Conservative: only normalize columns we know are boolean.
# Avoids false positives on columns where 't' is meaningful (e.g.
# a single-character device class code).
row = {"unknown_field": "t"}
result = normalize_row(row)
assert result["unknown_field"] == "t"
def test_empty_row(self):
assert normalize_row({}) == {}
class TestNormalizeRows:
def test_processes_each_row(self):
rows = [
{"block_enabled": "t", "name": "RP-1"},
{"block_enabled": "f", "name": "RP-2"},
]
result = normalize_rows(rows)
assert result[0]["block_enabled"] is True
assert result[1]["block_enabled"] is False
def test_empty_list(self):
assert normalize_rows([]) == []
class TestBooleanColumnsSet:
def test_known_columns_present(self):
# Smoke test that the canonical columns are in the set
assert "block_enabled" in BOOLEAN_COLUMNS
assert "blockenable" in BOOLEAN_COLUMNS
assert "isemergencyservicenumber" in BOOLEAN_COLUMNS

View File

@ -0,0 +1,84 @@
"""Tests for the SELECT-only SQL guardrail."""
import pytest
from mcp_cucm_axl.sql_validator import validate_select, SqlValidationError
class TestSelectAccepted:
def test_simple_select(self):
assert validate_select("SELECT * FROM device") == "SELECT * FROM device"
def test_with_cte(self):
q = "WITH x AS (SELECT 1 FROM systables) SELECT * FROM x"
assert validate_select(q) == q
def test_lowercase_select(self):
assert validate_select("select * from numplan") == "select * from numplan"
def test_trailing_semicolon_stripped(self):
assert validate_select("SELECT 1 FROM device;") == "SELECT 1 FROM device"
def test_block_comments_stripped(self):
q = "/* comment */ SELECT 1 FROM device"
cleaned = validate_select(q)
assert "SELECT 1 FROM device" in cleaned
def test_line_comments_stripped(self):
q = "-- a comment\nSELECT 1 FROM device"
cleaned = validate_select(q)
assert "SELECT 1 FROM device" in cleaned
class TestRejected:
def test_empty(self):
with pytest.raises(SqlValidationError, match="empty"):
validate_select("")
def test_whitespace_only(self):
with pytest.raises(SqlValidationError, match="empty"):
validate_select(" \n ")
def test_only_comments(self):
with pytest.raises(SqlValidationError, match="empty"):
validate_select("-- just a comment\n/* and another */")
def test_insert_rejected(self):
with pytest.raises(SqlValidationError, match="INSERT"):
validate_select("INSERT INTO device VALUES (1)")
def test_update_rejected(self):
with pytest.raises(SqlValidationError, match="UPDATE"):
validate_select("UPDATE device SET name='x' WHERE pkid='y'")
def test_delete_rejected(self):
with pytest.raises(SqlValidationError, match="DELETE"):
validate_select("DELETE FROM device WHERE pkid='y'")
def test_drop_rejected(self):
with pytest.raises(SqlValidationError, match="DROP"):
validate_select("DROP TABLE device")
def test_select_with_embedded_drop_rejected(self):
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish
# position our keyword filter still catches it. AXL would also reject
# this, but failing fast on the client saves a SOAP round-trip.
with pytest.raises(SqlValidationError, match="DROP"):
validate_select("SELECT 1 FROM device; DROP TABLE device")
def test_truncate_rejected(self):
with pytest.raises(SqlValidationError, match="TRUNCATE"):
validate_select("TRUNCATE TABLE device")
class TestEdgeCases:
def test_keyword_as_column_name_blocked(self):
# A column named "delete" would be blocked. This is acceptable —
# the data dictionary doesn't use SQL keywords as column names,
# and conservative blocking is the right call for v1.
with pytest.raises(SqlValidationError):
validate_select("SELECT delete FROM device")
def test_select_with_subquery(self):
q = "SELECT name FROM device WHERE pkid IN (SELECT fkdevice FROM numplan)"
assert "SELECT name FROM device" in validate_select(q)

97
tests/test_wildcard.py Normal file
View File

@ -0,0 +1,97 @@
"""Tests for CUCM dial-plan wildcard pattern matching."""
import pytest
from mcp_cucm_axl.route_plan import _pattern_matches_number, _wildcard_to_regex
class TestLiteralPatterns:
def test_exact_match(self):
assert _pattern_matches_number("1001", "1001")
def test_no_match(self):
assert not _pattern_matches_number("1001", "1002")
def test_escaped_plus(self):
assert _pattern_matches_number(r"\+15551234567", "+15551234567")
assert not _pattern_matches_number(r"\+15551234567", "15551234567")
class TestXWildcard:
def test_X_matches_any_digit(self):
assert _pattern_matches_number("XXXX", "1234")
assert _pattern_matches_number("XXXX", "9999")
def test_X_only_matches_digits(self):
assert not _pattern_matches_number("XXXX", "abc1")
assert not _pattern_matches_number("XXXX", "12") # too short
assert not _pattern_matches_number("XXXX", "12345") # too long
def test_X_mixed_with_literal(self):
assert _pattern_matches_number("9XXXX", "91234")
assert not _pattern_matches_number("9XXXX", "81234")
class TestBangWildcard:
def test_bang_matches_one_or_more(self):
assert _pattern_matches_number("9!", "91")
assert _pattern_matches_number("9!", "915551234567")
def test_bang_requires_at_least_one(self):
assert not _pattern_matches_number("9!", "9")
class TestCharacterClass:
def test_class_matches_any_in_set(self):
assert _pattern_matches_number("[2-9]XXX", "2000")
assert _pattern_matches_number("[2-9]XXX", "9999")
def test_class_excludes_outside_set(self):
assert not _pattern_matches_number("[2-9]XXX", "1000")
assert not _pattern_matches_number("[2-9]XXX", "0000")
class TestDotTerminator:
def test_dot_is_zero_width(self):
# CUCM's '.' is a marker for digit-discard, not a regex char
assert _pattern_matches_number("9.911", "9911")
assert _pattern_matches_number("10.911", "10911")
def test_dot_with_X_after(self):
assert _pattern_matches_number("9.[2-9]XXXXXXXXX", "92085551234")
class TestAtPattern:
def test_at_matches_any_digits(self):
# @ would normally apply a route filter; we treat it as "any digits"
assert _pattern_matches_number("9.@", "915551234567")
assert _pattern_matches_number("\\+.@", "+15551234567")
class TestEdgeCases:
def test_invalid_regex_returns_false(self):
# An unbalanced bracket should not raise
result = _pattern_matches_number("[", "1")
assert result is False
def test_empty_pattern(self):
assert _pattern_matches_number("", "")
assert not _pattern_matches_number("", "1")
def test_regex_anchors(self):
# Make sure we don't match a substring
assert not _pattern_matches_number("911", "1911")
assert not _pattern_matches_number("911", "9111")
class TestRegexConversion:
def test_X_to_digit_class(self):
assert _wildcard_to_regex("X") == r"^\d$"
def test_bang_to_one_or_more_digits(self):
assert _wildcard_to_regex("!") == r"^\d+$"
def test_anchored(self):
regex = _wildcard_to_regex("9XXX")
assert regex.startswith("^")
assert regex.endswith("$")

1798
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff