mcaxl/tests/test_risport.py
Ryan Malloy 639d706200 client/risport: add read-only allowlist proxies (defense-in-depth)
Today, mcaxl is read-only against CUCM by *absence* — the tools
never call write methods. But absence isn't enforced: a future
contributor adding a tool could write
self._service.addRoutePartition(...) and zeep would happily
dispatch it. There's no positive guard.

Two new chokepoints close that gap:

AXL side — _ReadOnlyServiceProxy wraps the zeep service object.
__getattr__ refuses any method outside _ALLOWED_AXL_METHODS
(currently {getCCMVersion, executeSQLQuery}) with a new
ReadOnlyViolation exception, raised at attribute lookup BEFORE
zeep serializes a SOAP envelope. Underscore-prefixed and dunder
attributes pass through (zeep introspects via _binding_options,
__class__, etc., and those don't dispatch SOAP).

RisPort side — RisPort70 envelopes are hand-rolled, so the proxy
pattern doesn't apply directly. The equivalent chokepoint lives in
the envelope builders: _check_operation_allowed(name) is the first
line of every builder, and _ALLOWED_RISPORT_OPERATIONS is the
allowlist (currently {selectCmDevice}).

Operators can verify the proxy is active via the health tool —
connection_status() now reports read_only_proxy: true and
allowed_axl_methods: [...].

Tests:
- new tests/test_readonly_proxy.py (13 tests):
  * allowed methods dispatch through to inner service
  * 9 parameterized refusals (addRoutePartition, updatePhone,
    removeUser, applyPhone, resetPhone, restartPhone,
    executeSQLUpdate, doDeviceLogin, wipePhone)
  * allowlist drift detection (set must be exactly what we
    advertise — accidental widening fails red)
  * dunder + underscore-prefixed passthrough
- tests/test_risport.py: +TestReadOnlyAllowlist (7 tests):
  * selectCmDevice passes _check_operation_allowed
  * 6 parameterized refusals (addCmDevice, removeCmDevice,
    resetDevice, restartDevice, applyCmDevice, executeSQLUpdate)
  * allowlist drift detection

182 tests pass total (was 161; +13 proxy + 7 risport + 1 allowlist
drift catch).
2026-04-29 06:38:41 -06:00

257 lines
8.9 KiB
Python

"""Unit tests for the RisPort70 SOAP envelope construction and parser.
Live-cluster integration is verified separately via the smoke-test
script — these tests are pure: envelope shape, response-XML parsing,
edge cases. No network.
"""
import xml.etree.ElementTree as ET
import pytest
from mcaxl.client import ReadOnlyViolation
from mcaxl.risport import (
DEVICE_STATUS_VALUES,
_ALLOWED_RISPORT_OPERATIONS,
RisPortClient,
_build_select_envelope,
_check_operation_allowed,
_escape_xml,
_parse_response,
)
class TestEscapeXml:
def test_basic_escapes(self):
assert _escape_xml("<bad>") == "&lt;bad&gt;"
assert _escape_xml("a&b") == "a&amp;b"
assert _escape_xml('"x"') == "&quot;x&quot;"
assert _escape_xml("'y'") == "&apos;y&apos;"
def test_passthrough_safe_text(self):
assert _escape_xml("Phone-1234") == "Phone-1234"
class TestSelectEnvelope:
def test_envelope_is_well_formed_xml(self):
env = _build_select_envelope()
# Must parse — if not, RisPort will reject it
root = ET.fromstring(env)
assert root is not None
def test_default_envelope_includes_required_fields(self):
env = _build_select_envelope()
# Cisco's WSDL requires every CmSelectionCriteria child
for required in [
"MaxReturnedDevices",
"DeviceClass",
"Model",
"Status",
"NodeName",
"SelectBy",
"SelectItems",
"Protocol",
"DownloadStatus",
]:
assert f"<soap:{required}>" in env, (
f"missing required CmSelectionCriteria field <soap:{required}>"
)
def test_state_info_threaded_into_envelope(self):
env = _build_select_envelope(state_info="cursor-abc-123")
assert "cursor-abc-123" in env
assert "<soap:StateInfo>cursor-abc-123</soap:StateInfo>" in env
def test_select_items_default_wildcard(self):
env = _build_select_envelope()
# Default: ['*'] — all devices
assert "<soap:Item>*</soap:Item>" in env
def test_select_items_explicit_list(self):
env = _build_select_envelope(select_items=["SEP1", "SEP2"])
assert "<soap:Item>SEP1</soap:Item>" in env
assert "<soap:Item>SEP2</soap:Item>" in env
def test_max_devices_int_coerced(self):
env = _build_select_envelope(max_devices=500)
assert "<soap:MaxReturnedDevices>500</soap:MaxReturnedDevices>" in env
def test_xml_special_chars_in_filter_escaped(self):
# SOAP injection defense — single quote and angle bracket must be escaped
env = _build_select_envelope(select_items=["O'Brien <test>"])
assert "&apos;" in env
assert "&lt;" in env
assert "&gt;" in env
# The raw chars must NOT appear
assert "'Brien <test>" not in env
class TestParseResponse:
# Match CUCM 15's actual response shape: an extra <SelectCmDeviceResult>
# wrapper inside <selectCmDeviceReturn>. Discovered while live-verifying
# against cucm-pub.binghammemorial.org 2026-04-26 — the parser must
# descend through this wrapper.
SAMPLE_RESPONSE = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
<selectCmDeviceReturn>
<SelectCmDeviceResult>
<TotalDevicesFound>2</TotalDevicesFound>
<StateInfo></StateInfo>
<CmNodes>
<item>
<Name>cucm-pub</Name>
<ReturnCode>Ok</ReturnCode>
<CmDevices>
<item>
<Name>SEP1234567890AB</Name>
<IpAddress><item><IP>10.0.0.5</IP><IPAddrType>ipv4</IPAddrType></item></IpAddress>
<DirNumber>2001</DirNumber>
<Status>Registered</Status>
<StatusReason>0</StatusReason>
<Protocol>SIP</Protocol>
<Description>Patient Room 101</Description>
<Model>Cisco 7841</Model>
<ActiveLoadID>sip78xx.12-5-1SR4-3</ActiveLoadID>
<TimeStamp>1700000000</TimeStamp>
</item>
<item>
<Name>SEPABCDEF123456</Name>
<IpAddress></IpAddress>
<DirNumber></DirNumber>
<Status>UnRegistered</Status>
<StatusReason>1</StatusReason>
<Description>Decommissioned phone</Description>
</item>
</CmDevices>
</item>
</CmNodes>
</SelectCmDeviceResult>
</selectCmDeviceReturn>
</ns:selectCmDeviceResponse>
</soapenv:Body>
</soapenv:Envelope>"""
# Pre-CUCM-15 shape (no SelectCmDeviceResult wrapper). The parser falls
# back to fields directly under selectCmDeviceReturn for backward compat.
LEGACY_RESPONSE = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
<selectCmDeviceReturn>
<TotalDevicesFound>1</TotalDevicesFound>
<StateInfo></StateInfo>
<CmNodes>
<item>
<Name>cucm-old</Name>
<ReturnCode>Ok</ReturnCode>
<CmDevices>
<item>
<Name>SEPLEGACY</Name>
<Status>Registered</Status>
</item>
</CmDevices>
</item>
</CmNodes>
</selectCmDeviceReturn>
</ns:selectCmDeviceResponse>
</soapenv:Body>
</soapenv:Envelope>"""
def test_legacy_response_shape_still_parses(self):
"""Backward compat with pre-CUCM-15 RisPort responses."""
result = _parse_response(self.LEGACY_RESPONSE)
assert result["total_devices_found"] == 1
assert result["cm_nodes"][0]["devices"][0]["name"] == "SEPLEGACY"
def test_parse_total_count(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert result["total_devices_found"] == 2
def test_parse_node_count(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert len(result["cm_nodes"]) == 1
assert result["cm_nodes"][0]["name"] == "cucm-pub"
def test_parse_device_fields(self):
result = _parse_response(self.SAMPLE_RESPONSE)
devices = result["cm_nodes"][0]["devices"]
assert len(devices) == 2
first = devices[0]
assert first["name"] == "SEP1234567890AB"
assert first["status"] == "Registered"
assert first["dir_number"] == "2001"
assert first["description"] == "Patient Room 101"
assert first["protocol"] == "SIP"
# Nested IP extraction
assert first["ip_address"] == "10.0.0.5"
def test_parse_unregistered_device(self):
result = _parse_response(self.SAMPLE_RESPONSE)
second = result["cm_nodes"][0]["devices"][1]
assert second["status"] == "UnRegistered"
assert second["ip_address"] == "" # missing IP renders empty
def test_parse_state_info_for_pagination(self):
result = _parse_response(self.SAMPLE_RESPONSE)
assert result["state_info"] == "" # last page
def test_parse_soap_fault_raises(self):
fault_response = """<?xml version="1.0"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<soapenv:Fault>
<faultstring>Authentication failed</faultstring>
</soapenv:Fault>
</soapenv:Body>
</soapenv:Envelope>"""
with pytest.raises(RuntimeError, match="Authentication failed"):
_parse_response(fault_response)
class TestStatusValidation:
def test_known_status_values(self):
assert "Registered" in DEVICE_STATUS_VALUES
assert "UnRegistered" in DEVICE_STATUS_VALUES
assert "PartiallyRegistered" in DEVICE_STATUS_VALUES
def test_select_cm_device_rejects_invalid_status(self):
client = RisPortClient()
# No env vars set, so _ensure_session would fail first;
# but the validation should run BEFORE that on bad input.
with pytest.raises(ValueError, match="status must be"):
client.select_cm_device(status="not-a-real-status")
class TestReadOnlyAllowlist:
"""The RisPort envelope builders all gate on _check_operation_allowed.
This is the equivalent of the AXL service proxy — the chokepoint that
blocks any future write-shaped operation from being assembled.
"""
def test_selectCmDevice_is_allowed(self):
# No raise — selectCmDevice is in the allowlist
_check_operation_allowed("selectCmDevice")
@pytest.mark.parametrize(
"operation",
[
"addCmDevice",
"removeCmDevice",
"resetDevice",
"restartDevice",
"applyCmDevice",
"executeSQLUpdate", # leakage from the AXL surface
],
)
def test_disallowed_operation_raises_in_check(self, operation):
with pytest.raises(ReadOnlyViolation, match=operation):
_check_operation_allowed(operation)
def test_allowlist_is_exactly_what_we_advertise(self):
# As with the AXL allowlist, widening this set is a deliberate
# decision that should be matched by an update to this test.
assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})