diff --git a/src/mcaxl/client.py b/src/mcaxl/client.py index 941d866..adb7cb4 100644 --- a/src/mcaxl/client.py +++ b/src/mcaxl/client.py @@ -34,6 +34,51 @@ class _ConfigError(RuntimeError): """ +class ReadOnlyViolation(RuntimeError): + """Raised when code attempts to dispatch an AXL method outside the + read-only allowlist. This is a defense-in-depth guard: the SQL + validator already prevents write SQL, and the tools as-written never + call write methods, but the proxy ensures that a future contributor + accidentally calling `_service.addRoutePartition(...)` gets a refusal + at the boundary instead of a silent cluster mutation. + """ + + +# AXL methods this server is permitted to dispatch. Adding a new method +# here is a deliberate decision — every additional name widens the +# read-only surface and should be reviewed. +_ALLOWED_AXL_METHODS = frozenset({ + "getCCMVersion", + "executeSQLQuery", +}) + + +class _ReadOnlyServiceProxy: + """Wraps a zeep service object. Refuses any method not in + _ALLOWED_AXL_METHODS, raising ReadOnlyViolation. + + Non-method attributes (zeep internals like `_binding_options`) are + passed through untouched so the underlying client keeps working. + """ + + def __init__(self, inner): + self._inner = inner + + def __getattr__(self, name): + # Dunders and private attributes pass through — we're guarding + # against accidental SOAP dispatch, not introspection. + if name.startswith("_"): + return getattr(self._inner, name) + if name not in _ALLOWED_AXL_METHODS: + raise ReadOnlyViolation( + f"AXL method {name!r} is not in the read-only allowlist. " + f"Allowed: {sorted(_ALLOWED_AXL_METHODS)}. " + f"This server is structurally read-only — see " + f"docs/explanation/read-only-by-structure/ for context." + ) + return getattr(self._inner, name) + + class AxlClient: """Lazy-loaded zeep client for CUCM AXL. @@ -67,6 +112,13 @@ class AxlClient: "config_error": self._config_error, # permanent until restart "last_error": self._last_error, "retry_config": self._retry_config, + # Operators should be able to verify the read-only proxy is + # active without reading the source. True means writes will + # be refused at dispatch time even if some future tool tries. + "read_only_proxy": self._service is not None and isinstance( + self._service, _ReadOnlyServiceProxy + ), + "allowed_axl_methods": sorted(_ALLOWED_AXL_METHODS), } def _ensure_connected(self) -> None: @@ -153,10 +205,15 @@ class AxlClient: ) # AXL endpoint is the AXL_URL itself; override the WSDL's default # service location which usually points at a placeholder host. - self._service = self._client.create_service( + # Wrap the resulting service in a read-only allowlist proxy — + # any SOAP method not in _ALLOWED_AXL_METHODS will raise + # ReadOnlyViolation at attribute lookup time, before zeep + # serializes a SOAP envelope. + raw_service = self._client.create_service( "{http://www.cisco.com/AXLAPIService/}AXLAPIBinding", url, ) + self._service = _ReadOnlyServiceProxy(raw_service) import time as _time self._connected_at = _time.monotonic() self._last_error = None # operational state is now clean diff --git a/src/mcaxl/risport.py b/src/mcaxl/risport.py index 1302bbd..4d854a0 100644 --- a/src/mcaxl/risport.py +++ b/src/mcaxl/risport.py @@ -36,10 +36,35 @@ from requests.adapters import HTTPAdapter from requests.auth import HTTPBasicAuth from urllib3.util.retry import Retry +from .client import ReadOnlyViolation + # RisPort path on the CUCM publisher _RIS_PATH = "/realtimeservice2/services/RISService70" +# RisPort70 operations this server is permitted to invoke. Parallels the +# AXL allowlist in client.py — adding a new operation here is a deliberate +# decision that widens the read-only surface. +_ALLOWED_RISPORT_OPERATIONS = frozenset({ + "selectCmDevice", +}) + + +def _check_operation_allowed(operation: str) -> None: + """Raise ReadOnlyViolation if `operation` is outside the allowlist. + + Called at the top of each envelope builder. RisPort70 doesn't go + through zeep — envelopes are hand-rolled — so the proxy pattern + used for AXL doesn't apply directly. This function is the equivalent + chokepoint: any envelope builder that reaches the network must first + check its operation name against the allowlist. + """ + if operation not in _ALLOWED_RISPORT_OPERATIONS: + raise ReadOnlyViolation( + f"RisPort70 operation {operation!r} is not in the read-only " + f"allowlist. Allowed: {sorted(_ALLOWED_RISPORT_OPERATIONS)}." + ) + # SOAP namespaces. These match Cisco's published values for RisPort70. _NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/" _NS_RIS = "http://schemas.cisco.com/ast/soap" @@ -77,6 +102,7 @@ def _build_select_envelope( are rejected. We err on the side of always including every field with sensible defaults. """ + _check_operation_allowed("selectCmDevice") items = select_items if select_items else ["*"] items_xml = "".join( f"{_escape_xml(i)}" diff --git a/tests/test_readonly_proxy.py b/tests/test_readonly_proxy.py new file mode 100644 index 0000000..f722ff5 --- /dev/null +++ b/tests/test_readonly_proxy.py @@ -0,0 +1,88 @@ +"""Tests for _ReadOnlyServiceProxy: defense-in-depth allowlist for AXL methods. + +The proxy wraps the zeep service object so any SOAP method outside +{getCCMVersion, executeSQLQuery} raises ReadOnlyViolation at attribute +lookup, before zeep serializes an envelope. This is a guard against future +contributors accidentally calling write methods like addRoutePartition(). +""" + +from unittest.mock import MagicMock + +import pytest + +from mcaxl.client import ( + ReadOnlyViolation, + _ALLOWED_AXL_METHODS, + _ReadOnlyServiceProxy, +) + + +class TestAllowlistEnforcement: + def test_allowed_method_dispatches_through(self): + # Both methods we currently use must pass through the proxy. + inner = MagicMock() + inner.getCCMVersion.return_value = {"version": "15.0(1)"} + inner.executeSQLQuery.return_value = {"rows": []} + proxy = _ReadOnlyServiceProxy(inner) + + assert proxy.getCCMVersion() == {"version": "15.0(1)"} + assert proxy.executeSQLQuery(sql="SELECT 1") == {"rows": []} + inner.getCCMVersion.assert_called_once() + inner.executeSQLQuery.assert_called_once_with(sql="SELECT 1") + + @pytest.mark.parametrize( + "method_name", + [ + "addRoutePartition", + "updatePhone", + "removeUser", + "applyPhone", + "resetPhone", + "restartPhone", + "executeSQLUpdate", # the AXL write counterpart + "doDeviceLogin", + "wipePhone", + ], + ) + def test_disallowed_method_raises(self, method_name): + inner = MagicMock() + proxy = _ReadOnlyServiceProxy(inner) + + with pytest.raises(ReadOnlyViolation, match=method_name): + getattr(proxy, method_name) + + # The inner service must NOT have been touched at all — refusal + # happens before any SOAP serialization. + assert not getattr(inner, method_name).called + + def test_allowlist_is_exactly_what_we_advertise(self): + # If this set ever grows, that's a deliberate decision and the + # test should be updated alongside the change. Catching unintended + # widening of the read-only surface is the point. + assert _ALLOWED_AXL_METHODS == frozenset( + {"getCCMVersion", "executeSQLQuery"} + ) + + +class TestAttributePassthrough: + def test_dunder_attributes_pass_through(self): + # Zeep introspects services via dunder attributes (__class__, + # __dict__, etc.). The proxy must not break those. + inner = MagicMock() + inner.__class__ = MagicMock + proxy = _ReadOnlyServiceProxy(inner) + + # Reading the class doesn't raise + _ = proxy.__class__ + + def test_underscore_prefixed_attributes_pass_through(self): + # zeep service internals like `_binding_options`, `_operations` + # are accessed by name. We don't want to gate those because they + # don't dispatch SOAP — they read metadata. + inner = MagicMock() + inner._binding_options = {"address": "https://cucm/axl/"} + inner._operations = ["getCCMVersion", "executeSQLQuery", "addPhone"] + proxy = _ReadOnlyServiceProxy(inner) + + assert proxy._binding_options == {"address": "https://cucm/axl/"} + assert "addPhone" in proxy._operations # introspection, not dispatch diff --git a/tests/test_risport.py b/tests/test_risport.py index 929afad..768e05b 100644 --- a/tests/test_risport.py +++ b/tests/test_risport.py @@ -9,10 +9,13 @@ import xml.etree.ElementTree as ET import pytest +from mcaxl.client import ReadOnlyViolation from mcaxl.risport import ( DEVICE_STATUS_VALUES, + _ALLOWED_RISPORT_OPERATIONS, RisPortClient, _build_select_envelope, + _check_operation_allowed, _escape_xml, _parse_response, ) @@ -220,3 +223,34 @@ class TestStatusValidation: # but the validation should run BEFORE that on bad input. with pytest.raises(ValueError, match="status must be"): client.select_cm_device(status="not-a-real-status") + + +class TestReadOnlyAllowlist: + """The RisPort envelope builders all gate on _check_operation_allowed. + This is the equivalent of the AXL service proxy — the chokepoint that + blocks any future write-shaped operation from being assembled. + """ + + def test_selectCmDevice_is_allowed(self): + # No raise — selectCmDevice is in the allowlist + _check_operation_allowed("selectCmDevice") + + @pytest.mark.parametrize( + "operation", + [ + "addCmDevice", + "removeCmDevice", + "resetDevice", + "restartDevice", + "applyCmDevice", + "executeSQLUpdate", # leakage from the AXL surface + ], + ) + def test_disallowed_operation_raises_in_check(self, operation): + with pytest.raises(ReadOnlyViolation, match=operation): + _check_operation_allowed(operation) + + def test_allowlist_is_exactly_what_we_advertise(self): + # As with the AXL allowlist, widening this set is a deliberate + # decision that should be matched by an update to this test. + assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})