10th subsystem: session.swd provides DAP discovery, DP/AP register read/write, AP enumeration, and convenience methods (dpidr, target_id). Includes SWDError, DAPInfo, APInfo types, input validation, ADIv5/v6 AP classification, and 24 mock-only tests covering happy and error paths.
287 lines
9.5 KiB
Python
287 lines
9.5 KiB
Python
"""Tests for the SWD/DAP subsystem."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from openocd.errors import SWDError
|
|
from openocd.types import APInfo, DAPInfo
|
|
|
|
|
|
async def test_dap_info(session):
|
|
"""info() should return a DAPInfo with parsed DPIDR and AP count."""
|
|
info = await session.swd.info()
|
|
assert isinstance(info, DAPInfo)
|
|
assert info.name == "stm32f1x.dap"
|
|
assert info.dpidr == 0x2BA01477
|
|
assert info.ap_count == 1
|
|
assert "MEM-AP" in info.raw_info
|
|
|
|
|
|
async def test_dap_info_frozen(session):
|
|
"""DAPInfo should be immutable (frozen dataclass)."""
|
|
info = await session.swd.info()
|
|
with pytest.raises(AttributeError):
|
|
info.name = "something_else" # type: ignore[misc]
|
|
|
|
|
|
async def test_dpreg_read(session):
|
|
"""dpreg() without a value should read and return a DP register."""
|
|
result = await session.swd.dpreg(0x0)
|
|
assert isinstance(result, int)
|
|
assert result == 0x2BA01477
|
|
|
|
|
|
async def test_dpreg_write(session, mock_ocd):
|
|
"""dpreg() with a value should write and return the written value."""
|
|
result = await session.swd.dpreg(0x0, value=0x12345678)
|
|
assert result == 0x12345678
|
|
# Verify the mock received the write command
|
|
_, _, server = mock_ocd
|
|
write_cmds = [c for c in server.received_commands if "dpreg 0x0 0x" in c]
|
|
assert len(write_cmds) >= 1
|
|
|
|
|
|
async def test_apreg_read(session):
|
|
"""apreg() without a value should read an AP register."""
|
|
result = await session.swd.apreg(0, 0xFC)
|
|
assert isinstance(result, int)
|
|
assert result == 0x04770031
|
|
|
|
|
|
async def test_apreg_write(session, mock_ocd):
|
|
"""apreg() with a value should write and return the written value."""
|
|
result = await session.swd.apreg(0, 0x0, value=0xAABBCCDD)
|
|
assert result == 0xAABBCCDD
|
|
_, _, server = mock_ocd
|
|
write_cmds = [c for c in server.received_commands if "apreg 0 0x0 0x" in c]
|
|
assert len(write_cmds) >= 1
|
|
|
|
|
|
async def test_enumerate_aps(session):
|
|
"""list_aps() should discover APs by probing IDR until zero."""
|
|
aps = await session.swd.list_aps()
|
|
assert isinstance(aps, list)
|
|
assert len(aps) == 1
|
|
|
|
ap = aps[0]
|
|
assert isinstance(ap, APInfo)
|
|
assert ap.index == 0
|
|
assert ap.idr == 0x04770031
|
|
assert ap.base == 0xE00FF003
|
|
assert ap.ap_type == "MEM-AP"
|
|
|
|
|
|
async def test_ap_info_frozen(session):
|
|
"""APInfo should be immutable (frozen dataclass)."""
|
|
aps = await session.swd.list_aps()
|
|
with pytest.raises(AttributeError):
|
|
aps[0].index = 99 # type: ignore[misc]
|
|
|
|
|
|
async def test_dpidr_convenience(session):
|
|
"""dpidr() should read DP address 0x0."""
|
|
result = await session.swd.dpidr()
|
|
assert result == 0x2BA01477
|
|
|
|
|
|
async def test_target_id(session):
|
|
"""target_id() should read DP address 0x24."""
|
|
result = await session.swd.target_id()
|
|
assert result == 0x00000477
|
|
|
|
|
|
async def test_auto_resolve_dap(session, mock_ocd):
|
|
"""With no explicit dap name, the controller should auto-discover."""
|
|
# First call triggers dap names lookup
|
|
await session.swd.dpidr()
|
|
_, _, server = mock_ocd
|
|
assert "dap names" in server.received_commands
|
|
|
|
# Second call should use the cached name (no extra dap names)
|
|
count_before = server.received_commands.count("dap names")
|
|
await session.swd.dpidr()
|
|
count_after = server.received_commands.count("dap names")
|
|
assert count_after == count_before
|
|
|
|
|
|
async def test_explicit_dap_name(session, mock_ocd):
|
|
"""Passing dap= explicitly should skip auto-discovery."""
|
|
result = await session.swd.dpreg(0x0, dap="stm32f1x.dap")
|
|
assert result == 0x2BA01477
|
|
# Should NOT have called "dap names"
|
|
_, _, server = mock_ocd
|
|
assert "dap names" not in server.received_commands
|
|
|
|
|
|
async def test_swd_error_on_bad_response(mock_ocd):
|
|
"""SWDError should be raised when response matches OpenOCD error patterns."""
|
|
from openocd.swd.dap import _check_error
|
|
|
|
with pytest.raises(SWDError):
|
|
_check_error("Error: invalid DAP", "test")
|
|
|
|
with pytest.raises(SWDError):
|
|
_check_error("invalid command name", "test")
|
|
|
|
with pytest.raises(SWDError):
|
|
_check_error("command not found", "test")
|
|
|
|
# Clean responses should not raise
|
|
_check_error("0x2ba01477", "test")
|
|
_check_error("", "test")
|
|
|
|
# Legitimate output containing "error" as a substring should NOT raise.
|
|
# This is the false-positive prevention fix (C1 from code review).
|
|
_check_error("error detection enabled in CTRL register", "test")
|
|
_check_error("AP ID register 0x04770031", "test")
|
|
|
|
|
|
async def test_swd_error_no_hex_value(mock_ocd):
|
|
"""SWDError should be raised when no hex value found in read response."""
|
|
from openocd.swd.dap import _parse_hex
|
|
|
|
with pytest.raises(SWDError, match="no hex value"):
|
|
_parse_hex("no numbers here", "test read")
|
|
|
|
|
|
def test_sync_wrapper():
|
|
"""SyncSWDController should expose the same API synchronously.
|
|
|
|
The sync API blocks with run_until_complete, so the mock server must
|
|
run on a separate thread to accept connections concurrently.
|
|
"""
|
|
import asyncio
|
|
import threading
|
|
|
|
from openocd.session import Session
|
|
from tests.mock_server import MockOpenOCDServer
|
|
|
|
# Run mock server in a background thread with its own event loop.
|
|
bg_loop = asyncio.new_event_loop()
|
|
server = MockOpenOCDServer()
|
|
bg_loop.run_until_complete(server.start())
|
|
host, port = server.address
|
|
|
|
thread = threading.Thread(target=bg_loop.run_forever, daemon=True)
|
|
thread.start()
|
|
|
|
try:
|
|
with Session.connect_sync(host, port, timeout=5.0) as sync_sess:
|
|
result = sync_sess.swd.dpidr()
|
|
assert result == 0x2BA01477
|
|
|
|
info = sync_sess.swd.info()
|
|
assert isinstance(info, DAPInfo)
|
|
assert info.name == "stm32f1x.dap"
|
|
|
|
aps = sync_sess.swd.list_aps()
|
|
assert len(aps) == 1
|
|
finally:
|
|
bg_loop.call_soon_threadsafe(bg_loop.stop)
|
|
thread.join(timeout=5)
|
|
bg_loop.run_until_complete(server.stop())
|
|
bg_loop.close()
|
|
|
|
|
|
def test_classify_ap():
|
|
"""AP classification should identify MEM-AP, JTAG-AP, and unknown types."""
|
|
from openocd.swd.dap import _classify_ap
|
|
|
|
# MEM-AP ADIv5 (class field 0x8)
|
|
assert _classify_ap(0x04770031) == "MEM-AP"
|
|
# Zero IDR = unknown
|
|
assert _classify_ap(0x00000000) == "unknown"
|
|
# Class field 0x1 (COM-AP / legacy MEM-AP)
|
|
assert _classify_ap(0x00002000) == "MEM-AP"
|
|
# MEM-AP ADIv6 (class field 0x9)
|
|
assert _classify_ap(0x00012000) == "MEM-AP"
|
|
# JTAG-AP: non-zero IDR, class not MEM-AP, type field 0x0
|
|
assert _classify_ap(0x00000010) == "JTAG-AP" # bits[3:0]=0x0, class=0
|
|
# Unknown: non-zero IDR, class not MEM-AP, type field != 0
|
|
assert _classify_ap(0x00000001) == "unknown" # bits[3:0]=0x1, class=0
|
|
|
|
|
|
# ======================================================================
|
|
# Error-path tests (from code review findings I6)
|
|
# ======================================================================
|
|
|
|
|
|
async def test_no_dap_found(mock_ocd):
|
|
"""SWDError should be raised when dap names returns empty."""
|
|
from openocd.session import Session
|
|
|
|
host, port, server = mock_ocd
|
|
# Override dap names to return empty
|
|
server.add_response(r"^dap names$", "")
|
|
|
|
sess = await Session.connect(host, port, timeout=5.0)
|
|
try:
|
|
with pytest.raises(SWDError, match="No DAP instances found"):
|
|
await sess.swd.dpidr()
|
|
finally:
|
|
await sess.close()
|
|
|
|
|
|
async def test_invalidate_cache(session, mock_ocd):
|
|
"""invalidate_cache() should force re-discovery on next call."""
|
|
_, _, server = mock_ocd
|
|
|
|
# First call populates the cache
|
|
await session.swd.dpidr()
|
|
count_after_first = server.received_commands.count("dap names")
|
|
assert count_after_first == 1
|
|
|
|
# Invalidate and call again
|
|
session.swd.invalidate_cache()
|
|
await session.swd.dpidr()
|
|
count_after_invalidate = server.received_commands.count("dap names")
|
|
assert count_after_invalidate == 2
|
|
|
|
|
|
async def test_dpreg_negative_address_rejected(session):
|
|
"""Negative addresses should be rejected before reaching OpenOCD."""
|
|
with pytest.raises(SWDError, match="must be 0"):
|
|
await session.swd.dpreg(-1)
|
|
|
|
|
|
async def test_dpreg_overflow_address_rejected(session):
|
|
"""Addresses > 0xFFFFFFFF should be rejected."""
|
|
with pytest.raises(SWDError, match="must be 0"):
|
|
await session.swd.dpreg(0x1_0000_0000)
|
|
|
|
|
|
async def test_apreg_negative_ap_rejected(session):
|
|
"""Negative AP numbers should be rejected."""
|
|
with pytest.raises(SWDError, match="AP number must be"):
|
|
await session.swd.apreg(-1, 0xFC)
|
|
|
|
|
|
async def test_apreg_ap_over_255_rejected(session):
|
|
"""AP numbers > 255 should be rejected per ARM ADI spec."""
|
|
with pytest.raises(SWDError, match="AP number must be"):
|
|
await session.swd.apreg(256, 0xFC)
|
|
|
|
|
|
async def test_dpreg_write_negative_value_rejected(session):
|
|
"""Negative values should be rejected for DP register writes."""
|
|
with pytest.raises(SWDError, match="must be 0"):
|
|
await session.swd.dpreg(0x0, value=-1)
|
|
|
|
|
|
async def test_dap_info_unparseable_dpidr(mock_ocd):
|
|
"""When dap info output has no DPIDR line, dpidr should be 0 with warning."""
|
|
from openocd.session import Session
|
|
|
|
host, port, server = mock_ocd
|
|
# Override dap info to return output with no DPIDR line
|
|
server.add_response(r"^stm32f1x\.dap info$", "AP # 0\n Some AP info\n No DPIDR here")
|
|
|
|
sess = await Session.connect(host, port, timeout=5.0)
|
|
try:
|
|
info = await sess.swd.info()
|
|
assert info.dpidr == 0 # Falls back to 0 with a logged warning
|
|
assert info.ap_count == 1 # AP # 0 is still counted
|
|
finally:
|
|
await sess.close()
|