Add QMP control tools, clipboard support, and peripheral configuration

New tools and modules:
- control.py: pause, resume, reset, savestate, loadstate, memdump, query_status
- resources.py: Live screen capture, screenshot management resources
- clipboard_copy/clipboard_paste via DOSBox-X hotkeys (Ctrl+F5/F6)
- screen_text/screen_graphics for text buffer and VGA memory access

Configuration improvements:
- Dynamic QMP/GDB port handling (stored in session state from launch())
- Joystick config: joysticktype, timed options
- Parallel port config: parallel1, parallel2 settings
- Added -hostrun flag for host clipboard integration

Performance and UX:
- Reduced keyboard_send delay from 50ms to 10ms default
- Refactored server.py to use Tool.from_function() pattern
- Screenshot tool now returns MCP resource URIs only

MCP Resources:
- dosbox://screen - live screen capture (no tool call needed)
- dosbox://screenshots - list available screenshots
- dosbox://screenshots/{filename} - get specific screenshot
- dosbox://screenshots/latest - get most recent screenshot
This commit is contained in:
Ryan Malloy 2026-01-27 23:54:12 -07:00
parent 79c646cf87
commit 1d826254ba
12 changed files with 1633 additions and 81 deletions

View File

@ -25,30 +25,35 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libavformat-dev \ libavformat-dev \
libavutil-dev \ libavutil-dev \
libswscale-dev \ libswscale-dev \
libncurses-dev \
nasm \ nasm \
ca-certificates \ ca-certificates \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Clone DOSBox-X with remote debugging support (GDB server + QMP) # Clone DOSBox-X with remote debugging support (GDB server + QMP)
# This fork adds --enable-remotedebug which compiles in gdbserver.cpp and qmp.cpp # This fork adds --enable-remotedebug which compiles in gdbserver.cpp and qmp.cpp
# Using rsp2k fork with GDB breakpoint address fix (uses physical addresses correctly)
# See: https://github.com/joncampbell123/dosbox-x/issues/752 # See: https://github.com/joncampbell123/dosbox-x/issues/752
#
# IMPORTANT: Clone and build MUST be in the same RUN to prevent BuildKit from
# caching the build step separately from the git clone step.
ARG CACHE_BUST=2026-01-27-v19-add-ncurses-for-debug
WORKDIR /build WORKDIR /build
RUN git clone --branch remotedebug --depth 1 https://github.com/lokkju/dosbox-x-remotedebug.git dosbox-x
WORKDIR /build/dosbox-x
# Configure and build with GDB server support # Configure and build with GDB server support
# --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers # --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers
# --enable-debug: Enables internal debugger (Alt+Pause) # --enable-debug: Enables internal debugger (Alt+Pause)
RUN ./autogen.sh && \ RUN echo "Cache bust: ${CACHE_BUST}" && \
git clone --branch remotedebug --depth 1 https://github.com/rsp2k/dosbox-x-remotedebug.git dosbox-x && \
cd dosbox-x && \
./autogen.sh && \
./configure \ ./configure \
--prefix=/opt/dosbox-x \ --prefix=/opt/dosbox-x \
--enable-remotedebug \ --enable-remotedebug \
--enable-debug \ --enable-debug \
--enable-sdl2 \ --enable-sdl2 \
--disable-printer \ --disable-printer && \
&& make -j$(nproc) \ make -j$(nproc) && \
&& make install make install
# ============================================================================= # =============================================================================
# Stage 2: Runtime image # Stage 2: Runtime image
@ -74,6 +79,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libxkbcommon0 \ libxkbcommon0 \
libxrandr2 \ libxrandr2 \
libxi6 \ libxi6 \
netcat-openbsd \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Copy DOSBox-X from builder # Copy DOSBox-X from builder
@ -86,20 +92,31 @@ RUN ln -s /opt/dosbox-x/bin/dosbox-x /usr/local/bin/dosbox-x
RUN mkdir -p /config /dos RUN mkdir -p /config /dos
# Default configuration for DOSBox-X # Default configuration for DOSBox-X
# GDB server is enabled via --enable-remotedebug at compile time # Note: --enable-remotedebug compiles in the code, but these runtime settings enable the servers
RUN cat > /config/dosbox.conf << 'EOF' RUN cat > /config/dosbox.conf << 'EOF'
[log]
logfile = /tmp/dosbox.log
[sdl] [sdl]
fullscreen=false fullscreen=false
windowresolution=800x600 windowresolution=800x600
output=opengl output=opengl
[cpu] [cpu]
core=auto # CRITICAL: Must use "normal" core for GDB breakpoints to work!
# Dynamic cores (auto/dynamic/dynamic_x86) bypass DEBUG_Breakpoint()
core=normal
cputype=auto cputype=auto
cycles=auto cycles=auto
[dosbox] [dosbox]
memsize=16 memsize=16
# Enable GDB remote debug server
gdbserver = true
gdbserver port = 1234
# Enable QMP (QEMU Machine Protocol) server for control
qmpserver = true
qmpserver port = 4444
[serial] [serial]
serial1=disabled serial1=disabled

View File

@ -20,6 +20,7 @@ services:
ports: ports:
- "${GDB_PORT:-1234}:1234" # GDB server (standard GDB remote protocol) - "${GDB_PORT:-1234}:1234" # GDB server (standard GDB remote protocol)
- "${QMP_PORT:-4444}:4444" # QMP server (QEMU Machine Protocol) - "${QMP_PORT:-4444}:4444" # QMP server (QEMU Machine Protocol)
- "5555:5555" # Serial nullmodem for RIPscrip input
# X11 forwarding for display # X11 forwarding for display
environment: environment:

View File

@ -27,28 +27,54 @@ class DOSBoxConfig:
gdb_port: int = 1234 gdb_port: int = 1234
gdb_enabled: bool = True gdb_enabled: bool = True
# QMP settings (for screenshots, keyboard, mouse control)
qmp_port: int = 4444
qmp_enabled: bool = True
# Display settings # Display settings
fullscreen: bool = False fullscreen: bool = False
windowresolution: str = "800x600" windowresolution: str = "800x600"
# CPU settings # CPU settings
core: str = "auto" # auto, dynamic, normal, simple # CRITICAL: Must use "normal" core for GDB breakpoints to work!
# Dynamic cores (auto/dynamic/dynamic_x86) bypass DEBUG_Breakpoint()
core: str = "normal"
cputype: str = "auto" cputype: str = "auto"
cycles: str = "auto" cycles: str = "auto"
# Memory # Memory
memsize: int = 16 # MB of conventional memory memsize: int = 16 # MB of conventional memory
# Startup
startbanner: bool = False # Disable splash screen for cleaner automation
# Serial ports (for future RIPscrip work) # Serial ports (for future RIPscrip work)
serial1: str = "disabled" serial1: str = "disabled"
serial2: str = "disabled" serial2: str = "disabled"
# Parallel ports (LPT)
# Options: disabled, file, printer, reallpt, disney
# file: writes output to capture directory (useful for capturing print jobs)
# printer: virtual dot-matrix printer
parallel1: str = "disabled"
parallel2: str = "disabled"
# Joystick settings
# Options: auto, none, 2axis, 4axis, 4axis_2, fcs, ch
joysticktype: str = "auto"
joystick_timed: bool = True # Enable timed intervals for axis
# Mount points # Mount points
mounts: dict[str, str] = field(default_factory=dict) mounts: dict[str, str] = field(default_factory=dict)
# Autoexec commands # Autoexec commands (DOSBox-X internal [autoexec] section)
autoexec: list[str] = field(default_factory=list) autoexec: list[str] = field(default_factory=list)
# DOS startup files (written to mounted drive)
# These are actual DOS files, not DOSBox config sections
autoexec_bat: str | None = None # Content for AUTOEXEC.BAT
config_sys: str | None = None # Content for CONFIG.SYS
def to_conf(self) -> str: def to_conf(self) -> str:
"""Generate DOSBox-X configuration file content.""" """Generate DOSBox-X configuration file content."""
lines = [ lines = [
@ -63,21 +89,39 @@ class DOSBoxConfig:
"", "",
"[dosbox]", "[dosbox]",
f"memsize={self.memsize}", f"memsize={self.memsize}",
f"startbanner={str(self.startbanner).lower()}",
]
# GDB stub configuration (lokkju/dosbox-x-remotedebug fork)
# Must be in [dosbox] section with "gdbserver port=" format
if self.gdb_enabled:
lines.extend([
"gdbserver=true",
f"gdbserver port={self.gdb_port}",
])
# QMP server configuration (for screenshots, keyboard, mouse)
if self.qmp_enabled:
lines.extend([
"qmpserver=true",
f"qmpserver port={self.qmp_port}",
])
lines.extend([
"", "",
"[serial]", "[serial]",
f"serial1={self.serial1}", f"serial1={self.serial1}",
f"serial2={self.serial2}", f"serial2={self.serial2}",
"", "",
] "[parallel]",
f"parallel1={self.parallel1}",
# GDB stub configuration (DOSBox-X specific) f"parallel2={self.parallel2}",
if self.gdb_enabled: "",
lines.extend([ "[joystick]",
"[debugger]", f"joysticktype={self.joysticktype}",
f"gdbserver=true", f"timed={str(self.joystick_timed).lower()}",
f"gdbport={self.gdb_port}", "",
"", ])
])
# Autoexec section # Autoexec section
lines.append("[autoexec]") lines.append("[autoexec]")
@ -103,7 +147,62 @@ class DOSBoxManager:
self._container_id: str | None = None self._container_id: str | None = None
self._config_path: Path | None = None self._config_path: Path | None = None
self._temp_dir: Path | None = None self._temp_dir: Path | None = None
self._dos_files_written: list[Path] = [] # Track files we created
self._gdb_port: int = 1234 self._gdb_port: int = 1234
self._qmp_port: int = 4444
self._xhost_granted: bool = False
def _setup_x11_access(self) -> bool:
"""Grant X11 access for Docker containers.
Runs 'xhost +local:docker' to allow Docker containers to connect
to the X11 display. This is required for headed mode operation.
Returns:
True if xhost succeeded or was already granted, False on failure.
"""
if self._xhost_granted:
return True
# Try xhost +local:docker (most common approach)
try:
result = subprocess.run(
["xhost", "+local:docker"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
logger.info("X11 access granted: xhost +local:docker")
self._xhost_granted = True
return True
else:
logger.warning(f"xhost +local:docker failed: {result.stderr.strip()}")
except FileNotFoundError:
logger.warning("xhost not found - X11 forwarding may not work")
except subprocess.SubprocessError as e:
logger.warning(f"xhost command failed: {e}")
# Fallback: try xhost +SI:localuser:root for rootless Docker
try:
result = subprocess.run(
["xhost", "+SI:localuser:root"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
logger.info("X11 access granted: xhost +SI:localuser:root")
self._xhost_granted = True
return True
except (FileNotFoundError, subprocess.SubprocessError):
pass
logger.warning(
"Could not grant X11 access. Display may not work. "
"Try running: xhost +local:docker"
)
return False
@property @property
def running(self) -> bool: def running(self) -> bool:
@ -119,6 +218,11 @@ class DOSBoxManager:
"""Get the GDB port.""" """Get the GDB port."""
return self._gdb_port return self._gdb_port
@property
def qmp_port(self) -> int:
"""Get the QMP port."""
return self._qmp_port
@property @property
def pid(self) -> int | None: def pid(self) -> int | None:
"""Get process ID if running natively.""" """Get process ID if running natively."""
@ -184,6 +288,7 @@ class DOSBoxManager:
self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-"))
config = config or DOSBoxConfig() config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port self._gdb_port = config.gdb_port
self._qmp_port = config.qmp_port
# If binary specified, set up mount and autoexec # If binary specified, set up mount and autoexec
if binary_path: if binary_path:
@ -201,7 +306,8 @@ class DOSBoxManager:
self._config_path.write_text(config.to_conf()) self._config_path.write_text(config.to_conf())
# Build command line # Build command line
cmd = [dosbox_exe, "-conf", str(self._config_path)] # -hostrun enables clipboard sharing with host
cmd = [dosbox_exe, "-conf", str(self._config_path), "-hostrun"]
if extra_args: if extra_args:
cmd.extend(extra_args) cmd.extend(extra_args)
@ -248,10 +354,24 @@ class DOSBoxManager:
except (subprocess.SubprocessError, FileNotFoundError) as e: except (subprocess.SubprocessError, FileNotFoundError) as e:
raise RuntimeError("Docker not available") from e raise RuntimeError("Docker not available") from e
# Grant X11 access for headed mode
self._setup_x11_access()
# Create temporary directory # Create temporary directory
self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-"))
config = config or DOSBoxConfig() config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port self._gdb_port = config.gdb_port
self._qmp_port = config.qmp_port
# If binary specified, set up mount and autoexec for container paths
if binary_path:
binary = Path(binary_path).resolve()
if not binary.exists():
raise FileNotFoundError(f"Binary not found: {binary_path}")
# In container, files are mounted at /dos
config.mounts["C"] = "/dos"
config.autoexec.append("C:")
config.autoexec.append(binary.name)
# Write config file # Write config file
self._config_path = self._temp_dir / "dosbox.conf" self._config_path = self._temp_dir / "dosbox.conf"
@ -265,8 +385,9 @@ class DOSBoxManager:
"--rm", "--rm",
"-d", # Detached "-d", # Detached
"--name", f"dosbox-mcp-{os.getpid()}", "--name", f"dosbox-mcp-{os.getpid()}",
# Network # Network - expose GDB and QMP ports
"-p", f"{self._gdb_port}:{self._gdb_port}", "-p", f"{self._gdb_port}:{self._gdb_port}",
"-p", f"{self._qmp_port}:{self._qmp_port}",
# X11 forwarding # X11 forwarding
"-e", f"DISPLAY={display}", "-e", f"DISPLAY={display}",
"-v", "/tmp/.X11-unix:/tmp/.X11-unix", "-v", "/tmp/.X11-unix:/tmp/.X11-unix",
@ -274,12 +395,11 @@ class DOSBoxManager:
"-v", f"{self._config_path}:/config/dosbox.conf:ro", "-v", f"{self._config_path}:/config/dosbox.conf:ro",
] ]
# Mount binary directory if specified # Mount binary directory if specified (rw for screenshots/capture to work)
if binary_path: if binary_path:
binary = Path(binary_path).resolve() binary = Path(binary_path).resolve()
if not binary.exists(): # Mount as read-write so DOSBox can save screenshots to capture/
raise FileNotFoundError(f"Binary not found: {binary_path}") cmd.extend(["-v", f"{binary.parent}:/dos:rw"])
cmd.extend(["-v", f"{binary.parent}:/dos:ro"])
cmd.append(image) cmd.append(image)

View File

@ -16,8 +16,7 @@ GDB stub, typically running on localhost:1234.
import logging import logging
import socket import socket
import time from collections.abc import Callable
from typing import Callable
from .types import Breakpoint, MemoryRegion, Registers, StopEvent, StopReason from .types import Breakpoint, MemoryRegion, Registers, StopEvent, StopReason
from .utils import ( from .utils import (
@ -26,7 +25,6 @@ from .utils import (
encode_hex, encode_hex,
parse_registers_x86, parse_registers_x86,
parse_stop_reply, parse_stop_reply,
signal_name,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -115,7 +113,7 @@ class GDBClient:
initial = self._socket.recv(1024) initial = self._socket.recv(1024)
if initial: if initial:
logger.debug(f"Initial data from stub: {initial!r}") logger.debug(f"Initial data from stub: {initial!r}")
except socket.timeout: except TimeoutError:
pass pass
finally: finally:
self._socket.settimeout(self.timeout) self._socket.settimeout(self.timeout)
@ -244,7 +242,7 @@ class GDBClient:
logger.debug(f"Received: ${packet_data}#{checksum}") logger.debug(f"Received: ${packet_data}#{checksum}")
return packet_data return packet_data
except socket.timeout: except TimeoutError:
raise GDBError("Receive timeout") from None raise GDBError("Receive timeout") from None
except OSError as e: except OSError as e:
self._connected = False self._connected = False

162
src/dosbox_mcp/resources.py Normal file
View File

@ -0,0 +1,162 @@
"""MCP Resources for DOSBox-X screenshots and captures.
Provides resource URIs for screenshots that can be fetched by MCP clients.
"""
import base64
import glob
import os
from datetime import datetime
from typing import Any
# Track screenshots taken during this session
_screenshot_registry: dict[str, dict[str, Any]] = {}
# Default capture directory (can be overridden)
CAPTURE_DIR = os.environ.get(
"DOS_DIR",
os.path.expanduser("~/claude/dosbox-mcp/dos")
) + "/capture"
def register_screenshot(filename: str, path: str, size: int, timestamp: str | None = None) -> str:
"""Register a screenshot and return its resource URI.
Args:
filename: The screenshot filename (e.g., "ripterm_001.png")
path: Full path to the screenshot file
size: File size in bytes
timestamp: Optional timestamp string
Returns:
Resource URI for the screenshot (e.g., "dosbox://screenshots/ripterm_001.png")
"""
if timestamp is None:
timestamp = datetime.now().isoformat()
_screenshot_registry[filename] = {
"path": path,
"size": size,
"timestamp": timestamp,
"format": "png",
}
return f"dosbox://screenshots/{filename}"
def get_screenshot_info(filename: str) -> dict[str, Any] | None:
"""Get info about a registered screenshot."""
return _screenshot_registry.get(filename)
def get_screenshot_data(filename: str) -> bytes | None:
"""Get the binary data for a screenshot.
Args:
filename: The screenshot filename
Returns:
PNG image data as bytes, or None if not found
"""
info = _screenshot_registry.get(filename)
if info and os.path.isfile(info["path"]):
with open(info["path"], "rb") as f:
return f.read()
# Try to find it in the capture directory
path = os.path.join(CAPTURE_DIR, filename)
if os.path.isfile(path):
with open(path, "rb") as f:
return f.read()
return None
def list_screenshots() -> list[dict[str, Any]]:
"""List all available screenshots.
Returns both registered screenshots and any found in the capture directory.
"""
screenshots = []
# Add registered screenshots
for filename, info in _screenshot_registry.items():
screenshots.append({
"uri": f"dosbox://screenshots/{filename}",
"filename": filename,
"size": info["size"],
"timestamp": info["timestamp"],
})
# Scan capture directory for additional files
if os.path.isdir(CAPTURE_DIR):
for path in glob.glob(os.path.join(CAPTURE_DIR, "*.png")):
filename = os.path.basename(path)
if filename not in _screenshot_registry:
stat = os.stat(path)
screenshots.append({
"uri": f"dosbox://screenshots/{filename}",
"filename": filename,
"size": stat.st_size,
"timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(),
})
# Sort by timestamp (newest first)
screenshots.sort(key=lambda x: x["timestamp"], reverse=True)
return screenshots
def get_latest_screenshot() -> dict[str, Any] | None:
"""Get the most recent screenshot."""
screenshots = list_screenshots()
return screenshots[0] if screenshots else None
def capture_screen_live() -> bytes | None:
"""Capture the current DOSBox-X screen and return PNG bytes.
This performs a live capture via QMP - no tool call needed.
Useful for the dosbox://screen resource.
Returns:
PNG image data as bytes, or None if capture failed
"""
from .tools.peripheral import _get_qmp_port, _qmp_command
result = _qmp_command("localhost", _get_qmp_port(), "screendump",
{"filename": "_live_capture.png"}, timeout=10.0)
if "error" in result:
return None
# Check if QMP returned base64 data directly
if "return" in result:
ret = result["return"]
# Some QMP implementations return base64 data directly
if "data" in ret:
return base64.b64decode(ret["data"])
# Otherwise try to read from the file it created
if "file" in ret:
filename = os.path.basename(ret["file"])
# Try capture directory
path = os.path.join(CAPTURE_DIR, filename)
if os.path.isfile(path):
with open(path, "rb") as f:
return f.read()
# Try relative path
possible_dirs = [
os.environ.get("DOS_DIR", ""),
os.path.expanduser("~/claude/dosbox-mcp/dos"),
"./dos",
]
for base in possible_dirs:
if not base:
continue
test_path = os.path.join(base, ret["file"])
if os.path.isfile(test_path):
with open(test_path, "rb") as f:
return f.read()
return None

View File

@ -11,8 +11,9 @@ import logging
from importlib.metadata import version from importlib.metadata import version
from fastmcp import FastMCP from fastmcp import FastMCP
from fastmcp.tools import Tool
from . import tools from . import resources, tools
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@ -59,30 +60,114 @@ Address formats supported:
# Register tools from modules # Register tools from modules
# ============================================================================= # =============================================================================
# Execution control # All tool functions to register
mcp.tool()(tools.launch) _TOOLS = [
mcp.tool()(tools.attach) # Execution control
mcp.tool()(tools.continue_execution) tools.launch,
mcp.tool()(tools.step) tools.attach,
mcp.tool()(tools.step_over) tools.continue_execution,
mcp.tool()(tools.quit) tools.step,
tools.step_over,
tools.quit,
# Breakpoints
tools.breakpoint_set,
tools.breakpoint_list,
tools.breakpoint_delete,
# Inspection
tools.registers,
tools.memory_read,
tools.memory_write,
tools.disassemble,
tools.stack,
tools.status,
tools.screen_text,
tools.screen_graphics,
# Peripheral
tools.screenshot,
tools.serial_send,
tools.keyboard_send,
tools.mouse_move,
tools.mouse_click,
tools.mouse_drag,
tools.clipboard_copy,
tools.clipboard_paste,
# Control (QMP-based)
tools.pause,
tools.resume,
tools.reset,
tools.savestate,
tools.loadstate,
tools.memdump,
tools.query_status,
]
# Breakpoints for func in _TOOLS:
mcp.tool()(tools.breakpoint_set) mcp.add_tool(Tool.from_function(func))
mcp.tool()(tools.breakpoint_list)
mcp.tool()(tools.breakpoint_delete)
# Inspection
mcp.tool()(tools.registers)
mcp.tool()(tools.memory_read)
mcp.tool()(tools.memory_write)
mcp.tool()(tools.disassemble)
mcp.tool()(tools.stack)
mcp.tool()(tools.status)
# Peripheral # =============================================================================
mcp.tool()(tools.screenshot) # Resources - Screen and Screenshots
mcp.tool()(tools.serial_send) # =============================================================================
@mcp.resource("dosbox://screen")
def get_screen_resource() -> bytes:
"""Capture and return the current DOSBox-X screen.
This is a live capture - no need to call screenshot() first.
Simply read this resource to get the current display as PNG.
Returns:
PNG image data of the current screen
"""
data = resources.capture_screen_live()
if data is None:
raise ValueError("Failed to capture screen - is DOSBox-X running with QMP enabled?")
return data
@mcp.resource("dosbox://screenshots")
def list_screenshots_resource() -> str:
"""List all available DOSBox-X screenshots.
Returns a JSON list of screenshot metadata including URIs.
"""
import json
screenshots = resources.list_screenshots()
return json.dumps(screenshots, indent=2)
@mcp.resource("dosbox://screenshots/{filename}")
def get_screenshot_resource(filename: str) -> bytes:
"""Get a specific screenshot by filename.
Args:
filename: Screenshot filename (e.g., "ripterm_001.png")
Returns:
PNG image data
"""
data = resources.get_screenshot_data(filename)
if data is None:
raise ValueError(f"Screenshot not found: {filename}")
return data
@mcp.resource("dosbox://screenshots/latest")
def get_latest_screenshot_resource() -> bytes:
"""Get the most recent screenshot.
Returns:
PNG image data of the latest screenshot
"""
latest = resources.get_latest_screenshot()
if latest is None:
raise ValueError("No screenshots available")
data = resources.get_screenshot_data(latest["filename"])
if data is None:
raise ValueError("Screenshot file not found")
return data
# ============================================================================= # =============================================================================

View File

@ -4,13 +4,33 @@ Tools are organized by function:
- execution: launch, attach, continue, step, quit - execution: launch, attach, continue, step, quit
- breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete - breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete
- inspection: registers, memory_read, memory_write, disassemble, stack, status - inspection: registers, memory_read, memory_write, disassemble, stack, status
- peripheral: screenshot, serial_send - peripheral: screenshot, serial_send, keyboard_send, mouse_*
- control: pause, resume, reset, savestate, loadstate, memdump, query_status
""" """
from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set
from .control import loadstate, memdump, pause, query_status, reset, resume, savestate
from .execution import attach, continue_execution, launch, quit, step, step_over from .execution import attach, continue_execution, launch, quit, step, step_over
from .inspection import disassemble, memory_read, memory_write, registers, stack, status from .inspection import (
from .peripheral import screenshot, serial_send disassemble,
memory_read,
memory_write,
registers,
screen_graphics,
screen_text,
stack,
status,
)
from .peripheral import (
clipboard_copy,
clipboard_paste,
keyboard_send,
mouse_click,
mouse_drag,
mouse_move,
screenshot,
serial_send,
)
__all__ = [ __all__ = [
# Execution # Execution
@ -31,7 +51,23 @@ __all__ = [
"disassemble", "disassemble",
"stack", "stack",
"status", "status",
"screen_text",
"screen_graphics",
# Peripheral # Peripheral
"screenshot", "screenshot",
"serial_send", "serial_send",
"keyboard_send",
"mouse_move",
"mouse_click",
"mouse_drag",
"clipboard_copy",
"clipboard_paste",
# Control
"pause",
"resume",
"reset",
"savestate",
"loadstate",
"memdump",
"query_status",
] ]

View File

@ -0,0 +1,251 @@
"""Control tools: pause, resume, reset, savestate, loadstate."""
from typing import Any
from .peripheral import _get_qmp_port, _qmp_command
def pause() -> dict:
"""Pause DOSBox-X emulation.
Stops the emulator at the current instruction. Use resume() to continue.
This is different from GDB breakpoints - it's an immediate halt.
Returns:
Pause status
"""
result = _qmp_command("localhost", _get_qmp_port(), "stop")
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
return {
"success": True,
"message": "Emulator paused",
}
def resume() -> dict:
"""Resume DOSBox-X emulation after pause.
Continues emulator execution after a pause() call.
Returns:
Resume status
"""
result = _qmp_command("localhost", _get_qmp_port(), "cont")
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
return {
"success": True,
"message": "Emulator resumed",
}
def reset(dos_only: bool = False) -> dict:
"""Reset DOSBox-X system.
Args:
dos_only: If True, only reset DOS (warm boot). If False, full system reset.
Returns:
Reset status
"""
args = {}
if dos_only:
args["dos"] = True
result = _qmp_command("localhost", _get_qmp_port(), "system_reset", args if args else None)
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
return {
"success": True,
"message": "DOS reset" if dos_only else "System reset",
"reset_type": "dos" if dos_only else "full",
}
def savestate(filename: str) -> dict:
"""Save DOSBox-X emulator state to file.
Creates a snapshot of the entire emulator state that can be loaded later.
Useful for creating checkpoints during reverse engineering.
Args:
filename: Path to save state file (relative to DOSBox working dir)
Returns:
Save status with file path
Example:
savestate("checkpoint1.sav")
"""
result = _qmp_command("localhost", _get_qmp_port(), "savestate",
{"filename": filename}, timeout=30.0)
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
response = {
"success": True,
"message": f"State saved to {filename}",
"filename": filename,
}
# Include any return data from QMP
if "return" in result:
ret = result["return"]
if isinstance(ret, dict):
response.update(ret)
return response
def loadstate(filename: str) -> dict:
"""Load DOSBox-X emulator state from file.
Restores a previously saved snapshot. The emulator will be in exactly
the same state as when the savestate was created.
Args:
filename: Path to state file to load
Returns:
Load status
Example:
loadstate("checkpoint1.sav")
"""
result = _qmp_command("localhost", _get_qmp_port(), "loadstate",
{"filename": filename}, timeout=30.0)
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
response = {
"success": True,
"message": f"State loaded from {filename}",
"filename": filename,
}
if "return" in result:
ret = result["return"]
if isinstance(ret, dict):
response.update(ret)
return response
def memdump(address: str, length: int, filename: str | None = None) -> dict:
"""Dump memory region to file or return as data.
Uses QMP memdump for efficient binary memory extraction.
More efficient than GDB memory_read for large regions.
Args:
address: Start address in hex format (e.g., "0x0000", "1234:5678")
length: Number of bytes to dump
filename: Optional file to save dump (returns base64 if not specified)
Returns:
Memory dump result with either file path or base64 data
Example:
memdump("0xB8000", 4000) # Dump VGA text buffer
memdump("CS:IP", 256, "code_dump.bin")
"""
# Parse segment:offset if provided
if ':' in address:
seg, off = address.split(':')
flat_addr = (int(seg, 16) << 4) + int(off, 16)
elif address.startswith('0x'):
flat_addr = int(address, 16)
else:
flat_addr = int(address, 16)
args: dict[str, Any] = {
"address": flat_addr,
"length": length,
}
if filename:
args["filename"] = filename
result = _qmp_command("localhost", _get_qmp_port(), "memdump", args, timeout=30.0)
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
response = {
"success": True,
"address": f"0x{flat_addr:X}",
"length": length,
}
if "return" in result:
ret = result["return"]
if isinstance(ret, dict):
if "file" in ret:
response["filename"] = ret["file"]
response["message"] = f"Memory dumped to {ret['file']}"
if "data" in ret:
# Return base64 data
response["data_base64"] = ret["data"]
response["message"] = f"Dumped {length} bytes"
return response
def query_status() -> dict:
"""Query detailed DOSBox-X emulator status via QMP.
Returns more detailed status than the GDB-based status() tool,
including pause state and debugger activity.
Returns:
Detailed emulator status
"""
result = _qmp_command("localhost", _get_qmp_port(), "query-status")
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
response = {
"success": True,
}
if "return" in result:
ret = result["return"]
if isinstance(ret, dict):
response["running"] = ret.get("running", False)
response["paused"] = not ret.get("running", False)
response["debugger_active"] = ret.get("debugger", False)
if "reason" in ret:
response["pause_reason"] = ret["reason"]
return response

View File

@ -1,74 +1,122 @@
"""Execution control tools: launch, attach, continue, step, quit.""" """Execution control tools: launch, attach, continue, step, quit."""
import time
from ..dosbox import DOSBoxConfig from ..dosbox import DOSBoxConfig
from ..gdb_client import GDBError from ..gdb_client import GDBError
from ..state import client, manager from ..state import client, manager
from ..utils import format_address from ..utils import format_address
from .peripheral import keyboard_send as _keyboard_send
def launch( def launch(
binary_path: str | None = None, binary_path: str | None = None,
gdb_port: int = 1234, gdb_port: int = 1234,
use_docker: bool = False, qmp_port: int = 4444,
use_docker: bool | None = None,
cycles: str = "auto", cycles: str = "auto",
memsize: int = 16, memsize: int = 16,
joystick: str = "auto",
parallel1: str = "disabled",
) -> dict: ) -> dict:
"""Launch DOSBox-X with GDB debugging enabled. """Launch DOSBox-X with GDB debugging and QMP control enabled.
Automatically uses Docker if native DOSBox-X is not installed.
Args: Args:
binary_path: Path to DOS binary to run (optional) binary_path: Path to DOS binary to run (optional)
gdb_port: Port for GDB stub (default: 1234) gdb_port: Port for GDB stub (default: 1234)
use_docker: Use Docker container instead of native DOSBox-X qmp_port: Port for QMP control - screenshots, keyboard, mouse (default: 4444)
use_docker: Force Docker (True), force native (False), or auto-detect (None/default)
cycles: CPU cycles setting (auto, max, or number) cycles: CPU cycles setting (auto, max, or number)
memsize: Conventional memory in MB (default: 16) memsize: Conventional memory in MB (default: 16)
joystick: Joystick type - auto, none, 2axis, 4axis (default: auto)
parallel1: Parallel port 1 - disabled, file, printer (default: disabled)
Use "file" to capture print output to capture directory
Returns: Returns:
Status dict with connection details Status dict with connection details
Example: Example:
launch("/path/to/GAME.EXE", gdb_port=1234) launch("/path/to/GAME.EXE") # Auto-detects native vs Docker
launch("/path/to/GAME.EXE", joystick="2axis") # With joystick
launch("/path/to/GAME.EXE", parallel1="file") # Capture printer output
""" """
config = DOSBoxConfig( config = DOSBoxConfig(
gdb_port=gdb_port, gdb_port=gdb_port,
gdb_enabled=True, gdb_enabled=True,
qmp_port=qmp_port,
qmp_enabled=True,
cycles=cycles, cycles=cycles,
memsize=memsize, memsize=memsize,
joysticktype=joystick,
parallel1=parallel1,
) )
launch_method = None
try: try:
if use_docker: # Auto-detect: try native first, fallback to Docker
if use_docker is None:
if manager._find_dosbox():
launch_method = "native"
manager.launch_native(binary_path=binary_path, config=config)
else:
# Native not available, try Docker
launch_method = "docker"
manager.launch_docker(binary_path=binary_path, config=config)
elif use_docker:
launch_method = "docker"
manager.launch_docker(binary_path=binary_path, config=config) manager.launch_docker(binary_path=binary_path, config=config)
else: else:
launch_method = "native"
manager.launch_native(binary_path=binary_path, config=config) manager.launch_native(binary_path=binary_path, config=config)
return { return {
"success": True, "success": True,
"message": "DOSBox-X launched successfully", "message": f"DOSBox-X launched successfully ({launch_method})",
"launch_method": launch_method,
"gdb_host": "localhost", "gdb_host": "localhost",
"gdb_port": gdb_port, "gdb_port": gdb_port,
"qmp_port": qmp_port,
"pid": manager.pid, "pid": manager.pid,
"hint": f"Use attach() to connect to the debugger on port {gdb_port}", "hint": f"Use attach() to connect to debugger on port {gdb_port}. QMP (screenshots/keyboard) on port {qmp_port}.",
} }
except Exception as e: except Exception as e:
return { return {
"success": False, "success": False,
"error": str(e), "error": str(e),
"hint": "Ensure Docker is installed and the dosbox-mcp image is built (make build)",
} }
def attach(host: str = "localhost", port: int = 1234) -> dict: def attach(
host: str = "localhost",
port: int | None = None,
auto_keypress: str | None = None,
keypress_delay_ms: int = 500,
) -> dict:
"""Connect to a running DOSBox-X GDB stub. """Connect to a running DOSBox-X GDB stub.
Args: Args:
host: Hostname or IP (default: localhost) host: Hostname or IP (default: localhost)
port: GDB port (default: 1234) port: GDB port (default: uses port from launch(), or 1234)
auto_keypress: Optional key(s) to send after attaching (e.g., "enter", "space")
Useful for dismissing splash screens automatically.
keypress_delay_ms: Delay after connection before sending keypress (default: 500ms)
Returns: Returns:
Connection status and initial register state Connection status and initial register state
Example: Examples:
attach("localhost", 1234) attach() # Auto-detect port from launch()
attach("localhost", 1234) # Explicit port
attach(auto_keypress="enter") # Dismiss splash screen with Enter
""" """
# Use manager's port if not specified, fallback to 1234
if port is None:
port = manager.gdb_port if manager.running else 1234
try: try:
client.connect(host, port) client.connect(host, port)
@ -76,13 +124,26 @@ def attach(host: str = "localhost", port: int = 1234) -> dict:
regs = client.read_registers() regs = client.read_registers()
stop = client.get_stop_reason() stop = client.get_stop_reason()
return { result = {
"success": True, "success": True,
"message": f"Connected to {host}:{port}", "message": f"Connected to {host}:{port}",
"gdb_port": port,
"stop_reason": stop.reason.name.lower(), "stop_reason": stop.reason.name.lower(),
"cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}",
"physical_address": f"{regs.cs_ip:05x}", "physical_address": f"{regs.cs_ip:05x}",
} }
# Send keypress to dismiss splash screen if requested
if auto_keypress:
time.sleep(keypress_delay_ms / 1000.0)
key_result = _keyboard_send(auto_keypress)
result["keypress_sent"] = auto_keypress
result["keypress_result"] = key_result.get("success", False)
if not key_result.get("success"):
result["keypress_error"] = key_result.get("error")
return result
except GDBError as e: except GDBError as e:
return { return {
"success": False, "success": False,

View File

@ -252,6 +252,130 @@ def stack(count: int = 16) -> dict:
} }
def screen_text(rows: int = 25, cols: int = 80) -> dict:
"""Read the VGA text mode screen buffer.
Reads directly from VGA text mode memory at 0xB8000. This is a reliable
way to see what's displayed without needing QMP/screenshots.
Args:
rows: Number of rows to read (default: 25, standard VGA)
cols: Number of columns (default: 80, standard VGA)
Returns:
Screen contents as text lines with optional attributes
Note:
Only works in text mode (mode 3). Graphics modes use 0xA0000.
"""
try:
# VGA text mode buffer at 0xB8000
# Each cell is 2 bytes: [character][attribute]
# Attribute: bits 0-3 = foreground, 4-6 = background, 7 = blink
VIDEO_BASE = 0xB8000
bytes_per_row = cols * 2
total_bytes = rows * bytes_per_row
mem = client.read_memory(VIDEO_BASE, total_bytes)
lines = []
raw_lines = []
for row in range(rows):
offset = row * bytes_per_row
row_data = mem.data[offset:offset + bytes_per_row]
# Extract characters (every other byte)
chars = []
for i in range(0, len(row_data), 2):
char_byte = row_data[i]
# Convert to printable ASCII, replace control chars with space
if 32 <= char_byte < 127:
chars.append(chr(char_byte))
elif char_byte == 0:
chars.append(' ')
else:
chars.append('.') # Non-printable
line = ''.join(chars).rstrip()
lines.append(line)
raw_lines.append(row_data.hex())
# Also provide a compact view (non-empty lines only)
compact = [f"{i:2d}: {line}" for i, line in enumerate(lines) if line.strip()]
return {
"success": True,
"mode": "text",
"dimensions": f"{cols}x{rows}",
"video_address": "B8000",
"lines": lines,
"compact": compact,
"raw_hex": raw_lines,
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> dict:
"""Read VGA graphics mode video memory.
Reads directly from VGA graphics memory at 0xA0000.
Args:
width: Screen width (default: 320 for mode 13h)
height: Screen height (default: 200 for mode 13h)
mode: VGA mode hint - "13h" (320x200x256) or "12h" (640x480x16)
Returns:
Raw video memory data (palette-indexed pixels)
Note:
Returns raw bytes - you'll need palette info to interpret colors.
Mode 13h: 1 byte per pixel (256 colors), linear framebuffer
Mode 12h: 4 planes, more complex
"""
try:
VIDEO_BASE = 0xA0000
if mode == "13h":
# Mode 13h: 320x200, 1 byte per pixel, linear
total_bytes = min(width * height, 64000) # Max 64KB
else:
# Conservative read for other modes
total_bytes = min(width * height // 8, 38400)
mem = client.read_memory(VIDEO_BASE, total_bytes)
# Provide some statistics about the pixel data
pixel_counts = {}
for b in mem.data:
pixel_counts[b] = pixel_counts.get(b, 0) + 1
# Top colors
top_colors = sorted(pixel_counts.items(), key=lambda x: -x[1])[:10]
return {
"success": True,
"mode": mode,
"dimensions": f"{width}x{height}",
"video_address": "A0000",
"bytes_read": len(mem.data),
"top_colors": [{"index": c, "count": n} for c, n in top_colors],
"raw_hex_preview": mem.data[:256].hex(), # First 256 bytes
"hint": "Use VGA palette registers (ports 3C7-3C9) to interpret colors",
}
except GDBError as e:
return {
"success": False,
"error": str(e),
}
def status() -> dict: def status() -> dict:
"""Get current debugger and emulator status. """Get current debugger and emulator status.

View File

@ -1,19 +1,216 @@
"""Peripheral tools: screenshot, serial communication.""" """Peripheral tools: screenshot, serial communication, keyboard input."""
import json
import socket
import time
from typing import Any
from ..state import manager
# Default ports (fallback if manager not configured)
DEFAULT_SERIAL_PORT = 5555 # nullmodem server:5555
DEFAULT_QMP_PORT = 4444 # qmpserver port=4444
def _get_qmp_port() -> int:
"""Get QMP port from manager or use default."""
return manager.qmp_port if manager.running else DEFAULT_QMP_PORT
def _tcp_send(host: str, port: int, data: bytes, timeout: float = 2.0) -> tuple[bool, str]:
"""Send data over TCP socket.
Args:
host: Target hostname
port: Target port
data: Data to send
timeout: Connection timeout
Returns:
Tuple of (success, message)
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
sock.sendall(data)
sock.close()
return True, f"Sent {len(data)} bytes to {host}:{port}"
except TimeoutError:
return False, f"Connection timeout to {host}:{port}"
except ConnectionRefusedError:
return False, f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?"
except Exception as e:
return False, f"Socket error: {e}"
def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0) -> dict:
"""Send QMP command to DOSBox-X.
QMP (QEMU Machine Protocol) is a JSON-based protocol for machine control.
The dosbox-x-remotedebug fork supports QMP on a configurable port.
Args:
host: Target hostname
port: QMP port
command: QMP command name (e.g., "send-key", "quit")
args: Optional command arguments
timeout: Connection timeout
Returns:
QMP response dict or error dict
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
# Read QMP greeting (DOSBox-X sends a banner)
greeting = b""
try:
sock.settimeout(0.1) # Short timeout for greeting
while True:
chunk = sock.recv(1024)
if not chunk:
break
greeting += chunk
except (TimeoutError, socket.timeout):
pass # Expected - no more data
# Build QMP command
cmd = {"execute": command}
if args:
cmd["arguments"] = args
# Send command as JSON line
sock.settimeout(timeout)
sock.sendall(json.dumps(cmd).encode() + b"\n")
# Read response
response = b""
try:
sock.settimeout(0.2) # Short timeout for response
while True:
chunk = sock.recv(1024)
if not chunk:
break
response += chunk
except (TimeoutError, socket.timeout):
pass
sock.close()
# Parse response
if response:
try:
return json.loads(response.decode().strip().split('\n')[-1])
except json.JSONDecodeError:
return {"success": True, "raw_response": response.decode()}
return {"success": True, "message": "Command sent (no response)"}
except TimeoutError:
return {"success": False, "error": f"QMP connection timeout to {host}:{port}"}
except ConnectionRefusedError:
return {"success": False, "error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?"}
except Exception as e:
return {"success": False, "error": f"QMP error: {e}"}
def screenshot(filename: str | None = None) -> dict: def screenshot(filename: str | None = None) -> dict:
"""Capture DOSBox-X display. """Capture DOSBox-X display.
Uses QMP screendump command which calls DOSBox-X's internal capture function.
Returns a resource URI that can be used to fetch the screenshot.
Args: Args:
filename: Optional output filename filename: Optional output filename (DOSBox-X uses auto-naming)
Returns: Returns:
Screenshot info or error Screenshot info including resource URI for fetching the image
""" """
# Placeholder - requires X11 or DOSBox-X specific integration import os
from datetime import datetime
# Import resources module for registration
try:
from dosbox_mcp import resources
except ImportError:
resources = None
# Use QMP screendump command - it returns base64 PNG and saves to capture/
result = _qmp_command("localhost", _get_qmp_port(), "screendump",
{"filename": filename or "screenshot.png"}, timeout=10.0)
# Check for QMP error response
if "error" in result:
error_desc = result.get("error", {})
if isinstance(error_desc, dict):
error_msg = error_desc.get("desc", str(error_desc))
else:
error_msg = str(error_desc)
return {
"success": False,
"error": f"Screenshot failed: {error_msg}",
}
# Parse successful response
if "return" in result:
ret = result["return"]
screenshot_filename = None
full_path = None
# Find the file path
if "file" in ret:
screenshot_filename = os.path.basename(ret["file"])
# Try to find full path on host
possible_dirs = [
os.environ.get("DOS_DIR", ""),
os.path.expanduser("~/claude/dosbox-mcp/dos"),
"./dos",
]
for base in possible_dirs:
if not base:
continue
test_path = os.path.join(base, ret["file"])
if os.path.isfile(test_path):
full_path = test_path
break
# Register with resource module
resource_uri = None
if resources and screenshot_filename and full_path:
resource_uri = resources.register_screenshot(
filename=screenshot_filename,
path=full_path,
size=ret.get("size", 0),
timestamp=datetime.now().isoformat(),
)
response = {
"success": True,
"message": "Screenshot captured",
"format": ret.get("format", "png"),
"size_bytes": ret.get("size", 0),
}
# Include resource URI (the only way clients should access screenshots)
if resource_uri:
response["resource_uri"] = resource_uri
else:
# Fallback: provide filename so client knows what was created
# but encourage using dosbox://screenshots/latest resource
response["hint"] = "Use resource dosbox://screenshots/latest to fetch"
if screenshot_filename:
response["filename"] = screenshot_filename
return response
# Fallback for unexpected response format
return { return {
"success": False, "success": True,
"error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.", "message": "Screenshot command sent",
"raw_response": result,
} }
@ -30,8 +227,509 @@ def serial_send(data: str, port: int = 1) -> dict:
Returns: Returns:
Send result Send result
""" """
# Placeholder - requires serial port configuration # Parse hex escapes like \x1b for ESC
return { def parse_data(s: str) -> bytes:
"success": False, result = bytearray()
"error": "Serial port communication not yet implemented.", i = 0
while i < len(s):
if s[i:i+2] == '\\x' and i + 4 <= len(s):
try:
result.append(int(s[i+2:i+4], 16))
i += 4
continue
except ValueError:
pass
elif s[i:i+2] == '\\r':
result.append(0x0d)
i += 2
continue
elif s[i:i+2] == '\\n':
result.append(0x0a)
i += 2
continue
result.append(ord(s[i]))
i += 1
return bytes(result)
# Map COM port to TCP port (based on dosbox.conf config)
# serial1=nullmodem server:5555
tcp_port_map = {
1: 5555, # COM1
2: 5556, # COM2 (if configured)
}
if port not in tcp_port_map:
return {
"success": False,
"error": f"Invalid COM port {port}. Supported: 1, 2",
}
tcp_port = tcp_port_map[port]
try:
byte_data = parse_data(data)
except Exception as e:
return {
"success": False,
"error": f"Failed to parse data: {e}",
}
success, message = _tcp_send("localhost", tcp_port, byte_data)
if success:
return {
"success": True,
"message": message,
"bytes_sent": len(byte_data),
"port": f"COM{port}",
"tcp_port": tcp_port,
"data_preview": data[:50] + ("..." if len(data) > 50 else ""),
}
else:
return {
"success": False,
"error": message,
"hint": f"Ensure DOSBox-X is running with serial{port}=nullmodem server:{tcp_port}",
}
# QMP keyboard key mapping (from DOSBox-X remotedebug qmp.cpp)
QMP_KEY_MAP = {
# Letters
"a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": "f", "g": "g",
"h": "h", "i": "i", "j": "j", "k": "k", "l": "l", "m": "m", "n": "n",
"o": "o", "p": "p", "q": "q", "r": "r", "s": "s", "t": "t", "u": "u",
"v": "v", "w": "w", "x": "x", "y": "y", "z": "z",
# Numbers
"0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
"5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
# Function keys
"f1": "f1", "f2": "f2", "f3": "f3", "f4": "f4", "f5": "f5", "f6": "f6",
"f7": "f7", "f8": "f8", "f9": "f9", "f10": "f10", "f11": "f11", "f12": "f12",
# Special keys
"ret": "ret", "enter": "ret", "return": "ret",
"esc": "esc", "escape": "esc",
"spc": "spc", "space": "spc", " ": "spc",
"tab": "tab",
"backspace": "backspace", "bs": "backspace",
"delete": "delete", "del": "delete",
"insert": "insert", "ins": "insert",
"home": "home", "end": "end",
"pgup": "pgup", "pageup": "pgup",
"pgdn": "pgdn", "pagedown": "pgdn",
# Arrow keys
"up": "up", "down": "down", "left": "left", "right": "right",
# Modifiers
"shift": "shift", "ctrl": "ctrl", "alt": "alt",
"shift_r": "shift_r", "ctrl_r": "ctrl_r", "alt_r": "alt_r",
# Punctuation
"minus": "minus", "-": "minus",
"equal": "equal", "=": "equal",
"bracket_left": "bracket_left", "[": "bracket_left",
"bracket_right": "bracket_right", "]": "bracket_right",
"backslash": "backslash", "\\": "backslash",
"semicolon": "semicolon", ";": "semicolon",
"apostrophe": "apostrophe", "'": "apostrophe",
"grave_accent": "grave_accent", "`": "grave_accent",
"comma": "comma", ",": "comma",
"dot": "dot", ".": "dot",
"slash": "slash", "/": "slash",
}
def keyboard_send(keys: str, delay_ms: int = 10) -> dict:
"""Send keyboard input to DOSBox-X via QMP protocol.
This simulates keyboard input using the QMP (QEMU Machine Protocol) server.
Useful for triggering RIPscrip command processing when serial port input
isn't configured.
Args:
keys: Keys to send. Can be:
- Single key: "a", "enter", "f1"
- Key sequence: "hello" (sends h, e, l, l, o)
- With modifiers: "ctrl-c", "alt-x", "shift-a"
- Mixed: "!|Z" sends !, |, Z
delay_ms: Delay between keystrokes in milliseconds (default: 50)
Returns:
Result dict with keys sent
Examples:
keyboard_send("hello") # Types "hello"
keyboard_send("enter") # Presses Enter
keyboard_send("ctrl-c") # Ctrl+C
keyboard_send("!|L0000001919") # RIPscrip line command
"""
sent_keys = []
errors = []
# Parse and send each key
i = 0
while i < len(keys):
# Check for modifier combinations like "ctrl-c"
modifier = None
if i + 5 < len(keys) and keys[i:i+5].lower() == "ctrl-":
modifier = "ctrl"
i += 5
elif i + 4 < len(keys) and keys[i:i+4].lower() == "alt-":
modifier = "alt"
i += 4
elif i + 6 < len(keys) and keys[i:i+6].lower() == "shift-":
modifier = "shift"
i += 6
# Check for special key names
key_to_send = None
remaining = keys[i:].lower()
# Try to match known special keys
for key_name in sorted(QMP_KEY_MAP.keys(), key=len, reverse=True):
if remaining.startswith(key_name) and len(key_name) > 1:
key_to_send = QMP_KEY_MAP[key_name]
i += len(key_name)
break
# If no special key matched, treat as single character
if key_to_send is None and i < len(keys):
char = keys[i]
if char.lower() in QMP_KEY_MAP:
key_to_send = QMP_KEY_MAP[char.lower()]
# Handle uppercase (needs shift)
if char.isupper() and modifier is None:
modifier = "shift"
elif char in "!@#$%^&*()_+{}|:\"<>?":
# Shifted punctuation - map to base key with shift
shift_map = {
"!": "1", "@": "2", "#": "3", "$": "4", "%": "5",
"^": "6", "&": "7", "*": "8", "(": "9", ")": "0",
"_": "minus", "+": "equal", "{": "bracket_left",
"}": "bracket_right", "|": "backslash", ":": "semicolon",
"\"": "apostrophe", "<": "comma", ">": "dot", "?": "slash",
}
if char in shift_map:
key_to_send = shift_map[char]
if modifier is None:
modifier = "shift"
i += 1
if key_to_send is None:
errors.append(f"Unknown key: {keys[i-1] if i > 0 else '?'}")
continue
# Build QMP command arguments
key_spec = {"type": "qcode", "data": key_to_send}
args = {"keys": [key_spec]}
# Add modifier if present
if modifier:
args["keys"].insert(0, {"type": "qcode", "data": modifier})
# Send via QMP
result = _qmp_command("localhost", _get_qmp_port(), "send-key", args)
if result.get("success", True) and "error" not in result:
sent_keys.append(f"{modifier}-{key_to_send}" if modifier else key_to_send)
else:
errors.append(f"Failed to send {key_to_send}: {result.get('error', 'unknown')}")
# Small delay between keys
if delay_ms > 0 and i < len(keys):
time.sleep(delay_ms / 1000.0)
if errors:
return {
"success": False,
"error": "; ".join(errors),
"keys_sent": sent_keys,
}
return {
"success": True,
"message": f"Sent {len(sent_keys)} keys",
"keys_sent": sent_keys,
"qmp_port": _get_qmp_port(),
}
def mouse_move(dx: int, dy: int) -> dict:
"""Move mouse cursor by relative amount.
Sends relative mouse movement to DOSBox-X. Positive dx moves right,
positive dy moves down.
Args:
dx: Horizontal movement (positive = right, negative = left)
dy: Vertical movement (positive = down, negative = up)
Returns:
Result dict with movement info
Examples:
mouse_move(10, 0) # Move right 10 pixels
mouse_move(-5, -5) # Move up-left
"""
# QMP input-send-event for relative mouse movement
args = {
"events": [
{
"type": "rel",
"data": {"axis": "x", "value": dx}
},
{
"type": "rel",
"data": {"axis": "y", "value": dy}
}
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
}
return {
"success": True,
"message": f"Mouse moved ({dx}, {dy})",
"dx": dx,
"dy": dy,
}
def mouse_click(button: str = "left", double: bool = False) -> dict:
"""Click mouse button.
Sends mouse button press and release to DOSBox-X.
Args:
button: Button to click - "left", "right", or "middle"
double: If True, send double-click
Returns:
Result dict with click info
Examples:
mouse_click() # Left click
mouse_click("right") # Right click
mouse_click("left", True) # Double-click
"""
# Map button names to QMP button values
button_map = {
"left": "left",
"right": "right",
"middle": "middle",
"l": "left",
"r": "right",
"m": "middle",
}
btn = button_map.get(button.lower())
if not btn:
return {
"success": False,
"error": f"Unknown button '{button}'. Use: left, right, middle",
}
clicks = 2 if double else 1
errors = []
for _ in range(clicks):
# Press
args = {
"events": [
{"type": "btn", "data": {"button": btn, "down": True}}
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Press failed: {result.get('error')}")
time.sleep(0.05) # Brief delay between press and release
# Release
args = {
"events": [
{"type": "btn", "data": {"button": btn, "down": False}}
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Release failed: {result.get('error')}")
if double and _ == 0:
time.sleep(0.1) # Delay between clicks for double-click
if errors:
return {
"success": False,
"error": "; ".join(errors),
}
return {
"success": True,
"message": f"{'Double-' if double else ''}Clicked {btn} button",
"button": btn,
"double_click": double,
}
def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> dict:
"""Drag mouse from one position to another.
Moves to start position, presses button, moves to end position, releases.
Coordinates are relative movements from current position.
Args:
start_x: Starting X offset from current position
start_y: Starting Y offset from current position
end_x: Ending X offset from start position
end_y: Ending Y offset from start position
button: Button to hold during drag (default: "left")
Returns:
Result dict with drag info
Example:
mouse_drag(0, 0, 100, 50) # Drag right 100, down 50
"""
button_map = {"left": "left", "right": "right", "middle": "middle"}
btn = button_map.get(button.lower(), "left")
errors = []
# Move to start position
if start_x != 0 or start_y != 0:
args = {
"events": [
{"type": "rel", "data": {"axis": "x", "value": start_x}},
{"type": "rel", "data": {"axis": "y", "value": start_y}},
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Move to start failed: {result.get('error')}")
time.sleep(0.05)
# Press button
args = {
"events": [{"type": "btn", "data": {"button": btn, "down": True}}]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Button press failed: {result.get('error')}")
time.sleep(0.05)
# Move to end position (relative to start)
dx = end_x - start_x
dy = end_y - start_y
if dx != 0 or dy != 0:
# Break into smaller steps for smoother drag
steps = max(abs(dx), abs(dy)) // 10 + 1
step_dx = dx / steps
step_dy = dy / steps
for _ in range(steps):
args = {
"events": [
{"type": "rel", "data": {"axis": "x", "value": int(step_dx)}},
{"type": "rel", "data": {"axis": "y", "value": int(step_dy)}},
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Drag movement failed: {result.get('error')}")
break
time.sleep(0.02)
time.sleep(0.05)
# Release button
args = {
"events": [{"type": "btn", "data": {"button": btn, "down": False}}]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result:
errors.append(f"Button release failed: {result.get('error')}")
if errors:
return {
"success": False,
"error": "; ".join(errors),
}
return {
"success": True,
"message": f"Dragged from ({start_x},{start_y}) to ({end_x},{end_y})",
"start": (start_x, start_y),
"end": (end_x, end_y),
"button": btn,
}
def clipboard_copy() -> dict:
"""Copy DOS screen text to host clipboard.
Sends Ctrl+F5 to DOSBox-X which copies all text on the DOS screen
to the host system clipboard.
Returns:
Success status
Note:
Only works in text mode. The copied text can then be accessed
from your host system's clipboard.
"""
# DOSBox-X: Ctrl+F5 = Copy screen text to host clipboard
result = _qmp_command("localhost", _get_qmp_port(), "send-key",
{"keys": [
{"type": "qcode", "data": "ctrl"},
{"type": "qcode", "data": "f5"}
]})
if "error" in result:
return {
"success": False,
"error": result.get("error", "Failed to send Ctrl+F5"),
}
return {
"success": True,
"message": "Screen text copied to host clipboard (Ctrl+F5)",
"hint": "Text is now in your host system clipboard",
}
def clipboard_paste() -> dict:
"""Paste host clipboard text into DOSBox-X.
Sends Ctrl+F6 to DOSBox-X which pastes text from the host system
clipboard into the DOS screen as simulated keystrokes.
Returns:
Success status
Note:
Pastes as keystrokes, so works with any DOS application.
Set your host clipboard content first, then call this.
"""
# DOSBox-X: Ctrl+F6 = Paste from host clipboard
result = _qmp_command("localhost", _get_qmp_port(), "send-key",
{"keys": [
{"type": "qcode", "data": "ctrl"},
{"type": "qcode", "data": "f6"}
]})
if "error" in result:
return {
"success": False,
"error": result.get("error", "Failed to send Ctrl+F6"),
}
return {
"success": True,
"message": "Host clipboard pasted into DOSBox-X (Ctrl+F6)",
"hint": "Text from host clipboard was typed as keystrokes",
} }

View File

@ -1,6 +1,5 @@
"""Utility functions for DOSBox-X MCP Server.""" """Utility functions for DOSBox-X MCP Server."""
import re
def parse_address(addr: str) -> int: def parse_address(addr: str) -> int: