Initial implementation of DOSBox-X MCP Server

MCP server for AI-assisted debugging of DOS binaries via GDB protocol.

Features:
- GDB remote protocol client for DOSBox-X debugging
- 16 debugging tools: launch, attach, breakpoint management,
  registers, memory read/write, disassemble, step, continue, etc.
- Docker container with DOSBox-X for consistent environment
- Support for DOS segment:offset addressing
- Comprehensive test suite (49 tests)

Primary use case: Reverse engineering the unpublished Bezier algorithm
in RIPTERM.EXE for the RIPscrip graphics protocol project.
This commit is contained in:
Ryan Malloy 2026-01-27 13:07:51 -07:00
commit 170eba0843
17 changed files with 5393 additions and 0 deletions

72
.gitignore vendored Normal file
View File

@ -0,0 +1,72 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
# uv
.uv/
# Testing
.pytest_cache/
.coverage
htmlcov/
# Linting
.ruff_cache/
.mypy_cache/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Local config
.env.local
.env.*.local
# DOS files (user-specific)
dos/
config/dosbox.conf
# Docker
.docker/
# Temp files
*.tmp
*.temp
*.log
# Screenshots and artifacts
screenshots/
*.png
*.bmp
# macOS
.DS_Store
# Ignore compiled DOSBox-X builds
dosbox-x-build/

120
Dockerfile Normal file
View File

@ -0,0 +1,120 @@
# DOSBox-X with GDB stub support
# Multi-stage build for minimal final image
# =============================================================================
# Stage 1: Build DOSBox-X with GDB support
# =============================================================================
FROM debian:bookworm-slim AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
build-essential \
automake \
autoconf \
libtool \
pkg-config \
libsdl2-dev \
libsdl2-net-dev \
libsdl2-image-dev \
libpng-dev \
libpcap-dev \
libslirp-dev \
libfluidsynth-dev \
libavcodec-dev \
libavformat-dev \
libavutil-dev \
libswscale-dev \
nasm \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Clone DOSBox-X (main repo - check for GDB support)
# Note: If hezi/dosbox-x-gdb is stale, main DOSBox-X may have debugger support
WORKDIR /build
RUN git clone --depth 1 https://github.com/joncampbell123/dosbox-x.git
WORKDIR /build/dosbox-x
# Configure and build
# DOSBox-X has built-in debugger that can be enabled
RUN ./autogen.sh && \
./configure \
--prefix=/opt/dosbox-x \
--enable-debug \
--enable-sdl2 \
--disable-printer \
&& make -j$(nproc) \
&& make install
# =============================================================================
# Stage 2: Runtime image
# =============================================================================
FROM debian:bookworm-slim
# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
libsdl2-2.0-0 \
libsdl2-net-2.0-0 \
libsdl2-image-2.0-0 \
libpng16-16 \
libpcap0.8 \
libslirp0 \
libfluidsynth3 \
libavcodec59 \
libavformat59 \
libavutil57 \
libswscale6 \
&& rm -rf /var/lib/apt/lists/*
# Copy DOSBox-X from builder
COPY --from=builder /opt/dosbox-x /opt/dosbox-x
# Create symlink in PATH
RUN ln -s /opt/dosbox-x/bin/dosbox-x /usr/local/bin/dosbox-x
# Create directories for config and DOS files
RUN mkdir -p /config /dos
# Default configuration with GDB stub enabled
RUN cat > /config/dosbox.conf << 'EOF'
[sdl]
fullscreen=false
windowresolution=800x600
output=opengl
[cpu]
core=auto
cputype=auto
cycles=auto
[dosbox]
memsize=16
[debugger]
# Enable GDB server stub
gdbserver=true
gdbport=1234
[serial]
serial1=disabled
serial2=disabled
[autoexec]
# Mount /dos as C:
MOUNT C /dos
C:
EOF
# Expose GDB port
EXPOSE 1234
# Set working directory
WORKDIR /dos
# Environment for X11 forwarding
ENV DISPLAY=:0
# Entry point
ENTRYPOINT ["dosbox-x", "-conf", "/config/dosbox.conf"]
CMD []

109
Makefile Normal file
View File

@ -0,0 +1,109 @@
# DOSBox-X MCP Makefile
# Convenient commands for development
.PHONY: all build up down logs shell test lint format clean help
# Default target
all: help
# Build the Docker image
build:
docker compose build
# Start DOSBox-X container
up:
@echo "Starting DOSBox-X..."
@echo "Note: Run 'xhost +local:docker' first for X11 display"
docker compose up -d
@echo "DOSBox-X started. GDB port: 1234"
@echo "Connect with: claude mcp add dosbox-mcp"
# Start headless (no GUI)
up-headless:
docker compose --profile headless up -d dosbox-headless
@echo "DOSBox-X headless started. GDB port: 1235"
# Stop container
down:
docker compose down
# View logs
logs:
docker compose logs -f
# Shell into container
shell:
docker compose exec dosbox /bin/bash
# Run tests
test:
uv run pytest tests/ -v
# Run specific test
test-%:
uv run pytest tests/ -v -k "$*"
# Lint code
lint:
uv run ruff check src/
# Format code
format:
uv run ruff format src/ tests/
# Clean up
clean:
docker compose down -v --rmi local
rm -rf __pycache__ .pytest_cache .ruff_cache
find . -type d -name '__pycache__' -exec rm -rf {} + 2>/dev/null || true
# Install dependencies (development)
install:
uv sync --dev
# Register MCP server with Claude Code
register:
claude mcp add dosbox-mcp -- uv run --directory $(shell pwd) dosbox-mcp
# Unregister MCP server
unregister:
claude mcp remove dosbox-mcp
# Create DOS directory structure
init:
mkdir -p dos config
@echo "Created dos/ and config/ directories"
@echo "Place DOS binaries in dos/ directory"
# Quick test - launch and attach
quicktest: up
@sleep 3
@echo "Testing GDB connection..."
@nc -zv localhost 1234 && echo "GDB stub is listening!" || echo "GDB stub not responding"
# Help
help:
@echo "DOSBox-X MCP Server"
@echo ""
@echo "Docker commands:"
@echo " make build Build Docker image"
@echo " make up Start DOSBox-X (GUI mode)"
@echo " make up-headless Start DOSBox-X (headless mode)"
@echo " make down Stop container"
@echo " make logs View container logs"
@echo " make shell Shell into container"
@echo ""
@echo "Development commands:"
@echo " make install Install dependencies"
@echo " make test Run tests"
@echo " make lint Lint code"
@echo " make format Format code"
@echo " make clean Clean up"
@echo ""
@echo "MCP commands:"
@echo " make register Register with Claude Code"
@echo " make unregister Unregister from Claude Code"
@echo ""
@echo "Setup:"
@echo " make init Create DOS directory structure"
@echo " make quicktest Test GDB connection"

203
README.md Normal file
View File

@ -0,0 +1,203 @@
# DOSBox-X MCP Server
AI-assisted debugging of DOS binaries via the Model Context Protocol (MCP).
This MCP server enables Claude to programmatically debug DOS programs running in DOSBox-X by providing tools for:
- Setting breakpoints
- Reading/writing registers and memory
- Stepping through code
- Tracing execution
## Primary Use Case
**Reverse engineering classic DOS programs** - specifically, tracing the unpublished Bezier curve algorithm in RIPTERM.EXE for the RIPscrip graphics protocol research project.
## Quick Start
### Prerequisites
- Python 3.11+
- [uv](https://github.com/astral-sh/uv) package manager
- Docker (for DOSBox-X container)
- X11 display (for DOSBox GUI)
### Installation
```bash
# Clone the repository
git clone https://github.com/yourusername/dosbox-mcp.git
cd dosbox-mcp
# Install dependencies
uv sync
# Build Docker image
make build
# Create DOS directory
make init
```
### Running
```bash
# Allow X11 access for Docker
xhost +local:docker
# Start DOSBox-X
make up
# Register MCP server with Claude Code
make register
```
### Usage with Claude
Once registered, Claude can use these tools:
```
# Launch DOSBox-X with a binary
launch("/path/to/GAME.EXE")
# Connect to debugger
attach("localhost", 1234)
# Set a breakpoint
breakpoint_set("1234:0100")
# Run until breakpoint
continue_execution()
# Read registers
registers()
# Read memory
memory_read("DS:0100", 64)
# Step through code
step(10)
# Clean up
quit()
```
## Tools Reference
### Execution Control
| Tool | Description |
|------|-------------|
| `launch` | Start DOSBox-X with optional binary |
| `attach` | Connect to GDB stub |
| `continue_execution` | Run until breakpoint |
| `step` | Step N instructions |
| `step_over` | Step over CALL instructions |
| `quit` | Stop DOSBox-X |
### Breakpoints
| Tool | Description |
|------|-------------|
| `breakpoint_set` | Set breakpoint at address |
| `breakpoint_list` | List all breakpoints |
| `breakpoint_delete` | Remove breakpoint(s) |
### Inspection
| Tool | Description |
|------|-------------|
| `registers` | Read all CPU registers |
| `memory_read` | Read memory region |
| `memory_write` | Write to memory |
| `disassemble` | Simple disassembly view |
| `stack` | Dump stack contents |
| `status` | Get debugger status |
### Address Formats
The server supports multiple address formats:
- **Segment:offset**: `1234:5678` (standard DOS format)
- **Flat hex**: `0x12345` or `12345h`
- **Decimal**: `#65536`
- **Register-based**: `DS:SI`, `CS:IP`
## Architecture
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Claude Code │────▶│ DOSBox-X MCP │────▶│ DOSBox-X │
│ │ MCP │ Server │ GDB │ (GDB stub) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
┌──────────────────┐
│ GDB Remote │
│ Protocol │
│ (TCP :1234) │
└──────────────────┘
```
## Development
```bash
# Run tests
make test
# Lint code
make lint
# Format code
make format
# View logs
make logs
```
## Project Structure
```
dosbox-mcp/
├── src/dosbox_mcp/
│ ├── server.py # FastMCP server (tools)
│ ├── gdb_client.py # GDB protocol client
│ ├── dosbox.py # DOSBox-X management
│ ├── types.py # Type definitions
│ └── utils.py # Utilities
├── tests/
├── examples/
├── Dockerfile # DOSBox-X with GDB
└── docker-compose.yml
```
## Technical Details
### GDB Remote Protocol
This server implements a client for the [GDB Remote Serial Protocol](https://sourceware.org/gdb/current/onlinedocs/gdb.html/Remote-Protocol.html), which provides:
- Register read/write (`g`, `G`, `p`, `P`)
- Memory read/write (`m`, `M`)
- Software breakpoints (`Z0`, `z0`)
- Execution control (`c`, `s`, `?`)
### Real Mode Addressing
DOS uses real mode with segment:offset addressing:
```
Physical Address = (Segment << 4) + Offset
```
This gives a 20-bit address space (1MB), though only 640KB is conventional memory.
## Related Projects
- [RIPscrip Research](../rpmesh/) - Parent project for RIPscrip graphics protocol
- [dosbox-x-gdb](https://github.com/hezi/dosbox-x-gdb) - DOSBox-X fork with GDB support
- [FastMCP](https://gofastmcp.com/) - MCP server framework
## License
MIT License

95
docker-compose.yml Normal file
View File

@ -0,0 +1,95 @@
# DOSBox-X MCP Docker Compose
#
# Usage:
# docker compose up -d # Start DOSBox-X
# docker compose logs -f # View logs
# docker compose down # Stop
#
# For GUI (X11 forwarding):
# xhost +local:docker # Allow Docker X11 access
# docker compose up -d
services:
dosbox:
build:
context: .
dockerfile: Dockerfile
container_name: dosbox-mcp
# Ports
ports:
- "${GDB_PORT:-1234}:1234" # GDB stub
- "${SERIAL_PORT:-5555}:5555" # Serial (optional)
# X11 forwarding for display
environment:
- DISPLAY=${DISPLAY:-:0}
volumes:
- /tmp/.X11-unix:/tmp/.X11-unix:rw
- ${XDG_RUNTIME_DIR:-/run/user/1000}:${XDG_RUNTIME_DIR:-/run/user/1000}:rw
# DOS files directory
- ${DOS_DIR:-./dos}:/dos:rw
# Custom config (optional)
- ${CONFIG_FILE:-./config/dosbox.conf}:/config/dosbox.conf:ro
# Audio (PulseAudio)
devices:
- /dev/snd:/dev/snd
# For PulseAudio sound
# Uncomment if you want sound support:
# - ${XDG_RUNTIME_DIR}/pulse:/run/user/1000/pulse:rw
# Run options
stdin_open: true
tty: true
# Resource limits
deploy:
resources:
limits:
cpus: '2'
memory: 1G
# Healthcheck - verify GDB port is listening
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 1234 || exit 1"]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
# Restart policy
restart: unless-stopped
# Optional: Separate container for headless operation
dosbox-headless:
build:
context: .
dockerfile: Dockerfile
container_name: dosbox-mcp-headless
profiles:
- headless
ports:
- "${GDB_PORT_HEADLESS:-1235}:1234"
environment:
# Use virtual framebuffer for headless
- SDL_VIDEODRIVER=dummy
- SDL_AUDIODRIVER=dummy
volumes:
- ${DOS_DIR:-./dos}:/dos:rw
- ${CONFIG_FILE:-./config/dosbox.conf}:/config/dosbox.conf:ro
stdin_open: true
tty: true
deploy:
resources:
limits:
cpus: '1'
memory: 512M

156
examples/ripterm_bezier.py Normal file
View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""Example: Tracing the Bezier algorithm in RIPTERM.EXE
This script demonstrates how to use the DOSBox-X MCP server to trace
the unpublished Bezier curve algorithm in RIPTERM.EXE.
The goal is to:
1. Launch DOSBox-X with RIPTERM
2. Set breakpoints at suspected Bezier drawing code
3. Feed a test RIPscrip file with Bezier commands
4. Capture register/memory state at each point
5. Reconstruct the algorithm from the captured data
Prerequisites:
- RIPTERM.EXE in the ./dos directory
- A test RIP file with Bezier commands
- DOSBox-X MCP server running
Usage:
python examples/ripterm_bezier.py
Note: This is a conceptual example. The actual addresses would need to be
determined through static analysis (e.g., in Ghidra) first.
"""
import json
import time
from pathlib import Path
def trace_bezier():
"""Trace the Bezier algorithm execution."""
# These would be determined from Ghidra analysis
# Hypothetical addresses for RIPTERM's Bezier code
BEZIER_ENTRY = "1234:0100" # Where Bezier processing starts
DRAW_POINT = "1234:0200" # Where individual points are drawn
CALCULATE = "1234:0300" # The core calculation routine
print("=" * 60)
print("RIPTERM Bezier Algorithm Tracer")
print("=" * 60)
print()
# This would use the MCP tools via Claude or direct API
# For demonstration, we'll show the intended flow:
print("Step 1: Launch DOSBox-X with RIPTERM")
print(" launch('/path/to/dos/RIPTERM.EXE')")
print()
print("Step 2: Attach to GDB stub")
print(" attach('localhost', 1234)")
print()
print("Step 3: Set breakpoints at key addresses")
print(f" breakpoint_set('{BEZIER_ENTRY}') # Bezier entry")
print(f" breakpoint_set('{DRAW_POINT}') # Draw point")
print(f" breakpoint_set('{CALCULATE}') # Calculation")
print()
print("Step 4: Continue execution until breakpoint")
print(" continue_execution()")
print()
print("Step 5: When breakpoint hit, capture state:")
print("""
# Read registers
regs = registers()
print(f"AX={regs['ax']} BX={regs['bx']} CX={regs['cx']} DX={regs['dx']}")
# Read stack (parameters often passed on stack)
stack_data = stack(16)
# Read data segment (for global variables)
mem = memory_read("DS:0000", 256)
# Step through to see calculation
for i in range(100):
step()
regs = registers()
# Log coordinate values
print(f"Step {i}: X={regs['cx']} Y={regs['dx']}")
""")
print("Step 6: Analyze captured data to reconstruct algorithm")
print()
# Example of what the captured data might look like
example_trace = [
{"step": 0, "x": 100, "y": 50, "note": "Control point 1"},
{"step": 10, "x": 112, "y": 58, "note": "Interpolated"},
{"step": 20, "x": 125, "y": 65, "note": "Interpolated"},
{"step": 30, "x": 138, "y": 71, "note": "Interpolated"},
{"step": 40, "x": 150, "y": 75, "note": "Control point 2 region"},
{"step": 50, "x": 162, "y": 71, "note": "Curving back"},
{"step": 60, "x": 175, "y": 65, "note": "Interpolated"},
{"step": 70, "x": 188, "y": 58, "note": "Interpolated"},
{"step": 80, "x": 200, "y": 50, "note": "End point"},
]
print("Example trace output:")
print("-" * 40)
for point in example_trace:
print(f" Step {point['step']:3d}: ({point['x']:3d}, {point['y']:3d}) - {point['note']}")
print()
print("From this data, we could determine:")
print(" - Whether it uses De Casteljau's algorithm")
print(" - The number of subdivisions")
print(" - Fixed-point vs floating-point math")
print(" - Any optimizations or approximations")
def create_test_rip():
"""Create a simple RIP file to test Bezier drawing."""
# RIPscrip Level 0 Bezier command
# The format is: !|z<x1><y1><x2><y2><x3><y3><x4><y4>
# Using MegaNum encoding
test_rip = """!|
!|E
!|c0F
!|z00320064009600C800C8006400640032
"""
# This draws a Bezier curve with:
# Start: (50, 100)
# Control 1: (150, 200)
# Control 2: (200, 100)
# End: (100, 50)
test_file = Path("dos/test-bezier.RIP")
test_file.parent.mkdir(exist_ok=True)
test_file.write_text(test_rip)
print(f"Created test file: {test_file}")
return test_file
def main():
"""Main entry point."""
print()
trace_bezier()
print()
print("=" * 60)
print("To actually run this:")
print("1. Use Ghidra to find the real Bezier addresses in RIPTERM.EXE")
print("2. Start the DOSBox-X MCP server")
print("3. Use Claude to interactively debug with these tools")
print("=" * 60)
if __name__ == "__main__":
main()

60
pyproject.toml Normal file
View File

@ -0,0 +1,60 @@
[project]
name = "dosbox-mcp"
version = "2025.01.27"
description = "MCP server for debugging DOS binaries in DOSBox-X via GDB protocol"
readme = "README.md"
requires-python = ">=3.11"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
license = {text = "MIT"}
keywords = ["mcp", "dosbox", "gdb", "reverse-engineering", "dos", "debugging"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Debuggers",
"Topic :: System :: Emulators",
]
dependencies = [
"fastmcp>=2.0.0",
"pillow>=10.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
"ruff>=0.8.0",
]
[project.scripts]
dosbox-mcp = "dosbox_mcp.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/dosbox_mcp"]
[tool.hatch.build.targets.sdist]
include = ["src/dosbox_mcp"]
# This is the key setting for src-layout
[tool.hatch.build]
sources = ["src"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"]
ignore = ["E501"] # Line length handled separately
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View File

@ -0,0 +1,3 @@
"""DOSBox-X MCP Server - AI-assisted DOS binary debugging."""
__version__ = "2025.01.27"

411
src/dosbox_mcp/dosbox.py Normal file
View File

@ -0,0 +1,411 @@
"""DOSBox-X process and container management.
This module handles:
- Launching DOSBox-X with GDB stub enabled
- Docker container management for dosbox-x-gdb
- Process lifecycle management
- Configuration file handling
"""
import logging
import os
import shutil
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class DOSBoxConfig:
"""Configuration for DOSBox-X instance."""
# GDB settings
gdb_port: int = 1234
gdb_enabled: bool = True
# Display settings
fullscreen: bool = False
windowresolution: str = "800x600"
# CPU settings
core: str = "auto" # auto, dynamic, normal, simple
cputype: str = "auto"
cycles: str = "auto"
# Memory
memsize: int = 16 # MB of conventional memory
# Serial ports (for future RIPscrip work)
serial1: str = "disabled"
serial2: str = "disabled"
# Mount points
mounts: dict[str, str] = field(default_factory=dict)
# Autoexec commands
autoexec: list[str] = field(default_factory=list)
def to_conf(self) -> str:
"""Generate DOSBox-X configuration file content."""
lines = [
"[sdl]",
f"fullscreen={str(self.fullscreen).lower()}",
f"windowresolution={self.windowresolution}",
"",
"[cpu]",
f"core={self.core}",
f"cputype={self.cputype}",
f"cycles={self.cycles}",
"",
"[dosbox]",
f"memsize={self.memsize}",
"",
"[serial]",
f"serial1={self.serial1}",
f"serial2={self.serial2}",
"",
]
# GDB stub configuration (DOSBox-X specific)
if self.gdb_enabled:
lines.extend([
"[debugger]",
f"gdbserver=true",
f"gdbport={self.gdb_port}",
"",
])
# Autoexec section
lines.append("[autoexec]")
# Add mount commands
for drive, path in self.mounts.items():
lines.append(f"MOUNT {drive.upper()} {path}")
# Add custom autoexec commands
lines.extend(self.autoexec)
return '\n'.join(lines)
class DOSBoxManager:
"""Manager for DOSBox-X instances.
Supports both native DOSBox-X and Docker containers.
"""
def __init__(self):
self._process: subprocess.Popen | None = None
self._container_id: str | None = None
self._config_path: Path | None = None
self._temp_dir: Path | None = None
self._gdb_port: int = 1234
@property
def running(self) -> bool:
"""Check if DOSBox-X is running."""
if self._process:
return self._process.poll() is None
if self._container_id:
return self._check_container_running()
return False
@property
def gdb_port(self) -> int:
"""Get the GDB port."""
return self._gdb_port
@property
def pid(self) -> int | None:
"""Get process ID if running natively."""
if self._process:
return self._process.pid
return None
def _check_container_running(self) -> bool:
"""Check if Docker container is running."""
if not self._container_id:
return False
try:
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Running}}", self._container_id],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip() == "true"
except (subprocess.SubprocessError, FileNotFoundError):
return False
def _find_dosbox(self) -> str | None:
"""Find DOSBox-X executable."""
# Check common locations
candidates = [
"dosbox-x",
"dosbox-x-gdb",
"/usr/bin/dosbox-x",
"/usr/local/bin/dosbox-x",
"/opt/dosbox-x/dosbox-x",
]
for candidate in candidates:
if shutil.which(candidate):
return candidate
return None
def launch_native(
self,
binary_path: str | None = None,
config: DOSBoxConfig | None = None,
extra_args: list[str] | None = None,
) -> None:
"""Launch DOSBox-X natively.
Args:
binary_path: Optional DOS binary to run
config: Configuration (uses defaults if not provided)
extra_args: Additional command-line arguments
"""
if self.running:
raise RuntimeError("DOSBox-X is already running")
dosbox_exe = self._find_dosbox()
if not dosbox_exe:
raise RuntimeError(
"DOSBox-X not found. Install dosbox-x or use Docker container."
)
# Create temporary directory for config
self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-"))
config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port
# If binary specified, set up mount and autoexec
if binary_path:
binary = Path(binary_path).resolve()
if not binary.exists():
raise FileNotFoundError(f"Binary not found: {binary_path}")
# Mount the directory containing the binary as C:
config.mounts["C"] = str(binary.parent)
config.autoexec.append("C:")
config.autoexec.append(binary.name)
# Write config file
self._config_path = self._temp_dir / "dosbox.conf"
self._config_path.write_text(config.to_conf())
# Build command line
cmd = [dosbox_exe, "-conf", str(self._config_path)]
if extra_args:
cmd.extend(extra_args)
logger.info(f"Launching: {' '.join(cmd)}")
# Start process
self._process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self._temp_dir),
)
# Wait a moment for GDB stub to start
time.sleep(1.0)
if not self.running:
stderr = self._process.stderr.read().decode() if self._process.stderr else ""
raise RuntimeError(f"DOSBox-X failed to start: {stderr}")
logger.info(f"DOSBox-X started (PID: {self._process.pid})")
def launch_docker(
self,
binary_path: str | None = None,
config: DOSBoxConfig | None = None,
image: str = "dosbox-mcp:latest",
display: str | None = None,
) -> None:
"""Launch DOSBox-X in Docker container.
Args:
binary_path: Optional DOS binary to run
config: Configuration (uses defaults if not provided)
image: Docker image name
display: X11 display (default: $DISPLAY)
"""
if self.running:
raise RuntimeError("DOSBox-X is already running")
# Check Docker availability
try:
subprocess.run(["docker", "version"], capture_output=True, check=True)
except (subprocess.SubprocessError, FileNotFoundError) as e:
raise RuntimeError("Docker not available") from e
# Create temporary directory
self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-"))
config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port
# Write config file
self._config_path = self._temp_dir / "dosbox.conf"
self._config_path.write_text(config.to_conf())
# Build docker command
display = display or os.environ.get("DISPLAY", ":0")
cmd = [
"docker", "run",
"--rm",
"-d", # Detached
"--name", f"dosbox-mcp-{os.getpid()}",
# Network
"-p", f"{self._gdb_port}:{self._gdb_port}",
# X11 forwarding
"-e", f"DISPLAY={display}",
"-v", "/tmp/.X11-unix:/tmp/.X11-unix",
# Config mount
"-v", f"{self._config_path}:/config/dosbox.conf:ro",
]
# Mount binary directory if specified
if binary_path:
binary = Path(binary_path).resolve()
if not binary.exists():
raise FileNotFoundError(f"Binary not found: {binary_path}")
cmd.extend(["-v", f"{binary.parent}:/dos:ro"])
cmd.append(image)
logger.info(f"Launching Docker: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Docker launch failed: {result.stderr}")
self._container_id = result.stdout.strip()
# Wait for container and GDB stub
time.sleep(2.0)
if not self.running:
logs = self.get_logs()
raise RuntimeError(f"Container failed to start: {logs}")
logger.info(f"DOSBox-X container started: {self._container_id[:12]}")
def stop(self, timeout: float = 5.0) -> None:
"""Stop DOSBox-X.
Args:
timeout: Seconds to wait before force-killing
"""
if self._process:
self._process.terminate()
try:
self._process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait()
self._process = None
logger.info("DOSBox-X process stopped")
if self._container_id:
try:
subprocess.run(
["docker", "stop", "-t", str(int(timeout)), self._container_id],
capture_output=True,
timeout=timeout + 5
)
except subprocess.SubprocessError:
# Force remove
subprocess.run(
["docker", "rm", "-f", self._container_id],
capture_output=True
)
self._container_id = None
logger.info("DOSBox-X container stopped")
# Cleanup temp directory
if self._temp_dir and self._temp_dir.exists():
try:
shutil.rmtree(self._temp_dir)
except OSError:
pass
self._temp_dir = None
def get_logs(self, lines: int = 50) -> str:
"""Get recent logs.
Args:
lines: Number of lines to retrieve
Returns:
Log output string
"""
if self._process:
# For native process, we don't capture logs in real-time
return "(Native process - logs not captured)"
if self._container_id:
try:
result = subprocess.run(
["docker", "logs", "--tail", str(lines), self._container_id],
capture_output=True,
text=True,
timeout=5
)
return result.stdout + result.stderr
except subprocess.SubprocessError:
return "(Failed to get container logs)"
return "(Not running)"
def screenshot(self, output_path: str | None = None) -> bytes | None:
"""Capture screenshot.
This uses DOSBox-X's built-in screenshot capability or
external tools depending on the setup.
Args:
output_path: Optional path to save screenshot
Returns:
PNG image data, or None if not available
"""
# DOSBox-X screenshots are typically saved via hotkey (F12)
# For programmatic capture, we'd need to:
# 1. Send the hotkey
# 2. Wait for the file
# 3. Read and return it
# For now, this is a placeholder
logger.warning("Screenshot not yet implemented")
return None
def send_keys(self, keys: str) -> None:
"""Send keystrokes to DOSBox-X.
This is useful for interacting with DOS programs.
Args:
keys: String of characters to send
"""
# This would require X11 integration or DOSBox-X's mapper
# For now, placeholder
logger.warning("Key sending not yet implemented")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - ensure cleanup."""
self.stop()
return False

View File

@ -0,0 +1,584 @@
"""GDB Remote Serial Protocol client for DOSBox-X debugging.
The GDB Remote Serial Protocol is a simple text-based protocol used for
debugger communication over serial lines or TCP sockets. Each packet has
the format:
$<command>#<checksum>
Where checksum is the sum of all command bytes modulo 256, as two hex digits.
The receiver acknowledges with '+' (success) or '-' (retry).
This implementation provides a synchronous client that connects to DOSBox-X's
GDB stub, typically running on localhost:1234.
"""
import logging
import socket
import time
from typing import Callable
from .types import Breakpoint, MemoryRegion, Registers, StopEvent, StopReason
from .utils import (
calculate_checksum,
decode_hex,
encode_hex,
parse_registers_x86,
parse_stop_reply,
signal_name,
)
logger = logging.getLogger(__name__)
class GDBError(Exception):
"""Error communicating with GDB stub."""
pass
class GDBClient:
"""Client for GDB Remote Serial Protocol.
This client implements the subset of GDB protocol needed for debugging
DOS programs in DOSBox-X:
- Register read/write
- Memory read/write
- Breakpoints (software)
- Continue/step execution
Example:
client = GDBClient()
client.connect("localhost", 1234)
regs = client.read_registers()
print(f"CS:IP = {regs.cs:04x}:{regs.ip:04x}")
client.set_breakpoint(0x10100)
client.continue_execution()
"""
def __init__(self, timeout: float = 5.0):
"""Initialize GDB client.
Args:
timeout: Socket timeout in seconds
"""
self.timeout = timeout
self._socket: socket.socket | None = None
self._connected = False
self._host = ""
self._port = 0
self._breakpoints: dict[int, Breakpoint] = {}
self._next_bp_id = 1
self._stop_callback: Callable[[StopEvent], None] | None = None
@property
def connected(self) -> bool:
"""Check if connected to GDB stub."""
return self._connected
@property
def host(self) -> str:
"""Get connected host."""
return self._host
@property
def port(self) -> int:
"""Get connected port."""
return self._port
def connect(self, host: str = "localhost", port: int = 1234) -> None:
"""Connect to GDB stub.
Args:
host: Hostname or IP address
port: Port number (default 1234 for DOSBox-X)
Raises:
GDBError: If connection fails
"""
if self._connected:
self.disconnect()
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self.timeout)
self._socket.connect((host, port))
self._host = host
self._port = port
self._connected = True
logger.info(f"Connected to GDB stub at {host}:{port}")
# Some GDB stubs send an initial packet; try to read it
try:
self._socket.settimeout(0.5)
initial = self._socket.recv(1024)
if initial:
logger.debug(f"Initial data from stub: {initial!r}")
except socket.timeout:
pass
finally:
self._socket.settimeout(self.timeout)
except OSError as e:
self._connected = False
raise GDBError(f"Failed to connect to {host}:{port}: {e}") from e
def disconnect(self) -> None:
"""Disconnect from GDB stub."""
if self._socket:
try:
self._socket.close()
except OSError:
pass
self._socket = None
self._connected = False
self._host = ""
self._port = 0
logger.info("Disconnected from GDB stub")
def _send_packet(self, command: str) -> None:
"""Send a GDB packet.
Args:
command: Command string (without $ and checksum)
Raises:
GDBError: If not connected or send fails
"""
if not self._connected or not self._socket:
raise GDBError("Not connected to GDB stub")
checksum = calculate_checksum(command)
packet = f"${command}#{checksum}"
logger.debug(f"Sending: {packet}")
try:
self._socket.sendall(packet.encode('latin-1'))
except OSError as e:
self._connected = False
raise GDBError(f"Send failed: {e}") from e
def _recv_packet(self) -> str:
"""Receive a GDB packet.
Returns:
Response string (without $ and checksum)
Raises:
GDBError: If receive fails or checksum mismatch
"""
if not self._connected or not self._socket:
raise GDBError("Not connected to GDB stub")
try:
data = b""
# Read until we get a complete packet
while True:
chunk = self._socket.recv(4096)
if not chunk:
raise GDBError("Connection closed by remote")
data += chunk
# Look for packet boundaries
decoded = data.decode('latin-1')
# Skip any leading ACK/NAK
while decoded and decoded[0] in '+-':
decoded = decoded[1:]
if '$' in decoded and '#' in decoded:
# Find packet bounds
start = decoded.index('$')
end = decoded.index('#', start)
if end + 2 <= len(decoded):
# Complete packet
packet_data = decoded[start + 1:end]
checksum = decoded[end + 1:end + 3]
# Verify checksum
expected = calculate_checksum(packet_data)
if checksum.lower() != expected.lower():
logger.warning(
f"Checksum mismatch: got {checksum}, expected {expected}"
)
# Send NAK
self._socket.sendall(b'-')
continue
# Send ACK
self._socket.sendall(b'+')
logger.debug(f"Received: ${packet_data}#{checksum}")
return packet_data
except socket.timeout:
raise GDBError("Receive timeout") from None
except OSError as e:
self._connected = False
raise GDBError(f"Receive failed: {e}") from e
def _command(self, cmd: str) -> str:
"""Send command and receive response.
Args:
cmd: GDB command
Returns:
Response string
"""
self._send_packet(cmd)
return self._recv_packet()
# =========================================================================
# Register Operations
# =========================================================================
def read_registers(self) -> Registers:
"""Read all CPU registers.
Returns:
Registers object with all CPU register values
"""
response = self._command("g")
if response.startswith("E"):
raise GDBError(f"Failed to read registers: {response}")
reg_dict = parse_registers_x86(response)
return Registers(**reg_dict)
def write_registers(self, regs: Registers) -> None:
"""Write all CPU registers.
Args:
regs: Registers object with values to write
"""
# Build register string in GDB order (little-endian)
def le32(val: int) -> str:
return val.to_bytes(4, 'little').hex()
hex_data = (
le32(regs.eax) + le32(regs.ecx) + le32(regs.edx) + le32(regs.ebx) +
le32(regs.esp) + le32(regs.ebp) + le32(regs.esi) + le32(regs.edi) +
le32(regs.eip) + le32(regs.eflags) +
le32(regs.cs) + le32(regs.ss) + le32(regs.ds) +
le32(regs.es) + le32(regs.fs) + le32(regs.gs)
)
response = self._command(f"G{hex_data}")
if response.startswith("E"):
raise GDBError(f"Failed to write registers: {response}")
def read_register(self, reg_num: int) -> int:
"""Read a single register by number.
Args:
reg_num: Register number (0=EAX, 1=ECX, ..., 8=EIP, etc.)
Returns:
Register value
"""
response = self._command(f"p{reg_num:x}")
if response.startswith("E"):
raise GDBError(f"Failed to read register {reg_num}: {response}")
return int.from_bytes(bytes.fromhex(response), 'little')
# =========================================================================
# Memory Operations
# =========================================================================
def read_memory(self, address: int, length: int) -> MemoryRegion:
"""Read memory from target.
Args:
address: Physical memory address
length: Number of bytes to read
Returns:
MemoryRegion with read data
"""
response = self._command(f"m{address:x},{length:x}")
if response.startswith("E"):
raise GDBError(f"Failed to read memory at {address:05x}: {response}")
data = decode_hex(response)
return MemoryRegion(address=address, data=data)
def write_memory(self, address: int, data: bytes) -> None:
"""Write memory to target.
Args:
address: Physical memory address
data: Bytes to write
"""
hex_data = encode_hex(data)
response = self._command(f"M{address:x},{len(data):x}:{hex_data}")
if response.startswith("E"):
raise GDBError(f"Failed to write memory at {address:05x}: {response}")
# =========================================================================
# Breakpoint Operations
# =========================================================================
def set_breakpoint(self, address: int) -> Breakpoint:
"""Set a software breakpoint.
Args:
address: Physical memory address
Returns:
Breakpoint object
"""
# Z0 = software breakpoint, address, kind (1 byte for x86)
response = self._command(f"Z0,{address:x},1")
if response.startswith("E"):
raise GDBError(f"Failed to set breakpoint at {address:05x}: {response}")
if response == "":
raise GDBError("Breakpoints not supported by this GDB stub")
bp = Breakpoint(
id=self._next_bp_id,
address=address,
enabled=True,
original=f"{address:05x}"
)
self._breakpoints[bp.id] = bp
self._next_bp_id += 1
logger.info(f"Set breakpoint {bp.id} at {address:05x}")
return bp
def delete_breakpoint(self, bp_id: int) -> None:
"""Delete a breakpoint by ID.
Args:
bp_id: Breakpoint ID
"""
if bp_id not in self._breakpoints:
raise GDBError(f"Breakpoint {bp_id} not found")
bp = self._breakpoints[bp_id]
response = self._command(f"z0,{bp.address:x},1")
if response.startswith("E"):
raise GDBError(f"Failed to delete breakpoint {bp_id}: {response}")
del self._breakpoints[bp_id]
logger.info(f"Deleted breakpoint {bp_id}")
def delete_all_breakpoints(self) -> int:
"""Delete all breakpoints.
Returns:
Number of breakpoints deleted
"""
count = 0
for bp_id in list(self._breakpoints.keys()):
try:
self.delete_breakpoint(bp_id)
count += 1
except GDBError as e:
logger.warning(f"Failed to delete breakpoint {bp_id}: {e}")
return count
def list_breakpoints(self) -> list[Breakpoint]:
"""List all breakpoints.
Returns:
List of Breakpoint objects
"""
return list(self._breakpoints.values())
# =========================================================================
# Execution Control
# =========================================================================
def continue_execution(self, timeout: float | None = None) -> StopEvent:
"""Continue execution until breakpoint or signal.
Args:
timeout: Optional timeout in seconds (None = use default)
Returns:
StopEvent describing why execution stopped
"""
old_timeout = self._socket.gettimeout() if self._socket else self.timeout
if timeout is not None and self._socket:
self._socket.settimeout(timeout)
try:
self._send_packet("c")
response = self._recv_packet()
finally:
if self._socket:
self._socket.settimeout(old_timeout)
return self._parse_stop(response)
def step(self, count: int = 1) -> StopEvent:
"""Step one or more instructions.
Args:
count: Number of instructions to step
Returns:
StopEvent describing current state
"""
event = None
for _ in range(count):
self._send_packet("s")
response = self._recv_packet()
event = self._parse_stop(response)
if event.reason != StopReason.STEP:
break
return event or StopEvent(reason=StopReason.UNKNOWN)
def step_over(self) -> StopEvent:
"""Step over a call instruction (step + continue if at call).
This is a higher-level operation that checks the current instruction
and sets a temporary breakpoint after it if it's a CALL.
Returns:
StopEvent describing current state
"""
# Read current instruction to see if it's a CALL
regs = self.read_registers()
addr = regs.cs_ip
mem = self.read_memory(addr, 8)
# Check for CALL instructions (E8 = near call, FF /2 = call r/m)
# This is simplified - a real implementation would need a disassembler
opcode = mem.data[0] if mem.data else 0
if opcode == 0xE8: # CALL rel16/rel32
# Set breakpoint after the call (5 bytes for E8 xx xx xx xx)
next_addr = addr + 5
tmp_bp = self.set_breakpoint(next_addr)
try:
event = self.continue_execution()
finally:
self.delete_breakpoint(tmp_bp.id)
return event
elif opcode == 0xFF:
# Check for FF /2 (CALL r/m)
modrm = mem.data[1] if len(mem.data) > 1 else 0
reg = (modrm >> 3) & 0x07
if reg == 2: # CALL
# Instruction length varies - this is simplified
next_addr = addr + 2 # Minimum size
tmp_bp = self.set_breakpoint(next_addr)
try:
event = self.continue_execution()
finally:
self.delete_breakpoint(tmp_bp.id)
return event
# Not a call, just step
return self.step()
def _parse_stop(self, response: str) -> StopEvent:
"""Parse a stop reply into a StopEvent."""
stop_type, info = parse_stop_reply(response)
if stop_type == "signal":
signal = info.get("signal", 0)
# SIGTRAP (5) usually indicates a breakpoint
if signal == 5:
# Check if we hit a known breakpoint
regs = self.read_registers()
addr = regs.cs_ip
for bp in self._breakpoints.values():
if bp.address == addr or bp.address == addr - 1:
bp.hit_count += 1
return StopEvent(
reason=StopReason.BREAKPOINT,
address=addr,
signal=signal,
breakpoint_id=bp.id
)
return StopEvent(
reason=StopReason.STEP,
address=addr,
signal=signal
)
return StopEvent(
reason=StopReason.SIGNAL,
address=0,
signal=signal
)
elif stop_type == "exit":
return StopEvent(
reason=StopReason.EXITED,
signal=info.get("code", 0)
)
return StopEvent(reason=StopReason.UNKNOWN)
# =========================================================================
# Query Operations
# =========================================================================
def query_supported(self) -> list[str]:
"""Query supported features.
Returns:
List of supported feature strings
"""
response = self._command("qSupported")
if response.startswith("E"):
return []
return response.split(';')
def query_attached(self) -> bool:
"""Query if attached to existing process.
Returns:
True if attached to existing process
"""
response = self._command("qAttached")
return response == "1"
def detach(self) -> None:
"""Detach from target (allow it to continue running)."""
self._command("D")
self.disconnect()
def kill(self) -> None:
"""Kill the target process."""
try:
self._command("k")
except GDBError:
pass # Connection may close immediately
self.disconnect()
# =========================================================================
# Utility Methods
# =========================================================================
def interrupt(self) -> None:
"""Send interrupt to stop running target.
This sends Ctrl-C (0x03) to halt execution.
"""
if not self._connected or not self._socket:
raise GDBError("Not connected to GDB stub")
try:
self._socket.sendall(b'\x03')
logger.debug("Sent interrupt")
except OSError as e:
self._connected = False
raise GDBError(f"Interrupt failed: {e}") from e
def get_stop_reason(self) -> StopEvent:
"""Query current stop reason.
Returns:
StopEvent describing current state
"""
response = self._command("?")
return self._parse_stop(response)

684
src/dosbox_mcp/server.py Normal file
View File

@ -0,0 +1,684 @@
"""DOSBox-X MCP Server - AI-assisted DOS binary debugging.
This server exposes DOSBox-X debugging capabilities through the Model Context
Protocol (MCP), enabling Claude to programmatically debug DOS binaries.
Primary use case: Reverse engineering classic DOS programs by setting breakpoints,
reading memory, and tracing execution.
"""
import logging
from importlib.metadata import version
from typing import Literal
from fastmcp import FastMCP
from .dosbox import DOSBoxConfig, DOSBoxManager
from .gdb_client import GDBClient, GDBError
from .types import DOSBoxStatus
from .utils import format_address, hexdump, parse_address
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Get package version
try:
PACKAGE_VERSION = version("dosbox-mcp")
except Exception:
PACKAGE_VERSION = "2025.01.27"
# Initialize FastMCP server
mcp = FastMCP(
name="dosbox-mcp",
instructions="""
DOSBox-X MCP Server for debugging DOS binaries.
This server provides tools to:
- Launch DOSBox-X with GDB debugging enabled
- Set breakpoints and trace execution
- Read/write CPU registers and memory
- Disassemble code at any address
Typical workflow:
1. launch() - Start DOSBox-X with a DOS binary
2. attach() - Connect to the GDB stub
3. breakpoint_set() - Set breakpoints at interesting addresses
4. continue() - Run until breakpoint hit
5. registers() / memory_read() - Inspect state
6. step() - Step through code
7. quit() - Clean up
Address formats supported:
- Segment:offset: "1234:5678" (standard DOS format)
- Flat hex: "0x12345" or "12345"
- Decimal: "#12345"
"""
)
# Global state
_manager = DOSBoxManager()
_client = GDBClient()
# =============================================================================
# P0 Tools - MVP for Bezier tracing
# =============================================================================
@mcp.tool()
def launch(
binary_path: str | None = None,
gdb_port: int = 1234,
use_docker: bool = False,
cycles: str = "auto",
memsize: int = 16,
) -> dict:
"""Launch DOSBox-X with GDB debugging enabled.
Args:
binary_path: Path to DOS binary to run (optional)
gdb_port: Port for GDB stub (default: 1234)
use_docker: Use Docker container instead of native DOSBox-X
cycles: CPU cycles setting (auto, max, or number)
memsize: Conventional memory in MB (default: 16)
Returns:
Status dict with connection details
Example:
launch("/path/to/GAME.EXE", gdb_port=1234)
"""
config = DOSBoxConfig(
gdb_port=gdb_port,
gdb_enabled=True,
cycles=cycles,
memsize=memsize,
)
try:
if use_docker:
_manager.launch_docker(binary_path=binary_path, config=config)
else:
_manager.launch_native(binary_path=binary_path, config=config)
return {
"success": True,
"message": "DOSBox-X launched successfully",
"gdb_host": "localhost",
"gdb_port": gdb_port,
"pid": _manager.pid,
"hint": f"Use attach() to connect to the debugger on port {gdb_port}",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def attach(host: str = "localhost", port: int = 1234) -> dict:
"""Connect to a running DOSBox-X GDB stub.
Args:
host: Hostname or IP (default: localhost)
port: GDB port (default: 1234)
Returns:
Connection status and initial register state
Example:
attach("localhost", 1234)
"""
try:
_client.connect(host, port)
# Get initial state
regs = _client.read_registers()
stop = _client.get_stop_reason()
return {
"success": True,
"message": f"Connected to {host}:{port}",
"stop_reason": stop.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def breakpoint_set(address: str) -> dict:
"""Set a software breakpoint at the specified address.
Args:
address: Memory address (segment:offset or flat hex)
Returns:
Breakpoint info
Examples:
breakpoint_set("1234:0100") # segment:offset
breakpoint_set("0x12340") # flat address
"""
try:
addr = parse_address(address)
bp = _client.set_breakpoint(addr)
return {
"success": True,
"breakpoint_id": bp.id,
"address": format_address(bp.address, "both"),
"original": address,
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def breakpoint_list() -> dict:
"""List all active breakpoints.
Returns:
List of breakpoint info
"""
bps = _client.list_breakpoints()
return {
"count": len(bps),
"breakpoints": [bp.to_dict() for bp in bps],
}
@mcp.tool()
def breakpoint_delete(id: int | None = None, all: bool = False) -> dict:
"""Delete breakpoint(s).
Args:
id: Specific breakpoint ID to delete
all: If True, delete all breakpoints
Returns:
Deletion result
"""
try:
if all:
count = _client.delete_all_breakpoints()
return {
"success": True,
"deleted": count,
}
elif id is not None:
_client.delete_breakpoint(id)
return {
"success": True,
"deleted_id": id,
}
else:
return {
"success": False,
"error": "Specify either 'id' or 'all=True'",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def continue_execution(timeout: float | None = None) -> dict:
"""Continue execution until breakpoint or signal.
Args:
timeout: Optional timeout in seconds
Returns:
Stop event info (reason, address, breakpoint hit)
"""
try:
event = _client.continue_execution(timeout=timeout)
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"address": format_address(event.address, "both"),
"breakpoint_id": event.breakpoint_id,
"signal": event.signal,
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def step(count: int = 1) -> dict:
"""Step one or more instructions.
Args:
count: Number of instructions to step (default: 1)
Returns:
New register state after stepping
"""
try:
event = _client.step(count)
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
"stepped": count,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def step_over() -> dict:
"""Step over a CALL instruction (execute subroutine and stop after return).
Returns:
New register state after step-over
"""
try:
event = _client.step_over()
regs = _client.read_registers()
return {
"success": True,
"stop_reason": event.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def registers() -> dict:
"""Read all CPU registers.
Returns:
Complete register state including:
- 32-bit registers (EAX, EBX, etc.)
- 16-bit aliases (AX, BX, etc.)
- Segment registers (CS, DS, ES, SS, FS, GS)
- Instruction pointer (CS:IP)
- Stack pointer (SS:SP)
- Flags
"""
try:
regs = _client.read_registers()
return {
"success": True,
**regs.to_dict(),
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def memory_read(
address: str,
length: int = 16,
format: Literal["hex", "ascii", "dump"] = "dump",
) -> dict:
"""Read memory from target.
Args:
address: Memory address (segment:offset or flat hex)
length: Number of bytes to read (default: 16, max: 4096)
format: Output format - "hex", "ascii", or "dump" (default)
Returns:
Memory contents in requested format
Examples:
memory_read("DS:0100", 64)
memory_read("0x12340", 256, format="hex")
"""
try:
# Handle register-based addresses like "DS:SI"
addr_str = address.upper()
if ':' in addr_str:
seg_part, off_part = addr_str.split(':')
# Check if parts are register names
regs = None
seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'}
off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'}
if seg_part in seg_regs or off_part in off_regs:
regs = _client.read_registers()
seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16)
off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16)
addr = (seg_val << 4) + off_val
else:
addr = parse_address(address)
else:
addr = parse_address(address)
# Limit read size
length = min(length, 4096)
mem = _client.read_memory(addr, length)
result = {
"success": True,
"address": format_address(addr, "both"),
"length": len(mem.data),
}
if format == "hex":
result["data"] = mem.to_hex()
elif format == "ascii":
result["data"] = mem.to_ascii()
else: # dump
result["dump"] = hexdump(mem.data, addr)
return result
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def memory_write(
address: str,
data: str,
format: Literal["hex", "ascii"] = "hex",
) -> dict:
"""Write memory to target.
Args:
address: Memory address (segment:offset or flat hex)
data: Data to write (hex string or ASCII)
format: Input format - "hex" or "ascii"
Returns:
Write result
Examples:
memory_write("1234:0100", "90909090", format="hex") # NOP sled
memory_write("DS:0100", "Hello", format="ascii")
"""
try:
addr = parse_address(address)
if format == "hex":
bytes_data = bytes.fromhex(data)
else:
bytes_data = data.encode('latin-1')
_client.write_memory(addr, bytes_data)
return {
"success": True,
"address": format_address(addr, "both"),
"bytes_written": len(bytes_data),
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def disassemble(address: str | None = None, count: int = 10) -> dict:
"""Disassemble instructions at address.
Note: This is a simplified disassembler. For complex analysis,
use a dedicated tool like Ghidra.
Args:
address: Start address (default: current CS:IP)
count: Number of bytes to read for disassembly (default: 10)
Returns:
Raw bytes and simple instruction hints
"""
try:
if address:
addr = parse_address(address)
else:
regs = _client.read_registers()
addr = regs.cs_ip
# Read memory for disassembly
mem = _client.read_memory(addr, count * 4) # Rough estimate
# Simple x86 opcode hints (not a full disassembler)
# This is just to give Claude some context
opcodes = {
0x90: "NOP",
0xCC: "INT 3",
0xCD: "INT",
0xC3: "RET",
0xCB: "RETF",
0xE8: "CALL",
0xE9: "JMP",
0xEB: "JMP short",
0x74: "JZ",
0x75: "JNZ",
0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX",
0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI",
0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX",
0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI",
0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm",
0x89: "MOV r/m,r", 0x8B: "MOV r,r/m",
0x01: "ADD r/m,r", 0x03: "ADD r,r/m",
0x29: "SUB r/m,r", 0x2B: "SUB r,r/m",
0x31: "XOR r/m,r", 0x33: "XOR r,r/m",
0x39: "CMP r/m,r", 0x3B: "CMP r,r/m",
}
lines = []
offset = 0
for i, b in enumerate(mem.data[:count]):
hint = opcodes.get(b, f"?? ({b:02x})")
lines.append({
"address": format_address(addr + i),
"byte": f"{b:02x}",
"hint": hint,
})
return {
"success": True,
"start_address": format_address(addr, "both"),
"raw_bytes": mem.data[:count].hex(),
"instructions": lines,
"note": "This is a simplified view. Use Ghidra for full disassembly.",
}
except (GDBError, ValueError) as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def stack(count: int = 16) -> dict:
"""Dump stack contents.
Args:
count: Number of words to dump (default: 16)
Returns:
Stack contents with SS:SP and values
"""
try:
regs = _client.read_registers()
sp_addr = regs.ss_sp
# Read stack (2 bytes per word in real mode)
mem = _client.read_memory(sp_addr, count * 2)
words = []
for i in range(0, len(mem.data), 2):
if i + 1 < len(mem.data):
word = int.from_bytes(mem.data[i:i+2], 'little')
words.append({
"offset": f"+{i:02x}",
"address": format_address(sp_addr + i),
"value": f"{word:04x}",
})
return {
"success": True,
"ss_sp": f"{regs.ss:04x}:{regs.sp:04x}",
"physical_address": format_address(sp_addr, "both"),
"words": words,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
@mcp.tool()
def status() -> dict:
"""Get current debugger and emulator status.
Returns:
Complete status including connection state, breakpoints, etc.
"""
status = DOSBoxStatus(
running=_manager.running,
connected=_client.connected,
host=_client.host,
port=_client.port,
pid=_manager.pid,
breakpoints=_client.list_breakpoints() if _client.connected else [],
)
result = status.to_dict()
# Add register state if connected
if _client.connected:
try:
regs = _client.read_registers()
result["cs_ip"] = f"{regs.cs:04x}:{regs.ip:04x}"
result["ss_sp"] = f"{regs.ss:04x}:{regs.sp:04x}"
except GDBError:
pass
return result
@mcp.tool()
def quit() -> dict:
"""Stop DOSBox-X and clean up.
Returns:
Shutdown status
"""
try:
if _client.connected:
try:
_client.kill()
except GDBError:
_client.disconnect()
if _manager.running:
_manager.stop()
return {
"success": True,
"message": "DOSBox-X stopped and cleaned up",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
# =============================================================================
# P2 Tools - Nice to have
# =============================================================================
@mcp.tool()
def screenshot(filename: str | None = None) -> dict:
"""Capture DOSBox-X display.
Args:
filename: Optional output filename
Returns:
Screenshot info or error
"""
# Placeholder - requires X11 or DOSBox-X specific integration
return {
"success": False,
"error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.",
}
@mcp.tool()
def serial_send(data: str, port: int = 1) -> dict:
"""Send data to DOSBox-X serial port.
This is useful for RIPscrip testing - send graphics commands
to a program listening on COM1.
Args:
data: Data to send (text or hex with \\x prefix)
port: COM port number (1 or 2)
Returns:
Send result
"""
# Placeholder - requires serial port configuration
return {
"success": False,
"error": "Serial port communication not yet implemented.",
}
# =============================================================================
# Entry Point
# =============================================================================
def main():
"""Entry point for the MCP server."""
print(f"🎮 DOSBox-X MCP Server v{PACKAGE_VERSION}")
print("AI-assisted DOS binary debugging")
print()
mcp.run()
if __name__ == "__main__":
main()

313
src/dosbox_mcp/types.py Normal file
View File

@ -0,0 +1,313 @@
"""Type definitions for DOSBox-X MCP Server."""
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Literal
class StopReason(Enum):
"""Reasons why execution stopped."""
BREAKPOINT = auto()
STEP = auto()
SIGNAL = auto()
EXITED = auto()
UNKNOWN = auto()
@dataclass
class Registers:
"""x86 real-mode CPU registers.
In real mode, segment registers (CS, DS, ES, SS) combine with offsets
to form 20-bit physical addresses: (segment << 4) + offset.
GDB returns these in a specific order that we must parse correctly.
"""
# General purpose registers (32-bit, but real mode uses 16-bit)
eax: int = 0
ecx: int = 0
edx: int = 0
ebx: int = 0
esp: int = 0
ebp: int = 0
esi: int = 0
edi: int = 0
# Instruction pointer
eip: int = 0
# Flags register
eflags: int = 0
# Segment registers (16-bit)
cs: int = 0
ss: int = 0
ds: int = 0
es: int = 0
fs: int = 0
gs: int = 0
@property
def ax(self) -> int:
"""Lower 16 bits of EAX."""
return self.eax & 0xFFFF
@property
def bx(self) -> int:
"""Lower 16 bits of EBX."""
return self.ebx & 0xFFFF
@property
def cx(self) -> int:
"""Lower 16 bits of ECX."""
return self.ecx & 0xFFFF
@property
def dx(self) -> int:
"""Lower 16 bits of EDX."""
return self.edx & 0xFFFF
@property
def sp(self) -> int:
"""Lower 16 bits of ESP."""
return self.esp & 0xFFFF
@property
def bp(self) -> int:
"""Lower 16 bits of EBP."""
return self.ebp & 0xFFFF
@property
def si(self) -> int:
"""Lower 16 bits of ESI."""
return self.esi & 0xFFFF
@property
def di(self) -> int:
"""Lower 16 bits of EDI."""
return self.edi & 0xFFFF
@property
def ip(self) -> int:
"""Lower 16 bits of EIP."""
return self.eip & 0xFFFF
@property
def flags(self) -> int:
"""Lower 16 bits of EFLAGS."""
return self.eflags & 0xFFFF
def physical_address(self, segment: str, offset: str) -> int:
"""Calculate physical address from segment:offset.
Args:
segment: Segment register name ('cs', 'ds', 'es', 'ss')
offset: Offset register name ('ip', 'sp', 'si', 'di', 'bx', etc.)
Returns:
20-bit physical address
"""
seg_val = getattr(self, segment.lower())
off_val = getattr(self, offset.lower())
return (seg_val << 4) + off_val
@property
def cs_ip(self) -> int:
"""Physical address of CS:IP (current instruction)."""
return (self.cs << 4) + self.ip
@property
def ss_sp(self) -> int:
"""Physical address of SS:SP (current stack)."""
return (self.ss << 4) + self.sp
def flag_set(self, flag: str) -> bool:
"""Check if a CPU flag is set.
Flags in EFLAGS register:
- CF (Carry): bit 0
- PF (Parity): bit 2
- AF (Aux Carry): bit 4
- ZF (Zero): bit 6
- SF (Sign): bit 7
- TF (Trap): bit 8
- IF (Interrupt): bit 9
- DF (Direction): bit 10
- OF (Overflow): bit 11
"""
flag_bits = {
'cf': 0, 'carry': 0,
'pf': 2, 'parity': 2,
'af': 4, 'aux': 4,
'zf': 6, 'zero': 6,
'sf': 7, 'sign': 7,
'tf': 8, 'trap': 8,
'if': 9, 'interrupt': 9,
'df': 10, 'direction': 10,
'of': 11, 'overflow': 11,
}
bit = flag_bits.get(flag.lower())
if bit is None:
raise ValueError(f"Unknown flag: {flag}")
return bool(self.eflags & (1 << bit))
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# 32-bit registers
'eax': f'{self.eax:08x}',
'ecx': f'{self.ecx:08x}',
'edx': f'{self.edx:08x}',
'ebx': f'{self.ebx:08x}',
'esp': f'{self.esp:08x}',
'ebp': f'{self.ebp:08x}',
'esi': f'{self.esi:08x}',
'edi': f'{self.edi:08x}',
'eip': f'{self.eip:08x}',
'eflags': f'{self.eflags:08x}',
# 16-bit aliases
'ax': f'{self.ax:04x}',
'cx': f'{self.cx:04x}',
'dx': f'{self.dx:04x}',
'bx': f'{self.bx:04x}',
'sp': f'{self.sp:04x}',
'bp': f'{self.bp:04x}',
'si': f'{self.si:04x}',
'di': f'{self.di:04x}',
'ip': f'{self.ip:04x}',
# Segment registers
'cs': f'{self.cs:04x}',
'ss': f'{self.ss:04x}',
'ds': f'{self.ds:04x}',
'es': f'{self.es:04x}',
'fs': f'{self.fs:04x}',
'gs': f'{self.gs:04x}',
# Computed addresses
'cs:ip': f'{self.cs:04x}:{self.ip:04x}',
'ss:sp': f'{self.ss:04x}:{self.sp:04x}',
# Flags
'flags': {
'carry': self.flag_set('cf'),
'parity': self.flag_set('pf'),
'aux': self.flag_set('af'),
'zero': self.flag_set('zf'),
'sign': self.flag_set('sf'),
'trap': self.flag_set('tf'),
'interrupt': self.flag_set('if'),
'direction': self.flag_set('df'),
'overflow': self.flag_set('of'),
},
}
@dataclass
class Breakpoint:
"""A debugger breakpoint."""
id: int
address: int
enabled: bool = True
hit_count: int = 0
# Original format provided by user
original: str = ""
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'id': self.id,
'address': f'{self.address:05x}',
'enabled': self.enabled,
'hit_count': self.hit_count,
'original': self.original,
}
@dataclass
class StopEvent:
"""Event describing why execution stopped."""
reason: StopReason
address: int = 0
signal: int | None = None
breakpoint_id: int | None = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'reason': self.reason.name.lower(),
'address': f'{self.address:05x}',
'signal': self.signal,
'breakpoint_id': self.breakpoint_id,
}
@dataclass
class MemoryRegion:
"""A region of memory read from the target."""
address: int
data: bytes
def to_hex(self) -> str:
"""Return data as hex string."""
return self.data.hex()
def to_ascii(self) -> str:
"""Return data as ASCII with non-printables as dots."""
return ''.join(chr(b) if 32 <= b < 127 else '.' for b in self.data)
def to_dict(self, format: Literal["hex", "ascii", "both"] = "both") -> dict:
"""Convert to dictionary for JSON serialization."""
result = {
'address': f'{self.address:05x}',
'length': len(self.data),
}
if format in ("hex", "both"):
result['hex'] = self.to_hex()
if format in ("ascii", "both"):
result['ascii'] = self.to_ascii()
return result
@dataclass
class DisassemblyLine:
"""A single disassembled instruction."""
address: int
bytes_hex: str
mnemonic: str
operands: str = ""
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'address': f'{self.address:05x}',
'bytes': self.bytes_hex,
'instruction': f'{self.mnemonic} {self.operands}'.strip(),
}
@dataclass
class DOSBoxStatus:
"""Status of the DOSBox-X instance."""
running: bool = False
connected: bool = False
host: str = ""
port: int = 0
pid: int | None = None
breakpoints: list[Breakpoint] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'running': self.running,
'connected': self.connected,
'host': self.host,
'port': self.port,
'pid': self.pid,
'breakpoint_count': len(self.breakpoints),
}

282
src/dosbox_mcp/utils.py Normal file
View File

@ -0,0 +1,282 @@
"""Utility functions for DOSBox-X MCP Server."""
import re
def parse_address(addr: str) -> int:
"""Parse a DOS address in various formats.
Supports:
- Segment:offset format: "1234:5678" -> physical address
- Flat hex: "0x12345" or "12345h" or "12345"
- Decimal: "#12345"
In real mode, physical address = (segment << 4) + offset
This gives a 20-bit address space (1MB).
Args:
addr: Address string in any supported format
Returns:
Integer physical address
Examples:
>>> parse_address("1000:0100") # segment:offset
65792 # 0x10100
>>> parse_address("0x10100") # flat hex
65792
>>> parse_address("F000:FFF0") # BIOS reset vector
1048560 # 0xFFFF0
"""
addr = addr.strip().lower()
# Segment:offset format (e.g., "1234:5678")
if ':' in addr:
parts = addr.split(':')
if len(parts) != 2:
raise ValueError(f"Invalid segment:offset format: {addr}")
segment = int(parts[0], 16)
offset = int(parts[1], 16)
return (segment << 4) + offset
# Decimal format (e.g., "#12345")
if addr.startswith('#'):
return int(addr[1:], 10)
# Hex with suffix (e.g., "12345h")
if addr.endswith('h'):
return int(addr[:-1], 16)
# Hex with prefix (e.g., "0x12345")
if addr.startswith('0x'):
return int(addr, 16)
# Assume hex
return int(addr, 16)
def format_address(addr: int, style: str = "flat") -> str:
"""Format a physical address in the specified style.
Args:
addr: Physical address (20-bit)
style: "flat" (default), "segoff", or "both"
Returns:
Formatted address string
Examples:
>>> format_address(0x10100, "flat")
'10100'
>>> format_address(0x10100, "segoff")
'1010:0000'
>>> format_address(0x10100, "both")
'10100 (1010:0000)'
"""
if style == "segoff":
# Convert to segment:offset (canonical form with offset < 16)
segment = addr >> 4
offset = addr & 0x0F
return f'{segment:04x}:{offset:04x}'
elif style == "both":
segment = addr >> 4
offset = addr & 0x0F
return f'{addr:05x} ({segment:04x}:{offset:04x})'
else: # flat
return f'{addr:05x}'
def calculate_checksum(data: str) -> str:
"""Calculate GDB packet checksum.
The checksum is the sum of all characters modulo 256,
returned as two hex digits.
Args:
data: Packet data (without $ prefix or # suffix)
Returns:
Two-character hex checksum
"""
total = sum(ord(c) for c in data)
return f'{total & 0xFF:02x}'
def encode_hex(data: bytes) -> str:
"""Encode bytes to hex string for GDB protocol."""
return data.hex()
def decode_hex(hex_str: str) -> bytes:
"""Decode hex string to bytes from GDB protocol."""
return bytes.fromhex(hex_str)
def escape_binary(data: bytes) -> bytes:
"""Escape binary data for GDB protocol.
GDB uses escape character 0x7d ('}') followed by XOR'd byte.
Characters that must be escaped: $, #, }, *
"""
result = bytearray()
escape_chars = {0x24, 0x23, 0x7d, 0x2a} # $, #, }, *
for b in data:
if b in escape_chars:
result.append(0x7d)
result.append(b ^ 0x20)
else:
result.append(b)
return bytes(result)
def unescape_binary(data: bytes) -> bytes:
"""Unescape binary data from GDB protocol."""
result = bytearray()
i = 0
while i < len(data):
if data[i] == 0x7d and i + 1 < len(data):
result.append(data[i + 1] ^ 0x20)
i += 2
else:
result.append(data[i])
i += 1
return bytes(result)
def parse_stop_reply(response: str) -> tuple[str, dict]:
"""Parse a GDB stop reply packet.
Stop replies start with S (signal), T (signal with info), or W (exit).
Returns:
Tuple of (stop_type, info_dict)
Examples:
"S05" -> ("signal", {"signal": 5})
"T05thread:01;" -> ("signal", {"signal": 5, "thread": "01"})
"W00" -> ("exit", {"code": 0})
"""
if not response:
return ("unknown", {})
if response.startswith('S'):
signal = int(response[1:3], 16)
return ("signal", {"signal": signal})
if response.startswith('T'):
signal = int(response[1:3], 16)
info = {"signal": signal}
# Parse additional key:value pairs
pairs = response[3:].rstrip(';').split(';')
for pair in pairs:
if ':' in pair:
key, value = pair.split(':', 1)
info[key] = value
return ("signal", info)
if response.startswith('W'):
code = int(response[1:3], 16)
return ("exit", {"code": code})
if response.startswith('X'):
signal = int(response[1:3], 16)
return ("terminated", {"signal": signal})
return ("unknown", {"raw": response})
def hexdump(data: bytes, address: int = 0, width: int = 16) -> str:
"""Format data as a hex dump.
Args:
data: Bytes to dump
address: Starting address for display
width: Bytes per line
Returns:
Formatted hex dump string
"""
lines = []
for i in range(0, len(data), width):
chunk = data[i:i + width]
hex_part = ' '.join(f'{b:02x}' for b in chunk)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
# Pad hex part for alignment
hex_part = hex_part.ljust(width * 3 - 1)
lines.append(f'{address + i:05x} {hex_part} |{ascii_part}|')
return '\n'.join(lines)
def parse_registers_x86(hex_data: str) -> dict[str, int]:
"""Parse GDB register dump for x86 (32-bit).
GDB returns registers in a specific order as concatenated hex values.
For i386, the order is: EAX, ECX, EDX, EBX, ESP, EBP, ESI, EDI,
EIP, EFLAGS, CS, SS, DS, ES, FS, GS.
Each 32-bit register is 8 hex characters (little-endian).
Segment registers are 4 hex characters.
"""
# Remove any whitespace
hex_data = hex_data.replace(' ', '').replace('\n', '')
def read_le32(offset: int) -> int:
"""Read 32-bit little-endian value from hex string."""
chunk = hex_data[offset:offset + 8]
if len(chunk) < 8:
return 0
# GDB sends in target byte order (little-endian for x86)
return int.from_bytes(bytes.fromhex(chunk), 'little')
def read_le16(offset: int) -> int:
"""Read 16-bit little-endian value from hex string."""
chunk = hex_data[offset:offset + 4]
if len(chunk) < 4:
return 0
return int.from_bytes(bytes.fromhex(chunk), 'little')
# Parse in GDB order
regs = {}
pos = 0
# General purpose registers (32-bit each = 8 hex chars)
for name in ['eax', 'ecx', 'edx', 'ebx', 'esp', 'ebp', 'esi', 'edi']:
regs[name] = read_le32(pos)
pos += 8
# EIP and EFLAGS
regs['eip'] = read_le32(pos)
pos += 8
regs['eflags'] = read_le32(pos)
pos += 8
# Segment registers (32-bit in GDB response, but only 16-bit meaningful)
for name in ['cs', 'ss', 'ds', 'es', 'fs', 'gs']:
regs[name] = read_le32(pos) & 0xFFFF
pos += 8
return regs
# Signal numbers (Unix signals used by GDB protocol)
SIGNALS = {
0: "SIGNONE",
1: "SIGHUP",
2: "SIGINT",
3: "SIGQUIT",
4: "SIGILL",
5: "SIGTRAP", # Breakpoint
6: "SIGABRT",
7: "SIGBUS",
8: "SIGFPE",
9: "SIGKILL",
10: "SIGUSR1",
11: "SIGSEGV",
}
def signal_name(num: int) -> str:
"""Get the name of a signal number."""
return SIGNALS.get(num, f"SIG{num}")

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Tests for DOSBox-X MCP Server."""

191
tests/test_types.py Normal file
View File

@ -0,0 +1,191 @@
"""Tests for type definitions."""
import pytest
from dosbox_mcp.types import (
Breakpoint,
DisassemblyLine,
DOSBoxStatus,
MemoryRegion,
Registers,
StopEvent,
StopReason,
)
class TestRegisters:
"""Tests for Registers type."""
def test_16bit_aliases(self):
"""Test 16-bit register aliases."""
regs = Registers(eax=0x12345678, ebx=0xAABBCCDD)
assert regs.ax == 0x5678
assert regs.bx == 0xCCDD
def test_physical_address_calculation(self):
"""Test segment:offset to physical address."""
regs = Registers(cs=0x1000, eip=0x0100, ss=0x2000, esp=0x0200)
assert regs.cs_ip == 0x10100 # (0x1000 << 4) + 0x100
assert regs.ss_sp == 0x20200 # (0x2000 << 4) + 0x200
def test_flag_checking(self):
"""Test CPU flag checking."""
# EFLAGS with zero flag (bit 6) and carry flag (bit 0) set
regs = Registers(eflags=0x41) # bits 0 and 6
assert regs.flag_set('cf') is True
assert regs.flag_set('carry') is True
assert regs.flag_set('zf') is True
assert regs.flag_set('zero') is True
assert regs.flag_set('sf') is False
assert regs.flag_set('sign') is False
def test_flag_unknown(self):
"""Test unknown flag name."""
regs = Registers()
with pytest.raises(ValueError):
regs.flag_set('unknown_flag')
def test_to_dict(self):
"""Test dictionary serialization."""
regs = Registers(eax=0x1234, cs=0x100, eip=0x200, eflags=0x41)
d = regs.to_dict()
assert d['eax'] == '00001234'
assert d['ax'] == '1234'
assert d['cs'] == '0100'
assert d['cs:ip'] == '0100:0200'
assert d['flags']['carry'] is True
assert d['flags']['zero'] is True
class TestBreakpoint:
"""Tests for Breakpoint type."""
def test_creation(self):
"""Test breakpoint creation."""
bp = Breakpoint(id=1, address=0x10100, original="1000:0100")
assert bp.id == 1
assert bp.address == 0x10100
assert bp.enabled is True
assert bp.hit_count == 0
def test_to_dict(self):
"""Test dictionary serialization."""
bp = Breakpoint(id=1, address=0x10100, hit_count=5, original="1000:0100")
d = bp.to_dict()
assert d['id'] == 1
assert d['address'] == '10100'
assert d['hit_count'] == 5
class TestStopEvent:
"""Tests for StopEvent type."""
def test_breakpoint_event(self):
"""Test breakpoint stop event."""
event = StopEvent(
reason=StopReason.BREAKPOINT,
address=0x10100,
signal=5,
breakpoint_id=1
)
d = event.to_dict()
assert d['reason'] == 'breakpoint'
assert d['address'] == '10100'
assert d['breakpoint_id'] == 1
def test_step_event(self):
"""Test step stop event."""
event = StopEvent(reason=StopReason.STEP, address=0x10100)
d = event.to_dict()
assert d['reason'] == 'step'
class TestMemoryRegion:
"""Tests for MemoryRegion type."""
def test_hex_format(self):
"""Test hex format output."""
mem = MemoryRegion(address=0x100, data=b"\x90\x90\xcc")
assert mem.to_hex() == "9090cc"
def test_ascii_format(self):
"""Test ASCII format output."""
mem = MemoryRegion(address=0x100, data=b"Hello\x00World")
assert mem.to_ascii() == "Hello.World"
def test_to_dict(self):
"""Test dictionary serialization."""
mem = MemoryRegion(address=0x100, data=b"AB")
d = mem.to_dict(format="both")
assert d['address'] == '00100'
assert d['length'] == 2
assert d['hex'] == '4142'
assert d['ascii'] == 'AB'
class TestDisassemblyLine:
"""Tests for DisassemblyLine type."""
def test_to_dict(self):
"""Test dictionary serialization."""
line = DisassemblyLine(
address=0x10100,
bytes_hex="90",
mnemonic="NOP",
operands=""
)
d = line.to_dict()
assert d['address'] == '10100'
assert d['bytes'] == '90'
assert d['instruction'] == 'NOP'
def test_instruction_with_operands(self):
"""Test instruction with operands."""
line = DisassemblyLine(
address=0x10100,
bytes_hex="b80100",
mnemonic="MOV",
operands="AX, 0001"
)
d = line.to_dict()
assert d['instruction'] == 'MOV AX, 0001'
class TestDOSBoxStatus:
"""Tests for DOSBoxStatus type."""
def test_default_status(self):
"""Test default status values."""
status = DOSBoxStatus()
assert status.running is False
assert status.connected is False
assert status.breakpoints == []
def test_to_dict(self):
"""Test dictionary serialization."""
bp = Breakpoint(id=1, address=0x100)
status = DOSBoxStatus(
running=True,
connected=True,
host="localhost",
port=1234,
pid=12345,
breakpoints=[bp]
)
d = status.to_dict()
assert d['running'] is True
assert d['connected'] is True
assert d['host'] == "localhost"
assert d['port'] == 1234
assert d['pid'] == 12345
assert d['breakpoint_count'] == 1

286
tests/test_utils.py Normal file
View File

@ -0,0 +1,286 @@
"""Tests for utility functions."""
import pytest
from dosbox_mcp.utils import (
calculate_checksum,
decode_hex,
encode_hex,
escape_binary,
format_address,
hexdump,
parse_address,
parse_registers_x86,
parse_stop_reply,
signal_name,
unescape_binary,
)
class TestParseAddress:
"""Tests for parse_address function."""
def test_segment_offset_format(self):
"""Test segment:offset address parsing."""
# Standard segment:offset
assert parse_address("1000:0100") == 0x10100
assert parse_address("F000:FFF0") == 0xFFFF0 # BIOS reset vector
assert parse_address("0000:0000") == 0x00000
def test_segment_offset_lowercase(self):
"""Test lowercase segment:offset."""
assert parse_address("a000:0000") == 0xA0000 # Video memory
def test_flat_hex_with_prefix(self):
"""Test 0x prefixed addresses."""
assert parse_address("0x12345") == 0x12345
assert parse_address("0xFFFF0") == 0xFFFF0
def test_flat_hex_with_suffix(self):
"""Test h-suffixed addresses."""
assert parse_address("12345h") == 0x12345
assert parse_address("FFFF0h") == 0xFFFF0
def test_plain_hex(self):
"""Test plain hex (assumed)."""
assert parse_address("12345") == 0x12345
assert parse_address("100") == 0x100
def test_decimal_format(self):
"""Test decimal addresses with # prefix."""
assert parse_address("#65536") == 65536
assert parse_address("#1048576") == 1048576 # 1MB
def test_whitespace_handling(self):
"""Test that whitespace is stripped."""
assert parse_address(" 1000:0100 ") == 0x10100
def test_invalid_segment_offset(self):
"""Test invalid segment:offset format."""
with pytest.raises(ValueError):
parse_address("1000:2000:3000")
class TestFormatAddress:
"""Tests for format_address function."""
def test_flat_format(self):
"""Test flat hex format."""
assert format_address(0x10100, "flat") == "10100"
assert format_address(0x00100, "flat") == "00100"
def test_segoff_format(self):
"""Test segment:offset format."""
# Note: This uses canonical form with minimal offset
result = format_address(0x10100, "segoff")
assert ":" in result
def test_both_format(self):
"""Test combined format."""
result = format_address(0x10100, "both")
assert "10100" in result
assert ":" in result
class TestChecksum:
"""Tests for GDB checksum calculation."""
def test_simple_checksum(self):
"""Test checksum of simple strings."""
# 'g' = 0x67 = 103
assert calculate_checksum("g") == "67"
def test_command_checksum(self):
"""Test checksum of actual GDB commands."""
# "?" = 0x3F = 63
assert calculate_checksum("?") == "3f"
# "c" = 0x63 = 99
assert calculate_checksum("c") == "63"
# "s" = 0x73 = 115
assert calculate_checksum("s") == "73"
def test_checksum_wrapping(self):
"""Test checksum modulo 256."""
# Create string that wraps
long_str = "A" * 300 # 65 * 300 = 19500, mod 256 = 60 = 0x3c
result = calculate_checksum(long_str)
expected = (65 * 300) % 256
assert result == f"{expected:02x}"
class TestHexEncoding:
"""Tests for hex encoding/decoding."""
def test_encode_hex(self):
"""Test bytes to hex encoding."""
assert encode_hex(b"\x00\x01\x02") == "000102"
assert encode_hex(b"ABC") == "414243"
def test_decode_hex(self):
"""Test hex to bytes decoding."""
assert decode_hex("000102") == b"\x00\x01\x02"
assert decode_hex("414243") == b"ABC"
def test_roundtrip(self):
"""Test encode/decode roundtrip."""
original = b"\x90\x90\xcc\xcd\x21" # NOP NOP INT3 INT 21
assert decode_hex(encode_hex(original)) == original
class TestBinaryEscaping:
"""Tests for GDB binary escaping."""
def test_escape_special_chars(self):
"""Test that special characters are escaped."""
# $ (0x24), # (0x23), } (0x7d), * (0x2a)
data = bytes([0x24, 0x23, 0x7d, 0x2a])
escaped = escape_binary(data)
# Each byte should become 0x7d followed by XOR with 0x20
assert escaped == bytes([
0x7d, 0x24 ^ 0x20, # $
0x7d, 0x23 ^ 0x20, # #
0x7d, 0x7d ^ 0x20, # }
0x7d, 0x2a ^ 0x20, # *
])
def test_escape_normal_chars(self):
"""Test that normal characters are not escaped."""
data = b"ABC123"
assert escape_binary(data) == data
def test_unescape(self):
"""Test unescaping."""
escaped = bytes([0x7d, 0x04]) # Escaped 0x24 ($)
assert unescape_binary(escaped) == bytes([0x24])
def test_escape_unescape_roundtrip(self):
"""Test escape/unescape roundtrip."""
original = bytes([0x24, 0x23, 0x7d, 0x2a, 0x41, 0x42])
assert unescape_binary(escape_binary(original)) == original
class TestParseStopReply:
"""Tests for parsing GDB stop replies."""
def test_signal_reply(self):
"""Test simple signal reply."""
stop_type, info = parse_stop_reply("S05")
assert stop_type == "signal"
assert info["signal"] == 5 # SIGTRAP
def test_signal_with_info(self):
"""Test signal reply with additional info."""
stop_type, info = parse_stop_reply("T05thread:01;")
assert stop_type == "signal"
assert info["signal"] == 5
assert info["thread"] == "01"
def test_exit_reply(self):
"""Test exit reply."""
stop_type, info = parse_stop_reply("W00")
assert stop_type == "exit"
assert info["code"] == 0
def test_terminated_reply(self):
"""Test terminated by signal."""
stop_type, info = parse_stop_reply("X09")
assert stop_type == "terminated"
assert info["signal"] == 9 # SIGKILL
def test_empty_reply(self):
"""Test empty reply."""
stop_type, info = parse_stop_reply("")
assert stop_type == "unknown"
def test_unknown_reply(self):
"""Test unknown reply format."""
stop_type, info = parse_stop_reply("QQQ")
assert stop_type == "unknown"
assert "raw" in info
class TestParseRegisters:
"""Tests for parsing x86 register dump."""
def test_parse_registers(self):
"""Test parsing register hex dump."""
# Create a mock register dump
# EAX=12345678, ECX=0, EDX=0, EBX=0, ESP=0, EBP=0, ESI=0, EDI=0
# EIP=00001000, EFLAGS=00000202
# CS=0100, SS=0200, DS=0300, ES=0400, FS=0, GS=0
# Little-endian hex for each register
hex_data = (
"78563412" # EAX = 0x12345678
"00000000" # ECX = 0
"00000000" # EDX = 0
"00000000" # EBX = 0
"00100000" # ESP = 0x1000
"00000000" # EBP = 0
"00000000" # ESI = 0
"00000000" # EDI = 0
"00100000" # EIP = 0x1000
"02020000" # EFLAGS = 0x202
"00010000" # CS = 0x100
"00020000" # SS = 0x200
"00030000" # DS = 0x300
"00040000" # ES = 0x400
"00000000" # FS = 0
"00000000" # GS = 0
)
regs = parse_registers_x86(hex_data)
assert regs["eax"] == 0x12345678
assert regs["ecx"] == 0
assert regs["esp"] == 0x1000
assert regs["eip"] == 0x1000
assert regs["eflags"] == 0x202
assert regs["cs"] == 0x100
assert regs["ds"] == 0x300
class TestSignalNames:
"""Tests for signal name lookup."""
def test_known_signals(self):
"""Test known signal names."""
assert signal_name(5) == "SIGTRAP"
assert signal_name(11) == "SIGSEGV"
assert signal_name(9) == "SIGKILL"
def test_unknown_signal(self):
"""Test unknown signal."""
assert signal_name(99) == "SIG99"
class TestHexdump:
"""Tests for hexdump formatting."""
def test_simple_hexdump(self):
"""Test basic hexdump output."""
data = b"Hello, World!"
dump = hexdump(data, address=0x100)
assert "00100" in dump
assert "48 65 6c 6c" in dump # "Hell"
assert "|Hello, World!|" in dump
def test_hexdump_with_unprintable(self):
"""Test hexdump with unprintable characters."""
data = b"\x00\x01\x02ABC\xff"
dump = hexdump(data, address=0)
assert "00 01 02" in dump
assert "|...ABC.|" in dump
def test_hexdump_multiline(self):
"""Test multiline hexdump."""
data = bytes(range(32))
dump = hexdump(data, width=16)
lines = dump.strip().split('\n')
assert len(lines) == 2

1823
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff