OpenOCD uses two different output formats for breakpoint listing within the same session: Breakpoint(IVA) for hardware (FPB) and IVA breakpoint for software (patched). The old regex only matched the first format, causing breakpoint_list() to always return empty when software breakpoints were present. The third field is a comparator index or bp number, not a hw/sw type flag — the format variant itself indicates the breakpoint type. Verified against live STM32F103C6T6 hardware: 8/9 lifecycle tests pass, 154/154 mock tests pass.
277 lines
9.7 KiB
Python
277 lines
9.7 KiB
Python
"""Fake OpenOCD TCL RPC server for testing.
|
|
|
|
An asyncio TCP server that speaks the OpenOCD TCL RPC framing protocol:
|
|
client sends: command_string + \\x1a
|
|
server replies: response_string + \\x1a
|
|
|
|
Supports exact-match and regex-based command routing with pre-loaded
|
|
responses that mirror real OpenOCD output.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import re
|
|
from collections.abc import Callable
|
|
|
|
SEPARATOR = b"\x1a"
|
|
|
|
|
|
# -- Canned OpenOCD responses ------------------------------------------------
|
|
|
|
TARGETS_RESPONSE = """\
|
|
TargetName Type Endian TapName State
|
|
-- ------------------ ---------- ------ ------------------ ------------
|
|
0* stm32f1x.cpu cortex_m little stm32f1x.cpu halted"""
|
|
|
|
REG_PC_RESPONSE = "pc (/32): 0x08001234"
|
|
REG_SP_RESPONSE = "sp (/32): 0x20005000"
|
|
REG_LR_RESPONSE = "lr (/32): 0x08000100"
|
|
REG_XPSR_RESPONSE = "xPSR (/32): 0x61000000"
|
|
|
|
REG_ALL_RESPONSE = """\
|
|
===== ARM registers
|
|
(0) r0 (/32): 0x00000000
|
|
(1) r1 (/32): 0x00000001
|
|
(2) r2 (/32): 0x20001000
|
|
(3) r3 (/32): 0x00000003
|
|
(4) r4 (/32): 0x00000000
|
|
(5) r5 (/32): 0x00000000
|
|
(6) r6 (/32): 0x00000000
|
|
(7) r7 (/32): 0x20004FF0
|
|
(8) r8 (/32): 0x00000000
|
|
(9) r9 (/32): 0x00000000
|
|
(10) r10 (/32): 0x00000000
|
|
(11) r11 (/32): 0x00000000
|
|
(12) r12 (/32): 0x00000000
|
|
(13) sp (/32): 0x20005000
|
|
(14) lr (/32): 0x08000100
|
|
(15) pc (/32): 0x08001234
|
|
(16) xPSR (/32): 0x61000000
|
|
(17) msp (/32): 0x20005000
|
|
(18) psp (/32): 0x00000000
|
|
(19) primask (/1): 0x00
|
|
(20) basepri (/8): 0x00
|
|
(21) faultmask (/1): 0x00
|
|
(22) control (/3): 0x00 (dirty)"""
|
|
|
|
READ_MEMORY_RESPONSE = "20005000 080001a1 080001ab 080001ad"
|
|
|
|
FLASH_BANKS_RESPONSE = (
|
|
"#0 : stm32f1x.flash (stm32f1x) at 0x08000000, size 0x00020000, buswidth 0, chipwidth 0"
|
|
)
|
|
|
|
SCAN_CHAIN_RESPONSE = """\
|
|
TapName Enabled IdCode Expected IrLen IrCap IrMask
|
|
-- ------------------- -------- ---------- ---------- ----- ----- ------
|
|
0 stm32f1x.cpu Y 0x3ba00477 0x3ba00477 4 0x01 0x0f"""
|
|
|
|
BP_LIST_RESPONSE = """\
|
|
Breakpoint(IVA): 0x08001234, 0x2, 0
|
|
IVA breakpoint: 0x08001300, 0x2, 0x03
|
|
Breakpoint(IVA): 0x08001400, 0x4, 1"""
|
|
|
|
RTT_CHANNELS_RESPONSE = """\
|
|
Up-channels:
|
|
0: Terminal 1024
|
|
1: Log 512
|
|
Down-channels:
|
|
0: Terminal 16"""
|
|
|
|
TRANSPORT_SELECT_RESPONSE = "swd"
|
|
TRANSPORT_LIST_RESPONSE = "jtag swd"
|
|
ADAPTER_SPEED_RESPONSE = "4000"
|
|
|
|
# -- SWD/DAP ---------------------------------------------------------------
|
|
DAP_NAMES_RESPONSE = "stm32f1x.dap"
|
|
|
|
DAP_INFO_RESPONSE = """\
|
|
AP # 0
|
|
AP ID register 0x04770031
|
|
Type is MEM-AP AHB3
|
|
MEM-AP BASE 0xe00ff003
|
|
Valid ROM table present
|
|
Component base address 0xe00ff000
|
|
Peripheral ID 0x04c0010471
|
|
Designer is 0x4bb, ST Microelectronics
|
|
DPIDR: 0x2ba01477"""
|
|
|
|
DPREG_0_RESPONSE = "0x2ba01477"
|
|
DPREG_24_RESPONSE = "0x00000477"
|
|
APREG_0_FC_RESPONSE = "0x04770031"
|
|
APREG_0_F8_RESPONSE = "0xe00ff003"
|
|
APREG_1_FC_RESPONSE = "0x00000000"
|
|
|
|
|
|
def _build_default_responses() -> list[tuple[re.Pattern[str], str | Callable[[str], str]]]:
|
|
"""Build the default command-to-response routing table.
|
|
|
|
Returns a list of (compiled_regex, response) pairs. The first match wins.
|
|
Response can be a string or a callable that receives the full command.
|
|
"""
|
|
routes: list[tuple[re.Pattern[str], str | Callable[[str], str]]] = [
|
|
# target state control
|
|
(re.compile(r"^targets$"), TARGETS_RESPONSE),
|
|
(re.compile(r"^halt$"), ""),
|
|
(re.compile(r"^resume"), ""),
|
|
(re.compile(r"^step"), ""),
|
|
(re.compile(r"^reset\s+"), ""),
|
|
(re.compile(r"^wait_halt"), ""),
|
|
# individual register reads (must come before bare "reg")
|
|
(re.compile(r"^reg\s+pc$"), REG_PC_RESPONSE),
|
|
(re.compile(r"^reg\s+sp$"), REG_SP_RESPONSE),
|
|
(re.compile(r"^reg\s+lr$"), REG_LR_RESPONSE),
|
|
(re.compile(r"^reg\s+xPSR$"), REG_XPSR_RESPONSE),
|
|
# register write (reg <name> <value>)
|
|
(re.compile(r"^reg\s+\S+\s+0x"), ""),
|
|
# bare "reg" -> full listing
|
|
(re.compile(r"^reg$"), REG_ALL_RESPONSE),
|
|
# memory
|
|
(re.compile(r"^read_memory\s+0x8000000\s+32\s+4$"), READ_MEMORY_RESPONSE),
|
|
# generic read_memory -- return zeros for widths/counts we haven't mapped
|
|
(re.compile(r"^read_memory\s+"), _generic_read_memory),
|
|
(re.compile(r"^write_memory\s+"), ""),
|
|
# flash
|
|
(re.compile(r"^flash banks$"), FLASH_BANKS_RESPONSE),
|
|
(re.compile(r"^flash\s+"), ""),
|
|
# SWD/DAP
|
|
(re.compile(r"^dap names$"), DAP_NAMES_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap info$"), DAP_INFO_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap dpreg 0x0$"), DPREG_0_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap dpreg 0x24$"), DPREG_24_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap dpreg 0x0 0x"), ""),
|
|
(re.compile(r"^stm32f1x\.dap apreg 0 0xfc$"), APREG_0_FC_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap apreg 0 0xf8$"), APREG_0_F8_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap apreg 1 0xfc$"), APREG_1_FC_RESPONSE),
|
|
(re.compile(r"^stm32f1x\.dap apreg 0 0x0 0x"), ""),
|
|
# JTAG
|
|
(re.compile(r"^scan_chain$"), SCAN_CHAIN_RESPONSE),
|
|
(re.compile(r"^irscan\s+"), "0x01"),
|
|
(re.compile(r"^drscan\s+"), "0xDEADBEEF"),
|
|
(re.compile(r"^runtest\s+"), ""),
|
|
(re.compile(r"^pathmove\s+"), ""),
|
|
# breakpoints
|
|
(re.compile(r"^bp\s+0x"), ""),
|
|
(re.compile(r"^bp$"), BP_LIST_RESPONSE),
|
|
(re.compile(r"^rbp\s+"), ""),
|
|
(re.compile(r"^wp\s+0x"), ""),
|
|
(re.compile(r"^wp$"), ""),
|
|
(re.compile(r"^rwp\s+"), ""),
|
|
# transport / adapter
|
|
(re.compile(r"^transport\s+select$"), TRANSPORT_SELECT_RESPONSE),
|
|
(re.compile(r"^transport\s+list$"), TRANSPORT_LIST_RESPONSE),
|
|
(re.compile(r"^adapter\s+speed$"), ADAPTER_SPEED_RESPONSE),
|
|
(re.compile(r"^adapter\s+speed\s+\d+"), ADAPTER_SPEED_RESPONSE),
|
|
(re.compile(r"^adapter\s+name$"), "cmsis-dap"),
|
|
# RTT
|
|
(re.compile(r"^rtt\s+channels$"), RTT_CHANNELS_RESPONSE),
|
|
(re.compile(r"^rtt\s+setup\s+"), ""),
|
|
(re.compile(r"^rtt\s+start$"), ""),
|
|
(re.compile(r"^rtt\s+stop$"), ""),
|
|
(re.compile(r"^rtt\s+channelread\s+"), "hello from target"),
|
|
(re.compile(r"^rtt\s+channelwrite\s+"), ""),
|
|
# notifications
|
|
(re.compile(r"^tcl_notifications\s+"), ""),
|
|
]
|
|
return routes
|
|
|
|
|
|
def _generic_read_memory(cmd: str) -> str:
|
|
"""Generate a plausible response for an arbitrary read_memory command.
|
|
|
|
Parses the count from the command and returns that many hex zeros.
|
|
"""
|
|
parts = cmd.split()
|
|
# read_memory <addr> <width> <count>
|
|
count = 1
|
|
if len(parts) >= 4:
|
|
with contextlib.suppress(ValueError):
|
|
count = int(parts[3])
|
|
return " ".join(["00"] * count)
|
|
|
|
|
|
class MockOpenOCDServer:
|
|
"""Asyncio TCP server that fakes OpenOCD TCL RPC responses.
|
|
|
|
Usage::
|
|
|
|
server = MockOpenOCDServer()
|
|
await server.start()
|
|
host, port = server.address
|
|
# ... connect and test ...
|
|
await server.stop()
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._server: asyncio.Server | None = None
|
|
self._routes = _build_default_responses()
|
|
self._host = "127.0.0.1"
|
|
self._port = 0 # OS picks a free port
|
|
# Track raw commands received, useful for assertions
|
|
self.received_commands: list[str] = []
|
|
|
|
@property
|
|
def address(self) -> tuple[str, int]:
|
|
"""Return (host, port) the server is listening on."""
|
|
if self._server is None:
|
|
raise RuntimeError("Server not started")
|
|
sock = self._server.sockets[0]
|
|
return sock.getsockname()[:2]
|
|
|
|
def add_response(self, pattern: str, response: str | Callable[[str], str]) -> None:
|
|
"""Prepend a custom response rule (takes priority over defaults)."""
|
|
self._routes.insert(0, (re.compile(pattern), response))
|
|
|
|
async def start(self) -> None:
|
|
self._server = await asyncio.start_server(self._handle_client, self._host, self._port)
|
|
await self._server.start_serving()
|
|
|
|
async def stop(self) -> None:
|
|
if self._server:
|
|
self._server.close()
|
|
await self._server.wait_closed()
|
|
self._server = None
|
|
|
|
async def _handle_client(
|
|
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
|
) -> None:
|
|
"""Handle one client connection, reading commands and sending responses."""
|
|
buf = bytearray()
|
|
try:
|
|
while True:
|
|
chunk = await reader.read(4096)
|
|
if not chunk:
|
|
break
|
|
buf.extend(chunk)
|
|
|
|
# Process all complete commands in the buffer
|
|
while True:
|
|
idx = buf.find(SEPARATOR)
|
|
if idx == -1:
|
|
break
|
|
command = bytes(buf[:idx]).decode("utf-8", errors="replace")
|
|
buf = buf[idx + 1 :]
|
|
|
|
self.received_commands.append(command)
|
|
response = self._resolve(command)
|
|
|
|
writer.write(response.encode("utf-8") + SEPARATOR)
|
|
await writer.drain()
|
|
except (asyncio.CancelledError, ConnectionResetError, BrokenPipeError):
|
|
pass
|
|
finally:
|
|
writer.close()
|
|
with contextlib.suppress(OSError):
|
|
await writer.wait_closed()
|
|
|
|
def _resolve(self, command: str) -> str:
|
|
"""Find the first matching route and return its response."""
|
|
for pattern, response in self._routes:
|
|
if pattern.search(command):
|
|
if callable(response):
|
|
return response(command)
|
|
return response
|
|
# Unrecognized command returns empty (success)
|
|
return ""
|