Refactor server.py into modular tools structure

Split 684-line server.py into focused modules:
- state.py: Shared global instances (manager, client)
- tools/execution.py: launch, attach, continue, step, quit
- tools/breakpoints.py: breakpoint_set, breakpoint_list, breakpoint_delete
- tools/inspection.py: registers, memory_*, disassemble, stack, status
- tools/peripheral.py: screenshot, serial_send

server.py now 102 lines (just FastMCP setup and tool registration)
This commit is contained in:
Ryan Malloy 2026-01-27 14:25:51 -07:00
parent 170eba0843
commit 6b5057c17a
7 changed files with 672 additions and 608 deletions

View File

@ -9,14 +9,10 @@ reading memory, and tracing execution.
import logging import logging
from importlib.metadata import version from importlib.metadata import version
from typing import Literal
from fastmcp import FastMCP from fastmcp import FastMCP
from .dosbox import DOSBoxConfig, DOSBoxManager from . import tools
from .gdb_client import GDBClient, GDBError
from .types import DOSBoxStatus
from .utils import format_address, hexdump, parse_address
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@ -59,612 +55,34 @@ Address formats supported:
""" """
) )
# Global state
_manager = DOSBoxManager()
_client = GDBClient()
# ============================================================================= # =============================================================================
# P0 Tools - MVP for Bezier tracing # Register tools from modules
# ============================================================================= # =============================================================================
# Execution control
@mcp.tool() mcp.tool()(tools.launch)
def launch( mcp.tool()(tools.attach)
binary_path: str | None = None, mcp.tool()(tools.continue_execution)
gdb_port: int = 1234, mcp.tool()(tools.step)
use_docker: bool = False, mcp.tool()(tools.step_over)
cycles: str = "auto", mcp.tool()(tools.quit)
memsize: int = 16,
) -> dict: # Breakpoints
"""Launch DOSBox-X with GDB debugging enabled. mcp.tool()(tools.breakpoint_set)
mcp.tool()(tools.breakpoint_list)
Args: mcp.tool()(tools.breakpoint_delete)
binary_path: Path to DOS binary to run (optional)
gdb_port: Port for GDB stub (default: 1234) # Inspection
use_docker: Use Docker container instead of native DOSBox-X mcp.tool()(tools.registers)
cycles: CPU cycles setting (auto, max, or number) mcp.tool()(tools.memory_read)
memsize: Conventional memory in MB (default: 16) mcp.tool()(tools.memory_write)
mcp.tool()(tools.disassemble)
Returns: mcp.tool()(tools.stack)
Status dict with connection details mcp.tool()(tools.status)
Example: # Peripheral
launch("/path/to/GAME.EXE", gdb_port=1234) mcp.tool()(tools.screenshot)
""" mcp.tool()(tools.serial_send)
config = DOSBoxConfig(
gdb_port=gdb_port,
gdb_enabled=True,
cycles=cycles,
memsize=memsize,
)
try:
if use_docker:
_manager.launch_docker(binary_path=binary_path, config=config)
else:
_manager.launch_native(binary_path=binary_path, config=config)
return {
"success": True,
"message": "DOSBox-X launched successfully",
"gdb_host": "localhost",
"gdb_port": gdb_port,
"pid": _manager.pid,
"hint": f"Use attach() to connect to the debugger on port {gdb_port}",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def attach(host: str = "localhost", port: int = 1234) -> dict:
"""Connect to a running DOSBox-X GDB stub.
Args:
host: Hostname or IP (default: localhost)
port: GDB port (default: 1234)
Returns:
Connection status and initial register state
Example:
attach("localhost", 1234)
"""
try:
_client.connect(host, port)
# Get initial state
regs = _client.read_registers()
stop = _client.get_stop_reason()
return {
"success": True,
"message": f"Connected to {host}:{port}",
"stop_reason": stop.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def breakpoint_set(address: str) -> dict:
"""Set a software breakpoint at the specified address.
Args:
address: Memory address (segment:offset or flat hex)
Returns:
Breakpoint info
Examples:
breakpoint_set("1234:0100") # segment:offset
breakpoint_set("0x12340") # flat address
"""
try:
addr = parse_address(address)
bp = _client.set_breakpoint(addr)
return {
"success": True,
"breakpoint_id": bp.id,
"address": format_address(bp.address, "both"),
"original": address,
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def breakpoint_list() -> dict:
"""List all active breakpoints.
Returns:
List of breakpoint info
"""
bps = _client.list_breakpoints()
return {
"count": len(bps),
"breakpoints": [bp.to_dict() for bp in bps],
}
@mcp.tool()
def breakpoint_delete(id: int | None = None, all: bool = False) -> dict:
"""Delete breakpoint(s).
Args:
id: Specific breakpoint ID to delete
all: If True, delete all breakpoints
Returns:
Deletion result
"""
try:
if all:
count = _client.delete_all_breakpoints()
return {
"success": True,
"deleted": count,
}
elif id is not None:
_client.delete_breakpoint(id)
return {
"success": True,
"deleted_id": id,
}
else:
return {
"success": False,
"error": "Specify either 'id' or 'all=True'",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def continue_execution(timeout: float | None = None) -> dict:
"""Continue execution until breakpoint or signal.
Args:
timeout: Optional timeout in seconds
Returns:
Stop event info (reason, address, breakpoint hit)
"""
try:
event = _client.continue_execution(timeout=timeout)
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"address": format_address(event.address, "both"),
"breakpoint_id": event.breakpoint_id,
"signal": event.signal,
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def step(count: int = 1) -> dict:
"""Step one or more instructions.
Args:
count: Number of instructions to step (default: 1)
Returns:
New register state after stepping
"""
try:
event = _client.step(count)
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
"stepped": count,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def step_over() -> dict:
"""Step over a CALL instruction (execute subroutine and stop after return).
Returns:
New register state after step-over
"""
try:
event = _client.step_over()
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def registers() -> dict:
"""Read all CPU registers.
Returns:
Complete register state including:
- 32-bit registers (EAX, EBX, etc.)
- 16-bit aliases (AX, BX, etc.)
- Segment registers (CS, DS, ES, SS, FS, GS)
- Instruction pointer (CS:IP)
- Stack pointer (SS:SP)
- Flags
"""
try:
regs = _client.read_registers()
return {
"success": True,
**regs.to_dict(),
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def memory_read(
address: str,
length: int = 16,
format: Literal["hex", "ascii", "dump"] = "dump",
) -> dict:
"""Read memory from target.
Args:
address: Memory address (segment:offset or flat hex)
length: Number of bytes to read (default: 16, max: 4096)
format: Output format - "hex", "ascii", or "dump" (default)
Returns:
Memory contents in requested format
Examples:
memory_read("DS:0100", 64)
memory_read("0x12340", 256, format="hex")
"""
try:
# Handle register-based addresses like "DS:SI"
addr_str = address.upper()
if ':' in addr_str:
seg_part, off_part = addr_str.split(':')
# Check if parts are register names
regs = None
seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'}
off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'}
if seg_part in seg_regs or off_part in off_regs:
regs = _client.read_registers()
seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16)
off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16)
addr = (seg_val << 4) + off_val
else:
addr = parse_address(address)
else:
addr = parse_address(address)
# Limit read size
length = min(length, 4096)
mem = _client.read_memory(addr, length)
result = {
"success": True,
"address": format_address(addr, "both"),
"length": len(mem.data),
}
if format == "hex":
result["data"] = mem.to_hex()
elif format == "ascii":
result["data"] = mem.to_ascii()
else: # dump
result["dump"] = hexdump(mem.data, addr)
return result
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def memory_write(
address: str,
data: str,
format: Literal["hex", "ascii"] = "hex",
) -> dict:
"""Write memory to target.
Args:
address: Memory address (segment:offset or flat hex)
data: Data to write (hex string or ASCII)
format: Input format - "hex" or "ascii"
Returns:
Write result
Examples:
memory_write("1234:0100", "90909090", format="hex") # NOP sled
memory_write("DS:0100", "Hello", format="ascii")
"""
try:
addr = parse_address(address)
if format == "hex":
bytes_data = bytes.fromhex(data)
else:
bytes_data = data.encode('latin-1')
_client.write_memory(addr, bytes_data)
return {
"success": True,
"address": format_address(addr, "both"),
"bytes_written": len(bytes_data),
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def disassemble(address: str | None = None, count: int = 10) -> dict:
"""Disassemble instructions at address.
Note: This is a simplified disassembler. For complex analysis,
use a dedicated tool like Ghidra.
Args:
address: Start address (default: current CS:IP)
count: Number of bytes to read for disassembly (default: 10)
Returns:
Raw bytes and simple instruction hints
"""
try:
if address:
addr = parse_address(address)
else:
regs = _client.read_registers()
addr = regs.cs_ip
# Read memory for disassembly
mem = _client.read_memory(addr, count * 4) # Rough estimate
# Simple x86 opcode hints (not a full disassembler)
# This is just to give Claude some context
opcodes = {
0x90: "NOP",
0xCC: "INT 3",
0xCD: "INT",
0xC3: "RET",
0xCB: "RETF",
0xE8: "CALL",
0xE9: "JMP",
0xEB: "JMP short",
0x74: "JZ",
0x75: "JNZ",
0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX",
0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI",
0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX",
0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI",
0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm",
0x89: "MOV r/m,r", 0x8B: "MOV r,r/m",
0x01: "ADD r/m,r", 0x03: "ADD r,r/m",
0x29: "SUB r/m,r", 0x2B: "SUB r,r/m",
0x31: "XOR r/m,r", 0x33: "XOR r,r/m",
0x39: "CMP r/m,r", 0x3B: "CMP r,r/m",
}
lines = []
offset = 0
for i, b in enumerate(mem.data[:count]):
hint = opcodes.get(b, f"?? ({b:02x})")
lines.append({
"address": format_address(addr + i),
"byte": f"{b:02x}",
"hint": hint,
})
return {
"success": True,
"start_address": format_address(addr, "both"),
"raw_bytes": mem.data[:count].hex(),
"instructions": lines,
"note": "This is a simplified view. Use Ghidra for full disassembly.",
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def stack(count: int = 16) -> dict:
"""Dump stack contents.
Args:
count: Number of words to dump (default: 16)
Returns:
Stack contents with SS:SP and values
"""
try:
regs = _client.read_registers()
sp_addr = regs.ss_sp
# Read stack (2 bytes per word in real mode)
mem = _client.read_memory(sp_addr, count * 2)
words = []
for i in range(0, len(mem.data), 2):
if i + 1 < len(mem.data):
word = int.from_bytes(mem.data[i:i+2], 'little')
words.append({
"offset": f"+{i:02x}",
"address": format_address(sp_addr + i),
"value": f"{word:04x}",
})
return {
"success": True,
"ss_sp": f"{regs.ss:04x}:{regs.sp:04x}",
"physical_address": format_address(sp_addr, "both"),
"words": words,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def status() -> dict:
"""Get current debugger and emulator status.
Returns:
Complete status including connection state, breakpoints, etc.
"""
status = DOSBoxStatus(
running=_manager.running,
connected=_client.connected,
host=_client.host,
port=_client.port,
pid=_manager.pid,
breakpoints=_client.list_breakpoints() if _client.connected else [],
)
result = status.to_dict()
# Add register state if connected
if _client.connected:
try:
regs = _client.read_registers()
result["cs_ip"] = f"{regs.cs:04x}:{regs.ip:04x}"
result["ss_sp"] = f"{regs.ss:04x}:{regs.sp:04x}"
except GDBError:
pass
return result
@mcp.tool()
def quit() -> dict:
"""Stop DOSBox-X and clean up.
Returns:
Shutdown status
"""
try:
if _client.connected:
try:
_client.kill()
except GDBError:
_client.disconnect()
if _manager.running:
_manager.stop()
return {
"success": True,
"message": "DOSBox-X stopped and cleaned up",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
# =============================================================================
# P2 Tools - Nice to have
# =============================================================================
@mcp.tool()
def screenshot(filename: str | None = None) -> dict:
"""Capture DOSBox-X display.
Args:
filename: Optional output filename
Returns:
Screenshot info or error
"""
# Placeholder - requires X11 or DOSBox-X specific integration
return {
"success": False,
"error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.",
}
@mcp.tool()
def serial_send(data: str, port: int = 1) -> dict:
"""Send data to DOSBox-X serial port.
This is useful for RIPscrip testing - send graphics commands
to a program listening on COM1.
Args:
data: Data to send (text or hex with \\x prefix)
port: COM port number (1 or 2)
Returns:
Send result
"""
# Placeholder - requires serial port configuration
return {
"success": False,
"error": "Serial port communication not yet implemented.",
}
# ============================================================================= # =============================================================================

14
src/dosbox_mcp/state.py Normal file
View File

@ -0,0 +1,14 @@
"""Shared global state for DOSBox-X MCP Server.
This module holds the singleton instances that are shared across all tools.
Centralizing state here avoids circular imports when tools are split into modules.
"""
from .dosbox import DOSBoxManager
from .gdb_client import GDBClient
# Global DOSBox-X process/container manager
manager = DOSBoxManager()
# Global GDB client connection
client = GDBClient()

View File

@ -0,0 +1,37 @@
"""MCP tools for DOSBox-X debugging.
Tools are organized by function:
- execution: launch, attach, continue, step, quit
- breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete
- inspection: registers, memory_read, memory_write, disassemble, stack, status
- peripheral: screenshot, serial_send
"""
from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set
from .execution import attach, continue_execution, launch, quit, step, step_over
from .inspection import disassemble, memory_read, memory_write, registers, stack, status
from .peripheral import screenshot, serial_send
__all__ = [
# Execution
"launch",
"attach",
"continue_execution",
"step",
"step_over",
"quit",
# Breakpoints
"breakpoint_set",
"breakpoint_list",
"breakpoint_delete",
# Inspection
"registers",
"memory_read",
"memory_write",
"disassemble",
"stack",
"status",
# Peripheral
"screenshot",
"serial_send",
]

View File

@ -0,0 +1,82 @@
"""Breakpoint management tools."""
from ..gdb_client import GDBError
from ..state import client
from ..utils import format_address, parse_address
def breakpoint_set(address: str) -> dict:
"""Set a software breakpoint at the specified address.
Args:
address: Memory address (segment:offset or flat hex)
Returns:
Breakpoint info
Examples:
breakpoint_set("1234:0100") # segment:offset
breakpoint_set("0x12340") # flat address
"""
try:
addr = parse_address(address)
bp = client.set_breakpoint(addr)
return {
"success": True,
"breakpoint_id": bp.id,
"address": format_address(bp.address, "both"),
"original": address,
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
def breakpoint_list() -> dict:
"""List all active breakpoints.
Returns:
List of breakpoint info
"""
bps = client.list_breakpoints()
return {
"count": len(bps),
"breakpoints": [bp.to_dict() for bp in bps],
}
def breakpoint_delete(id: int | None = None, all: bool = False) -> dict:
"""Delete breakpoint(s).
Args:
id: Specific breakpoint ID to delete
all: If True, delete all breakpoints
Returns:
Deletion result
"""
try:
if all:
count = client.delete_all_breakpoints()
return {
"success": True,
"deleted": count,
}
elif id is not None:
client.delete_breakpoint(id)
return {
"success": True,
"deleted_id": id,
}
else:
return {
"success": False,
"error": "Specify either 'id' or 'all=True'",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}

View File

@ -0,0 +1,195 @@
"""Execution control tools: launch, attach, continue, step, quit."""
from ..dosbox import DOSBoxConfig
from ..gdb_client import GDBError
from ..state import client, manager
from ..utils import format_address
def launch(
binary_path: str | None = None,
gdb_port: int = 1234,
use_docker: bool = False,
cycles: str = "auto",
memsize: int = 16,
) -> dict:
"""Launch DOSBox-X with GDB debugging enabled.
Args:
binary_path: Path to DOS binary to run (optional)
gdb_port: Port for GDB stub (default: 1234)
use_docker: Use Docker container instead of native DOSBox-X
cycles: CPU cycles setting (auto, max, or number)
memsize: Conventional memory in MB (default: 16)
Returns:
Status dict with connection details
Example:
launch("/path/to/GAME.EXE", gdb_port=1234)
"""
config = DOSBoxConfig(
gdb_port=gdb_port,
gdb_enabled=True,
cycles=cycles,
memsize=memsize,
)
try:
if use_docker:
manager.launch_docker(binary_path=binary_path, config=config)
else:
manager.launch_native(binary_path=binary_path, config=config)
return {
"success": True,
"message": "DOSBox-X launched successfully",
"gdb_host": "localhost",
"gdb_port": gdb_port,
"pid": manager.pid,
"hint": f"Use attach() to connect to the debugger on port {gdb_port}",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
def attach(host: str = "localhost", port: int = 1234) -> dict:
"""Connect to a running DOSBox-X GDB stub.
Args:
host: Hostname or IP (default: localhost)
port: GDB port (default: 1234)
Returns:
Connection status and initial register state
Example:
attach("localhost", 1234)
"""
try:
client.connect(host, port)
# Get initial state
regs = client.read_registers()
stop = client.get_stop_reason()
return {
"success": True,
"message": f"Connected to {host}:{port}",
"stop_reason": stop.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def continue_execution(timeout: float | None = None) -> dict:
"""Continue execution until breakpoint or signal.
Args:
timeout: Optional timeout in seconds
Returns:
Stop event info (reason, address, breakpoint hit)
"""
try:
event = client.continue_execution(timeout=timeout)
regs = client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"address": format_address(event.address, "both"),
"breakpoint_id": event.breakpoint_id,
"signal": event.signal,
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def step(count: int = 1) -> dict:
"""Step one or more instructions.
Args:
count: Number of instructions to step (default: 1)
Returns:
New register state after stepping
"""
try:
event = client.step(count)
regs = client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
"stepped": count,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def step_over() -> dict:
"""Step over a CALL instruction (execute subroutine and stop after return).
Returns:
New register state after step-over
"""
try:
event = client.step_over()
regs = client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def quit() -> dict:
"""Stop DOSBox-X and clean up.
Returns:
Shutdown status
"""
try:
if client.connected:
try:
client.kill()
except GDBError:
client.disconnect()
if manager.running:
manager.stop()
return {
"success": True,
"message": "DOSBox-X stopped and cleaned up",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}

View File

@ -0,0 +1,281 @@
"""Inspection tools: registers, memory, disassemble, stack, status."""
from typing import Literal
from ..gdb_client import GDBError
from ..state import client, manager
from ..types import DOSBoxStatus
from ..utils import format_address, hexdump, parse_address
def registers() -> dict:
"""Read all CPU registers.
Returns:
Complete register state including:
- 32-bit registers (EAX, EBX, etc.)
- 16-bit aliases (AX, BX, etc.)
- Segment registers (CS, DS, ES, SS, FS, GS)
- Instruction pointer (CS:IP)
- Stack pointer (SS:SP)
- Flags
"""
try:
regs = client.read_registers()
return {
"success": True,
**regs.to_dict(),
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def memory_read(
address: str,
length: int = 16,
format: Literal["hex", "ascii", "dump"] = "dump",
) -> dict:
"""Read memory from target.
Args:
address: Memory address (segment:offset or flat hex)
length: Number of bytes to read (default: 16, max: 4096)
format: Output format - "hex", "ascii", or "dump" (default)
Returns:
Memory contents in requested format
Examples:
memory_read("DS:0100", 64)
memory_read("0x12340", 256, format="hex")
"""
try:
# Handle register-based addresses like "DS:SI"
addr_str = address.upper()
if ':' in addr_str:
seg_part, off_part = addr_str.split(':')
# Check if parts are register names
regs = None
seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'}
off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'}
if seg_part in seg_regs or off_part in off_regs:
regs = client.read_registers()
seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16)
off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16)
addr = (seg_val << 4) + off_val
else:
addr = parse_address(address)
else:
addr = parse_address(address)
# Limit read size
length = min(length, 4096)
mem = client.read_memory(addr, length)
result = {
"success": True,
"address": format_address(addr, "both"),
"length": len(mem.data),
}
if format == "hex":
result["data"] = mem.to_hex()
elif format == "ascii":
result["data"] = mem.to_ascii()
else: # dump
result["dump"] = hexdump(mem.data, addr)
return result
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
def memory_write(
address: str,
data: str,
format: Literal["hex", "ascii"] = "hex",
) -> dict:
"""Write memory to target.
Args:
address: Memory address (segment:offset or flat hex)
data: Data to write (hex string or ASCII)
format: Input format - "hex" or "ascii"
Returns:
Write result
Examples:
memory_write("1234:0100", "90909090", format="hex") # NOP sled
memory_write("DS:0100", "Hello", format="ascii")
"""
try:
addr = parse_address(address)
if format == "hex":
bytes_data = bytes.fromhex(data)
else:
bytes_data = data.encode('latin-1')
client.write_memory(addr, bytes_data)
return {
"success": True,
"address": format_address(addr, "both"),
"bytes_written": len(bytes_data),
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
def disassemble(address: str | None = None, count: int = 10) -> dict:
"""Disassemble instructions at address.
Note: This is a simplified disassembler. For complex analysis,
use a dedicated tool like Ghidra.
Args:
address: Start address (default: current CS:IP)
count: Number of bytes to read for disassembly (default: 10)
Returns:
Raw bytes and simple instruction hints
"""
try:
if address:
addr = parse_address(address)
else:
regs = client.read_registers()
addr = regs.cs_ip
# Read memory for disassembly
mem = client.read_memory(addr, count * 4) # Rough estimate
# Simple x86 opcode hints (not a full disassembler)
# This is just to give Claude some context
opcodes = {
0x90: "NOP",
0xCC: "INT 3",
0xCD: "INT",
0xC3: "RET",
0xCB: "RETF",
0xE8: "CALL",
0xE9: "JMP",
0xEB: "JMP short",
0x74: "JZ",
0x75: "JNZ",
0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX",
0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI",
0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX",
0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI",
0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm",
0x89: "MOV r/m,r", 0x8B: "MOV r,r/m",
0x01: "ADD r/m,r", 0x03: "ADD r,r/m",
0x29: "SUB r/m,r", 0x2B: "SUB r,r/m",
0x31: "XOR r/m,r", 0x33: "XOR r,r/m",
0x39: "CMP r/m,r", 0x3B: "CMP r,r/m",
}
lines = []
for i, b in enumerate(mem.data[:count]):
hint = opcodes.get(b, f"?? ({b:02x})")
lines.append({
"address": format_address(addr + i),
"byte": f"{b:02x}",
"hint": hint,
})
return {
"success": True,
"start_address": format_address(addr, "both"),
"raw_bytes": mem.data[:count].hex(),
"instructions": lines,
"note": "This is a simplified view. Use Ghidra for full disassembly.",
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
def stack(count: int = 16) -> dict:
"""Dump stack contents.
Args:
count: Number of words to dump (default: 16)
Returns:
Stack contents with SS:SP and values
"""
try:
regs = client.read_registers()
sp_addr = regs.ss_sp
# Read stack (2 bytes per word in real mode)
mem = client.read_memory(sp_addr, count * 2)
words = []
for i in range(0, len(mem.data), 2):
if i + 1 < len(mem.data):
word = int.from_bytes(mem.data[i:i+2], 'little')
words.append({
"offset": f"+{i:02x}",
"address": format_address(sp_addr + i),
"value": f"{word:04x}",
})
return {
"success": True,
"ss_sp": f"{regs.ss:04x}:{regs.sp:04x}",
"physical_address": format_address(sp_addr, "both"),
"words": words,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def status() -> dict:
"""Get current debugger and emulator status.
Returns:
Complete status including connection state, breakpoints, etc.
"""
dosbox_status = DOSBoxStatus(
running=manager.running,
connected=client.connected,
host=client.host,
port=client.port,
pid=manager.pid,
breakpoints=client.list_breakpoints() if client.connected else [],
)
result = dosbox_status.to_dict()
# Add register state if connected
if client.connected:
try:
regs = client.read_registers()
result["cs_ip"] = f"{regs.cs:04x}:{regs.ip:04x}"
result["ss_sp"] = f"{regs.ss:04x}:{regs.sp:04x}"
except GDBError:
pass
return result

View File

@ -0,0 +1,37 @@
"""Peripheral tools: screenshot, serial communication."""
def screenshot(filename: str | None = None) -> dict:
"""Capture DOSBox-X display.
Args:
filename: Optional output filename
Returns:
Screenshot info or error
"""
# Placeholder - requires X11 or DOSBox-X specific integration
return {
"success": False,
"error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.",
}
def serial_send(data: str, port: int = 1) -> dict:
"""Send data to DOSBox-X serial port.
This is useful for RIPscrip testing - send graphics commands
to a program listening on COM1.
Args:
data: Data to send (text or hex with \\x prefix)
port: COM port number (1 or 2)
Returns:
Send result
"""
# Placeholder - requires serial port configuration
return {
"success": False,
"error": "Serial port communication not yet implemented.",
}