From 1d826254ba13356678b313d36b23f10c1cdeb1e7 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 27 Jan 2026 23:54:12 -0700 Subject: [PATCH] Add QMP control tools, clipboard support, and peripheral configuration New tools and modules: - control.py: pause, resume, reset, savestate, loadstate, memdump, query_status - resources.py: Live screen capture, screenshot management resources - clipboard_copy/clipboard_paste via DOSBox-X hotkeys (Ctrl+F5/F6) - screen_text/screen_graphics for text buffer and VGA memory access Configuration improvements: - Dynamic QMP/GDB port handling (stored in session state from launch()) - Joystick config: joysticktype, timed options - Parallel port config: parallel1, parallel2 settings - Added -hostrun flag for host clipboard integration Performance and UX: - Reduced keyboard_send delay from 50ms to 10ms default - Refactored server.py to use Tool.from_function() pattern - Screenshot tool now returns MCP resource URIs only MCP Resources: - dosbox://screen - live screen capture (no tool call needed) - dosbox://screenshots - list available screenshots - dosbox://screenshots/{filename} - get specific screenshot - dosbox://screenshots/latest - get most recent screenshot --- Dockerfile | 37 +- docker-compose.yml | 1 + src/dosbox_mcp/dosbox.py | 156 ++++++- src/dosbox_mcp/gdb_client.py | 8 +- src/dosbox_mcp/resources.py | 162 +++++++ src/dosbox_mcp/server.py | 129 +++++- src/dosbox_mcp/tools/__init__.py | 42 +- src/dosbox_mcp/tools/control.py | 251 ++++++++++ src/dosbox_mcp/tools/execution.py | 85 +++- src/dosbox_mcp/tools/inspection.py | 124 +++++ src/dosbox_mcp/tools/peripheral.py | 718 ++++++++++++++++++++++++++++- src/dosbox_mcp/utils.py | 1 - 12 files changed, 1633 insertions(+), 81 deletions(-) create mode 100644 src/dosbox_mcp/resources.py create mode 100644 src/dosbox_mcp/tools/control.py diff --git a/Dockerfile b/Dockerfile index 5c9a368..7fd513d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,30 +25,35 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libavformat-dev \ libavutil-dev \ libswscale-dev \ + libncurses-dev \ nasm \ ca-certificates \ && rm -rf /var/lib/apt/lists/* # Clone DOSBox-X with remote debugging support (GDB server + QMP) # This fork adds --enable-remotedebug which compiles in gdbserver.cpp and qmp.cpp +# Using rsp2k fork with GDB breakpoint address fix (uses physical addresses correctly) # See: https://github.com/joncampbell123/dosbox-x/issues/752 +# +# IMPORTANT: Clone and build MUST be in the same RUN to prevent BuildKit from +# caching the build step separately from the git clone step. +ARG CACHE_BUST=2026-01-27-v19-add-ncurses-for-debug WORKDIR /build -RUN git clone --branch remotedebug --depth 1 https://github.com/lokkju/dosbox-x-remotedebug.git dosbox-x - -WORKDIR /build/dosbox-x - # Configure and build with GDB server support # --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers # --enable-debug: Enables internal debugger (Alt+Pause) -RUN ./autogen.sh && \ +RUN echo "Cache bust: ${CACHE_BUST}" && \ + git clone --branch remotedebug --depth 1 https://github.com/rsp2k/dosbox-x-remotedebug.git dosbox-x && \ + cd dosbox-x && \ + ./autogen.sh && \ ./configure \ --prefix=/opt/dosbox-x \ --enable-remotedebug \ --enable-debug \ --enable-sdl2 \ - --disable-printer \ - && make -j$(nproc) \ - && make install + --disable-printer && \ + make -j$(nproc) && \ + make install # ============================================================================= # Stage 2: Runtime image @@ -74,6 +79,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libxkbcommon0 \ libxrandr2 \ libxi6 \ + netcat-openbsd \ && rm -rf /var/lib/apt/lists/* # Copy DOSBox-X from builder @@ -86,20 +92,31 @@ RUN ln -s /opt/dosbox-x/bin/dosbox-x /usr/local/bin/dosbox-x RUN mkdir -p /config /dos # Default configuration for DOSBox-X -# GDB server is enabled via --enable-remotedebug at compile time +# Note: --enable-remotedebug compiles in the code, but these runtime settings enable the servers RUN cat > /config/dosbox.conf << 'EOF' +[log] +logfile = /tmp/dosbox.log + [sdl] fullscreen=false windowresolution=800x600 output=opengl [cpu] -core=auto +# CRITICAL: Must use "normal" core for GDB breakpoints to work! +# Dynamic cores (auto/dynamic/dynamic_x86) bypass DEBUG_Breakpoint() +core=normal cputype=auto cycles=auto [dosbox] memsize=16 +# Enable GDB remote debug server +gdbserver = true +gdbserver port = 1234 +# Enable QMP (QEMU Machine Protocol) server for control +qmpserver = true +qmpserver port = 4444 [serial] serial1=disabled diff --git a/docker-compose.yml b/docker-compose.yml index d46ee20..a4473c0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,7 @@ services: ports: - "${GDB_PORT:-1234}:1234" # GDB server (standard GDB remote protocol) - "${QMP_PORT:-4444}:4444" # QMP server (QEMU Machine Protocol) + - "5555:5555" # Serial nullmodem for RIPscrip input # X11 forwarding for display environment: diff --git a/src/dosbox_mcp/dosbox.py b/src/dosbox_mcp/dosbox.py index 1305ccd..d439fb7 100644 --- a/src/dosbox_mcp/dosbox.py +++ b/src/dosbox_mcp/dosbox.py @@ -27,28 +27,54 @@ class DOSBoxConfig: gdb_port: int = 1234 gdb_enabled: bool = True + # QMP settings (for screenshots, keyboard, mouse control) + qmp_port: int = 4444 + qmp_enabled: bool = True + # Display settings fullscreen: bool = False windowresolution: str = "800x600" # CPU settings - core: str = "auto" # auto, dynamic, normal, simple + # CRITICAL: Must use "normal" core for GDB breakpoints to work! + # Dynamic cores (auto/dynamic/dynamic_x86) bypass DEBUG_Breakpoint() + core: str = "normal" cputype: str = "auto" cycles: str = "auto" # Memory memsize: int = 16 # MB of conventional memory + # Startup + startbanner: bool = False # Disable splash screen for cleaner automation + # Serial ports (for future RIPscrip work) serial1: str = "disabled" serial2: str = "disabled" + # Parallel ports (LPT) + # Options: disabled, file, printer, reallpt, disney + # file: writes output to capture directory (useful for capturing print jobs) + # printer: virtual dot-matrix printer + parallel1: str = "disabled" + parallel2: str = "disabled" + + # Joystick settings + # Options: auto, none, 2axis, 4axis, 4axis_2, fcs, ch + joysticktype: str = "auto" + joystick_timed: bool = True # Enable timed intervals for axis + # Mount points mounts: dict[str, str] = field(default_factory=dict) - # Autoexec commands + # Autoexec commands (DOSBox-X internal [autoexec] section) autoexec: list[str] = field(default_factory=list) + # DOS startup files (written to mounted drive) + # These are actual DOS files, not DOSBox config sections + autoexec_bat: str | None = None # Content for AUTOEXEC.BAT + config_sys: str | None = None # Content for CONFIG.SYS + def to_conf(self) -> str: """Generate DOSBox-X configuration file content.""" lines = [ @@ -63,21 +89,39 @@ class DOSBoxConfig: "", "[dosbox]", f"memsize={self.memsize}", + f"startbanner={str(self.startbanner).lower()}", + ] + + # GDB stub configuration (lokkju/dosbox-x-remotedebug fork) + # Must be in [dosbox] section with "gdbserver port=" format + if self.gdb_enabled: + lines.extend([ + "gdbserver=true", + f"gdbserver port={self.gdb_port}", + ]) + + # QMP server configuration (for screenshots, keyboard, mouse) + if self.qmp_enabled: + lines.extend([ + "qmpserver=true", + f"qmpserver port={self.qmp_port}", + ]) + + lines.extend([ "", "[serial]", f"serial1={self.serial1}", f"serial2={self.serial2}", "", - ] - - # GDB stub configuration (DOSBox-X specific) - if self.gdb_enabled: - lines.extend([ - "[debugger]", - f"gdbserver=true", - f"gdbport={self.gdb_port}", - "", - ]) + "[parallel]", + f"parallel1={self.parallel1}", + f"parallel2={self.parallel2}", + "", + "[joystick]", + f"joysticktype={self.joysticktype}", + f"timed={str(self.joystick_timed).lower()}", + "", + ]) # Autoexec section lines.append("[autoexec]") @@ -103,7 +147,62 @@ class DOSBoxManager: self._container_id: str | None = None self._config_path: Path | None = None self._temp_dir: Path | None = None + self._dos_files_written: list[Path] = [] # Track files we created self._gdb_port: int = 1234 + self._qmp_port: int = 4444 + self._xhost_granted: bool = False + + def _setup_x11_access(self) -> bool: + """Grant X11 access for Docker containers. + + Runs 'xhost +local:docker' to allow Docker containers to connect + to the X11 display. This is required for headed mode operation. + + Returns: + True if xhost succeeded or was already granted, False on failure. + """ + if self._xhost_granted: + return True + + # Try xhost +local:docker (most common approach) + try: + result = subprocess.run( + ["xhost", "+local:docker"], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0: + logger.info("X11 access granted: xhost +local:docker") + self._xhost_granted = True + return True + else: + logger.warning(f"xhost +local:docker failed: {result.stderr.strip()}") + except FileNotFoundError: + logger.warning("xhost not found - X11 forwarding may not work") + except subprocess.SubprocessError as e: + logger.warning(f"xhost command failed: {e}") + + # Fallback: try xhost +SI:localuser:root for rootless Docker + try: + result = subprocess.run( + ["xhost", "+SI:localuser:root"], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0: + logger.info("X11 access granted: xhost +SI:localuser:root") + self._xhost_granted = True + return True + except (FileNotFoundError, subprocess.SubprocessError): + pass + + logger.warning( + "Could not grant X11 access. Display may not work. " + "Try running: xhost +local:docker" + ) + return False @property def running(self) -> bool: @@ -119,6 +218,11 @@ class DOSBoxManager: """Get the GDB port.""" return self._gdb_port + @property + def qmp_port(self) -> int: + """Get the QMP port.""" + return self._qmp_port + @property def pid(self) -> int | None: """Get process ID if running natively.""" @@ -184,6 +288,7 @@ class DOSBoxManager: self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) config = config or DOSBoxConfig() self._gdb_port = config.gdb_port + self._qmp_port = config.qmp_port # If binary specified, set up mount and autoexec if binary_path: @@ -201,7 +306,8 @@ class DOSBoxManager: self._config_path.write_text(config.to_conf()) # Build command line - cmd = [dosbox_exe, "-conf", str(self._config_path)] + # -hostrun enables clipboard sharing with host + cmd = [dosbox_exe, "-conf", str(self._config_path), "-hostrun"] if extra_args: cmd.extend(extra_args) @@ -248,10 +354,24 @@ class DOSBoxManager: except (subprocess.SubprocessError, FileNotFoundError) as e: raise RuntimeError("Docker not available") from e + # Grant X11 access for headed mode + self._setup_x11_access() + # Create temporary directory self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) config = config or DOSBoxConfig() self._gdb_port = config.gdb_port + self._qmp_port = config.qmp_port + + # If binary specified, set up mount and autoexec for container paths + if binary_path: + binary = Path(binary_path).resolve() + if not binary.exists(): + raise FileNotFoundError(f"Binary not found: {binary_path}") + # In container, files are mounted at /dos + config.mounts["C"] = "/dos" + config.autoexec.append("C:") + config.autoexec.append(binary.name) # Write config file self._config_path = self._temp_dir / "dosbox.conf" @@ -265,8 +385,9 @@ class DOSBoxManager: "--rm", "-d", # Detached "--name", f"dosbox-mcp-{os.getpid()}", - # Network + # Network - expose GDB and QMP ports "-p", f"{self._gdb_port}:{self._gdb_port}", + "-p", f"{self._qmp_port}:{self._qmp_port}", # X11 forwarding "-e", f"DISPLAY={display}", "-v", "/tmp/.X11-unix:/tmp/.X11-unix", @@ -274,12 +395,11 @@ class DOSBoxManager: "-v", f"{self._config_path}:/config/dosbox.conf:ro", ] - # Mount binary directory if specified + # Mount binary directory if specified (rw for screenshots/capture to work) if binary_path: binary = Path(binary_path).resolve() - if not binary.exists(): - raise FileNotFoundError(f"Binary not found: {binary_path}") - cmd.extend(["-v", f"{binary.parent}:/dos:ro"]) + # Mount as read-write so DOSBox can save screenshots to capture/ + cmd.extend(["-v", f"{binary.parent}:/dos:rw"]) cmd.append(image) diff --git a/src/dosbox_mcp/gdb_client.py b/src/dosbox_mcp/gdb_client.py index 4fd75ad..33a34be 100644 --- a/src/dosbox_mcp/gdb_client.py +++ b/src/dosbox_mcp/gdb_client.py @@ -16,8 +16,7 @@ GDB stub, typically running on localhost:1234. import logging import socket -import time -from typing import Callable +from collections.abc import Callable from .types import Breakpoint, MemoryRegion, Registers, StopEvent, StopReason from .utils import ( @@ -26,7 +25,6 @@ from .utils import ( encode_hex, parse_registers_x86, parse_stop_reply, - signal_name, ) logger = logging.getLogger(__name__) @@ -115,7 +113,7 @@ class GDBClient: initial = self._socket.recv(1024) if initial: logger.debug(f"Initial data from stub: {initial!r}") - except socket.timeout: + except TimeoutError: pass finally: self._socket.settimeout(self.timeout) @@ -244,7 +242,7 @@ class GDBClient: logger.debug(f"Received: ${packet_data}#{checksum}") return packet_data - except socket.timeout: + except TimeoutError: raise GDBError("Receive timeout") from None except OSError as e: self._connected = False diff --git a/src/dosbox_mcp/resources.py b/src/dosbox_mcp/resources.py new file mode 100644 index 0000000..8cd8172 --- /dev/null +++ b/src/dosbox_mcp/resources.py @@ -0,0 +1,162 @@ +"""MCP Resources for DOSBox-X screenshots and captures. + +Provides resource URIs for screenshots that can be fetched by MCP clients. +""" + +import base64 +import glob +import os +from datetime import datetime +from typing import Any + +# Track screenshots taken during this session +_screenshot_registry: dict[str, dict[str, Any]] = {} + +# Default capture directory (can be overridden) +CAPTURE_DIR = os.environ.get( + "DOS_DIR", + os.path.expanduser("~/claude/dosbox-mcp/dos") +) + "/capture" + + +def register_screenshot(filename: str, path: str, size: int, timestamp: str | None = None) -> str: + """Register a screenshot and return its resource URI. + + Args: + filename: The screenshot filename (e.g., "ripterm_001.png") + path: Full path to the screenshot file + size: File size in bytes + timestamp: Optional timestamp string + + Returns: + Resource URI for the screenshot (e.g., "dosbox://screenshots/ripterm_001.png") + """ + if timestamp is None: + timestamp = datetime.now().isoformat() + + _screenshot_registry[filename] = { + "path": path, + "size": size, + "timestamp": timestamp, + "format": "png", + } + + return f"dosbox://screenshots/{filename}" + + +def get_screenshot_info(filename: str) -> dict[str, Any] | None: + """Get info about a registered screenshot.""" + return _screenshot_registry.get(filename) + + +def get_screenshot_data(filename: str) -> bytes | None: + """Get the binary data for a screenshot. + + Args: + filename: The screenshot filename + + Returns: + PNG image data as bytes, or None if not found + """ + info = _screenshot_registry.get(filename) + if info and os.path.isfile(info["path"]): + with open(info["path"], "rb") as f: + return f.read() + + # Try to find it in the capture directory + path = os.path.join(CAPTURE_DIR, filename) + if os.path.isfile(path): + with open(path, "rb") as f: + return f.read() + + return None + + +def list_screenshots() -> list[dict[str, Any]]: + """List all available screenshots. + + Returns both registered screenshots and any found in the capture directory. + """ + screenshots = [] + + # Add registered screenshots + for filename, info in _screenshot_registry.items(): + screenshots.append({ + "uri": f"dosbox://screenshots/{filename}", + "filename": filename, + "size": info["size"], + "timestamp": info["timestamp"], + }) + + # Scan capture directory for additional files + if os.path.isdir(CAPTURE_DIR): + for path in glob.glob(os.path.join(CAPTURE_DIR, "*.png")): + filename = os.path.basename(path) + if filename not in _screenshot_registry: + stat = os.stat(path) + screenshots.append({ + "uri": f"dosbox://screenshots/{filename}", + "filename": filename, + "size": stat.st_size, + "timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(), + }) + + # Sort by timestamp (newest first) + screenshots.sort(key=lambda x: x["timestamp"], reverse=True) + return screenshots + + +def get_latest_screenshot() -> dict[str, Any] | None: + """Get the most recent screenshot.""" + screenshots = list_screenshots() + return screenshots[0] if screenshots else None + + +def capture_screen_live() -> bytes | None: + """Capture the current DOSBox-X screen and return PNG bytes. + + This performs a live capture via QMP - no tool call needed. + Useful for the dosbox://screen resource. + + Returns: + PNG image data as bytes, or None if capture failed + """ + from .tools.peripheral import _get_qmp_port, _qmp_command + + result = _qmp_command("localhost", _get_qmp_port(), "screendump", + {"filename": "_live_capture.png"}, timeout=10.0) + + if "error" in result: + return None + + # Check if QMP returned base64 data directly + if "return" in result: + ret = result["return"] + + # Some QMP implementations return base64 data directly + if "data" in ret: + return base64.b64decode(ret["data"]) + + # Otherwise try to read from the file it created + if "file" in ret: + filename = os.path.basename(ret["file"]) + # Try capture directory + path = os.path.join(CAPTURE_DIR, filename) + if os.path.isfile(path): + with open(path, "rb") as f: + return f.read() + # Try relative path + possible_dirs = [ + os.environ.get("DOS_DIR", ""), + os.path.expanduser("~/claude/dosbox-mcp/dos"), + "./dos", + ] + for base in possible_dirs: + if not base: + continue + test_path = os.path.join(base, ret["file"]) + if os.path.isfile(test_path): + with open(test_path, "rb") as f: + return f.read() + + return None diff --git a/src/dosbox_mcp/server.py b/src/dosbox_mcp/server.py index 9e88c26..04e436d 100644 --- a/src/dosbox_mcp/server.py +++ b/src/dosbox_mcp/server.py @@ -11,8 +11,9 @@ import logging from importlib.metadata import version from fastmcp import FastMCP +from fastmcp.tools import Tool -from . import tools +from . import resources, tools # Configure logging logging.basicConfig( @@ -59,30 +60,114 @@ Address formats supported: # Register tools from modules # ============================================================================= -# Execution control -mcp.tool()(tools.launch) -mcp.tool()(tools.attach) -mcp.tool()(tools.continue_execution) -mcp.tool()(tools.step) -mcp.tool()(tools.step_over) -mcp.tool()(tools.quit) +# All tool functions to register +_TOOLS = [ + # Execution control + tools.launch, + tools.attach, + tools.continue_execution, + tools.step, + tools.step_over, + tools.quit, + # Breakpoints + tools.breakpoint_set, + tools.breakpoint_list, + tools.breakpoint_delete, + # Inspection + tools.registers, + tools.memory_read, + tools.memory_write, + tools.disassemble, + tools.stack, + tools.status, + tools.screen_text, + tools.screen_graphics, + # Peripheral + tools.screenshot, + tools.serial_send, + tools.keyboard_send, + tools.mouse_move, + tools.mouse_click, + tools.mouse_drag, + tools.clipboard_copy, + tools.clipboard_paste, + # Control (QMP-based) + tools.pause, + tools.resume, + tools.reset, + tools.savestate, + tools.loadstate, + tools.memdump, + tools.query_status, +] -# Breakpoints -mcp.tool()(tools.breakpoint_set) -mcp.tool()(tools.breakpoint_list) -mcp.tool()(tools.breakpoint_delete) +for func in _TOOLS: + mcp.add_tool(Tool.from_function(func)) -# Inspection -mcp.tool()(tools.registers) -mcp.tool()(tools.memory_read) -mcp.tool()(tools.memory_write) -mcp.tool()(tools.disassemble) -mcp.tool()(tools.stack) -mcp.tool()(tools.status) -# Peripheral -mcp.tool()(tools.screenshot) -mcp.tool()(tools.serial_send) +# ============================================================================= +# Resources - Screen and Screenshots +# ============================================================================= + + +@mcp.resource("dosbox://screen") +def get_screen_resource() -> bytes: + """Capture and return the current DOSBox-X screen. + + This is a live capture - no need to call screenshot() first. + Simply read this resource to get the current display as PNG. + + Returns: + PNG image data of the current screen + """ + data = resources.capture_screen_live() + if data is None: + raise ValueError("Failed to capture screen - is DOSBox-X running with QMP enabled?") + return data + + +@mcp.resource("dosbox://screenshots") +def list_screenshots_resource() -> str: + """List all available DOSBox-X screenshots. + + Returns a JSON list of screenshot metadata including URIs. + """ + import json + screenshots = resources.list_screenshots() + return json.dumps(screenshots, indent=2) + + +@mcp.resource("dosbox://screenshots/{filename}") +def get_screenshot_resource(filename: str) -> bytes: + """Get a specific screenshot by filename. + + Args: + filename: Screenshot filename (e.g., "ripterm_001.png") + + Returns: + PNG image data + """ + data = resources.get_screenshot_data(filename) + if data is None: + raise ValueError(f"Screenshot not found: {filename}") + return data + + +@mcp.resource("dosbox://screenshots/latest") +def get_latest_screenshot_resource() -> bytes: + """Get the most recent screenshot. + + Returns: + PNG image data of the latest screenshot + """ + latest = resources.get_latest_screenshot() + if latest is None: + raise ValueError("No screenshots available") + + data = resources.get_screenshot_data(latest["filename"]) + if data is None: + raise ValueError("Screenshot file not found") + return data # ============================================================================= diff --git a/src/dosbox_mcp/tools/__init__.py b/src/dosbox_mcp/tools/__init__.py index f8ed067..46db365 100644 --- a/src/dosbox_mcp/tools/__init__.py +++ b/src/dosbox_mcp/tools/__init__.py @@ -4,13 +4,33 @@ Tools are organized by function: - execution: launch, attach, continue, step, quit - breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete - inspection: registers, memory_read, memory_write, disassemble, stack, status -- peripheral: screenshot, serial_send +- peripheral: screenshot, serial_send, keyboard_send, mouse_* +- control: pause, resume, reset, savestate, loadstate, memdump, query_status """ from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set +from .control import loadstate, memdump, pause, query_status, reset, resume, savestate from .execution import attach, continue_execution, launch, quit, step, step_over -from .inspection import disassemble, memory_read, memory_write, registers, stack, status -from .peripheral import screenshot, serial_send +from .inspection import ( + disassemble, + memory_read, + memory_write, + registers, + screen_graphics, + screen_text, + stack, + status, +) +from .peripheral import ( + clipboard_copy, + clipboard_paste, + keyboard_send, + mouse_click, + mouse_drag, + mouse_move, + screenshot, + serial_send, +) __all__ = [ # Execution @@ -31,7 +51,23 @@ __all__ = [ "disassemble", "stack", "status", + "screen_text", + "screen_graphics", # Peripheral "screenshot", "serial_send", + "keyboard_send", + "mouse_move", + "mouse_click", + "mouse_drag", + "clipboard_copy", + "clipboard_paste", + # Control + "pause", + "resume", + "reset", + "savestate", + "loadstate", + "memdump", + "query_status", ] diff --git a/src/dosbox_mcp/tools/control.py b/src/dosbox_mcp/tools/control.py new file mode 100644 index 0000000..dcf2872 --- /dev/null +++ b/src/dosbox_mcp/tools/control.py @@ -0,0 +1,251 @@ +"""Control tools: pause, resume, reset, savestate, loadstate.""" + +from typing import Any + +from .peripheral import _get_qmp_port, _qmp_command + + +def pause() -> dict: + """Pause DOSBox-X emulation. + + Stops the emulator at the current instruction. Use resume() to continue. + This is different from GDB breakpoints - it's an immediate halt. + + Returns: + Pause status + """ + result = _qmp_command("localhost", _get_qmp_port(), "stop") + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + return { + "success": True, + "message": "Emulator paused", + } + + +def resume() -> dict: + """Resume DOSBox-X emulation after pause. + + Continues emulator execution after a pause() call. + + Returns: + Resume status + """ + result = _qmp_command("localhost", _get_qmp_port(), "cont") + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + return { + "success": True, + "message": "Emulator resumed", + } + + +def reset(dos_only: bool = False) -> dict: + """Reset DOSBox-X system. + + Args: + dos_only: If True, only reset DOS (warm boot). If False, full system reset. + + Returns: + Reset status + """ + args = {} + if dos_only: + args["dos"] = True + + result = _qmp_command("localhost", _get_qmp_port(), "system_reset", args if args else None) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + return { + "success": True, + "message": "DOS reset" if dos_only else "System reset", + "reset_type": "dos" if dos_only else "full", + } + + +def savestate(filename: str) -> dict: + """Save DOSBox-X emulator state to file. + + Creates a snapshot of the entire emulator state that can be loaded later. + Useful for creating checkpoints during reverse engineering. + + Args: + filename: Path to save state file (relative to DOSBox working dir) + + Returns: + Save status with file path + + Example: + savestate("checkpoint1.sav") + """ + result = _qmp_command("localhost", _get_qmp_port(), "savestate", + {"filename": filename}, timeout=30.0) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + response = { + "success": True, + "message": f"State saved to {filename}", + "filename": filename, + } + + # Include any return data from QMP + if "return" in result: + ret = result["return"] + if isinstance(ret, dict): + response.update(ret) + + return response + + +def loadstate(filename: str) -> dict: + """Load DOSBox-X emulator state from file. + + Restores a previously saved snapshot. The emulator will be in exactly + the same state as when the savestate was created. + + Args: + filename: Path to state file to load + + Returns: + Load status + + Example: + loadstate("checkpoint1.sav") + """ + result = _qmp_command("localhost", _get_qmp_port(), "loadstate", + {"filename": filename}, timeout=30.0) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + response = { + "success": True, + "message": f"State loaded from {filename}", + "filename": filename, + } + + if "return" in result: + ret = result["return"] + if isinstance(ret, dict): + response.update(ret) + + return response + + +def memdump(address: str, length: int, filename: str | None = None) -> dict: + """Dump memory region to file or return as data. + + Uses QMP memdump for efficient binary memory extraction. + More efficient than GDB memory_read for large regions. + + Args: + address: Start address in hex format (e.g., "0x0000", "1234:5678") + length: Number of bytes to dump + filename: Optional file to save dump (returns base64 if not specified) + + Returns: + Memory dump result with either file path or base64 data + + Example: + memdump("0xB8000", 4000) # Dump VGA text buffer + memdump("CS:IP", 256, "code_dump.bin") + """ + + # Parse segment:offset if provided + if ':' in address: + seg, off = address.split(':') + flat_addr = (int(seg, 16) << 4) + int(off, 16) + elif address.startswith('0x'): + flat_addr = int(address, 16) + else: + flat_addr = int(address, 16) + + args: dict[str, Any] = { + "address": flat_addr, + "length": length, + } + if filename: + args["filename"] = filename + + result = _qmp_command("localhost", _get_qmp_port(), "memdump", args, timeout=30.0) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + response = { + "success": True, + "address": f"0x{flat_addr:X}", + "length": length, + } + + if "return" in result: + ret = result["return"] + if isinstance(ret, dict): + if "file" in ret: + response["filename"] = ret["file"] + response["message"] = f"Memory dumped to {ret['file']}" + if "data" in ret: + # Return base64 data + response["data_base64"] = ret["data"] + response["message"] = f"Dumped {length} bytes" + + return response + + +def query_status() -> dict: + """Query detailed DOSBox-X emulator status via QMP. + + Returns more detailed status than the GDB-based status() tool, + including pause state and debugger activity. + + Returns: + Detailed emulator status + """ + result = _qmp_command("localhost", _get_qmp_port(), "query-status") + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + response = { + "success": True, + } + + if "return" in result: + ret = result["return"] + if isinstance(ret, dict): + response["running"] = ret.get("running", False) + response["paused"] = not ret.get("running", False) + response["debugger_active"] = ret.get("debugger", False) + if "reason" in ret: + response["pause_reason"] = ret["reason"] + + return response diff --git a/src/dosbox_mcp/tools/execution.py b/src/dosbox_mcp/tools/execution.py index 1ebce66..25662e3 100644 --- a/src/dosbox_mcp/tools/execution.py +++ b/src/dosbox_mcp/tools/execution.py @@ -1,74 +1,122 @@ """Execution control tools: launch, attach, continue, step, quit.""" +import time + from ..dosbox import DOSBoxConfig from ..gdb_client import GDBError from ..state import client, manager from ..utils import format_address +from .peripheral import keyboard_send as _keyboard_send def launch( binary_path: str | None = None, gdb_port: int = 1234, - use_docker: bool = False, + qmp_port: int = 4444, + use_docker: bool | None = None, cycles: str = "auto", memsize: int = 16, + joystick: str = "auto", + parallel1: str = "disabled", ) -> dict: - """Launch DOSBox-X with GDB debugging enabled. + """Launch DOSBox-X with GDB debugging and QMP control enabled. + + Automatically uses Docker if native DOSBox-X is not installed. Args: binary_path: Path to DOS binary to run (optional) gdb_port: Port for GDB stub (default: 1234) - use_docker: Use Docker container instead of native DOSBox-X + qmp_port: Port for QMP control - screenshots, keyboard, mouse (default: 4444) + use_docker: Force Docker (True), force native (False), or auto-detect (None/default) cycles: CPU cycles setting (auto, max, or number) memsize: Conventional memory in MB (default: 16) + joystick: Joystick type - auto, none, 2axis, 4axis (default: auto) + parallel1: Parallel port 1 - disabled, file, printer (default: disabled) + Use "file" to capture print output to capture directory Returns: Status dict with connection details Example: - launch("/path/to/GAME.EXE", gdb_port=1234) + launch("/path/to/GAME.EXE") # Auto-detects native vs Docker + launch("/path/to/GAME.EXE", joystick="2axis") # With joystick + launch("/path/to/GAME.EXE", parallel1="file") # Capture printer output """ config = DOSBoxConfig( gdb_port=gdb_port, gdb_enabled=True, + qmp_port=qmp_port, + qmp_enabled=True, cycles=cycles, memsize=memsize, + joysticktype=joystick, + parallel1=parallel1, ) + launch_method = None + try: - if use_docker: + # Auto-detect: try native first, fallback to Docker + if use_docker is None: + if manager._find_dosbox(): + launch_method = "native" + manager.launch_native(binary_path=binary_path, config=config) + else: + # Native not available, try Docker + launch_method = "docker" + manager.launch_docker(binary_path=binary_path, config=config) + elif use_docker: + launch_method = "docker" manager.launch_docker(binary_path=binary_path, config=config) else: + launch_method = "native" manager.launch_native(binary_path=binary_path, config=config) return { "success": True, - "message": "DOSBox-X launched successfully", + "message": f"DOSBox-X launched successfully ({launch_method})", + "launch_method": launch_method, "gdb_host": "localhost", "gdb_port": gdb_port, + "qmp_port": qmp_port, "pid": manager.pid, - "hint": f"Use attach() to connect to the debugger on port {gdb_port}", + "hint": f"Use attach() to connect to debugger on port {gdb_port}. QMP (screenshots/keyboard) on port {qmp_port}.", } except Exception as e: return { "success": False, "error": str(e), + "hint": "Ensure Docker is installed and the dosbox-mcp image is built (make build)", } -def attach(host: str = "localhost", port: int = 1234) -> dict: +def attach( + host: str = "localhost", + port: int | None = None, + auto_keypress: str | None = None, + keypress_delay_ms: int = 500, +) -> dict: """Connect to a running DOSBox-X GDB stub. Args: host: Hostname or IP (default: localhost) - port: GDB port (default: 1234) + port: GDB port (default: uses port from launch(), or 1234) + auto_keypress: Optional key(s) to send after attaching (e.g., "enter", "space") + Useful for dismissing splash screens automatically. + keypress_delay_ms: Delay after connection before sending keypress (default: 500ms) Returns: Connection status and initial register state - Example: - attach("localhost", 1234) + Examples: + attach() # Auto-detect port from launch() + attach("localhost", 1234) # Explicit port + attach(auto_keypress="enter") # Dismiss splash screen with Enter """ + # Use manager's port if not specified, fallback to 1234 + if port is None: + port = manager.gdb_port if manager.running else 1234 + try: client.connect(host, port) @@ -76,13 +124,26 @@ def attach(host: str = "localhost", port: int = 1234) -> dict: regs = client.read_registers() stop = client.get_stop_reason() - return { + result = { "success": True, "message": f"Connected to {host}:{port}", + "gdb_port": port, "stop_reason": stop.reason.name.lower(), "cs_ip": f"{regs.cs:04x}:{regs.ip:04x}", "physical_address": f"{regs.cs_ip:05x}", } + + # Send keypress to dismiss splash screen if requested + if auto_keypress: + time.sleep(keypress_delay_ms / 1000.0) + key_result = _keyboard_send(auto_keypress) + result["keypress_sent"] = auto_keypress + result["keypress_result"] = key_result.get("success", False) + if not key_result.get("success"): + result["keypress_error"] = key_result.get("error") + + return result + except GDBError as e: return { "success": False, diff --git a/src/dosbox_mcp/tools/inspection.py b/src/dosbox_mcp/tools/inspection.py index 9b02622..840dc3e 100644 --- a/src/dosbox_mcp/tools/inspection.py +++ b/src/dosbox_mcp/tools/inspection.py @@ -252,6 +252,130 @@ def stack(count: int = 16) -> dict: } +def screen_text(rows: int = 25, cols: int = 80) -> dict: + """Read the VGA text mode screen buffer. + + Reads directly from VGA text mode memory at 0xB8000. This is a reliable + way to see what's displayed without needing QMP/screenshots. + + Args: + rows: Number of rows to read (default: 25, standard VGA) + cols: Number of columns (default: 80, standard VGA) + + Returns: + Screen contents as text lines with optional attributes + + Note: + Only works in text mode (mode 3). Graphics modes use 0xA0000. + """ + try: + # VGA text mode buffer at 0xB8000 + # Each cell is 2 bytes: [character][attribute] + # Attribute: bits 0-3 = foreground, 4-6 = background, 7 = blink + VIDEO_BASE = 0xB8000 + bytes_per_row = cols * 2 + total_bytes = rows * bytes_per_row + + mem = client.read_memory(VIDEO_BASE, total_bytes) + + lines = [] + raw_lines = [] + for row in range(rows): + offset = row * bytes_per_row + row_data = mem.data[offset:offset + bytes_per_row] + + # Extract characters (every other byte) + chars = [] + for i in range(0, len(row_data), 2): + char_byte = row_data[i] + # Convert to printable ASCII, replace control chars with space + if 32 <= char_byte < 127: + chars.append(chr(char_byte)) + elif char_byte == 0: + chars.append(' ') + else: + chars.append('.') # Non-printable + + line = ''.join(chars).rstrip() + lines.append(line) + raw_lines.append(row_data.hex()) + + # Also provide a compact view (non-empty lines only) + compact = [f"{i:2d}: {line}" for i, line in enumerate(lines) if line.strip()] + + return { + "success": True, + "mode": "text", + "dimensions": f"{cols}x{rows}", + "video_address": "B8000", + "lines": lines, + "compact": compact, + "raw_hex": raw_lines, + } + + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + +def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> dict: + """Read VGA graphics mode video memory. + + Reads directly from VGA graphics memory at 0xA0000. + + Args: + width: Screen width (default: 320 for mode 13h) + height: Screen height (default: 200 for mode 13h) + mode: VGA mode hint - "13h" (320x200x256) or "12h" (640x480x16) + + Returns: + Raw video memory data (palette-indexed pixels) + + Note: + Returns raw bytes - you'll need palette info to interpret colors. + Mode 13h: 1 byte per pixel (256 colors), linear framebuffer + Mode 12h: 4 planes, more complex + """ + try: + VIDEO_BASE = 0xA0000 + + if mode == "13h": + # Mode 13h: 320x200, 1 byte per pixel, linear + total_bytes = min(width * height, 64000) # Max 64KB + else: + # Conservative read for other modes + total_bytes = min(width * height // 8, 38400) + + mem = client.read_memory(VIDEO_BASE, total_bytes) + + # Provide some statistics about the pixel data + pixel_counts = {} + for b in mem.data: + pixel_counts[b] = pixel_counts.get(b, 0) + 1 + + # Top colors + top_colors = sorted(pixel_counts.items(), key=lambda x: -x[1])[:10] + + return { + "success": True, + "mode": mode, + "dimensions": f"{width}x{height}", + "video_address": "A0000", + "bytes_read": len(mem.data), + "top_colors": [{"index": c, "count": n} for c, n in top_colors], + "raw_hex_preview": mem.data[:256].hex(), # First 256 bytes + "hint": "Use VGA palette registers (ports 3C7-3C9) to interpret colors", + } + + except GDBError as e: + return { + "success": False, + "error": str(e), + } + + def status() -> dict: """Get current debugger and emulator status. diff --git a/src/dosbox_mcp/tools/peripheral.py b/src/dosbox_mcp/tools/peripheral.py index df8c0c1..e458451 100644 --- a/src/dosbox_mcp/tools/peripheral.py +++ b/src/dosbox_mcp/tools/peripheral.py @@ -1,19 +1,216 @@ -"""Peripheral tools: screenshot, serial communication.""" +"""Peripheral tools: screenshot, serial communication, keyboard input.""" + +import json +import socket +import time +from typing import Any + +from ..state import manager + +# Default ports (fallback if manager not configured) +DEFAULT_SERIAL_PORT = 5555 # nullmodem server:5555 +DEFAULT_QMP_PORT = 4444 # qmpserver port=4444 + + +def _get_qmp_port() -> int: + """Get QMP port from manager or use default.""" + return manager.qmp_port if manager.running else DEFAULT_QMP_PORT + + +def _tcp_send(host: str, port: int, data: bytes, timeout: float = 2.0) -> tuple[bool, str]: + """Send data over TCP socket. + + Args: + host: Target hostname + port: Target port + data: Data to send + timeout: Connection timeout + + Returns: + Tuple of (success, message) + """ + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((host, port)) + sock.sendall(data) + sock.close() + return True, f"Sent {len(data)} bytes to {host}:{port}" + except TimeoutError: + return False, f"Connection timeout to {host}:{port}" + except ConnectionRefusedError: + return False, f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?" + except Exception as e: + return False, f"Socket error: {e}" + + +def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0) -> dict: + """Send QMP command to DOSBox-X. + + QMP (QEMU Machine Protocol) is a JSON-based protocol for machine control. + The dosbox-x-remotedebug fork supports QMP on a configurable port. + + Args: + host: Target hostname + port: QMP port + command: QMP command name (e.g., "send-key", "quit") + args: Optional command arguments + timeout: Connection timeout + + Returns: + QMP response dict or error dict + """ + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((host, port)) + + # Read QMP greeting (DOSBox-X sends a banner) + greeting = b"" + try: + sock.settimeout(0.1) # Short timeout for greeting + while True: + chunk = sock.recv(1024) + if not chunk: + break + greeting += chunk + except (TimeoutError, socket.timeout): + pass # Expected - no more data + + # Build QMP command + cmd = {"execute": command} + if args: + cmd["arguments"] = args + + # Send command as JSON line + sock.settimeout(timeout) + sock.sendall(json.dumps(cmd).encode() + b"\n") + + # Read response + response = b"" + try: + sock.settimeout(0.2) # Short timeout for response + while True: + chunk = sock.recv(1024) + if not chunk: + break + response += chunk + except (TimeoutError, socket.timeout): + pass + + sock.close() + + # Parse response + if response: + try: + return json.loads(response.decode().strip().split('\n')[-1]) + except json.JSONDecodeError: + return {"success": True, "raw_response": response.decode()} + + return {"success": True, "message": "Command sent (no response)"} + + except TimeoutError: + return {"success": False, "error": f"QMP connection timeout to {host}:{port}"} + except ConnectionRefusedError: + return {"success": False, "error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?"} + except Exception as e: + return {"success": False, "error": f"QMP error: {e}"} def screenshot(filename: str | None = None) -> dict: """Capture DOSBox-X display. + Uses QMP screendump command which calls DOSBox-X's internal capture function. + Returns a resource URI that can be used to fetch the screenshot. + Args: - filename: Optional output filename + filename: Optional output filename (DOSBox-X uses auto-naming) Returns: - Screenshot info or error + Screenshot info including resource URI for fetching the image """ - # Placeholder - requires X11 or DOSBox-X specific integration + import os + from datetime import datetime + + # Import resources module for registration + try: + from dosbox_mcp import resources + except ImportError: + resources = None + + # Use QMP screendump command - it returns base64 PNG and saves to capture/ + result = _qmp_command("localhost", _get_qmp_port(), "screendump", + {"filename": filename or "screenshot.png"}, timeout=10.0) + + # Check for QMP error response + if "error" in result: + error_desc = result.get("error", {}) + if isinstance(error_desc, dict): + error_msg = error_desc.get("desc", str(error_desc)) + else: + error_msg = str(error_desc) + return { + "success": False, + "error": f"Screenshot failed: {error_msg}", + } + + # Parse successful response + if "return" in result: + ret = result["return"] + screenshot_filename = None + full_path = None + + # Find the file path + if "file" in ret: + screenshot_filename = os.path.basename(ret["file"]) + # Try to find full path on host + possible_dirs = [ + os.environ.get("DOS_DIR", ""), + os.path.expanduser("~/claude/dosbox-mcp/dos"), + "./dos", + ] + for base in possible_dirs: + if not base: + continue + test_path = os.path.join(base, ret["file"]) + if os.path.isfile(test_path): + full_path = test_path + break + + # Register with resource module + resource_uri = None + if resources and screenshot_filename and full_path: + resource_uri = resources.register_screenshot( + filename=screenshot_filename, + path=full_path, + size=ret.get("size", 0), + timestamp=datetime.now().isoformat(), + ) + + response = { + "success": True, + "message": "Screenshot captured", + "format": ret.get("format", "png"), + "size_bytes": ret.get("size", 0), + } + + # Include resource URI (the only way clients should access screenshots) + if resource_uri: + response["resource_uri"] = resource_uri + else: + # Fallback: provide filename so client knows what was created + # but encourage using dosbox://screenshots/latest resource + response["hint"] = "Use resource dosbox://screenshots/latest to fetch" + if screenshot_filename: + response["filename"] = screenshot_filename + + return response + + # Fallback for unexpected response format return { - "success": False, - "error": "Screenshot not yet implemented. Use DOSBox-X hotkey F12.", + "success": True, + "message": "Screenshot command sent", + "raw_response": result, } @@ -30,8 +227,509 @@ def serial_send(data: str, port: int = 1) -> dict: Returns: Send result """ - # Placeholder - requires serial port configuration - return { - "success": False, - "error": "Serial port communication not yet implemented.", + # Parse hex escapes like \x1b for ESC + def parse_data(s: str) -> bytes: + result = bytearray() + i = 0 + while i < len(s): + if s[i:i+2] == '\\x' and i + 4 <= len(s): + try: + result.append(int(s[i+2:i+4], 16)) + i += 4 + continue + except ValueError: + pass + elif s[i:i+2] == '\\r': + result.append(0x0d) + i += 2 + continue + elif s[i:i+2] == '\\n': + result.append(0x0a) + i += 2 + continue + result.append(ord(s[i])) + i += 1 + return bytes(result) + + # Map COM port to TCP port (based on dosbox.conf config) + # serial1=nullmodem server:5555 + tcp_port_map = { + 1: 5555, # COM1 + 2: 5556, # COM2 (if configured) + } + + if port not in tcp_port_map: + return { + "success": False, + "error": f"Invalid COM port {port}. Supported: 1, 2", + } + + tcp_port = tcp_port_map[port] + + try: + byte_data = parse_data(data) + except Exception as e: + return { + "success": False, + "error": f"Failed to parse data: {e}", + } + + success, message = _tcp_send("localhost", tcp_port, byte_data) + + if success: + return { + "success": True, + "message": message, + "bytes_sent": len(byte_data), + "port": f"COM{port}", + "tcp_port": tcp_port, + "data_preview": data[:50] + ("..." if len(data) > 50 else ""), + } + else: + return { + "success": False, + "error": message, + "hint": f"Ensure DOSBox-X is running with serial{port}=nullmodem server:{tcp_port}", + } + + +# QMP keyboard key mapping (from DOSBox-X remotedebug qmp.cpp) +QMP_KEY_MAP = { + # Letters + "a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": "f", "g": "g", + "h": "h", "i": "i", "j": "j", "k": "k", "l": "l", "m": "m", "n": "n", + "o": "o", "p": "p", "q": "q", "r": "r", "s": "s", "t": "t", "u": "u", + "v": "v", "w": "w", "x": "x", "y": "y", "z": "z", + # Numbers + "0": "0", "1": "1", "2": "2", "3": "3", "4": "4", + "5": "5", "6": "6", "7": "7", "8": "8", "9": "9", + # Function keys + "f1": "f1", "f2": "f2", "f3": "f3", "f4": "f4", "f5": "f5", "f6": "f6", + "f7": "f7", "f8": "f8", "f9": "f9", "f10": "f10", "f11": "f11", "f12": "f12", + # Special keys + "ret": "ret", "enter": "ret", "return": "ret", + "esc": "esc", "escape": "esc", + "spc": "spc", "space": "spc", " ": "spc", + "tab": "tab", + "backspace": "backspace", "bs": "backspace", + "delete": "delete", "del": "delete", + "insert": "insert", "ins": "insert", + "home": "home", "end": "end", + "pgup": "pgup", "pageup": "pgup", + "pgdn": "pgdn", "pagedown": "pgdn", + # Arrow keys + "up": "up", "down": "down", "left": "left", "right": "right", + # Modifiers + "shift": "shift", "ctrl": "ctrl", "alt": "alt", + "shift_r": "shift_r", "ctrl_r": "ctrl_r", "alt_r": "alt_r", + # Punctuation + "minus": "minus", "-": "minus", + "equal": "equal", "=": "equal", + "bracket_left": "bracket_left", "[": "bracket_left", + "bracket_right": "bracket_right", "]": "bracket_right", + "backslash": "backslash", "\\": "backslash", + "semicolon": "semicolon", ";": "semicolon", + "apostrophe": "apostrophe", "'": "apostrophe", + "grave_accent": "grave_accent", "`": "grave_accent", + "comma": "comma", ",": "comma", + "dot": "dot", ".": "dot", + "slash": "slash", "/": "slash", +} + + +def keyboard_send(keys: str, delay_ms: int = 10) -> dict: + """Send keyboard input to DOSBox-X via QMP protocol. + + This simulates keyboard input using the QMP (QEMU Machine Protocol) server. + Useful for triggering RIPscrip command processing when serial port input + isn't configured. + + Args: + keys: Keys to send. Can be: + - Single key: "a", "enter", "f1" + - Key sequence: "hello" (sends h, e, l, l, o) + - With modifiers: "ctrl-c", "alt-x", "shift-a" + - Mixed: "!|Z" sends !, |, Z + delay_ms: Delay between keystrokes in milliseconds (default: 50) + + Returns: + Result dict with keys sent + + Examples: + keyboard_send("hello") # Types "hello" + keyboard_send("enter") # Presses Enter + keyboard_send("ctrl-c") # Ctrl+C + keyboard_send("!|L0000001919") # RIPscrip line command + """ + sent_keys = [] + errors = [] + + # Parse and send each key + i = 0 + while i < len(keys): + # Check for modifier combinations like "ctrl-c" + modifier = None + if i + 5 < len(keys) and keys[i:i+5].lower() == "ctrl-": + modifier = "ctrl" + i += 5 + elif i + 4 < len(keys) and keys[i:i+4].lower() == "alt-": + modifier = "alt" + i += 4 + elif i + 6 < len(keys) and keys[i:i+6].lower() == "shift-": + modifier = "shift" + i += 6 + + # Check for special key names + key_to_send = None + remaining = keys[i:].lower() + + # Try to match known special keys + for key_name in sorted(QMP_KEY_MAP.keys(), key=len, reverse=True): + if remaining.startswith(key_name) and len(key_name) > 1: + key_to_send = QMP_KEY_MAP[key_name] + i += len(key_name) + break + + # If no special key matched, treat as single character + if key_to_send is None and i < len(keys): + char = keys[i] + if char.lower() in QMP_KEY_MAP: + key_to_send = QMP_KEY_MAP[char.lower()] + # Handle uppercase (needs shift) + if char.isupper() and modifier is None: + modifier = "shift" + elif char in "!@#$%^&*()_+{}|:\"<>?": + # Shifted punctuation - map to base key with shift + shift_map = { + "!": "1", "@": "2", "#": "3", "$": "4", "%": "5", + "^": "6", "&": "7", "*": "8", "(": "9", ")": "0", + "_": "minus", "+": "equal", "{": "bracket_left", + "}": "bracket_right", "|": "backslash", ":": "semicolon", + "\"": "apostrophe", "<": "comma", ">": "dot", "?": "slash", + } + if char in shift_map: + key_to_send = shift_map[char] + if modifier is None: + modifier = "shift" + i += 1 + + if key_to_send is None: + errors.append(f"Unknown key: {keys[i-1] if i > 0 else '?'}") + continue + + # Build QMP command arguments + key_spec = {"type": "qcode", "data": key_to_send} + args = {"keys": [key_spec]} + + # Add modifier if present + if modifier: + args["keys"].insert(0, {"type": "qcode", "data": modifier}) + + # Send via QMP + result = _qmp_command("localhost", _get_qmp_port(), "send-key", args) + + if result.get("success", True) and "error" not in result: + sent_keys.append(f"{modifier}-{key_to_send}" if modifier else key_to_send) + else: + errors.append(f"Failed to send {key_to_send}: {result.get('error', 'unknown')}") + + # Small delay between keys + if delay_ms > 0 and i < len(keys): + time.sleep(delay_ms / 1000.0) + + if errors: + return { + "success": False, + "error": "; ".join(errors), + "keys_sent": sent_keys, + } + + return { + "success": True, + "message": f"Sent {len(sent_keys)} keys", + "keys_sent": sent_keys, + "qmp_port": _get_qmp_port(), + } + + +def mouse_move(dx: int, dy: int) -> dict: + """Move mouse cursor by relative amount. + + Sends relative mouse movement to DOSBox-X. Positive dx moves right, + positive dy moves down. + + Args: + dx: Horizontal movement (positive = right, negative = left) + dy: Vertical movement (positive = down, negative = up) + + Returns: + Result dict with movement info + + Examples: + mouse_move(10, 0) # Move right 10 pixels + mouse_move(-5, -5) # Move up-left + """ + # QMP input-send-event for relative mouse movement + args = { + "events": [ + { + "type": "rel", + "data": {"axis": "x", "value": dx} + }, + { + "type": "rel", + "data": {"axis": "y", "value": dy} + } + ] + } + + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Unknown error"), + } + + return { + "success": True, + "message": f"Mouse moved ({dx}, {dy})", + "dx": dx, + "dy": dy, + } + + +def mouse_click(button: str = "left", double: bool = False) -> dict: + """Click mouse button. + + Sends mouse button press and release to DOSBox-X. + + Args: + button: Button to click - "left", "right", or "middle" + double: If True, send double-click + + Returns: + Result dict with click info + + Examples: + mouse_click() # Left click + mouse_click("right") # Right click + mouse_click("left", True) # Double-click + """ + # Map button names to QMP button values + button_map = { + "left": "left", + "right": "right", + "middle": "middle", + "l": "left", + "r": "right", + "m": "middle", + } + + btn = button_map.get(button.lower()) + if not btn: + return { + "success": False, + "error": f"Unknown button '{button}'. Use: left, right, middle", + } + + clicks = 2 if double else 1 + errors = [] + + for _ in range(clicks): + # Press + args = { + "events": [ + {"type": "btn", "data": {"button": btn, "down": True}} + ] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Press failed: {result.get('error')}") + + time.sleep(0.05) # Brief delay between press and release + + # Release + args = { + "events": [ + {"type": "btn", "data": {"button": btn, "down": False}} + ] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Release failed: {result.get('error')}") + + if double and _ == 0: + time.sleep(0.1) # Delay between clicks for double-click + + if errors: + return { + "success": False, + "error": "; ".join(errors), + } + + return { + "success": True, + "message": f"{'Double-' if double else ''}Clicked {btn} button", + "button": btn, + "double_click": double, + } + + +def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> dict: + """Drag mouse from one position to another. + + Moves to start position, presses button, moves to end position, releases. + Coordinates are relative movements from current position. + + Args: + start_x: Starting X offset from current position + start_y: Starting Y offset from current position + end_x: Ending X offset from start position + end_y: Ending Y offset from start position + button: Button to hold during drag (default: "left") + + Returns: + Result dict with drag info + + Example: + mouse_drag(0, 0, 100, 50) # Drag right 100, down 50 + """ + button_map = {"left": "left", "right": "right", "middle": "middle"} + btn = button_map.get(button.lower(), "left") + + errors = [] + + # Move to start position + if start_x != 0 or start_y != 0: + args = { + "events": [ + {"type": "rel", "data": {"axis": "x", "value": start_x}}, + {"type": "rel", "data": {"axis": "y", "value": start_y}}, + ] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Move to start failed: {result.get('error')}") + time.sleep(0.05) + + # Press button + args = { + "events": [{"type": "btn", "data": {"button": btn, "down": True}}] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Button press failed: {result.get('error')}") + time.sleep(0.05) + + # Move to end position (relative to start) + dx = end_x - start_x + dy = end_y - start_y + if dx != 0 or dy != 0: + # Break into smaller steps for smoother drag + steps = max(abs(dx), abs(dy)) // 10 + 1 + step_dx = dx / steps + step_dy = dy / steps + + for _ in range(steps): + args = { + "events": [ + {"type": "rel", "data": {"axis": "x", "value": int(step_dx)}}, + {"type": "rel", "data": {"axis": "y", "value": int(step_dy)}}, + ] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Drag movement failed: {result.get('error')}") + break + time.sleep(0.02) + + time.sleep(0.05) + + # Release button + args = { + "events": [{"type": "btn", "data": {"button": btn, "down": False}}] + } + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) + if "error" in result: + errors.append(f"Button release failed: {result.get('error')}") + + if errors: + return { + "success": False, + "error": "; ".join(errors), + } + + return { + "success": True, + "message": f"Dragged from ({start_x},{start_y}) to ({end_x},{end_y})", + "start": (start_x, start_y), + "end": (end_x, end_y), + "button": btn, + } + + +def clipboard_copy() -> dict: + """Copy DOS screen text to host clipboard. + + Sends Ctrl+F5 to DOSBox-X which copies all text on the DOS screen + to the host system clipboard. + + Returns: + Success status + + Note: + Only works in text mode. The copied text can then be accessed + from your host system's clipboard. + """ + # DOSBox-X: Ctrl+F5 = Copy screen text to host clipboard + result = _qmp_command("localhost", _get_qmp_port(), "send-key", + {"keys": [ + {"type": "qcode", "data": "ctrl"}, + {"type": "qcode", "data": "f5"} + ]}) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Failed to send Ctrl+F5"), + } + + return { + "success": True, + "message": "Screen text copied to host clipboard (Ctrl+F5)", + "hint": "Text is now in your host system clipboard", + } + + +def clipboard_paste() -> dict: + """Paste host clipboard text into DOSBox-X. + + Sends Ctrl+F6 to DOSBox-X which pastes text from the host system + clipboard into the DOS screen as simulated keystrokes. + + Returns: + Success status + + Note: + Pastes as keystrokes, so works with any DOS application. + Set your host clipboard content first, then call this. + """ + # DOSBox-X: Ctrl+F6 = Paste from host clipboard + result = _qmp_command("localhost", _get_qmp_port(), "send-key", + {"keys": [ + {"type": "qcode", "data": "ctrl"}, + {"type": "qcode", "data": "f6"} + ]}) + + if "error" in result: + return { + "success": False, + "error": result.get("error", "Failed to send Ctrl+F6"), + } + + return { + "success": True, + "message": "Host clipboard pasted into DOSBox-X (Ctrl+F6)", + "hint": "Text from host clipboard was typed as keystrokes", } diff --git a/src/dosbox_mcp/utils.py b/src/dosbox_mcp/utils.py index 146c33b..accc047 100644 --- a/src/dosbox_mcp/utils.py +++ b/src/dosbox_mcp/utils.py @@ -1,6 +1,5 @@ """Utility functions for DOSBox-X MCP Server.""" -import re def parse_address(addr: str) -> int: