"""Memory read/write operations via OpenOCD TCL API. Uses the ``read_memory`` and ``write_memory`` TCL commands for reliable structured I/O, falling back to ``mdb``/``mdw`` style commands only where the TCL API is unavailable. """ from __future__ import annotations import asyncio import logging from pathlib import Path from openocd.connection.tcl_rpc import TclRpcConnection from openocd.errors import TargetError log = logging.getLogger(__name__) # Width constants for read_memory / write_memory _WIDTH_8 = 8 _WIDTH_16 = 16 _WIDTH_32 = 32 _WIDTH_64 = 64 # Hexdump formatting _HEXDUMP_BYTES_PER_LINE = 16 class Memory: """Read and write target memory.""" def __init__(self, conn: TclRpcConnection) -> None: self._conn = conn # ------------------------------------------------------------------ # Typed reads # ------------------------------------------------------------------ async def read_u8(self, addr: int, count: int = 1) -> list[int]: """Read 8-bit values starting at *addr*.""" return await self._read(addr, _WIDTH_8, count) async def read_u16(self, addr: int, count: int = 1) -> list[int]: """Read 16-bit values starting at *addr*.""" return await self._read(addr, _WIDTH_16, count) async def read_u32(self, addr: int, count: int = 1) -> list[int]: """Read 32-bit values starting at *addr*.""" return await self._read(addr, _WIDTH_32, count) async def read_u64(self, addr: int, count: int = 1) -> list[int]: """Read 64-bit values starting at *addr*.""" return await self._read(addr, _WIDTH_64, count) async def read_bytes(self, addr: int, size: int) -> bytes: """Read *size* bytes starting at *addr* and return as a bytes object.""" values = await self._read(addr, _WIDTH_8, size) return bytes(values) # ------------------------------------------------------------------ # Typed writes # ------------------------------------------------------------------ async def write_u8(self, addr: int, values: int | list[int]) -> None: """Write one or more 8-bit values starting at *addr*.""" await self._write(addr, _WIDTH_8, values) async def write_u16(self, addr: int, values: int | list[int]) -> None: """Write one or more 16-bit values starting at *addr*.""" await self._write(addr, _WIDTH_16, values) async def write_u32(self, addr: int, values: int | list[int]) -> None: """Write one or more 32-bit values starting at *addr*.""" await self._write(addr, _WIDTH_32, values) async def write_bytes(self, addr: int, data: bytes) -> None: """Write raw bytes to memory starting at *addr*.""" await self._write(addr, _WIDTH_8, list(data)) # ------------------------------------------------------------------ # Utilities # ------------------------------------------------------------------ async def search(self, pattern: bytes, start: int, end: int) -> list[int]: """Search for *pattern* in memory between *start* and *end*. Reads the region in chunks and returns a list of addresses where the pattern was found. This is done client-side since OpenOCD has no native memory-search command. """ if not pattern: return [] region_size = end - start if region_size <= 0: return [] chunk_size = 4096 overlap = len(pattern) - 1 results: list[int] = [] offset = 0 while offset < region_size: read_len = min(chunk_size + overlap, region_size - offset) data = await self.read_bytes(start + offset, read_len) # Scan for the pattern within this chunk search_start = 0 while True: idx = data.find(pattern, search_start) if idx == -1: break results.append(start + offset + idx) search_start = idx + 1 # Advance past the non-overlapping portion offset += chunk_size return results async def dump(self, addr: int, size: int, path: Path) -> None: """Read *size* bytes from *addr* and write them to a file.""" data = await self.read_bytes(addr, size) path.write_bytes(data) log.info("Dumped %d bytes from 0x%08X to %s", size, addr, path) async def hexdump(self, addr: int, size: int) -> str: """Read *size* bytes and return a formatted hex+ASCII dump. Output format (16 bytes per line):: 08000000: 00 50 00 20 A1 01 00 08 AB 01 00 08 AD 01 00 08 |.P. ............| """ data = await self.read_bytes(addr, size) lines: list[str] = [] for offset in range(0, len(data), _HEXDUMP_BYTES_PER_LINE): chunk = data[offset : offset + _HEXDUMP_BYTES_PER_LINE] line_addr = addr + offset # Hex portion — two groups of 8 bytes separated by an extra space hex_parts: list[str] = [] for i, b in enumerate(chunk): hex_parts.append(f"{b:02X}") if i == 7: hex_parts.append("") # extra gap between byte 7 and 8 hex_str = " ".join(hex_parts) # Pad to consistent width (3 chars * 16 bytes + 1 extra gap = 49 chars) # 16 hex pairs = 16*2=32 hex chars, 15 spaces + 1 gap space = 16 = 49 hex_str = hex_str.ljust(49) # ASCII portion ascii_str = "".join( chr(b) if 0x20 <= b < 0x7F else "." for b in chunk ) lines.append(f"{line_addr:08X}: {hex_str} |{ascii_str}|") return "\n".join(lines) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _read(self, addr: int, width: int, count: int) -> list[int]: """Read *count* values of *width* bits using the TCL ``read_memory`` API. Command: ``read_memory `` Response: space-separated hex values. """ cmd = f"read_memory 0x{addr:x} {width} {count}" resp = await self._conn.send(cmd) if "error" in resp.lower(): raise TargetError(f"read_memory failed: {resp}") tokens = resp.strip().split() try: return [int(t, 16) for t in tokens] except ValueError as exc: raise TargetError( f"Cannot parse read_memory response: {resp!r}" ) from exc async def _write(self, addr: int, width: int, values: int | list[int]) -> None: """Write values of *width* bits using the TCL ``write_memory`` API. Command: ``write_memory {val1 val2 ...}`` """ if isinstance(values, int): values = [values] val_str = " ".join(f"0x{v:x}" for v in values) cmd = f"write_memory 0x{addr:x} {width} {{{val_str}}}" resp = await self._conn.send(cmd) if "error" in resp.lower(): raise TargetError(f"write_memory failed: {resp}") class SyncMemory: """Synchronous wrapper around Memory.""" def __init__(self, memory: Memory, loop: asyncio.AbstractEventLoop) -> None: self._memory = memory self._loop = loop def read_u8(self, addr: int, count: int = 1) -> list[int]: return self._loop.run_until_complete(self._memory.read_u8(addr, count)) def read_u16(self, addr: int, count: int = 1) -> list[int]: return self._loop.run_until_complete(self._memory.read_u16(addr, count)) def read_u32(self, addr: int, count: int = 1) -> list[int]: return self._loop.run_until_complete(self._memory.read_u32(addr, count)) def read_u64(self, addr: int, count: int = 1) -> list[int]: return self._loop.run_until_complete(self._memory.read_u64(addr, count)) def read_bytes(self, addr: int, size: int) -> bytes: return self._loop.run_until_complete(self._memory.read_bytes(addr, size)) def write_u8(self, addr: int, values: int | list[int]) -> None: self._loop.run_until_complete(self._memory.write_u8(addr, values)) def write_u16(self, addr: int, values: int | list[int]) -> None: self._loop.run_until_complete(self._memory.write_u16(addr, values)) def write_u32(self, addr: int, values: int | list[int]) -> None: self._loop.run_until_complete(self._memory.write_u32(addr, values)) def write_bytes(self, addr: int, data: bytes) -> None: self._loop.run_until_complete(self._memory.write_bytes(addr, data)) def search(self, pattern: bytes, start: int, end: int) -> list[int]: return self._loop.run_until_complete(self._memory.search(pattern, start, end)) def dump(self, addr: int, size: int, path: Path) -> None: self._loop.run_until_complete(self._memory.dump(addr, size, path)) def hexdump(self, addr: int, size: int) -> str: return self._loop.run_until_complete(self._memory.hexdump(addr, size))