mcaxl/tests/test_risport.py
Ryan Malloy 39d4b29392 Add RisPort70 for real-time registration state + rate-limit backoff
Two ideas borrowed from cisco-cucm-mcp (calltelemetry/cisco-cucm-mcp,
MIT licensed): real-time device registration via RisPort70, and
exponential-backoff retry on transient HTTP 5xx errors. Both are
purpose-built for the audit use case rather than general-purpose
ports — RisPort tools exist to inform audit findings, not as a
standalone "look at my devices" interface.

Rate limit / 503 backoff (~30 lines + 3 tests):
  AxlClient now mounts an HTTPAdapter with a urllib3 Retry policy
  (3 retries, exponential backoff, status_forcelist=[502,503,504]).
  Configurable via AXL_RATE_LIMIT_RETRIES (default 3, 0 disables).
  Surfaces in connection_status() so operators can see the policy.
  Closes a real reliability gap: CUCM SOAP rate-limits under load
  during change windows or with multiple concurrent admins; pre-fix
  any 503 was a hard failure.

RisPort70 (new src/risport.py + 2 tools + prompt update):
  Hand-coded SOAP client for /realtimeservice2/services/RISService70
  (avoids dragging in another zeep instance for one operation).
  Reuses AXL_URL/USER/PASS env vars — RisPort lives on the same host.

  New tools:
    device_registration_status(device_class, status, name_filter, page_size)
    device_registration_summary()  — cluster-wide breakdown by class

  Live-cluster verification (cucm-pub.binghammemorial.org):
    Phone:    803  registered=679  unregistered=123  rejected=1
    Gateway:   85  registered=41   rejected=44   ← real audit finding
    SIPTrunk:  22  registered=18   unregistered=4
    HuntList:  28  registered=28
    H323/CTI:  0   (cluster doesn't use these)

  Discovered while live-verifying: CUCM 15 wraps the RisPort response
  in an extra <SelectCmDeviceResult> element inside <selectCmDeviceReturn>.
  Older CUCM versions exposed the fields directly. The parser falls
  back to either shape; tests cover both (test_legacy_response_shape_still_parses
  asserts the older shape still works).

phone_inventory_report prompt updated:
  New Step 3 — "Cross-reference with real-time registration" — recommends
  device_registration_summary() + device_registration_status(status="UnRegistered")
  to surface configured-but-never-registered phones (strongest orphan signal),
  PartiallyRegistered phones (firewall/cert/version mismatch indicator),
  and registration-state vs config-state mismatches.

Tooling delta worth noting:
  AXL device count:    1,377 phones
  RisPort device count:   803 phones
  Delta (~574)         likely templates, hidden phones, or stale config —
                       itself an audit finding the new tool will surface
                       to anyone running phone_inventory_report.

README updated:
  - Added health(), device_registration_status, device_registration_summary
  - Added "Scope and complement" section recommending @calltelemetry/cisco-cucm-mcp
    alongside for operational debugging (logs, perfmon, packet capture,
    service control). The two servers answer different questions; the LLM
    with both can compose audit findings with operational state.
  - Listed all 10 prompts (was 4 outdated entries).

Tests: 134 → 155 (+21).
2026-04-26 10:28:04 -06:00

223 lines
7.7 KiB
Python

"""Unit tests for the RisPort70 SOAP envelope construction and parser.
Live-cluster integration is verified separately via the smoke-test
script — these tests are pure: envelope shape, response-XML parsing,
edge cases. No network.
"""
import xml.etree.ElementTree as ET
import pytest
from mcp_cucm_axl.risport import (
DEVICE_STATUS_VALUES,
RisPortClient,
_build_select_envelope,
_escape_xml,
_parse_response,
)
class TestEscapeXml:
def test_basic_escapes(self):
assert _escape_xml("<bad>") == "&lt;bad&gt;"
assert _escape_xml("a&b") == "a&amp;b"
assert _escape_xml('"x"') == "&quot;x&quot;"
assert _escape_xml("'y'") == "&apos;y&apos;"
def test_passthrough_safe_text(self):
assert _escape_xml("Phone-1234") == "Phone-1234"
class TestSelectEnvelope:
def test_envelope_is_well_formed_xml(self):
env = _build_select_envelope()
# Must parse — if not, RisPort will reject it
root = ET.fromstring(env)
assert root is not None
def test_default_envelope_includes_required_fields(self):
env = _build_select_envelope()
# Cisco's WSDL requires every CmSelectionCriteria child
for required in [
"MaxReturnedDevices",
"DeviceClass",
"Model",
"Status",
"NodeName",
"SelectBy",
"SelectItems",
"Protocol",
"DownloadStatus",
]:
assert f"<soap:{required}>" in env, (
f"missing required CmSelectionCriteria field <soap:{required}>"
)
def test_state_info_threaded_into_envelope(self):
env = _build_select_envelope(state_info="cursor-abc-123")
assert "cursor-abc-123" in env
assert "<soap:StateInfo>cursor-abc-123</soap:StateInfo>" in env
def test_select_items_default_wildcard(self):
env = _build_select_envelope()
# Default: ['*'] — all devices
assert "<soap:Item>*</soap:Item>" in env
def test_select_items_explicit_list(self):
env = _build_select_envelope(select_items=["SEP1", "SEP2"])
assert "<soap:Item>SEP1</soap:Item>" in env
assert "<soap:Item>SEP2</soap:Item>" in env
def test_max_devices_int_coerced(self):
env = _build_select_envelope(max_devices=500)
assert "<soap:MaxReturnedDevices>500</soap:MaxReturnedDevices>" in env
def test_xml_special_chars_in_filter_escaped(self):
# SOAP injection defense — single quote and angle bracket must be escaped
env = _build_select_envelope(select_items=["O'Brien <test>"])
assert "&apos;" in env
assert "&lt;" in env
assert "&gt;" in env
# The raw chars must NOT appear
assert "'Brien <test>" not in env
class TestParseResponse:
# Match CUCM 15's actual response shape: an extra <SelectCmDeviceResult>
# wrapper inside <selectCmDeviceReturn>. Discovered while live-verifying
# against cucm-pub.binghammemorial.org 2026-04-26 — the parser must
# descend through this wrapper.
SAMPLE_RESPONSE = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
<selectCmDeviceReturn>
<SelectCmDeviceResult>
<TotalDevicesFound>2</TotalDevicesFound>
<StateInfo></StateInfo>
<CmNodes>
<item>
<Name>cucm-pub</Name>
<ReturnCode>Ok</ReturnCode>
<CmDevices>
<item>
<Name>SEP1234567890AB</Name>
<IpAddress><item><IP>10.0.0.5</IP><IPAddrType>ipv4</IPAddrType></item></IpAddress>
<DirNumber>2001</DirNumber>
<Status>Registered</Status>
<StatusReason>0</StatusReason>
<Protocol>SIP</Protocol>
<Description>Patient Room 101</Description>
<Model>Cisco 7841</Model>
<ActiveLoadID>sip78xx.12-5-1SR4-3</ActiveLoadID>
<TimeStamp>1700000000</TimeStamp>
</item>
<item>
<Name>SEPABCDEF123456</Name>
<IpAddress></IpAddress>
<DirNumber></DirNumber>
<Status>UnRegistered</Status>
<StatusReason>1</StatusReason>
<Description>Decommissioned phone</Description>
</item>
</CmDevices>
</item>
</CmNodes>
</SelectCmDeviceResult>
</selectCmDeviceReturn>
</ns:selectCmDeviceResponse>
</soapenv:Body>
</soapenv:Envelope>"""
# Pre-CUCM-15 shape (no SelectCmDeviceResult wrapper). The parser falls
# back to fields directly under selectCmDeviceReturn for backward compat.
LEGACY_RESPONSE = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
<selectCmDeviceReturn>
<TotalDevicesFound>1</TotalDevicesFound>
<StateInfo></StateInfo>
<CmNodes>
<item>
<Name>cucm-old</Name>
<ReturnCode>Ok</ReturnCode>
<CmDevices>
<item>
<Name>SEPLEGACY</Name>
<Status>Registered</Status>
</item>
</CmDevices>
</item>
</CmNodes>
</selectCmDeviceReturn>
</ns:selectCmDeviceResponse>
</soapenv:Body>
</soapenv:Envelope>"""
def test_legacy_response_shape_still_parses(self):
"""Backward compat with pre-CUCM-15 RisPort responses."""
result = _parse_response(self.LEGACY_RESPONSE)
assert result["total_devices_found"] == 1
assert result["cm_nodes"][0]["devices"][0]["name"] == "SEPLEGACY"
def test_parse_total_count(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert result["total_devices_found"] == 2
def test_parse_node_count(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert len(result["cm_nodes"]) == 1
assert result["cm_nodes"][0]["name"] == "cucm-pub"
def test_parse_device_fields(self):
result = _parse_response(self.SAMPLE_RESPONSE)
devices = result["cm_nodes"][0]["devices"]
assert len(devices) == 2
first = devices[0]
assert first["name"] == "SEP1234567890AB"
assert first["status"] == "Registered"
assert first["dir_number"] == "2001"
assert first["description"] == "Patient Room 101"
assert first["protocol"] == "SIP"
# Nested IP extraction
assert first["ip_address"] == "10.0.0.5"
def test_parse_unregistered_device(self):
result = _parse_response(self.SAMPLE_RESPONSE)
second = result["cm_nodes"][0]["devices"][1]
assert second["status"] == "UnRegistered"
assert second["ip_address"] == "" # missing IP renders empty
def test_parse_state_info_for_pagination(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert result["state_info"] == "" # last page
def test_parse_soap_fault_raises(self):
fault_response = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<soapenv:Fault>
<faultstring>Authentication failed</faultstring>
</soapenv:Fault>
</soapenv:Body>
</soapenv:Envelope>"""
with pytest.raises(RuntimeError, match="Authentication failed"):
_parse_response(fault_response)
class TestStatusValidation:
def test_known_status_values(self):
assert "Registered" in DEVICE_STATUS_VALUES
assert "UnRegistered" in DEVICE_STATUS_VALUES
assert "PartiallyRegistered" in DEVICE_STATUS_VALUES
def test_select_cm_device_rejects_invalid_status(self):
client = RisPortClient()
# No env vars set, so _ensure_session would fail first;
# but the validation should run BEFORE that on bad input.
with pytest.raises(ValueError, match="status must be"):
client.select_cm_device(status="not-a-real-status")