diff --git a/src/dosbox_mcp/server.py b/src/dosbox_mcp/server.py index ab75546..9e88c26 100644 --- a/src/dosbox_mcp/server.py +++ b/src/dosbox_mcp/server.py @@ -9,14 +9,10 @@ reading memory, and tracing execution. import logging from importlib.metadata import version -from typing import Literal from fastmcp import FastMCP -from .dosbox import DOSBoxConfig, DOSBoxManager -from .gdb_client import GDBClient, GDBError -from .types import DOSBoxStatus -from .utils import format_address, hexdump, parse_address +from . import tools # Configure logging logging.basicConfig( @@ -59,612 +55,34 @@ Address formats supported: """ ) -# Global state -_manager = DOSBoxManager() -_client = GDBClient() - - # ============================================================================= -# P0 Tools - MVP for Bezier tracing +# Register tools from modules # ============================================================================= - -@mcp.tool() -def launch( - binary_path: str | None = None, - gdb_port: int = 1234, - use_docker: bool = False, - cycles: str = "auto", - memsize: int = 16, -) -> dict: - """Launch DOSBox-X with GDB debugging enabled. - - Args: - binary_path: Path to DOS binary to run (optional) - gdb_port: Port for GDB stub (default: 1234) - use_docker: Use Docker container instead of native DOSBox-X - cycles: CPU cycles setting (auto, max, or number) - memsize: Conventional memory in MB (default: 16) - - Returns: - Status dict with connection details - - Example: - launch("/path/to/GAME.EXE", gdb_port=1234) - """ - config = DOSBoxConfig( - gdb_port=gdb_port, - gdb_enabled=True, - cycles=cycles, - memsize=memsize, - ) - - try: - if use_docker: - _manager.launch_docker(binary_path=binary_path, config=config) - else: - _manager.launch_native(binary_path=binary_path, config=config) - - return { - "success": True, - "message": "DOSBox-X launched successfully", - "gdb_host": "localhost", - "gdb_port": gdb_port, - "pid": _manager.pid, - "hint": f"Use attach() to connect to the debugger on port {gdb_port}", - } - except Exception as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def attach(host: str = "localhost", port: int = 1234) -> dict: - """Connect to a running DOSBox-X GDB stub. - - Args: - host: Hostname or IP (default: localhost) - port: GDB port (default: 1234) - - Returns: - Connection status and initial register state - - Example: - attach("localhost", 1234) - """ - try: - _client.connect(host, port) - - # Get initial state - regs = _client.read_registers() - stop = _client.get_stop_reason() - - return { - "success": True, - "message": f"Connected to {host}:{port}", - "stop_reason": stop.reason.name.lower(), - "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", - "physical_address": f"{regs.cs_ip:05x}", - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def breakpoint_set(address: str) -> dict: - """Set a software breakpoint at the specified address. - - Args: - address: Memory address (segment:offset or flat hex) - - Returns: - Breakpoint info - - Examples: - breakpoint_set("1234:0100") # segment:offset - breakpoint_set("0x12340") # flat address - """ - try: - addr = parse_address(address) - bp = _client.set_breakpoint(addr) - return { - "success": True, - "breakpoint_id": bp.id, - "address": format_address(bp.address, "both"), - "original": address, - } - except (GDBError, ValueError) as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def breakpoint_list() -> dict: - """List all active breakpoints. - - Returns: - List of breakpoint info - """ - bps = _client.list_breakpoints() - return { - "count": len(bps), - "breakpoints": [bp.to_dict() for bp in bps], - } - - -@mcp.tool() -def breakpoint_delete(id: int | None = None, all: bool = False) -> dict: - """Delete breakpoint(s). - - Args: - id: Specific breakpoint ID to delete - all: If True, delete all breakpoints - - Returns: - Deletion result - """ - try: - if all: - count = _client.delete_all_breakpoints() - return { - "success": True, - "deleted": count, - } - elif id is not None: - _client.delete_breakpoint(id) - return { - "success": True, - "deleted_id": id, - } - else: - return { - "success": False, - "error": "Specify either 'id' or 'all=True'", - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def continue_execution(timeout: float | None = None) -> dict: - """Continue execution until breakpoint or signal. - - Args: - timeout: Optional timeout in seconds - - Returns: - Stop event info (reason, address, breakpoint hit) - """ - try: - event = _client.continue_execution(timeout=timeout) - regs = _client.read_registers() - - return { - "success": True, - "stop_reason": event.reason.name.lower(), - "address": format_address(event.address, "both"), - "breakpoint_id": event.breakpoint_id, - "signal": event.signal, - "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def step(count: int = 1) -> dict: - """Step one or more instructions. - - Args: - count: Number of instructions to step (default: 1) - - Returns: - New register state after stepping - """ - try: - event = _client.step(count) - regs = _client.read_registers() - - return { - "success": True, - "stop_reason": event.reason.name.lower(), - "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", - "physical_address": f"{regs.cs_ip:05x}", - "stepped": count, - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def step_over() -> dict: - """Step over a CALL instruction (execute subroutine and stop after return). - - Returns: - New register state after step-over - """ - try: - event = _client.step_over() - regs = _client.read_registers() - - return { - "success": True, - "stop_reason": event.reason.name.lower(), - "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", - "physical_address": f"{regs.cs_ip:05x}", - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def registers() -> dict: - """Read all CPU registers. - - Returns: - Complete register state including: - - 32-bit registers (EAX, EBX, etc.) - - 16-bit aliases (AX, BX, etc.) - - Segment registers (CS, DS, ES, SS, FS, GS) - - Instruction pointer (CS:IP) - - Stack pointer (SS:SP) - - Flags - """ - try: - regs = _client.read_registers() - return { - "success": True, - **regs.to_dict(), - } - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def memory_read( - address: str, - length: int = 16, - format: Literal["hex", "ascii", "dump"] = "dump", -) -> dict: - """Read memory from target. - - Args: - address: Memory address (segment:offset or flat hex) - length: Number of bytes to read (default: 16, max: 4096) - format: Output format - "hex", "ascii", or "dump" (default) - - Returns: - Memory contents in requested format - - Examples: - memory_read("DS:0100", 64) - memory_read("0x12340", 256, format="hex") - """ - try: - # Handle register-based addresses like "DS:SI" - addr_str = address.upper() - if ':' in addr_str: - seg_part, off_part = addr_str.split(':') - # Check if parts are register names - regs = None - seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'} - off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'} - - if seg_part in seg_regs or off_part in off_regs: - regs = _client.read_registers() - seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16) - off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16) - addr = (seg_val << 4) + off_val - else: - addr = parse_address(address) - else: - addr = parse_address(address) - - # Limit read size - length = min(length, 4096) - - mem = _client.read_memory(addr, length) - - result = { - "success": True, - "address": format_address(addr, "both"), - "length": len(mem.data), - } - - if format == "hex": - result["data"] = mem.to_hex() - elif format == "ascii": - result["data"] = mem.to_ascii() - else: # dump - result["dump"] = hexdump(mem.data, addr) - - return result - - except (GDBError, ValueError) as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def memory_write( - address: str, - data: str, - format: Literal["hex", "ascii"] = "hex", -) -> dict: - """Write memory to target. - - Args: - address: Memory address (segment:offset or flat hex) - data: Data to write (hex string or ASCII) - format: Input format - "hex" or "ascii" - - Returns: - Write result - - Examples: - memory_write("1234:0100", "90909090", format="hex") # NOP sled - memory_write("DS:0100", "Hello", format="ascii") - """ - try: - addr = parse_address(address) - - if format == "hex": - bytes_data = bytes.fromhex(data) - else: - bytes_data = data.encode('latin-1') - - _client.write_memory(addr, bytes_data) - - return { - "success": True, - "address": format_address(addr, "both"), - "bytes_written": len(bytes_data), - } - except (GDBError, ValueError) as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def disassemble(address: str | None = None, count: int = 10) -> dict: - """Disassemble instructions at address. - - Note: This is a simplified disassembler. For complex analysis, - use a dedicated tool like Ghidra. - - Args: - address: Start address (default: current CS:IP) - count: Number of bytes to read for disassembly (default: 10) - - Returns: - Raw bytes and simple instruction hints - """ - try: - if address: - addr = parse_address(address) - else: - regs = _client.read_registers() - addr = regs.cs_ip - - # Read memory for disassembly - mem = _client.read_memory(addr, count * 4) # Rough estimate - - # Simple x86 opcode hints (not a full disassembler) - # This is just to give Claude some context - opcodes = { - 0x90: "NOP", - 0xCC: "INT 3", - 0xCD: "INT", - 0xC3: "RET", - 0xCB: "RETF", - 0xE8: "CALL", - 0xE9: "JMP", - 0xEB: "JMP short", - 0x74: "JZ", - 0x75: "JNZ", - 0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX", - 0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI", - 0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX", - 0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI", - 0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm", - 0x89: "MOV r/m,r", 0x8B: "MOV r,r/m", - 0x01: "ADD r/m,r", 0x03: "ADD r,r/m", - 0x29: "SUB r/m,r", 0x2B: "SUB r,r/m", - 0x31: "XOR r/m,r", 0x33: "XOR r,r/m", - 0x39: "CMP r/m,r", 0x3B: "CMP r,r/m", - } - - lines = [] - offset = 0 - for i, b in enumerate(mem.data[:count]): - hint = opcodes.get(b, f"?? ({b:02x})") - lines.append({ - "address": format_address(addr + i), - "byte": f"{b:02x}", - "hint": hint, - }) - - return { - "success": True, - "start_address": format_address(addr, "both"), - "raw_bytes": mem.data[:count].hex(), - "instructions": lines, - "note": "This is a simplified view. Use Ghidra for full disassembly.", - } - - except (GDBError, ValueError) as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def stack(count: int = 16) -> dict: - """Dump stack contents. - - Args: - count: Number of words to dump (default: 16) - - Returns: - Stack contents with SS:SP and values - """ - try: - regs = _client.read_registers() - sp_addr = regs.ss_sp - - # Read stack (2 bytes per word in real mode) - mem = _client.read_memory(sp_addr, count * 2) - - words = [] - for i in range(0, len(mem.data), 2): - if i + 1 < len(mem.data): - word = int.from_bytes(mem.data[i:i+2], 'little') - words.append({ - "offset": f"+{i:02x}", - "address": format_address(sp_addr + i), - "value": f"{word:04x}", - }) - - return { - "success": True, - "ss_sp": f"{regs.ss:04x}:{regs.sp:04x}", - "physical_address": format_address(sp_addr, "both"), - "words": words, - } - - except GDBError as e: - return { - "success": False, - "error": str(e), - } - - -@mcp.tool() -def status() -> dict: - """Get current debugger and emulator status. - - Returns: - Complete status including connection state, breakpoints, etc. - """ - status = DOSBoxStatus( - running=_manager.running, - connected=_client.connected, - host=_client.host, - port=_client.port, - pid=_manager.pid, - breakpoints=_client.list_breakpoints() if _client.connected else [], - ) - - result = status.to_dict() - - # Add register state if connected - if _client.connected: - try: - regs = _client.read_registers() - result["cs_ip"] = f"{regs.cs:04x}:{regs.ip:04x}" - result["ss_sp"] = f"{regs.ss:04x}:{regs.sp:04x}" - except GDBError: - pass - - return result - - -@mcp.tool() -def quit() -> dict: - """Stop DOSBox-X and clean up. - - Returns: - Shutdown status - """ - try: - if _client.connected: - try: - _client.kill() - except GDBError: - _client.disconnect() - - if _manager.running: - _manager.stop() - - return { - "success": True, - "message": "DOSBox-X stopped and cleaned up", - } - except Exception as e: - return { - "success": False, - "error": str(e), - } - - -# ============================================================================= -# P2 Tools - Nice to have -# ============================================================================= - - -@mcp.tool() -def screenshot(filename: str | None = None) -> dict: - """Capture DOSBox-X display. - - Args: - filename: Optional output filename - - Returns: - Screenshot info or error - """ - # Placeholder - requires X11 or DOSBox-X specific integration - return { - "success": False, - "error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.", - } - - -@mcp.tool() -def serial_send(data: str, port: int = 1) -> dict: - """Send data to DOSBox-X serial port. - - This is useful for RIPscrip testing - send graphics commands - to a program listening on COM1. - - Args: - data: Data to send (text or hex with \\x prefix) - port: COM port number (1 or 2) - - Returns: - Send result - """ - # Placeholder - requires serial port configuration - return { - "success": False, - "error": "Serial port communication not yet implemented.", - } +# Execution control +mcp.tool()(tools.launch) +mcp.tool()(tools.attach) +mcp.tool()(tools.continue_execution) +mcp.tool()(tools.step) +mcp.tool()(tools.step_over) +mcp.tool()(tools.quit) + +# Breakpoints +mcp.tool()(tools.breakpoint_set) +mcp.tool()(tools.breakpoint_list) +mcp.tool()(tools.breakpoint_delete) + +# Inspection +mcp.tool()(tools.registers) +mcp.tool()(tools.memory_read) +mcp.tool()(tools.memory_write) +mcp.tool()(tools.disassemble) +mcp.tool()(tools.stack) +mcp.tool()(tools.status) + +# Peripheral +mcp.tool()(tools.screenshot) +mcp.tool()(tools.serial_send) # ============================================================================= diff --git a/src/dosbox_mcp/state.py b/src/dosbox_mcp/state.py new file mode 100644 index 0000000..d27447b --- /dev/null +++ b/src/dosbox_mcp/state.py @@ -0,0 +1,14 @@ +"""Shared global state for DOSBox-X MCP Server. + +This module holds the singleton instances that are shared across all tools. +Centralizing state here avoids circular imports when tools are split into modules. +""" + +from .dosbox import DOSBoxManager +from .gdb_client import GDBClient + +# Global DOSBox-X process/container manager +manager = DOSBoxManager() + +# Global GDB client connection +client = GDBClient() diff --git a/src/dosbox_mcp/tools/__init__.py b/src/dosbox_mcp/tools/__init__.py new file mode 100644 index 0000000..f8ed067 --- /dev/null +++ b/src/dosbox_mcp/tools/__init__.py @@ -0,0 +1,37 @@ +"""MCP tools for DOSBox-X debugging. + +Tools are organized by function: +- execution: launch, attach, continue, step, quit +- breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete +- inspection: registers, memory_read, memory_write, disassemble, stack, status +- peripheral: screenshot, serial_send +""" + +from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set +from .execution import attach, continue_execution, launch, quit, step, step_over +from .inspection import disassemble, memory_read, memory_write, registers, stack, status +from .peripheral import screenshot, serial_send + +__all__ = [ + # Execution + "launch", + "attach", + "continue_execution", + "step", + "step_over", + "quit", + # Breakpoints + "breakpoint_set", + "breakpoint_list", + "breakpoint_delete", + # Inspection + "registers", + "memory_read", + "memory_write", + "disassemble", + "stack", + "status", + # Peripheral + "screenshot", + "serial_send", +] diff --git a/src/dosbox_mcp/tools/breakpoints.py b/src/dosbox_mcp/tools/breakpoints.py new file mode 100644 index 0000000..b18929b --- /dev/null +++ b/src/dosbox_mcp/tools/breakpoints.py @@ -0,0 +1,82 @@ +"""Breakpoint management tools.""" + +from ..gdb_client import GDBError +from ..state import client +from ..utils import format_address, parse_address + + +def breakpoint_set(address: str) -> dict: + """Set a software breakpoint at the specified address. + + Args: + address: Memory address (segment:offset or flat hex) + + Returns: + Breakpoint info + + Examples: + breakpoint_set("1234:0100") # segment:offset + breakpoint_set("0x12340") # flat address + """ + try: + addr = parse_address(address) + bp = client.set_breakpoint(addr) + return { + "success": True, + "breakpoint_id": bp.id, + "address": format_address(bp.address, "both"), + "original": address, + } + except (GDBError, ValueError) as e: + return { + "success": False, + "error": str(e), + } + + +def breakpoint_list() -> dict: + """List all active breakpoints. + + Returns: + List of breakpoint info + """ + bps = client.list_breakpoints() + return { + "count": len(bps), + "breakpoints": [bp.to_dict() for bp in bps], + } + + +def breakpoint_delete(id: int | None = None, all: bool = False) -> dict: + """Delete breakpoint(s). + + Args: + id: Specific breakpoint ID to delete + all: If True, delete all breakpoints + + Returns: + Deletion result + """ + try: + if all: + count = client.delete_all_breakpoints() + return { + "success": True, + "deleted": count, + } + elif id is not None: + client.delete_breakpoint(id) + return { + "success": True, + "deleted_id": id, + } + else: + return { + "success": False, + "error": "Specify either 'id' or 'all=True'", + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } diff --git a/src/dosbox_mcp/tools/execution.py b/src/dosbox_mcp/tools/execution.py new file mode 100644 index 0000000..1ebce66 --- /dev/null +++ b/src/dosbox_mcp/tools/execution.py @@ -0,0 +1,195 @@ +"""Execution control tools: launch, attach, continue, step, quit.""" + +from ..dosbox import DOSBoxConfig +from ..gdb_client import GDBError +from ..state import client, manager +from ..utils import format_address + + +def launch( + binary_path: str | None = None, + gdb_port: int = 1234, + use_docker: bool = False, + cycles: str = "auto", + memsize: int = 16, +) -> dict: + """Launch DOSBox-X with GDB debugging enabled. + + Args: + binary_path: Path to DOS binary to run (optional) + gdb_port: Port for GDB stub (default: 1234) + use_docker: Use Docker container instead of native DOSBox-X + cycles: CPU cycles setting (auto, max, or number) + memsize: Conventional memory in MB (default: 16) + + Returns: + Status dict with connection details + + Example: + launch("/path/to/GAME.EXE", gdb_port=1234) + """ + config = DOSBoxConfig( + gdb_port=gdb_port, + gdb_enabled=True, + cycles=cycles, + memsize=memsize, + ) + + try: + if use_docker: + manager.launch_docker(binary_path=binary_path, config=config) + else: + manager.launch_native(binary_path=binary_path, config=config) + + return { + "success": True, + "message": "DOSBox-X launched successfully", + "gdb_host": "localhost", + "gdb_port": gdb_port, + "pid": manager.pid, + "hint": f"Use attach() to connect to the debugger on port {gdb_port}", + } + except Exception as e: + return { + "success": False, + "error": str(e), + } + + +def attach(host: str = "localhost", port: int = 1234) -> dict: + """Connect to a running DOSBox-X GDB stub. + + Args: + host: Hostname or IP (default: localhost) + port: GDB port (default: 1234) + + Returns: + Connection status and initial register state + + Example: + attach("localhost", 1234) + """ + try: + client.connect(host, port) + + # Get initial state + regs = client.read_registers() + stop = client.get_stop_reason() + + return { + "success": True, + "message": f"Connected to {host}:{port}", + "stop_reason": stop.reason.name.lower(), + "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", + "physical_address": f"{regs.cs_ip:05x}", + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def continue_execution(timeout: float | None = None) -> dict: + """Continue execution until breakpoint or signal. + + Args: + timeout: Optional timeout in seconds + + Returns: + Stop event info (reason, address, breakpoint hit) + """ + try: + event = client.continue_execution(timeout=timeout) + regs = client.read_registers() + + return { + "success": True, + "stop_reason": event.reason.name.lower(), + "address": format_address(event.address, "both"), + "breakpoint_id": event.breakpoint_id, + "signal": event.signal, + "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def step(count: int = 1) -> dict: + """Step one or more instructions. + + Args: + count: Number of instructions to step (default: 1) + + Returns: + New register state after stepping + """ + try: + event = client.step(count) + regs = client.read_registers() + + return { + "success": True, + "stop_reason": event.reason.name.lower(), + "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", + "physical_address": f"{regs.cs_ip:05x}", + "stepped": count, + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def step_over() -> dict: + """Step over a CALL instruction (execute subroutine and stop after return). + + Returns: + New register state after step-over + """ + try: + event = client.step_over() + regs = client.read_registers() + + return { + "success": True, + "stop_reason": event.reason.name.lower(), + "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", + "physical_address": f"{regs.cs_ip:05x}", + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def quit() -> dict: + """Stop DOSBox-X and clean up. + + Returns: + Shutdown status + """ + try: + if client.connected: + try: + client.kill() + except GDBError: + client.disconnect() + + if manager.running: + manager.stop() + + return { + "success": True, + "message": "DOSBox-X stopped and cleaned up", + } + except Exception as e: + return { + "success": False, + "error": str(e), + } diff --git a/src/dosbox_mcp/tools/inspection.py b/src/dosbox_mcp/tools/inspection.py new file mode 100644 index 0000000..9b02622 --- /dev/null +++ b/src/dosbox_mcp/tools/inspection.py @@ -0,0 +1,281 @@ +"""Inspection tools: registers, memory, disassemble, stack, status.""" + +from typing import Literal + +from ..gdb_client import GDBError +from ..state import client, manager +from ..types import DOSBoxStatus +from ..utils import format_address, hexdump, parse_address + + +def registers() -> dict: + """Read all CPU registers. + + Returns: + Complete register state including: + - 32-bit registers (EAX, EBX, etc.) + - 16-bit aliases (AX, BX, etc.) + - Segment registers (CS, DS, ES, SS, FS, GS) + - Instruction pointer (CS:IP) + - Stack pointer (SS:SP) + - Flags + """ + try: + regs = client.read_registers() + return { + "success": True, + **regs.to_dict(), + } + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def memory_read( + address: str, + length: int = 16, + format: Literal["hex", "ascii", "dump"] = "dump", +) -> dict: + """Read memory from target. + + Args: + address: Memory address (segment:offset or flat hex) + length: Number of bytes to read (default: 16, max: 4096) + format: Output format - "hex", "ascii", or "dump" (default) + + Returns: + Memory contents in requested format + + Examples: + memory_read("DS:0100", 64) + memory_read("0x12340", 256, format="hex") + """ + try: + # Handle register-based addresses like "DS:SI" + addr_str = address.upper() + if ':' in addr_str: + seg_part, off_part = addr_str.split(':') + # Check if parts are register names + regs = None + seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'} + off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'} + + if seg_part in seg_regs or off_part in off_regs: + regs = client.read_registers() + seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16) + off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16) + addr = (seg_val << 4) + off_val + else: + addr = parse_address(address) + else: + addr = parse_address(address) + + # Limit read size + length = min(length, 4096) + + mem = client.read_memory(addr, length) + + result = { + "success": True, + "address": format_address(addr, "both"), + "length": len(mem.data), + } + + if format == "hex": + result["data"] = mem.to_hex() + elif format == "ascii": + result["data"] = mem.to_ascii() + else: # dump + result["dump"] = hexdump(mem.data, addr) + + return result + + except (GDBError, ValueError) as e: + return { + "success": False, + "error": str(e), + } + + +def memory_write( + address: str, + data: str, + format: Literal["hex", "ascii"] = "hex", +) -> dict: + """Write memory to target. + + Args: + address: Memory address (segment:offset or flat hex) + data: Data to write (hex string or ASCII) + format: Input format - "hex" or "ascii" + + Returns: + Write result + + Examples: + memory_write("1234:0100", "90909090", format="hex") # NOP sled + memory_write("DS:0100", "Hello", format="ascii") + """ + try: + addr = parse_address(address) + + if format == "hex": + bytes_data = bytes.fromhex(data) + else: + bytes_data = data.encode('latin-1') + + client.write_memory(addr, bytes_data) + + return { + "success": True, + "address": format_address(addr, "both"), + "bytes_written": len(bytes_data), + } + except (GDBError, ValueError) as e: + return { + "success": False, + "error": str(e), + } + + +def disassemble(address: str | None = None, count: int = 10) -> dict: + """Disassemble instructions at address. + + Note: This is a simplified disassembler. For complex analysis, + use a dedicated tool like Ghidra. + + Args: + address: Start address (default: current CS:IP) + count: Number of bytes to read for disassembly (default: 10) + + Returns: + Raw bytes and simple instruction hints + """ + try: + if address: + addr = parse_address(address) + else: + regs = client.read_registers() + addr = regs.cs_ip + + # Read memory for disassembly + mem = client.read_memory(addr, count * 4) # Rough estimate + + # Simple x86 opcode hints (not a full disassembler) + # This is just to give Claude some context + opcodes = { + 0x90: "NOP", + 0xCC: "INT 3", + 0xCD: "INT", + 0xC3: "RET", + 0xCB: "RETF", + 0xE8: "CALL", + 0xE9: "JMP", + 0xEB: "JMP short", + 0x74: "JZ", + 0x75: "JNZ", + 0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX", + 0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI", + 0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX", + 0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI", + 0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm", + 0x89: "MOV r/m,r", 0x8B: "MOV r,r/m", + 0x01: "ADD r/m,r", 0x03: "ADD r,r/m", + 0x29: "SUB r/m,r", 0x2B: "SUB r,r/m", + 0x31: "XOR r/m,r", 0x33: "XOR r,r/m", + 0x39: "CMP r/m,r", 0x3B: "CMP r,r/m", + } + + lines = [] + for i, b in enumerate(mem.data[:count]): + hint = opcodes.get(b, f"?? ({b:02x})") + lines.append({ + "address": format_address(addr + i), + "byte": f"{b:02x}", + "hint": hint, + }) + + return { + "success": True, + "start_address": format_address(addr, "both"), + "raw_bytes": mem.data[:count].hex(), + "instructions": lines, + "note": "This is a simplified view. Use Ghidra for full disassembly.", + } + + except (GDBError, ValueError) as e: + return { + "success": False, + "error": str(e), + } + + +def stack(count: int = 16) -> dict: + """Dump stack contents. + + Args: + count: Number of words to dump (default: 16) + + Returns: + Stack contents with SS:SP and values + """ + try: + regs = client.read_registers() + sp_addr = regs.ss_sp + + # Read stack (2 bytes per word in real mode) + mem = client.read_memory(sp_addr, count * 2) + + words = [] + for i in range(0, len(mem.data), 2): + if i + 1 < len(mem.data): + word = int.from_bytes(mem.data[i:i+2], 'little') + words.append({ + "offset": f"+{i:02x}", + "address": format_address(sp_addr + i), + "value": f"{word:04x}", + }) + + return { + "success": True, + "ss_sp": f"{regs.ss:04x}:{regs.sp:04x}", + "physical_address": format_address(sp_addr, "both"), + "words": words, + } + + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def status() -> dict: + """Get current debugger and emulator status. + + Returns: + Complete status including connection state, breakpoints, etc. + """ + dosbox_status = DOSBoxStatus( + running=manager.running, + connected=client.connected, + host=client.host, + port=client.port, + pid=manager.pid, + breakpoints=client.list_breakpoints() if client.connected else [], + ) + + result = dosbox_status.to_dict() + + # Add register state if connected + if client.connected: + try: + regs = client.read_registers() + result["cs_ip"] = f"{regs.cs:04x}:{regs.ip:04x}" + result["ss_sp"] = f"{regs.ss:04x}:{regs.sp:04x}" + except GDBError: + pass + + return result diff --git a/src/dosbox_mcp/tools/peripheral.py b/src/dosbox_mcp/tools/peripheral.py new file mode 100644 index 0000000..df8c0c1 --- /dev/null +++ b/src/dosbox_mcp/tools/peripheral.py @@ -0,0 +1,37 @@ +"""Peripheral tools: screenshot, serial communication.""" + + +def screenshot(filename: str | None = None) -> dict: + """Capture DOSBox-X display. + + Args: + filename: Optional output filename + + Returns: + Screenshot info or error + """ + # Placeholder - requires X11 or DOSBox-X specific integration + return { + "success": False, + "error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.", + } + + +def serial_send(data: str, port: int = 1) -> dict: + """Send data to DOSBox-X serial port. + + This is useful for RIPscrip testing - send graphics commands + to a program listening on COM1. + + Args: + data: Data to send (text or hex with \\x prefix) + port: COM port number (1 or 2) + + Returns: + Send result + """ + # Placeholder - requires serial port configuration + return { + "success": False, + "error": "Serial port communication not yet implemented.", + }