diff --git a/README.md b/README.md
index e9deb55..26f66ba 100644
--- a/README.md
+++ b/README.md
@@ -91,6 +91,18 @@ opens this directory.
| `axl_list_tables(pattern=None)` | Discover Informix tables |
| `axl_describe_table(name)` | Column metadata for one table |
| `cache_stats()`, `cache_clear(pattern=None)` | Cache plumbing |
+| `health()` | Subsystem self-check (cache / AXL / docs / RisPort init state) |
+
+### Real-time device registration (RisPort70)
+
+Complementary to AXL — AXL tells you what's *configured*, RisPort tells you
+what's *currently registered*. The audit-relevant cross-reference is
+"configured but unregistered" (orphan signal).
+
+| Tool | Purpose |
+|---|---|
+| `device_registration_status(device_class, status, name_filter, page_size)` | Page through CUCM's RisPort `selectCmDevice` for live registration state |
+| `device_registration_summary()` | Cluster-wide breakdown: registered / unregistered / rejected counts across Phone, Gateway, SIPTrunk, HuntList, etc. |
### Route plan
@@ -116,6 +128,34 @@ sibling `cisco-docs` index and embed them inline:
- `investigate_pattern(pattern, partition=None)` — deep-dive a specific pattern
- `audit_routing(focus="full")` — comprehensive audit walkthrough
- `cucm_sql_help(question)` — catch-all for arbitrary SQL questions
+- `sip_trunk_report(name_filter=None)` — SIP trunk inventory + findings
+- `phone_inventory_report(filter=None)` — phone fleet aggregates with anomaly findings; cross-references RisPort registration state
+- `user_audit(focus="full")` — end users + application users + role assignments
+- `inbound_did_audit()` — XFORM-Inbound-DNIS inventory + screening pipeline
+- `hunt_pilot_audit()` — hunt pilots, queue settings, line group membership
+- `whoami(userid=None)` — single-user role chain (defaults to AXL service account)
+
+## Scope and complement
+
+This server is **audit-focused**: read-only queries against AXL plus
+RisPort cross-reference for registration state. It does *not* cover
+operational debugging (logs, packet capture, perfmon counters,
+service control, certificates, backups).
+
+For those, install [`@calltelemetry/cisco-cucm-mcp`](https://github.com/calltelemetry/cisco-cucm-mcp)
+alongside this server:
+
+```bash
+claude mcp add cucm-ops -- npx -y @calltelemetry/cisco-cucm-mcp@latest
+```
+
+The two servers are **complementary**, not competing — they answer
+different questions and use different CUCM APIs (AXL + RisPort here;
+DIME + RisPort + PerfMon + ControlCenter + SSH there). An LLM with
+both servers can compose audit findings (this server) with operational
+state (theirs) — e.g., *"audit found CSS X has 0 references AND
+RisPort shows zero phones currently registered against any device pool
+that inherits it → confirmed safe to delete."*
## Cache
diff --git a/src/mcp_cucm_axl/client.py b/src/mcp_cucm_axl/client.py
index 6639e8e..f13e573 100644
--- a/src/mcp_cucm_axl/client.py
+++ b/src/mcp_cucm_axl/client.py
@@ -50,20 +50,23 @@ class AxlClient:
self._config_error: str | None = None # permanent, pinned
self._last_error: str | None = None # last seen, may be transient
self._connected_at: float | None = None # monotonic time of last success
+ self._retry_config: dict | None = None # populated when session is built
def connection_status(self) -> dict:
"""Diagnostic snapshot — what's the state of the connection?
Useful for the `health` MCP tool and for operators trying to
figure out why a tool call failed. Reports whether we're
- currently connected, when we last successfully connected, and
- the last error (config or operational).
+ currently connected, when we last successfully connected, the
+ last error (config or operational), and the rate-limit retry
+ policy in effect.
"""
return {
"connected": self._service is not None,
"connected_at_monotonic": self._connected_at,
"config_error": self._config_error, # permanent until restart
"last_error": self._last_error,
+ "retry_config": self._retry_config,
}
def _ensure_connected(self) -> None:
@@ -103,6 +106,33 @@ class AxlClient:
session.verify = verify_tls
session.auth = HTTPBasicAuth(user, password)
+ # Rate-limit / transient-error retry. CUCM's SOAP layer returns 503
+ # under load (multiple admins running AXL queries during a change
+ # window, etc). 502/504 occur when the publisher is restarting or
+ # a load balancer is between us and CUCM. Pre-fix, any of these
+ # was a hard failure to the caller; now they're retried with
+ # exponential backoff.
+ from requests.adapters import HTTPAdapter
+ from urllib3.util.retry import Retry
+ max_retries = int(os.environ.get("AXL_RATE_LIMIT_RETRIES", "3"))
+ if max_retries > 0:
+ retry = Retry(
+ total=max_retries,
+ backoff_factor=1.0, # 1s, 2s, 4s between retries
+ status_forcelist=(502, 503, 504),
+ allowed_methods=frozenset(["POST", "GET"]),
+ raise_on_status=False, # let zeep see the final response
+ respect_retry_after_header=True,
+ )
+ adapter = HTTPAdapter(max_retries=retry)
+ session.mount("https://", adapter)
+ session.mount("http://", adapter)
+ self._retry_config = {
+ "max_retries": max_retries,
+ "backoff_factor": 1.0,
+ "status_forcelist": [502, 503, 504],
+ }
+
# zeep's own WSDL cache (separate from our response cache) keeps
# repeat startups fast — it parses the WSDL once and reuses
from platformdirs import user_cache_dir
diff --git a/src/mcp_cucm_axl/prompts/phone_inventory_report.py b/src/mcp_cucm_axl/prompts/phone_inventory_report.py
index d948ec3..9679d2b 100644
--- a/src/mcp_cucm_axl/prompts/phone_inventory_report.py
+++ b/src/mcp_cucm_axl/prompts/phone_inventory_report.py
@@ -145,7 +145,46 @@ ORDER BY d.name;
worth confirming the count matches expectations — a sudden increase
indicates config drift.)
-## Step 3 — Cross-reference with users (if owners exist)
+## Step 3 — Cross-reference with real-time registration (highest-leverage)
+
+`device_registration_summary()` returns a cluster-wide breakdown by status
+across all device classes. The audit-relevant question is: what fraction
+of *configured* phones are *actually registered*?
+
+```
+device_registration_summary()
+```
+
+For drill-down on the unregistered population:
+
+```
+device_registration_status(device_class="Phone", status="UnRegistered")
+```
+
+This gives you the actual list of phones that have a config in CUCM but
+are not currently registered. **A phone that's been "UnRegistered" for
+weeks is the strongest orphan signal we can produce** — cleaner than
+"no description" or "no owner" because registration state reflects
+real-world usage, not just operator hygiene.
+
+**Findings to surface from this cross-reference:**
+
+- **Configured-but-never-registered phones**: the cluster has them
+ configured, they've never come online. Often abandoned conference
+ phones, decommissioned analog gateways, or templates that were
+ cloned but never deployed. Strong candidates for deletion.
+- **Registered but no description / no owner**: actively-used phones
+ whose operator hygiene is weak. Hospitals accumulate these as
+ "patient room 3142" or shared cordless phones; verify they're
+ intentional shared-use endpoints rather than just untracked.
+- **PartiallyRegistered**: a phone that's communicating with one CM
+ node but not another. Indicates a real registration problem
+ (firewall, version mismatch, certificate); flag for IT.
+- **Status mismatch**: phone class says "Cisco 7841" but `model` field
+ on the registered device says something different — could indicate a
+ spoofing attempt or hardware swap without config update.
+
+## Step 4 — Cross-reference with users (if owners exist)
For phones with an owner, list owner identities to spot stale
assignments (employee left, phone still tagged to them):
diff --git a/src/mcp_cucm_axl/risport.py b/src/mcp_cucm_axl/risport.py
new file mode 100644
index 0000000..8603ae7
--- /dev/null
+++ b/src/mcp_cucm_axl/risport.py
@@ -0,0 +1,407 @@
+"""RisPort70 — real-time device registration status.
+
+CUCM's RisPort (Real-time Information Server Port) lives at
+`/realtimeservice2/services/RISService70` and exposes a SOAP API for
+querying the *runtime* state of devices: are phones currently
+registered, what IP did they get, what's their status, etc.
+
+This is **complementary to AXL**: AXL tells us what's CONFIGURED;
+RisPort tells us what's HAPPENING right now. For audit purposes the
+difference between "configured but never registered" (orphan) and
+"actively registered" (live) is the highest-value cross-reference.
+
+Why we ship our own RisPort wrapper alongside the AXL one rather than
+deferring to a separate operations MCP server (`@calltelemetry/cisco-cucm-mcp`
+covers operational debugging more thoroughly): the audit-narrative is
+the use case here. `phone_inventory_report` becomes substantively more
+valuable when it can join configured-phones with currently-registered
+state in a single prompt, and that join lives most naturally in this
+codebase.
+
+SOAP envelope structure cribbed from cisco-cucm-mcp's TypeScript
+implementation (MIT licensed) — namespaces and field names verified
+against CUCM 15.0.1.12900 documentation.
+"""
+
+from __future__ import annotations
+
+import os
+import re
+import sys
+import xml.etree.ElementTree as ET
+from urllib.parse import urlparse
+
+from requests import Session
+from requests.adapters import HTTPAdapter
+from requests.auth import HTTPBasicAuth
+from urllib3.util.retry import Retry
+
+
+# RisPort path on the CUCM publisher
+_RIS_PATH = "/realtimeservice2/services/RISService70"
+
+# SOAP namespaces. These match Cisco's published values for RisPort70.
+_NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/"
+_NS_RIS = "http://schemas.cisco.com/ast/soap"
+
+# Status values RisPort returns for devices
+DEVICE_STATUS_VALUES = (
+ "Any", "Registered", "UnRegistered", "Rejected",
+ "PartiallyRegistered", "Unknown",
+)
+
+
+def _escape_xml(s: str) -> str:
+ """Minimal XML entity escape for values we inject into SOAP envelopes."""
+ return (
+ s.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace('"', """)
+ .replace("'", "'")
+ )
+
+
+def _build_select_envelope(
+ state_info: str = "",
+ max_devices: int = 200,
+ device_class: str = "Phone",
+ status: str = "Any",
+ select_items: list[str] | None = None,
+ select_by: str = "Name",
+) -> str:
+ """Build a `selectCmDevice` SOAP envelope.
+
+ The structure is fragile — the CmSelectionCriteria child elements
+ must appear in the order Cisco's WSDL expects, and missing fields
+ are rejected. We err on the side of always including every field
+ with sensible defaults.
+ """
+ items = select_items if select_items else ["*"]
+ items_xml = "".join(
+ f"{_escape_xml(i)}"
+ for i in items
+ )
+ return (
+ ''
+ f''
+ ""
+ ""
+ ""
+ f"{_escape_xml(state_info)}"
+ ""
+ f"{int(max_devices)}"
+ f"{_escape_xml(device_class)}"
+ "255"
+ f"{_escape_xml(status)}"
+ ""
+ f"{_escape_xml(select_by)}"
+ f"{items_xml}"
+ "Any"
+ "Any"
+ ""
+ ""
+ ""
+ ""
+ )
+
+
+def _extract_text(elem: ET.Element | None, tag: str) -> str:
+ """Return the text of child of elem, or '' if missing.
+
+ RisPort responses don't use namespaces consistently across CUCM
+ versions; matching on local-name only is more robust than xpath
+ against a fixed namespace.
+ """
+ if elem is None:
+ return ""
+ for child in elem:
+ if child.tag.split("}")[-1] == tag:
+ return (child.text or "").strip()
+ return ""
+
+
+def _extract_ip(elem: ET.Element | None) -> str:
+ """Pull the IP string out of CUCM's nested IPAddress structure.
+
+ CUCM 15 returns IPAddress as a nested struct:
+ - x.x.x.xipv4
+ Older versions may return a flat string. We handle both.
+ """
+ if elem is None:
+ return ""
+ if elem.text and elem.text.strip():
+ return elem.text.strip()
+ for item_elem in elem:
+ if item_elem.tag.split("}")[-1].lower() == "item":
+ for ip_elem in item_elem:
+ if ip_elem.tag.split("}")[-1] == "IP":
+ return (ip_elem.text or "").strip()
+ return ""
+
+
+def _parse_device(elem: ET.Element) -> dict:
+ """Pull the audit-relevant fields out of a single CmDevice element."""
+ ip_elem = None
+ for child in elem:
+ local = child.tag.split("}")[-1]
+ if local in ("IPAddress", "IpAddress"):
+ ip_elem = child
+ break
+ return {
+ "name": _extract_text(elem, "Name"),
+ "ip_address": _extract_ip(ip_elem),
+ "description": _extract_text(elem, "Description"),
+ "dir_number": _extract_text(elem, "DirNumber"),
+ "status": _extract_text(elem, "Status"),
+ "status_reason": _extract_text(elem, "StatusReason"),
+ "protocol": _extract_text(elem, "Protocol"),
+ "model": _extract_text(elem, "Model"),
+ "active_load_id": _extract_text(elem, "ActiveLoadID"),
+ "timestamp": _extract_text(elem, "TimeStamp"),
+ }
+
+
+def _parse_response(xml_text: str) -> dict:
+ """Parse the selectCmDevice SOAP response into structured form.
+
+ Returns:
+ {
+ "total_devices_found": int,
+ "state_info": str (cursor for next page; empty = last),
+ "cm_nodes": [
+ {"name": str, "return_code": str, "devices": [...]}
+ ],
+ }
+ """
+ root = ET.fromstring(xml_text)
+ # Walk to selectCmDeviceReturn regardless of namespacing quirks
+ select_return = None
+ for elem in root.iter():
+ if elem.tag.split("}")[-1] == "selectCmDeviceReturn":
+ select_return = elem
+ break
+ if select_return is None:
+ # Check for SOAP fault
+ for elem in root.iter():
+ if elem.tag.split("}")[-1] == "Fault":
+ fault_string = _extract_text(elem, "faultstring")
+ raise RuntimeError(f"RisPort SOAP fault: {fault_string or 'unknown'}")
+ raise RuntimeError("RisPort response missing selectCmDeviceReturn")
+
+ # CUCM 15 wraps the data in an extra element
+ # inside . Older versions exposed the fields
+ # directly. Probe for the wrapper and descend if found.
+ for child in select_return:
+ if child.tag.split("}")[-1] == "SelectCmDeviceResult":
+ select_return = child
+ break
+
+ total = _extract_text(select_return, "TotalDevicesFound")
+ state_info = _extract_text(select_return, "StateInfo")
+
+ nodes_out = []
+ cm_nodes = None
+ for child in select_return:
+ if child.tag.split("}")[-1] == "CmNodes":
+ cm_nodes = child
+ break
+
+ if cm_nodes is not None:
+ for node_elem in cm_nodes:
+ if node_elem.tag.split("}")[-1] != "item":
+ continue
+ node_name = _extract_text(node_elem, "Name")
+ return_code = _extract_text(node_elem, "ReturnCode")
+ devices: list[dict] = []
+ for cm_devices_elem in node_elem:
+ if cm_devices_elem.tag.split("}")[-1] != "CmDevices":
+ continue
+ for dev_item in cm_devices_elem:
+ if dev_item.tag.split("}")[-1] == "item":
+ devices.append(_parse_device(dev_item))
+ nodes_out.append({
+ "name": node_name,
+ "return_code": return_code,
+ "devices": devices,
+ })
+
+ try:
+ total_int = int(total)
+ except (TypeError, ValueError):
+ total_int = 0
+
+ return {
+ "total_devices_found": total_int,
+ "state_info": state_info,
+ "cm_nodes": nodes_out,
+ }
+
+
+class RisPortClient:
+ """Lazy SOAP client for CUCM's RisPort70 service.
+
+ Reuses AXL_URL / AXL_USER / AXL_PASS env vars (RisPort lives on the
+ same host as AXL on standard CUCM deployments). Builds a separate
+ `requests.Session` so the retry policy and TLS settings can be tuned
+ independently if needed.
+ """
+
+ def __init__(self):
+ self._session: Session | None = None
+ self._url: str | None = None
+ self._config_error: str | None = None
+ self._last_error: str | None = None
+
+ def _ensure_session(self) -> None:
+ if self._session is not None:
+ return
+ if self._config_error is not None:
+ raise RuntimeError(self._config_error)
+ try:
+ axl_url = os.environ["AXL_URL"]
+ user = os.environ["AXL_USER"]
+ password = os.environ["AXL_PASS"]
+ except KeyError as e:
+ self._config_error = (
+ f"Missing required env var {e.args[0]} for RisPort. "
+ f"Reuses AXL_URL/USER/PASS."
+ )
+ raise RuntimeError(self._config_error) from None
+
+ verify_tls = os.environ.get("AXL_VERIFY_TLS", "false").lower() in (
+ "1", "true", "yes"
+ )
+
+ # Derive RisPort URL from AXL host
+ parsed = urlparse(axl_url)
+ if not parsed.hostname:
+ self._config_error = f"Could not parse AXL_URL host: {axl_url!r}"
+ raise RuntimeError(self._config_error)
+ port = parsed.port or 8443
+ scheme = parsed.scheme or "https"
+ self._url = f"{scheme}://{parsed.hostname}:{port}{_RIS_PATH}"
+
+ session = Session()
+ session.verify = verify_tls
+ session.auth = HTTPBasicAuth(user, password)
+
+ # Same retry policy as AXL — 503/502/504 with backoff
+ max_retries = int(os.environ.get("AXL_RATE_LIMIT_RETRIES", "3"))
+ if max_retries > 0:
+ retry = Retry(
+ total=max_retries,
+ backoff_factor=1.0,
+ status_forcelist=(502, 503, 504),
+ allowed_methods=frozenset(["POST", "GET"]),
+ raise_on_status=False,
+ respect_retry_after_header=True,
+ )
+ adapter = HTTPAdapter(max_retries=retry)
+ session.mount("https://", adapter)
+ session.mount("http://", adapter)
+
+ self._session = session
+ print(
+ f"[mcp-cucm-axl] RisPort client ready: {self._url}",
+ file=sys.stderr,
+ flush=True,
+ )
+
+ def select_cm_device(
+ self,
+ max_devices: int = 200,
+ device_class: str = "Phone",
+ status: str = "Any",
+ name_filter: str | None = None,
+ state_info: str = "",
+ ) -> dict:
+ """Single-page selectCmDevice call. Returns up to max_devices rows.
+
+ For full inventory, call `select_all` which auto-paginates via
+ the `state_info` cursor.
+ """
+ # Validate before connecting — we want a clear error from bad input
+ # whether or not env vars are set.
+ if status not in DEVICE_STATUS_VALUES:
+ raise ValueError(
+ f"status must be one of {DEVICE_STATUS_VALUES}; got {status!r}"
+ )
+ self._ensure_session()
+
+ select_items = [name_filter] if name_filter else ["*"]
+ envelope = _build_select_envelope(
+ state_info=state_info,
+ max_devices=max_devices,
+ device_class=device_class,
+ status=status,
+ select_items=select_items,
+ )
+ try:
+ resp = self._session.post(
+ self._url,
+ data=envelope,
+ headers={
+ "Content-Type": "text/xml; charset=utf-8",
+ "SOAPAction": '"selectCmDevice"',
+ },
+ timeout=30,
+ )
+ resp.raise_for_status()
+ self._last_error = None
+ return _parse_response(resp.text)
+ except Exception as e:
+ self._last_error = f"RisPort request failed: {e}"
+ raise RuntimeError(self._last_error) from e
+
+ def select_all(
+ self,
+ device_class: str = "Phone",
+ status: str = "Any",
+ page_size: int = 200,
+ max_pages: int = 20,
+ ) -> dict:
+ """Auto-paginate through selectCmDevice using the StateInfo cursor.
+
+ Walks pages until the cursor is empty OR max_pages reached. Returns
+ a single dict with all devices flattened across pages, plus
+ per-status counts and a `pages_walked` field for diagnostics.
+ """
+ all_devices: list[dict] = []
+ nodes_seen: set[str] = set()
+ state_info = ""
+ pages = 0
+ last_total = 0
+ while pages < max_pages:
+ page = self.select_cm_device(
+ max_devices=page_size,
+ device_class=device_class,
+ status=status,
+ state_info=state_info,
+ )
+ pages += 1
+ last_total = page["total_devices_found"]
+ for node in page["cm_nodes"]:
+ nodes_seen.add(node["name"])
+ all_devices.extend(node["devices"])
+ next_cursor = page.get("state_info") or ""
+ if not next_cursor or next_cursor == state_info:
+ break
+ state_info = next_cursor
+
+ # Per-status breakdown
+ status_counts: dict[str, int] = {}
+ for d in all_devices:
+ s = d.get("status") or "Unknown"
+ status_counts[s] = status_counts.get(s, 0) + 1
+
+ return {
+ "device_class": device_class,
+ "status_filter": status,
+ "total_devices_found": last_total,
+ "devices_returned": len(all_devices),
+ "pages_walked": pages,
+ "cm_nodes_seen": sorted(nodes_seen),
+ "status_counts": status_counts,
+ "devices": all_devices,
+ }
diff --git a/src/mcp_cucm_axl/server.py b/src/mcp_cucm_axl/server.py
index 2904652..70e40e0 100644
--- a/src/mcp_cucm_axl/server.py
+++ b/src/mcp_cucm_axl/server.py
@@ -29,12 +29,14 @@ from . import route_plan
from .cache import AxlCache
from .client import AxlClient
from .docs_loader import DocsIndex
+from .risport import RisPortClient
# ---- Module-level singletons, initialized in main() ----
_cache: AxlCache | None = None
_axl: AxlClient | None = None
_docs: DocsIndex | None = None
+_ris: RisPortClient | None = None
mcp = FastMCP("CUCM AXL (read-only)")
@@ -152,6 +154,7 @@ def health() -> dict:
"cache": _cache is not None,
"axl": _axl is not None,
"docs": _docs is not None,
+ "risport": _ris is not None,
}
if _axl is not None:
info["axl_connection"] = _axl.connection_status()
@@ -311,6 +314,69 @@ def route_devices_using_css(css_name: str, max_per_category: int = 50) -> dict:
return route_plan.find_devices_using_css(_client(), css_name, max_per_category)
+@mcp.tool
+def device_registration_status(
+ device_class: str = "Phone",
+ status: str = "Any",
+ name_filter: str | None = None,
+ page_size: int = 200,
+) -> dict:
+ """Real-time device registration status from CUCM RisPort70.
+
+ Complementary to AXL: AXL tells us what's *configured*; RisPort tells
+ us what's *currently registered*. The most audit-relevant cross-
+ reference is "configured but unregistered" (likely orphan).
+
+ Args:
+ device_class: One of "Phone" (default), "Gateway", "H323", "CTI",
+ "VoiceMail", "MediaResources", "HuntList", "SIPTrunk", "Any".
+ status: One of "Any" (default), "Registered", "UnRegistered",
+ "Rejected", "PartiallyRegistered", "Unknown".
+ name_filter: Optional name (or wildcard `*` substring) to narrow
+ the result. Maps to RisPort's SelectItems.
+ page_size: Max devices per RisPort call. RisPort caps at 1000.
+ """
+ if _ris is None:
+ raise RuntimeError("RisPort client not initialized — server bootstrap failed.")
+ if name_filter:
+ return _ris.select_cm_device(
+ max_devices=page_size,
+ device_class=device_class,
+ status=status,
+ name_filter=name_filter,
+ )
+ return _ris.select_all(
+ device_class=device_class,
+ status=status,
+ page_size=page_size,
+ )
+
+
+@mcp.tool
+def device_registration_summary() -> dict:
+ """High-level cluster registration health: counts by status across
+ all device classes that matter for audit work.
+
+ Useful as a once-per-conversation orientation: "are most phones
+ actually registered, or is something broken?"
+ """
+ if _ris is None:
+ raise RuntimeError("RisPort client not initialized — server bootstrap failed.")
+ summary = {}
+ for cls in ("Phone", "Gateway", "H323", "SIPTrunk", "HuntList", "CTI"):
+ try:
+ r = _ris.select_all(device_class=cls, status="Any")
+ summary[cls] = {
+ "total_devices_found": r["total_devices_found"],
+ "devices_returned": r["devices_returned"],
+ "status_counts": r["status_counts"],
+ "cm_nodes_seen": r["cm_nodes_seen"],
+ }
+ except Exception as e:
+ summary[cls] = {"error": str(e)[:200]}
+ return summary
+
+
@mcp.tool
def route_filters(name: str | None = None, include_members: bool = False) -> dict:
"""List route filters with their composition rules.
@@ -455,7 +521,7 @@ def _banner() -> None:
def main() -> None:
- global _cache, _axl, _docs
+ global _cache, _axl, _docs, _ris
# Load .env from the project directory (where the user runs uv from)
cwd_env = Path.cwd() / ".env"
@@ -490,6 +556,7 @@ def main() -> None:
_axl = AxlClient(_cache)
_docs = DocsIndex.load() # may be None; prompts handle gracefully
+ _ris = RisPortClient()
mcp.run()
diff --git a/tests/test_client_recovery.py b/tests/test_client_recovery.py
index d411277..3c31517 100644
--- a/tests/test_client_recovery.py
+++ b/tests/test_client_recovery.py
@@ -81,3 +81,75 @@ def test_health_diagnostic_includes_connection_state(cache: AxlCache):
assert "connected" in info
assert info["connected"] is False # never tried yet
assert "last_error" in info
+
+
+# ---- Rate limit / 503 retry --------------------------------------------------
+# Inspired by cisco-cucm-mcp's exponential-backoff approach. CUCM's SOAP
+# layer returns 503 under load (concurrent AXL admins, change window). Without
+# retries, we'd fail loudly; with them, transient rate limiting becomes
+# invisible to the caller.
+
+def test_retry_config_default_three_retries(cache: AxlCache, monkeypatch):
+ """By default, the session is configured for 3 retries with backoff."""
+ monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl")
+ monkeypatch.setenv("AXL_USER", "test")
+ monkeypatch.setenv("AXL_PASS", "test")
+ monkeypatch.setenv("AXL_VERIFY_TLS", "false")
+ # Stub Client construction so we exercise only the session/retry setup
+ from mcp_cucm_axl import client as client_mod
+
+ constructed = {}
+
+ def stub_client(*args, **kwargs):
+ constructed["transport"] = kwargs.get("transport")
+ # Raise to short-circuit before service creation
+ raise ConnectionError("stub: don't actually connect")
+
+ monkeypatch.setattr(client_mod, "Client", stub_client)
+
+ client = AxlClient(cache)
+ with pytest.raises(RuntimeError):
+ client._ensure_connected()
+
+ info = client.connection_status()
+ assert info["retry_config"] is not None
+ assert info["retry_config"]["max_retries"] == 3
+ assert 503 in info["retry_config"]["status_forcelist"]
+ assert 502 in info["retry_config"]["status_forcelist"]
+ assert 504 in info["retry_config"]["status_forcelist"]
+
+
+def test_retry_config_overridable_via_env(cache: AxlCache, monkeypatch):
+ """Operators can tune the retry count via AXL_RATE_LIMIT_RETRIES."""
+ monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl")
+ monkeypatch.setenv("AXL_USER", "test")
+ monkeypatch.setenv("AXL_PASS", "test")
+ monkeypatch.setenv("AXL_RATE_LIMIT_RETRIES", "7")
+
+ from mcp_cucm_axl import client as client_mod
+ monkeypatch.setattr(client_mod, "Client", lambda *a, **kw: (_ for _ in ()).throw(ConnectionError("stub")))
+
+ client = AxlClient(cache)
+ with pytest.raises(RuntimeError):
+ client._ensure_connected()
+
+ assert client.connection_status()["retry_config"]["max_retries"] == 7
+
+
+def test_retry_config_zero_disables(cache: AxlCache, monkeypatch):
+ """AXL_RATE_LIMIT_RETRIES=0 disables the retry adapter entirely.
+ Useful for test environments or when an operator wants raw failures."""
+ monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl")
+ monkeypatch.setenv("AXL_USER", "test")
+ monkeypatch.setenv("AXL_PASS", "test")
+ monkeypatch.setenv("AXL_RATE_LIMIT_RETRIES", "0")
+
+ from mcp_cucm_axl import client as client_mod
+ monkeypatch.setattr(client_mod, "Client", lambda *a, **kw: (_ for _ in ()).throw(ConnectionError("stub")))
+
+ client = AxlClient(cache)
+ with pytest.raises(RuntimeError):
+ client._ensure_connected()
+
+ cfg = client.connection_status()["retry_config"]
+ assert cfg["max_retries"] == 0
diff --git a/tests/test_risport.py b/tests/test_risport.py
new file mode 100644
index 0000000..663b0ed
--- /dev/null
+++ b/tests/test_risport.py
@@ -0,0 +1,222 @@
+"""Unit tests for the RisPort70 SOAP envelope construction and parser.
+
+Live-cluster integration is verified separately via the smoke-test
+script — these tests are pure: envelope shape, response-XML parsing,
+edge cases. No network.
+"""
+
+import xml.etree.ElementTree as ET
+
+import pytest
+
+from mcp_cucm_axl.risport import (
+ DEVICE_STATUS_VALUES,
+ RisPortClient,
+ _build_select_envelope,
+ _escape_xml,
+ _parse_response,
+)
+
+
+class TestEscapeXml:
+ def test_basic_escapes(self):
+ assert _escape_xml("") == "<bad>"
+ assert _escape_xml("a&b") == "a&b"
+ assert _escape_xml('"x"') == ""x""
+ assert _escape_xml("'y'") == "'y'"
+
+ def test_passthrough_safe_text(self):
+ assert _escape_xml("Phone-1234") == "Phone-1234"
+
+
+class TestSelectEnvelope:
+ def test_envelope_is_well_formed_xml(self):
+ env = _build_select_envelope()
+ # Must parse — if not, RisPort will reject it
+ root = ET.fromstring(env)
+ assert root is not None
+
+ def test_default_envelope_includes_required_fields(self):
+ env = _build_select_envelope()
+ # Cisco's WSDL requires every CmSelectionCriteria child
+ for required in [
+ "MaxReturnedDevices",
+ "DeviceClass",
+ "Model",
+ "Status",
+ "NodeName",
+ "SelectBy",
+ "SelectItems",
+ "Protocol",
+ "DownloadStatus",
+ ]:
+ assert f"" in env, (
+ f"missing required CmSelectionCriteria field "
+ )
+
+ def test_state_info_threaded_into_envelope(self):
+ env = _build_select_envelope(state_info="cursor-abc-123")
+ assert "cursor-abc-123" in env
+ assert "cursor-abc-123" in env
+
+ def test_select_items_default_wildcard(self):
+ env = _build_select_envelope()
+ # Default: ['*'] — all devices
+ assert "*" in env
+
+ def test_select_items_explicit_list(self):
+ env = _build_select_envelope(select_items=["SEP1", "SEP2"])
+ assert "SEP1" in env
+ assert "SEP2" in env
+
+ def test_max_devices_int_coerced(self):
+ env = _build_select_envelope(max_devices=500)
+ assert "500" in env
+
+ def test_xml_special_chars_in_filter_escaped(self):
+ # SOAP injection defense — single quote and angle bracket must be escaped
+ env = _build_select_envelope(select_items=["O'Brien "])
+ assert "'" in env
+ assert "<" in env
+ assert ">" in env
+ # The raw chars must NOT appear
+ assert "'Brien " not in env
+
+
+class TestParseResponse:
+ # Match CUCM 15's actual response shape: an extra
+ # wrapper inside . Discovered while live-verifying
+ # against cucm-pub.binghammemorial.org 2026-04-26 — the parser must
+ # descend through this wrapper.
+ SAMPLE_RESPONSE = """
+
+
+
+
+
+ 2
+
+
+ -
+ cucm-pub
+ Ok
+
+
-
+ SEP1234567890AB
+
- 10.0.0.5ipv4
+ 2001
+ Registered
+ 0
+ SIP
+ Patient Room 101
+ Cisco 7841
+ sip78xx.12-5-1SR4-3
+ 1700000000
+
+ -
+ SEPABCDEF123456
+
+
+ UnRegistered
+ 1
+ Decommissioned phone
+
+
+
+
+
+
+
+
+"""
+
+ # Pre-CUCM-15 shape (no SelectCmDeviceResult wrapper). The parser falls
+ # back to fields directly under selectCmDeviceReturn for backward compat.
+ LEGACY_RESPONSE = """
+
+
+
+
+ 1
+
+
+ -
+ cucm-old
+ Ok
+
+
-
+ SEPLEGACY
+ Registered
+
+
+
+
+
+
+
+"""
+
+ def test_legacy_response_shape_still_parses(self):
+ """Backward compat with pre-CUCM-15 RisPort responses."""
+ result = _parse_response(self.LEGACY_RESPONSE)
+ assert result["total_devices_found"] == 1
+ assert result["cm_nodes"][0]["devices"][0]["name"] == "SEPLEGACY"
+
+ def test_parse_total_count(self):
+ result = _parse_response(self.SAMPLE_RESPONSE)
+ assert result["total_devices_found"] == 2
+
+ def test_parse_node_count(self):
+ result = _parse_response(self.SAMPLE_RESPONSE)
+ assert len(result["cm_nodes"]) == 1
+ assert result["cm_nodes"][0]["name"] == "cucm-pub"
+
+ def test_parse_device_fields(self):
+ result = _parse_response(self.SAMPLE_RESPONSE)
+ devices = result["cm_nodes"][0]["devices"]
+ assert len(devices) == 2
+
+ first = devices[0]
+ assert first["name"] == "SEP1234567890AB"
+ assert first["status"] == "Registered"
+ assert first["dir_number"] == "2001"
+ assert first["description"] == "Patient Room 101"
+ assert first["protocol"] == "SIP"
+ # Nested IP extraction
+ assert first["ip_address"] == "10.0.0.5"
+
+ def test_parse_unregistered_device(self):
+ result = _parse_response(self.SAMPLE_RESPONSE)
+ second = result["cm_nodes"][0]["devices"][1]
+ assert second["status"] == "UnRegistered"
+ assert second["ip_address"] == "" # missing IP renders empty
+
+ def test_parse_state_info_for_pagination(self):
+ result = _parse_response(self.SAMPLE_RESPONSE)
+ assert result["state_info"] == "" # last page
+
+ def test_parse_soap_fault_raises(self):
+ fault_response = """
+
+
+
+Authentication failed
+
+
+"""
+ with pytest.raises(RuntimeError, match="Authentication failed"):
+ _parse_response(fault_response)
+
+
+class TestStatusValidation:
+ def test_known_status_values(self):
+ assert "Registered" in DEVICE_STATUS_VALUES
+ assert "UnRegistered" in DEVICE_STATUS_VALUES
+ assert "PartiallyRegistered" in DEVICE_STATUS_VALUES
+
+ def test_select_cm_device_rejects_invalid_status(self):
+ client = RisPortClient()
+ # No env vars set, so _ensure_session would fail first;
+ # but the validation should run BEFORE that on bad input.
+ with pytest.raises(ValueError, match="status must be"):
+ client.select_cm_device(status="not-a-real-status")