Add TTF font support, network tools, and logging capabilities

TTF Font Support:
- Bundle 7 IBM PC fonts from Ultimate Oldschool PC Font Pack (CC BY-SA 4.0)
- Add fonts.py module with resolve_font() for host/Docker path handling
- Add fonts_list() MCP tool for font discovery
- Extend launch() with TTF parameters (output, ttf_font, ttf_ptsize, etc.)
- Mount fonts at /fonts in Docker container for TTF rendering

Network Tools:
- Add port_list() to show configured serial/parallel ports
- Add port_status() to check port connectivity
- Add modem_dial()/modem_hangup() for BBS dial-out
- Extend launch() with serial1/serial2/ipx parameters

Logging Tools:
- Add logging_status/enable/disable for DOSBox-X debug logging
- Add logging_category() for selective category control
- Add log_capture()/log_clear() for log retrieval

Code quality improvements:
- Use contextlib.suppress instead of try-except-pass
- Fix variable naming (VIDEO_BASE -> video_base)
- Apply ruff formatting throughout
This commit is contained in:
Ryan Malloy 2026-01-28 12:00:40 -07:00
parent 1d826254ba
commit 68e8d3c4c4
26 changed files with 2560 additions and 445 deletions

View File

@ -14,6 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
autoconf \ autoconf \
libtool \ libtool \
pkg-config \ pkg-config \
patch \
libsdl2-dev \ libsdl2-dev \
libsdl2-net-dev \ libsdl2-net-dev \
libsdl2-image-dev \ libsdl2-image-dev \
@ -37,11 +38,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# #
# IMPORTANT: Clone and build MUST be in the same RUN to prevent BuildKit from # IMPORTANT: Clone and build MUST be in the same RUN to prevent BuildKit from
# caching the build step separately from the git clone step. # caching the build step separately from the git clone step.
ARG CACHE_BUST=2026-01-27-v19-add-ncurses-for-debug ARG CACHE_BUST=2026-01-28-v23-git-only
WORKDIR /build WORKDIR /build
# Configure and build with GDB server support # Configure and build with GDB server support
# --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers # --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers
# --enable-debug: Enables internal debugger (Alt+Pause) # --enable-debug: Enables internal debugger (Alt+Pause)
# Note: Removed --disable-printer to enable parallel port support
# All patches are now committed to the rsp2k fork (joystick, parport, logging)
RUN echo "Cache bust: ${CACHE_BUST}" && \ RUN echo "Cache bust: ${CACHE_BUST}" && \
git clone --branch remotedebug --depth 1 https://github.com/rsp2k/dosbox-x-remotedebug.git dosbox-x && \ git clone --branch remotedebug --depth 1 https://github.com/rsp2k/dosbox-x-remotedebug.git dosbox-x && \
cd dosbox-x && \ cd dosbox-x && \
@ -50,8 +54,7 @@ RUN echo "Cache bust: ${CACHE_BUST}" && \
--prefix=/opt/dosbox-x \ --prefix=/opt/dosbox-x \
--enable-remotedebug \ --enable-remotedebug \
--enable-debug \ --enable-debug \
--enable-sdl2 \ --enable-sdl2 && \
--disable-printer && \
make -j$(nproc) && \ make -j$(nproc) && \
make install make install

View File

@ -41,11 +41,7 @@ build-backend = "hatchling.build"
packages = ["src/dosbox_mcp"] packages = ["src/dosbox_mcp"]
[tool.hatch.build.targets.sdist] [tool.hatch.build.targets.sdist]
include = ["src/dosbox_mcp"] include = ["src/dosbox_mcp", "src/dosbox_mcp/fonts"]
# This is the key setting for src-layout
[tool.hatch.build]
sources = ["src"]
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100

View File

@ -7,6 +7,7 @@ This module handles:
- Configuration file handling - Configuration file handling
""" """
import contextlib
import logging import logging
import os import os
import shutil import shutil
@ -16,6 +17,8 @@ import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from .fonts import DOCKER_FONTS_PATH, FONTS_DIR, resolve_font
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -34,6 +37,22 @@ class DOSBoxConfig:
# Display settings # Display settings
fullscreen: bool = False fullscreen: bool = False
windowresolution: str = "800x600" windowresolution: str = "800x600"
output: str = "opengl" # Display output: opengl, ttf, surface, etc.
# TrueType font settings (when output=ttf)
# TTF mode renders text using system TrueType fonts for sharper display
# Useful for terminal programs, word processors, and text-heavy apps
ttf_font: str | None = None # TTF font name (e.g., "Consola", "Nouveau_IBM")
ttf_fontbold: str | None = None # Bold variant
ttf_fontital: str | None = None # Italic variant
ttf_fontboit: str | None = None # Bold-italic variant
ttf_ptsize: int | None = None # Font size in points (default: auto)
ttf_winperc: int | None = None # Window size as percentage (e.g., 75)
ttf_lins: int | None = None # Screen lines (e.g., 50 for taller screen)
ttf_cols: int | None = None # Screen columns (e.g., 132 for wider screen)
ttf_blinkc: str | None = None # Cursor blink rate (0-7, or "false")
ttf_wp: str | None = None # Word processor mode: WP, WS, XY, FE
ttf_colors: str | None = None # Custom color scheme (16 RGB or hex values)
# CPU settings # CPU settings
# CRITICAL: Must use "normal" core for GDB breakpoints to work! # CRITICAL: Must use "normal" core for GDB breakpoints to work!
@ -48,10 +67,20 @@ class DOSBoxConfig:
# Startup # Startup
startbanner: bool = False # Disable splash screen for cleaner automation startbanner: bool = False # Disable splash screen for cleaner automation
# Serial ports (for future RIPscrip work) # Serial ports (for RIPscrip work and modem dial-out)
# Options: disabled, nullmodem, modem, direct, dummy
# For nullmodem: use serial1_port/serial2_port for TCP port
serial1: str = "disabled" serial1: str = "disabled"
serial2: str = "disabled" serial2: str = "disabled"
# TCP ports for nullmodem serial mode
# These are the ports external clients connect to
serial1_port: int = 5555
serial2_port: int = 5556
# IPX networking (for DOS multiplayer games like DOOM, Duke Nukem)
ipx: bool = False
# Parallel ports (LPT) # Parallel ports (LPT)
# Options: disabled, file, printer, reallpt, disney # Options: disabled, file, printer, reallpt, disney
# file: writes output to capture directory (useful for capturing print jobs) # file: writes output to capture directory (useful for capturing print jobs)
@ -73,14 +102,47 @@ class DOSBoxConfig:
# DOS startup files (written to mounted drive) # DOS startup files (written to mounted drive)
# These are actual DOS files, not DOSBox config sections # These are actual DOS files, not DOSBox config sections
autoexec_bat: str | None = None # Content for AUTOEXEC.BAT autoexec_bat: str | None = None # Content for AUTOEXEC.BAT
config_sys: str | None = None # Content for CONFIG.SYS config_sys: str | None = None # Content for CONFIG.SYS
def to_conf(self) -> str: def _build_serial_config(self, mode: str, tcp_port: int) -> str:
"""Generate DOSBox-X configuration file content.""" """Build serial port configuration string.
Args:
mode: Serial mode (disabled, nullmodem, modem, direct, dummy)
tcp_port: TCP port for nullmodem mode
Returns:
DOSBox-X serial configuration string
"""
mode_lower = mode.lower().strip()
if mode_lower == "disabled":
return "disabled"
elif mode_lower == "nullmodem":
# nullmodem server mode - listens on TCP port for incoming connections
return f"nullmodem server:0.0.0.0 port:{tcp_port}"
elif mode_lower == "modem":
# modem mode - allows DOS programs to dial out via AT commands
return "modem"
elif mode_lower in ("direct", "dummy"):
return mode_lower
else:
# Custom configuration string passed directly
return mode
def to_conf(self, container_fonts_path: str | None = None) -> str:
"""Generate DOSBox-X configuration file content.
Args:
container_fonts_path: If running in Docker, the container path where
fonts are mounted (e.g., "/fonts"). If None,
uses host paths.
"""
lines = [ lines = [
"[sdl]", "[sdl]",
f"fullscreen={str(self.fullscreen).lower()}", f"fullscreen={str(self.fullscreen).lower()}",
f"windowresolution={self.windowresolution}", f"windowresolution={self.windowresolution}",
f"output={self.output}",
"", "",
"[cpu]", "[cpu]",
f"core={self.core}", f"core={self.core}",
@ -95,33 +157,43 @@ class DOSBoxConfig:
# GDB stub configuration (lokkju/dosbox-x-remotedebug fork) # GDB stub configuration (lokkju/dosbox-x-remotedebug fork)
# Must be in [dosbox] section with "gdbserver port=" format # Must be in [dosbox] section with "gdbserver port=" format
if self.gdb_enabled: if self.gdb_enabled:
lines.extend([ lines.extend(
"gdbserver=true", [
f"gdbserver port={self.gdb_port}", "gdbserver=true",
]) f"gdbserver port={self.gdb_port}",
]
)
# QMP server configuration (for screenshots, keyboard, mouse) # QMP server configuration (for screenshots, keyboard, mouse)
if self.qmp_enabled: if self.qmp_enabled:
lines.extend([ lines.extend(
"qmpserver=true", [
f"qmpserver port={self.qmp_port}", "qmpserver=true",
]) f"qmpserver port={self.qmp_port}",
]
)
lines.extend([ # Serial port configuration with TCP port handling for nullmodem
"", serial1_config = self._build_serial_config(self.serial1, self.serial1_port)
"[serial]", serial2_config = self._build_serial_config(self.serial2, self.serial2_port)
f"serial1={self.serial1}",
f"serial2={self.serial2}", lines.extend(
"", [
"[parallel]", "",
f"parallel1={self.parallel1}", "[serial]",
f"parallel2={self.parallel2}", f"serial1={serial1_config}",
"", f"serial2={serial2_config}",
"[joystick]", "",
f"joysticktype={self.joysticktype}", "[parallel]",
f"timed={str(self.joystick_timed).lower()}", f"parallel1={self.parallel1}",
"", f"parallel2={self.parallel2}",
]) "",
"[joystick]",
f"joysticktype={self.joysticktype}",
f"timed={str(self.joystick_timed).lower()}",
"",
]
)
# Autoexec section # Autoexec section
lines.append("[autoexec]") lines.append("[autoexec]")
@ -133,7 +205,53 @@ class DOSBoxConfig:
# Add custom autoexec commands # Add custom autoexec commands
lines.extend(self.autoexec) lines.extend(self.autoexec)
return '\n'.join(lines) # IPX networking section (for DOS multiplayer games)
if self.ipx:
lines.extend(
[
"",
"[ipx]",
"ipx=true",
]
)
# TrueType font section (when output=ttf or TTF settings provided)
# resolve_font() converts bundled font names to full paths
# container_fonts_path is used for Docker to map to container paths
ttf_settings = []
font_path = resolve_font(self.ttf_font, container_fonts_path)
if font_path:
ttf_settings.append(f"font={font_path}")
fontbold_path = resolve_font(self.ttf_fontbold, container_fonts_path)
if fontbold_path:
ttf_settings.append(f"fontbold={fontbold_path}")
fontital_path = resolve_font(self.ttf_fontital, container_fonts_path)
if fontital_path:
ttf_settings.append(f"fontital={fontital_path}")
fontboit_path = resolve_font(self.ttf_fontboit, container_fonts_path)
if fontboit_path:
ttf_settings.append(f"fontboit={fontboit_path}")
if self.ttf_ptsize is not None:
ttf_settings.append(f"ptsize={self.ttf_ptsize}")
if self.ttf_winperc is not None:
ttf_settings.append(f"winperc={self.ttf_winperc}")
if self.ttf_lins is not None:
ttf_settings.append(f"lins={self.ttf_lins}")
if self.ttf_cols is not None:
ttf_settings.append(f"cols={self.ttf_cols}")
if self.ttf_blinkc is not None:
ttf_settings.append(f"blinkc={self.ttf_blinkc}")
if self.ttf_wp:
ttf_settings.append(f"wp={self.ttf_wp}")
if self.ttf_colors:
ttf_settings.append(f"colors={self.ttf_colors}")
if ttf_settings:
lines.append("")
lines.append("[ttf]")
lines.extend(ttf_settings)
return "\n".join(lines)
class DOSBoxManager: class DOSBoxManager:
@ -151,6 +269,12 @@ class DOSBoxManager:
self._gdb_port: int = 1234 self._gdb_port: int = 1234
self._qmp_port: int = 4444 self._qmp_port: int = 4444
self._xhost_granted: bool = False self._xhost_granted: bool = False
# Serial port configuration tracking
self._serial1_mode: str = "disabled"
self._serial2_mode: str = "disabled"
self._serial1_port: int = 5555
self._serial2_port: int = 5556
self._ipx_enabled: bool = False
def _setup_x11_access(self) -> bool: def _setup_x11_access(self) -> bool:
"""Grant X11 access for Docker containers. """Grant X11 access for Docker containers.
@ -199,8 +323,7 @@ class DOSBoxManager:
pass pass
logger.warning( logger.warning(
"Could not grant X11 access. Display may not work. " "Could not grant X11 access. Display may not work. Try running: xhost +local:docker"
"Try running: xhost +local:docker"
) )
return False return False
@ -223,6 +346,46 @@ class DOSBoxManager:
"""Get the QMP port.""" """Get the QMP port."""
return self._qmp_port return self._qmp_port
@property
def serial1_mode(self) -> str:
"""Get serial port 1 mode."""
return self._serial1_mode
@property
def serial2_mode(self) -> str:
"""Get serial port 2 mode."""
return self._serial2_mode
@property
def serial1_port(self) -> int:
"""Get serial port 1 TCP port (for nullmodem mode)."""
return self._serial1_port
@property
def serial2_port(self) -> int:
"""Get serial port 2 TCP port (for nullmodem mode)."""
return self._serial2_port
@property
def ipx_enabled(self) -> bool:
"""Check if IPX networking is enabled."""
return self._ipx_enabled
def get_serial_tcp_port(self, com_port: int) -> int | None:
"""Get the TCP port for a COM port if configured for nullmodem.
Args:
com_port: COM port number (1 or 2)
Returns:
TCP port number if nullmodem is configured, None otherwise
"""
if com_port == 1:
return self._serial1_port if self._serial1_mode == "nullmodem" else None
elif com_port == 2:
return self._serial2_port if self._serial2_mode == "nullmodem" else None
return None
@property @property
def pid(self) -> int | None: def pid(self) -> int | None:
"""Get process ID if running natively.""" """Get process ID if running natively."""
@ -239,7 +402,7 @@ class DOSBoxManager:
["docker", "inspect", "-f", "{{.State.Running}}", self._container_id], ["docker", "inspect", "-f", "{{.State.Running}}", self._container_id],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=5 timeout=5,
) )
return result.stdout.strip() == "true" return result.stdout.strip() == "true"
except (subprocess.SubprocessError, FileNotFoundError): except (subprocess.SubprocessError, FileNotFoundError):
@ -280,15 +443,19 @@ class DOSBoxManager:
dosbox_exe = self._find_dosbox() dosbox_exe = self._find_dosbox()
if not dosbox_exe: if not dosbox_exe:
raise RuntimeError( raise RuntimeError("DOSBox-X not found. Install dosbox-x or use Docker container.")
"DOSBox-X not found. Install dosbox-x or use Docker container."
)
# Create temporary directory for config # Create temporary directory for config
self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-"))
config = config or DOSBoxConfig() config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port self._gdb_port = config.gdb_port
self._qmp_port = config.qmp_port self._qmp_port = config.qmp_port
# Store serial port configuration
self._serial1_mode = config.serial1
self._serial2_mode = config.serial2
self._serial1_port = config.serial1_port
self._serial2_port = config.serial2_port
self._ipx_enabled = config.ipx
# If binary specified, set up mount and autoexec # If binary specified, set up mount and autoexec
if binary_path: if binary_path:
@ -362,6 +529,12 @@ class DOSBoxManager:
config = config or DOSBoxConfig() config = config or DOSBoxConfig()
self._gdb_port = config.gdb_port self._gdb_port = config.gdb_port
self._qmp_port = config.qmp_port self._qmp_port = config.qmp_port
# Store serial port configuration
self._serial1_mode = config.serial1
self._serial2_mode = config.serial2
self._serial1_port = config.serial1_port
self._serial2_port = config.serial2_port
self._ipx_enabled = config.ipx
# If binary specified, set up mount and autoexec for container paths # If binary specified, set up mount and autoexec for container paths
if binary_path: if binary_path:
@ -373,28 +546,44 @@ class DOSBoxManager:
config.autoexec.append("C:") config.autoexec.append("C:")
config.autoexec.append(binary.name) config.autoexec.append(binary.name)
# Write config file # Write config file - use container font path for Docker
self._config_path = self._temp_dir / "dosbox.conf" self._config_path = self._temp_dir / "dosbox.conf"
self._config_path.write_text(config.to_conf()) self._config_path.write_text(config.to_conf(container_fonts_path=DOCKER_FONTS_PATH))
# Build docker command # Build docker command
display = display or os.environ.get("DISPLAY", ":0") display = display or os.environ.get("DISPLAY", ":0")
cmd = [ cmd = [
"docker", "run", "docker",
"run",
"--rm", "--rm",
"-d", # Detached "-d", # Detached
"--name", f"dosbox-mcp-{os.getpid()}", "--name",
f"dosbox-mcp-{os.getpid()}",
# Network - expose GDB and QMP ports # Network - expose GDB and QMP ports
"-p", f"{self._gdb_port}:{self._gdb_port}", "-p",
"-p", f"{self._qmp_port}:{self._qmp_port}", f"{self._gdb_port}:{self._gdb_port}",
"-p",
f"{self._qmp_port}:{self._qmp_port}",
# X11 forwarding # X11 forwarding
"-e", f"DISPLAY={display}", "-e",
"-v", "/tmp/.X11-unix:/tmp/.X11-unix", f"DISPLAY={display}",
"-v",
"/tmp/.X11-unix:/tmp/.X11-unix",
# Config mount # Config mount
"-v", f"{self._config_path}:/config/dosbox.conf:ro", "-v",
f"{self._config_path}:/config/dosbox.conf:ro",
# Fonts mount (bundled TTF fonts)
"-v",
f"{FONTS_DIR}:{DOCKER_FONTS_PATH}:ro",
] ]
# Expose serial ports for nullmodem mode
if config.serial1.lower() == "nullmodem":
cmd.extend(["-p", f"{self._serial1_port}:{self._serial1_port}"])
if config.serial2.lower() == "nullmodem":
cmd.extend(["-p", f"{self._serial2_port}:{self._serial2_port}"])
# Mount binary directory if specified (rw for screenshots/capture to work) # Mount binary directory if specified (rw for screenshots/capture to work)
if binary_path: if binary_path:
binary = Path(binary_path).resolve() binary = Path(binary_path).resolve()
@ -441,23 +630,18 @@ class DOSBoxManager:
subprocess.run( subprocess.run(
["docker", "stop", "-t", str(int(timeout)), self._container_id], ["docker", "stop", "-t", str(int(timeout)), self._container_id],
capture_output=True, capture_output=True,
timeout=timeout + 5 timeout=timeout + 5,
) )
except subprocess.SubprocessError: except subprocess.SubprocessError:
# Force remove # Force remove
subprocess.run( subprocess.run(["docker", "rm", "-f", self._container_id], capture_output=True)
["docker", "rm", "-f", self._container_id],
capture_output=True
)
self._container_id = None self._container_id = None
logger.info("DOSBox-X container stopped") logger.info("DOSBox-X container stopped")
# Cleanup temp directory # Cleanup temp directory
if self._temp_dir and self._temp_dir.exists(): if self._temp_dir and self._temp_dir.exists():
try: with contextlib.suppress(OSError):
shutil.rmtree(self._temp_dir) shutil.rmtree(self._temp_dir)
except OSError:
pass
self._temp_dir = None self._temp_dir = None
def get_logs(self, lines: int = 50) -> str: def get_logs(self, lines: int = 50) -> str:
@ -479,7 +663,7 @@ class DOSBoxManager:
["docker", "logs", "--tail", str(lines), self._container_id], ["docker", "logs", "--tail", str(lines), self._container_id],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=5 timeout=5,
) )
return result.stdout + result.stderr return result.stdout + result.stderr
except subprocess.SubprocessError: except subprocess.SubprocessError:

114
src/dosbox_mcp/fonts.py Normal file
View File

@ -0,0 +1,114 @@
"""Font utilities for DOSBox-X TTF output.
This module provides access to bundled IBM PC fonts from
The Ultimate Oldschool PC Font Pack by VileR (int10h.org).
Fonts are licensed under CC BY-SA 4.0.
"""
from pathlib import Path
# Font directory location
FONTS_DIR = Path(__file__).parent / "fonts"
# Available bundled fonts (name -> filename mapping)
BUNDLED_FONTS = {
# Strict CP437 encoding
"Px437_IBM_VGA_8x16": "Px437_IBM_VGA_8x16.ttf",
"Px437_IBM_VGA_9x16": "Px437_IBM_VGA_9x16.ttf",
"Px437_IBM_EGA_8x14": "Px437_IBM_EGA_8x14.ttf",
"Px437_IBM_CGA": "Px437_IBM_CGA.ttf",
"Px437_IBM_MDA": "Px437_IBM_MDA.ttf",
# Extended Unicode (CP437 + additional characters)
"PxPlus_IBM_VGA_8x16": "PxPlus_IBM_VGA_8x16.ttf",
"PxPlus_IBM_VGA_9x16": "PxPlus_IBM_VGA_9x16.ttf",
}
# Default font for general use
DEFAULT_FONT = "Px437_IBM_VGA_9x16"
def get_font_path(font_name: str) -> Path | None:
"""Get the full path to a bundled font.
Args:
font_name: Font name without .ttf extension
(e.g., "Px437_IBM_VGA_9x16")
Returns:
Path to font file, or None if not found
Example:
>>> path = get_font_path("Px437_IBM_VGA_9x16")
>>> str(path)
'/path/to/dosbox_mcp/fonts/Px437_IBM_VGA_9x16.ttf'
"""
filename = BUNDLED_FONTS.get(font_name)
if filename:
path = FONTS_DIR / filename
if path.exists():
return path
return None
def list_fonts() -> list[str]:
"""List all available bundled font names.
Returns:
List of font names that can be passed to get_font_path()
Example:
>>> list_fonts()
['Px437_IBM_CGA', 'Px437_IBM_EGA_8x14', ...]
"""
return sorted(BUNDLED_FONTS.keys())
def is_bundled_font(font_name: str) -> bool:
"""Check if a font name refers to a bundled font.
Args:
font_name: Font name to check
Returns:
True if font is bundled with dosbox-mcp
"""
return font_name in BUNDLED_FONTS
def resolve_font(font_name: str | None, container_path: str | None = None) -> str | None:
"""Resolve a font name to a full path if bundled, or return as-is.
This allows users to specify either:
- A bundled font name (e.g., "Px437_IBM_VGA_9x16")
- A system font name (e.g., "Consolas")
- A full path to a TTF file
Args:
font_name: Font name, path, or None
container_path: If set, return container path instead of host path
(e.g., "/fonts" for Docker)
Returns:
Full path to bundled font, or original value if not bundled
"""
if font_name is None:
return None
# Check if it's a bundled font
filename = BUNDLED_FONTS.get(font_name)
if filename:
if container_path:
# Return container-relative path
return f"{container_path}/{filename}"
# Return host path
path = FONTS_DIR / filename
if path.exists():
return str(path)
# Return as-is (system font or path)
return font_name
# Docker container path for mounted fonts
DOCKER_FONTS_PATH = "/fonts"

View File

@ -0,0 +1,428 @@
Attribution-ShareAlike 4.0 International
=======================================================================
Creative Commons Corporation ("Creative Commons") is not a law firm and
does not provide legal services or legal advice. Distribution of
Creative Commons public licenses does not create a lawyer-client or
other relationship. Creative Commons makes its licenses and related
information available on an "as-is" basis. Creative Commons gives no
warranties regarding its licenses, any material licensed under their
terms and conditions, or any related information. Creative Commons
disclaims all liability for damages resulting from their use to the
fullest extent possible.
Using Creative Commons Public Licenses
Creative Commons public licenses provide a standard set of terms and
conditions that creators and other rights holders may use to share
original works of authorship and other material subject to copyright
and certain other rights specified in the public license below. The
following considerations are for informational purposes only, are not
exhaustive, and do not form part of our licenses.
Considerations for licensors: Our public licenses are
intended for use by those authorized to give the public
permission to use material in ways otherwise restricted by
copyright and certain other rights. Our licenses are
irrevocable. Licensors should read and understand the terms
and conditions of the license they choose before applying it.
Licensors should also secure all rights necessary before
applying our licenses so that the public can reuse the
material as expected. Licensors should clearly mark any
material not subject to the license. This includes other CC-
licensed material, or material used under an exception or
limitation to copyright. More considerations for licensors:
wiki.creativecommons.org/Considerations_for_licensors
Considerations for the public: By using one of our public
licenses, a licensor grants the public permission to use the
licensed material under specified terms and conditions. If
the licensor's permission is not necessary for any reason--for
example, because of any applicable exception or limitation to
copyright--then that use is not regulated by the license. Our
licenses grant only permissions under copyright and certain
other rights that a licensor has authority to grant. Use of
the licensed material may still be restricted for other
reasons, including because others have copyright or other
rights in the material. A licensor may make special requests,
such as asking that all changes be marked or described.
Although not required by our licenses, you are encouraged to
respect those requests where reasonable. More_considerations
for the public:
wiki.creativecommons.org/Considerations_for_licensees
=======================================================================
Creative Commons Attribution-ShareAlike 4.0 International Public
License
By exercising the Licensed Rights (defined below), You accept and agree
to be bound by the terms and conditions of this Creative Commons
Attribution-ShareAlike 4.0 International Public License ("Public
License"). To the extent this Public License may be interpreted as a
contract, You are granted the Licensed Rights in consideration of Your
acceptance of these terms and conditions, and the Licensor grants You
such rights in consideration of benefits the Licensor receives from
making the Licensed Material available under these terms and
conditions.
Section 1 -- Definitions.
a. Adapted Material means material subject to Copyright and Similar
Rights that is derived from or based upon the Licensed Material
and in which the Licensed Material is translated, altered,
arranged, transformed, or otherwise modified in a manner requiring
permission under the Copyright and Similar Rights held by the
Licensor. For purposes of this Public License, where the Licensed
Material is a musical work, performance, or sound recording,
Adapted Material is always produced where the Licensed Material is
synched in timed relation with a moving image.
b. Adapter's License means the license You apply to Your Copyright
and Similar Rights in Your contributions to Adapted Material in
accordance with the terms and conditions of this Public License.
c. BY-SA Compatible License means a license listed at
creativecommons.org/compatiblelicenses, approved by Creative
Commons as essentially the equivalent of this Public License.
d. Copyright and Similar Rights means copyright and/or similar rights
closely related to copyright including, without limitation,
performance, broadcast, sound recording, and Sui Generis Database
Rights, without regard to how the rights are labeled or
categorized. For purposes of this Public License, the rights
specified in Section 2(b)(1)-(2) are not Copyright and Similar
Rights.
e. Effective Technological Measures means those measures that, in the
absence of proper authority, may not be circumvented under laws
fulfilling obligations under Article 11 of the WIPO Copyright
Treaty adopted on December 20, 1996, and/or similar international
agreements.
f. Exceptions and Limitations means fair use, fair dealing, and/or
any other exception or limitation to Copyright and Similar Rights
that applies to Your use of the Licensed Material.
g. License Elements means the license attributes listed in the name
of a Creative Commons Public License. The License Elements of this
Public License are Attribution and ShareAlike.
h. Licensed Material means the artistic or literary work, database,
or other material to which the Licensor applied this Public
License.
i. Licensed Rights means the rights granted to You subject to the
terms and conditions of this Public License, which are limited to
all Copyright and Similar Rights that apply to Your use of the
Licensed Material and that the Licensor has authority to license.
j. Licensor means the individual(s) or entity(ies) granting rights
under this Public License.
k. Share means to provide material to the public by any means or
process that requires permission under the Licensed Rights, such
as reproduction, public display, public performance, distribution,
dissemination, communication, or importation, and to make material
available to the public including in ways that members of the
public may access the material from a place and at a time
individually chosen by them.
l. Sui Generis Database Rights means rights other than copyright
resulting from Directive 96/9/EC of the European Parliament and of
the Council of 11 March 1996 on the legal protection of databases,
as amended and/or succeeded, as well as other essentially
equivalent rights anywhere in the world.
m. You means the individual or entity exercising the Licensed Rights
under this Public License. Your has a corresponding meaning.
Section 2 -- Scope.
a. License grant.
1. Subject to the terms and conditions of this Public License,
the Licensor hereby grants You a worldwide, royalty-free,
non-sublicensable, non-exclusive, irrevocable license to
exercise the Licensed Rights in the Licensed Material to:
a. reproduce and Share the Licensed Material, in whole or
in part; and
b. produce, reproduce, and Share Adapted Material.
2. Exceptions and Limitations. For the avoidance of doubt, where
Exceptions and Limitations apply to Your use, this Public
License does not apply, and You do not need to comply with
its terms and conditions.
3. Term. The term of this Public License is specified in Section
6(a).
4. Media and formats; technical modifications allowed. The
Licensor authorizes You to exercise the Licensed Rights in
all media and formats whether now known or hereafter created,
and to make technical modifications necessary to do so. The
Licensor waives and/or agrees not to assert any right or
authority to forbid You from making technical modifications
necessary to exercise the Licensed Rights, including
technical modifications necessary to circumvent Effective
Technological Measures. For purposes of this Public License,
simply making modifications authorized by this Section 2(a)
(4) never produces Adapted Material.
5. Downstream recipients.
a. Offer from the Licensor -- Licensed Material. Every
recipient of the Licensed Material automatically
receives an offer from the Licensor to exercise the
Licensed Rights under the terms and conditions of this
Public License.
b. Additional offer from the Licensor -- Adapted Material.
Every recipient of Adapted Material from You
automatically receives an offer from the Licensor to
exercise the Licensed Rights in the Adapted Material
under the conditions of the Adapter's License You apply.
c. No downstream restrictions. You may not offer or impose
any additional or different terms or conditions on, or
apply any Effective Technological Measures to, the
Licensed Material if doing so restricts exercise of the
Licensed Rights by any recipient of the Licensed
Material.
6. No endorsement. Nothing in this Public License constitutes or
may be construed as permission to assert or imply that You
are, or that Your use of the Licensed Material is, connected
with, or sponsored, endorsed, or granted official status by,
the Licensor or others designated to receive attribution as
provided in Section 3(a)(1)(A)(i).
b. Other rights.
1. Moral rights, such as the right of integrity, are not
licensed under this Public License, nor are publicity,
privacy, and/or other similar personality rights; however, to
the extent possible, the Licensor waives and/or agrees not to
assert any such rights held by the Licensor to the limited
extent necessary to allow You to exercise the Licensed
Rights, but not otherwise.
2. Patent and trademark rights are not licensed under this
Public License.
3. To the extent possible, the Licensor waives any right to
collect royalties from You for the exercise of the Licensed
Rights, whether directly or through a collecting society
under any voluntary or waivable statutory or compulsory
licensing scheme. In all other cases the Licensor expressly
reserves any right to collect such royalties.
Section 3 -- License Conditions.
Your exercise of the Licensed Rights is expressly made subject to the
following conditions.
a. Attribution.
1. If You Share the Licensed Material (including in modified
form), You must:
a. retain the following if it is supplied by the Licensor
with the Licensed Material:
i. identification of the creator(s) of the Licensed
Material and any others designated to receive
attribution, in any reasonable manner requested by
the Licensor (including by pseudonym if
designated);
ii. a copyright notice;
iii. a notice that refers to this Public License;
iv. a notice that refers to the disclaimer of
warranties;
v. a URI or hyperlink to the Licensed Material to the
extent reasonably practicable;
b. indicate if You modified the Licensed Material and
retain an indication of any previous modifications; and
c. indicate the Licensed Material is licensed under this
Public License, and include the text of, or the URI or
hyperlink to, this Public License.
2. You may satisfy the conditions in Section 3(a)(1) in any
reasonable manner based on the medium, means, and context in
which You Share the Licensed Material. For example, it may be
reasonable to satisfy the conditions by providing a URI or
hyperlink to a resource that includes the required
information.
3. If requested by the Licensor, You must remove any of the
information required by Section 3(a)(1)(A) to the extent
reasonably practicable.
b. ShareAlike.
In addition to the conditions in Section 3(a), if You Share
Adapted Material You produce, the following conditions also apply.
1. The Adapter's License You apply must be a Creative Commons
license with the same License Elements, this version or
later, or a BY-SA Compatible License.
2. You must include the text of, or the URI or hyperlink to, the
Adapter's License You apply. You may satisfy this condition
in any reasonable manner based on the medium, means, and
context in which You Share Adapted Material.
3. You may not offer or impose any additional or different terms
or conditions on, or apply any Effective Technological
Measures to, Adapted Material that restrict exercise of the
rights granted under the Adapter's License You apply.
Section 4 -- Sui Generis Database Rights.
Where the Licensed Rights include Sui Generis Database Rights that
apply to Your use of the Licensed Material:
a. for the avoidance of doubt, Section 2(a)(1) grants You the right
to extract, reuse, reproduce, and Share all or a substantial
portion of the contents of the database;
b. if You include all or a substantial portion of the database
contents in a database in which You have Sui Generis Database
Rights, then the database in which You have Sui Generis Database
Rights (but not its individual contents) is Adapted Material,
including for purposes of Section 3(b); and
c. You must comply with the conditions in Section 3(a) if You Share
all or a substantial portion of the contents of the database.
For the avoidance of doubt, this Section 4 supplements and does not
replace Your obligations under this Public License where the Licensed
Rights include other Copyright and Similar Rights.
Section 5 -- Disclaimer of Warranties and Limitation of Liability.
a. UNLESS OTHERWISE SEPARATELY UNDERTAKEN BY THE LICENSOR, TO THE
EXTENT POSSIBLE, THE LICENSOR OFFERS THE LICENSED MATERIAL AS-IS
AND AS-AVAILABLE, AND MAKES NO REPRESENTATIONS OR WARRANTIES OF
ANY KIND CONCERNING THE LICENSED MATERIAL, WHETHER EXPRESS,
IMPLIED, STATUTORY, OR OTHER. THIS INCLUDES, WITHOUT LIMITATION,
WARRANTIES OF TITLE, MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE, NON-INFRINGEMENT, ABSENCE OF LATENT OR OTHER DEFECTS,
ACCURACY, OR THE PRESENCE OR ABSENCE OF ERRORS, WHETHER OR NOT
KNOWN OR DISCOVERABLE. WHERE DISCLAIMERS OF WARRANTIES ARE NOT
ALLOWED IN FULL OR IN PART, THIS DISCLAIMER MAY NOT APPLY TO YOU.
b. TO THE EXTENT POSSIBLE, IN NO EVENT WILL THE LICENSOR BE LIABLE
TO YOU ON ANY LEGAL THEORY (INCLUDING, WITHOUT LIMITATION,
NEGLIGENCE) OR OTHERWISE FOR ANY DIRECT, SPECIAL, INDIRECT,
INCIDENTAL, CONSEQUENTIAL, PUNITIVE, EXEMPLARY, OR OTHER LOSSES,
COSTS, EXPENSES, OR DAMAGES ARISING OUT OF THIS PUBLIC LICENSE OR
USE OF THE LICENSED MATERIAL, EVEN IF THE LICENSOR HAS BEEN
ADVISED OF THE POSSIBILITY OF SUCH LOSSES, COSTS, EXPENSES, OR
DAMAGES. WHERE A LIMITATION OF LIABILITY IS NOT ALLOWED IN FULL OR
IN PART, THIS LIMITATION MAY NOT APPLY TO YOU.
c. The disclaimer of warranties and limitation of liability provided
above shall be interpreted in a manner that, to the extent
possible, most closely approximates an absolute disclaimer and
waiver of all liability.
Section 6 -- Term and Termination.
a. This Public License applies for the term of the Copyright and
Similar Rights licensed here. However, if You fail to comply with
this Public License, then Your rights under this Public License
terminate automatically.
b. Where Your right to use the Licensed Material has terminated under
Section 6(a), it reinstates:
1. automatically as of the date the violation is cured, provided
it is cured within 30 days of Your discovery of the
violation; or
2. upon express reinstatement by the Licensor.
For the avoidance of doubt, this Section 6(b) does not affect any
right the Licensor may have to seek remedies for Your violations
of this Public License.
c. For the avoidance of doubt, the Licensor may also offer the
Licensed Material under separate terms or conditions or stop
distributing the Licensed Material at any time; however, doing so
will not terminate this Public License.
d. Sections 1, 5, 6, 7, and 8 survive termination of this Public
License.
Section 7 -- Other Terms and Conditions.
a. The Licensor shall not be bound by any additional or different
terms or conditions communicated by You unless expressly agreed.
b. Any arrangements, understandings, or agreements regarding the
Licensed Material not stated herein are separate from and
independent of the terms and conditions of this Public License.
Section 8 -- Interpretation.
a. For the avoidance of doubt, this Public License does not, and
shall not be interpreted to, reduce, limit, restrict, or impose
conditions on any use of the Licensed Material that could lawfully
be made without permission under this Public License.
b. To the extent possible, if any provision of this Public License is
deemed unenforceable, it shall be automatically reformed to the
minimum extent necessary to make it enforceable. If the provision
cannot be reformed, it shall be severed from this Public License
without affecting the enforceability of the remaining terms and
conditions.
c. No term or condition of this Public License will be waived and no
failure to comply consented to unless expressly agreed to by the
Licensor.
d. Nothing in this Public License constitutes or may be interpreted
as a limitation upon, or waiver of, any privileges and immunities
that apply to the Licensor or You, including from the legal
processes of any jurisdiction or authority.
=======================================================================
Creative Commons is not a party to its public
licenses. Notwithstanding, Creative Commons may elect to apply one of
its public licenses to material it publishes and in those instances
will be considered the “Licensor.” The text of the Creative Commons
public licenses is dedicated to the public domain under the CC0 Public
Domain Dedication. Except for the limited purpose of indicating that
material is shared under a Creative Commons public license or as
otherwise permitted by the Creative Commons policies published at
creativecommons.org/policies, Creative Commons does not authorize the
use of the trademark "Creative Commons" or any other trademark or logo
of Creative Commons without its prior written consent including,
without limitation, in connection with any unauthorized modifications
to any of its public licenses or any other arrangements,
understandings, or agreements concerning use of licensed material. For
the avoidance of doubt, this paragraph does not form part of the
public licenses.
Creative Commons may be contacted at creativecommons.org.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,45 @@
# Bundled DOS Fonts
These TrueType fonts are from **The Ultimate Oldschool PC Font Pack** by VileR (int10h.org).
## Included Fonts
| Font | Description | Best For |
|------|-------------|----------|
| `Px437_IBM_VGA_8x16.ttf` | Classic IBM VGA 8x16 | Standard DOS text (80x25) |
| `Px437_IBM_VGA_9x16.ttf` | IBM VGA 9x16 (wider) | Better readability |
| `Px437_IBM_EGA_8x14.ttf` | IBM EGA 8x14 | 80x25 with smaller font |
| `Px437_IBM_CGA.ttf` | IBM CGA | 40-column mode, retro look |
| `Px437_IBM_MDA.ttf` | IBM MDA (Monochrome) | Word processing style |
| `PxPlus_IBM_VGA_8x16.ttf` | VGA 8x16 + Unicode | Extended character support |
| `PxPlus_IBM_VGA_9x16.ttf` | VGA 9x16 + Unicode | Extended + better readability |
## Font Naming Convention
- **Px437_** - Strict CP437 encoding (original DOS character set)
- **PxPlus_** - Extended Unicode with full CP437 + additional characters
## Usage in DOSBox-X
```python
from dosbox_mcp.tools import launch
# Use bundled IBM VGA font
launch(
binary_path="/dos/RIPTERM.EXE",
output="ttf",
ttf_font="Px437_IBM_VGA_9x16", # Font name without .ttf
ttf_ptsize=20,
)
```
## License
These fonts are licensed under **Creative Commons Attribution-ShareAlike 4.0 International (CC BY-SA 4.0)**.
See LICENSE.TXT for full license text.
**Attribution:**
- Font Pack: The Ultimate Oldschool PC Font Pack
- Author: VileR
- Website: https://int10h.org/oldschool-pc-fonts/

View File

@ -139,7 +139,7 @@ class GDBClient:
# Parse supported features # Parse supported features
if response: if response:
supported = response.split(';') supported = response.split(";")
logger.info(f"GDB stub supports: {supported[:5]}...") # First 5 logger.info(f"GDB stub supports: {supported[:5]}...") # First 5
# Enable no-ack mode if supported (faster communication) # Enable no-ack mode if supported (faster communication)
@ -158,10 +158,10 @@ class GDBClient:
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnect from GDB stub.""" """Disconnect from GDB stub."""
if self._socket: if self._socket:
try: import contextlib
with contextlib.suppress(OSError):
self._socket.close() self._socket.close()
except OSError:
pass
self._socket = None self._socket = None
self._connected = False self._connected = False
self._host = "" self._host = ""
@ -185,7 +185,7 @@ class GDBClient:
logger.debug(f"Sending: {packet}") logger.debug(f"Sending: {packet}")
try: try:
self._socket.sendall(packet.encode('latin-1')) self._socket.sendall(packet.encode("latin-1"))
except OSError as e: except OSError as e:
self._connected = False self._connected = False
raise GDBError(f"Send failed: {e}") from e raise GDBError(f"Send failed: {e}") from e
@ -212,20 +212,20 @@ class GDBClient:
data += chunk data += chunk
# Look for packet boundaries # Look for packet boundaries
decoded = data.decode('latin-1') decoded = data.decode("latin-1")
# Skip any leading ACK/NAK # Skip any leading ACK/NAK
while decoded and decoded[0] in '+-': while decoded and decoded[0] in "+-":
decoded = decoded[1:] decoded = decoded[1:]
if '$' in decoded and '#' in decoded: if "$" in decoded and "#" in decoded:
# Find packet bounds # Find packet bounds
start = decoded.index('$') start = decoded.index("$")
end = decoded.index('#', start) end = decoded.index("#", start)
if end + 2 <= len(decoded): if end + 2 <= len(decoded):
# Complete packet # Complete packet
packet_data = decoded[start + 1:end] packet_data = decoded[start + 1 : end]
checksum = decoded[end + 1:end + 3] checksum = decoded[end + 1 : end + 3]
# Verify checksum # Verify checksum
expected = calculate_checksum(packet_data) expected = calculate_checksum(packet_data)
@ -234,11 +234,11 @@ class GDBClient:
f"Checksum mismatch: got {checksum}, expected {expected}" f"Checksum mismatch: got {checksum}, expected {expected}"
) )
# Send NAK # Send NAK
self._socket.sendall(b'-') self._socket.sendall(b"-")
continue continue
# Send ACK # Send ACK
self._socket.sendall(b'+') self._socket.sendall(b"+")
logger.debug(f"Received: ${packet_data}#{checksum}") logger.debug(f"Received: ${packet_data}#{checksum}")
return packet_data return packet_data
@ -284,16 +284,28 @@ class GDBClient:
Args: Args:
regs: Registers object with values to write regs: Registers object with values to write
""" """
# Build register string in GDB order (little-endian) # Build register string in GDB order (little-endian)
def le32(val: int) -> str: def le32(val: int) -> str:
return val.to_bytes(4, 'little').hex() return val.to_bytes(4, "little").hex()
hex_data = ( hex_data = (
le32(regs.eax) + le32(regs.ecx) + le32(regs.edx) + le32(regs.ebx) + le32(regs.eax)
le32(regs.esp) + le32(regs.ebp) + le32(regs.esi) + le32(regs.edi) + + le32(regs.ecx)
le32(regs.eip) + le32(regs.eflags) + + le32(regs.edx)
le32(regs.cs) + le32(regs.ss) + le32(regs.ds) + + le32(regs.ebx)
le32(regs.es) + le32(regs.fs) + le32(regs.gs) + le32(regs.esp)
+ le32(regs.ebp)
+ le32(regs.esi)
+ le32(regs.edi)
+ le32(regs.eip)
+ le32(regs.eflags)
+ le32(regs.cs)
+ le32(regs.ss)
+ le32(regs.ds)
+ le32(regs.es)
+ le32(regs.fs)
+ le32(regs.gs)
) )
response = self._command(f"G{hex_data}") response = self._command(f"G{hex_data}")
@ -312,7 +324,7 @@ class GDBClient:
response = self._command(f"p{reg_num:x}") response = self._command(f"p{reg_num:x}")
if response.startswith("E"): if response.startswith("E"):
raise GDBError(f"Failed to read register {reg_num}: {response}") raise GDBError(f"Failed to read register {reg_num}: {response}")
return int.from_bytes(bytes.fromhex(response), 'little') return int.from_bytes(bytes.fromhex(response), "little")
# ========================================================================= # =========================================================================
# Memory Operations # Memory Operations
@ -371,10 +383,7 @@ class GDBClient:
raise GDBError("Breakpoints not supported by this GDB stub") raise GDBError("Breakpoints not supported by this GDB stub")
bp = Breakpoint( bp = Breakpoint(
id=self._next_bp_id, id=self._next_bp_id, address=address, enabled=True, original=f"{address:05x}"
address=address,
enabled=True,
original=f"{address:05x}"
) )
self._breakpoints[bp.id] = bp self._breakpoints[bp.id] = bp
self._next_bp_id += 1 self._next_bp_id += 1
@ -528,24 +537,13 @@ class GDBClient:
reason=StopReason.BREAKPOINT, reason=StopReason.BREAKPOINT,
address=addr, address=addr,
signal=signal, signal=signal,
breakpoint_id=bp.id breakpoint_id=bp.id,
) )
return StopEvent( return StopEvent(reason=StopReason.STEP, address=addr, signal=signal)
reason=StopReason.STEP, return StopEvent(reason=StopReason.SIGNAL, address=0, signal=signal)
address=addr,
signal=signal
)
return StopEvent(
reason=StopReason.SIGNAL,
address=0,
signal=signal
)
elif stop_type == "exit": elif stop_type == "exit":
return StopEvent( return StopEvent(reason=StopReason.EXITED, signal=info.get("code", 0))
reason=StopReason.EXITED,
signal=info.get("code", 0)
)
return StopEvent(reason=StopReason.UNKNOWN) return StopEvent(reason=StopReason.UNKNOWN)
@ -562,7 +560,7 @@ class GDBClient:
response = self._command("qSupported") response = self._command("qSupported")
if response.startswith("E"): if response.startswith("E"):
return [] return []
return response.split(';') return response.split(";")
def query_attached(self) -> bool: def query_attached(self) -> bool:
"""Query if attached to existing process. """Query if attached to existing process.
@ -580,10 +578,10 @@ class GDBClient:
def kill(self) -> None: def kill(self) -> None:
"""Kill the target process.""" """Kill the target process."""
try: import contextlib
with contextlib.suppress(GDBError):
self._command("k") self._command("k")
except GDBError:
pass # Connection may close immediately
self.disconnect() self.disconnect()
# ========================================================================= # =========================================================================
@ -599,7 +597,7 @@ class GDBClient:
raise GDBError("Not connected to GDB stub") raise GDBError("Not connected to GDB stub")
try: try:
self._socket.sendall(b'\x03') self._socket.sendall(b"\x03")
logger.debug("Sent interrupt") logger.debug("Sent interrupt")
except OSError as e: except OSError as e:
self._connected = False self._connected = False

View File

@ -13,10 +13,7 @@ from typing import Any
_screenshot_registry: dict[str, dict[str, Any]] = {} _screenshot_registry: dict[str, dict[str, Any]] = {}
# Default capture directory (can be overridden) # Default capture directory (can be overridden)
CAPTURE_DIR = os.environ.get( CAPTURE_DIR = os.environ.get("DOS_DIR", os.path.expanduser("~/claude/dosbox-mcp/dos")) + "/capture"
"DOS_DIR",
os.path.expanduser("~/claude/dosbox-mcp/dos")
) + "/capture"
def register_screenshot(filename: str, path: str, size: int, timestamp: str | None = None) -> str: def register_screenshot(filename: str, path: str, size: int, timestamp: str | None = None) -> str:
@ -81,12 +78,14 @@ def list_screenshots() -> list[dict[str, Any]]:
# Add registered screenshots # Add registered screenshots
for filename, info in _screenshot_registry.items(): for filename, info in _screenshot_registry.items():
screenshots.append({ screenshots.append(
"uri": f"dosbox://screenshots/{filename}", {
"filename": filename, "uri": f"dosbox://screenshots/{filename}",
"size": info["size"], "filename": filename,
"timestamp": info["timestamp"], "size": info["size"],
}) "timestamp": info["timestamp"],
}
)
# Scan capture directory for additional files # Scan capture directory for additional files
if os.path.isdir(CAPTURE_DIR): if os.path.isdir(CAPTURE_DIR):
@ -94,12 +93,14 @@ def list_screenshots() -> list[dict[str, Any]]:
filename = os.path.basename(path) filename = os.path.basename(path)
if filename not in _screenshot_registry: if filename not in _screenshot_registry:
stat = os.stat(path) stat = os.stat(path)
screenshots.append({ screenshots.append(
"uri": f"dosbox://screenshots/{filename}", {
"filename": filename, "uri": f"dosbox://screenshots/{filename}",
"size": stat.st_size, "filename": filename,
"timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(), "size": stat.st_size,
}) "timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
)
# Sort by timestamp (newest first) # Sort by timestamp (newest first)
screenshots.sort(key=lambda x: x["timestamp"], reverse=True) screenshots.sort(key=lambda x: x["timestamp"], reverse=True)
@ -123,8 +124,9 @@ def capture_screen_live() -> bytes | None:
""" """
from .tools.peripheral import _get_qmp_port, _qmp_command from .tools.peripheral import _get_qmp_port, _qmp_command
result = _qmp_command("localhost", _get_qmp_port(), "screendump", result = _qmp_command(
{"filename": "_live_capture.png"}, timeout=10.0) "localhost", _get_qmp_port(), "screendump", {"filename": "_live_capture.png"}, timeout=10.0
)
if "error" in result: if "error" in result:
return None return None

View File

@ -17,8 +17,7 @@ from . import resources, tools
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -53,7 +52,7 @@ Address formats supported:
- Segment:offset: "1234:5678" (standard DOS format) - Segment:offset: "1234:5678" (standard DOS format)
- Flat hex: "0x12345" or "12345" - Flat hex: "0x12345" or "12345"
- Decimal: "#12345" - Decimal: "#12345"
""" """,
) )
# ============================================================================= # =============================================================================
@ -69,6 +68,7 @@ _TOOLS = [
tools.step, tools.step,
tools.step_over, tools.step_over,
tools.quit, tools.quit,
tools.fonts_list,
# Breakpoints # Breakpoints
tools.breakpoint_set, tools.breakpoint_set,
tools.breakpoint_list, tools.breakpoint_list,
@ -91,6 +91,12 @@ _TOOLS = [
tools.mouse_drag, tools.mouse_drag,
tools.clipboard_copy, tools.clipboard_copy,
tools.clipboard_paste, tools.clipboard_paste,
# Joystick (requires QMP patch)
tools.joystick_move,
tools.joystick_button,
# Parallel Port (requires QMP patch)
tools.parallel_write,
tools.parallel_read,
# Control (QMP-based) # Control (QMP-based)
tools.pause, tools.pause,
tools.resume, tools.resume,
@ -99,6 +105,18 @@ _TOOLS = [
tools.loadstate, tools.loadstate,
tools.memdump, tools.memdump,
tools.query_status, tools.query_status,
# Logging (requires QMP logging patch)
tools.logging_status,
tools.logging_enable,
tools.logging_disable,
tools.logging_category,
tools.log_capture,
tools.log_clear,
# Network (port mapping and modem)
tools.port_list,
tools.port_status,
tools.modem_dial,
tools.modem_hangup,
] ]
for func in _TOOLS: for func in _TOOLS:
@ -110,15 +128,22 @@ for func in _TOOLS:
# ============================================================================= # =============================================================================
@mcp.resource("dosbox://screen") @mcp.resource("dosbox://screen", mime_type="image/png")
def get_screen_resource() -> bytes: def get_screen_resource() -> bytes:
"""Capture and return the current DOSBox-X screen. """Live capture of the current DOSBox-X display.
This is a live capture - no need to call screenshot() first. Use this to see what's currently on screen without saving a file.
Simply read this resource to get the current display as PNG. Returns PNG image data directly - just read this resource.
Requires: DOSBox-X running with QMP enabled (port 4444)
When to use:
- Quick visual check of current display state
- Verifying UI state before/after interactions
- Debugging display issues
Returns: Returns:
PNG image data of the current screen PNG image bytes of the current display
""" """
data = resources.capture_screen_live() data = resources.capture_screen_live()
if data is None: if data is None:
@ -128,24 +153,39 @@ def get_screen_resource() -> bytes:
@mcp.resource("dosbox://screenshots") @mcp.resource("dosbox://screenshots")
def list_screenshots_resource() -> str: def list_screenshots_resource() -> str:
"""List all available DOSBox-X screenshots. """List all saved screenshots with metadata.
Returns a JSON list of screenshot metadata including URIs. Returns JSON array of screenshot info including:
- filename: Use with dosbox://screenshots/{filename} to retrieve
- size: File size in bytes
- timestamp: When the screenshot was taken
Workflow:
1. Call screenshot() tool to capture display
2. Note the filename from the response
3. Access via dosbox://screenshots/{filename}
Returns:
JSON array of screenshot metadata
""" """
import json import json
screenshots = resources.list_screenshots() screenshots = resources.list_screenshots()
return json.dumps(screenshots, indent=2) return json.dumps(screenshots, indent=2)
@mcp.resource("dosbox://screenshots/{filename}") @mcp.resource("dosbox://screenshots/{filename}", mime_type="image/png")
def get_screenshot_resource(filename: str) -> bytes: def get_screenshot_resource(filename: str) -> bytes:
"""Get a specific screenshot by filename. """Retrieve a saved screenshot by exact filename.
Args: Args:
filename: Screenshot filename (e.g., "ripterm_001.png") filename: Exact filename from screenshot() result or list
Example: "screenshot_001.png"
Returns: Returns:
PNG image data PNG image bytes
Note: Use dosbox://screenshots to list available files.
""" """
data = resources.get_screenshot_data(filename) data = resources.get_screenshot_data(filename)
if data is None: if data is None:
@ -153,23 +193,6 @@ def get_screenshot_resource(filename: str) -> bytes:
return data return data
@mcp.resource("dosbox://screenshots/latest")
def get_latest_screenshot_resource() -> bytes:
"""Get the most recent screenshot.
Returns:
PNG image data of the latest screenshot
"""
latest = resources.get_latest_screenshot()
if latest is None:
raise ValueError("No screenshots available")
data = resources.get_screenshot_data(latest["filename"])
if data is None:
raise ValueError("Screenshot file not found")
return data
# ============================================================================= # =============================================================================
# Entry Point # Entry Point
# ============================================================================= # =============================================================================

View File

@ -4,13 +4,15 @@ Tools are organized by function:
- execution: launch, attach, continue, step, quit - execution: launch, attach, continue, step, quit
- breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete - breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete
- inspection: registers, memory_read, memory_write, disassemble, stack, status - inspection: registers, memory_read, memory_write, disassemble, stack, status
- peripheral: screenshot, serial_send, keyboard_send, mouse_* - peripheral: screenshot, serial_send, keyboard_send, mouse_*, joystick_*, parallel_*
- control: pause, resume, reset, savestate, loadstate, memdump, query_status - control: pause, resume, reset, savestate, loadstate, memdump, query_status
- logging: logging_status, logging_enable, logging_disable, log_capture, log_clear
- network: port_list, port_status, modem_dial, modem_hangup
""" """
from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set
from .control import loadstate, memdump, pause, query_status, reset, resume, savestate from .control import loadstate, memdump, pause, query_status, reset, resume, savestate
from .execution import attach, continue_execution, launch, quit, step, step_over from .execution import attach, continue_execution, fonts_list, launch, quit, step, step_over
from .inspection import ( from .inspection import (
disassemble, disassemble,
memory_read, memory_read,
@ -21,13 +23,26 @@ from .inspection import (
stack, stack,
status, status,
) )
from .logging import (
log_capture,
log_clear,
logging_category,
logging_disable,
logging_enable,
logging_status,
)
from .network import modem_dial, modem_hangup, port_list, port_status
from .peripheral import ( from .peripheral import (
clipboard_copy, clipboard_copy,
clipboard_paste, clipboard_paste,
joystick_button,
joystick_move,
keyboard_send, keyboard_send,
mouse_click, mouse_click,
mouse_drag, mouse_drag,
mouse_move, mouse_move,
parallel_read,
parallel_write,
screenshot, screenshot,
serial_send, serial_send,
) )
@ -40,6 +55,7 @@ __all__ = [
"step", "step",
"step_over", "step_over",
"quit", "quit",
"fonts_list",
# Breakpoints # Breakpoints
"breakpoint_set", "breakpoint_set",
"breakpoint_list", "breakpoint_list",
@ -62,6 +78,10 @@ __all__ = [
"mouse_drag", "mouse_drag",
"clipboard_copy", "clipboard_copy",
"clipboard_paste", "clipboard_paste",
"joystick_move",
"joystick_button",
"parallel_write",
"parallel_read",
# Control # Control
"pause", "pause",
"resume", "resume",
@ -70,4 +90,16 @@ __all__ = [
"loadstate", "loadstate",
"memdump", "memdump",
"query_status", "query_status",
# Logging
"logging_status",
"logging_enable",
"logging_disable",
"logging_category",
"log_capture",
"log_clear",
# Network
"port_list",
"port_status",
"modem_dial",
"modem_hangup",
] ]

View File

@ -1,4 +1,21 @@
"""Breakpoint management tools.""" """Breakpoint management tools for DOS debugging.
Breakpoints pause execution when the CPU reaches a specific address.
Essential for reverse engineering - set breakpoints at interesting
code locations, then continue execution to analyze program behavior.
Address formats accepted:
- Segment:offset: "1234:0100" (standard DOS format)
- Flat hex: "0x12340" or "12340"
- With 0x prefix recommended for flat addresses
Workflow:
1. breakpoint_set("CS:0100") - Set at code segment offset
2. continue_execution() - Run until breakpoint hit
3. registers() / memory_read() - Inspect state
4. breakpoint_list() - See all active breakpoints
5. breakpoint_delete(id=1) - Remove when done
"""
from ..gdb_client import GDBError from ..gdb_client import GDBError
from ..state import client from ..state import client
@ -6,17 +23,26 @@ from ..utils import format_address, parse_address
def breakpoint_set(address: str) -> dict: def breakpoint_set(address: str) -> dict:
"""Set a software breakpoint at the specified address. """Set a software breakpoint at a memory address.
When the CPU's instruction pointer reaches this address,
execution will pause and you can inspect registers/memory.
Args: Args:
address: Memory address (segment:offset or flat hex) address: Where to break. Formats:
- "1234:0100" - segment:offset (DOS standard)
- "0x12340" - flat 20-bit address
- "CS:0100" - relative to code segment
Returns: Returns:
Breakpoint info - breakpoint_id: Reference ID for delete/list operations
- address: Normalized address where breakpoint is set
- original: The address string you provided
Examples: Examples:
breakpoint_set("1234:0100") # segment:offset breakpoint_set("1234:0100") # At segment 1234, offset 0100
breakpoint_set("0x12340") # flat address breakpoint_set("0x12340") # At flat address
breakpoint_set("CS:0100") # Relative to current code segment
""" """
try: try:
addr = parse_address(address) addr = parse_address(address)
@ -35,10 +61,14 @@ def breakpoint_set(address: str) -> dict:
def breakpoint_list() -> dict: def breakpoint_list() -> dict:
"""List all active breakpoints. """List all currently active breakpoints.
Returns: Returns:
List of breakpoint info - count: Number of active breakpoints
- breakpoints: Array of breakpoint info, each with:
- id: Use this ID with breakpoint_delete()
- address: Memory address being watched
- enabled: Whether breakpoint is active
""" """
bps = client.list_breakpoints() bps = client.list_breakpoints()
return { return {
@ -48,14 +78,19 @@ def breakpoint_list() -> dict:
def breakpoint_delete(id: int | None = None, all: bool = False) -> dict: def breakpoint_delete(id: int | None = None, all: bool = False) -> dict:
"""Delete breakpoint(s). """Delete one or all breakpoints.
Args: Args:
id: Specific breakpoint ID to delete id: Breakpoint ID from breakpoint_set() or breakpoint_list()
all: If True, delete all breakpoints all: Set True to delete ALL breakpoints at once
Returns: Returns:
Deletion result - deleted: Count of breakpoints removed (when all=True)
- deleted_id: ID that was removed (when id specified)
Examples:
breakpoint_delete(id=1) # Delete breakpoint #1
breakpoint_delete(all=True) # Clear all breakpoints
""" """
try: try:
if all: if all:

View File

@ -1,4 +1,21 @@
"""Control tools: pause, resume, reset, savestate, loadstate.""" """Emulator control tools via QMP (QEMU Machine Protocol).
These tools control the DOSBox-X emulator itself, not the debugger.
Use these for:
- Pausing/resuming the emulator (different from GDB breakpoints)
- Creating save states for checkpointing
- Resetting the emulated system
- Dumping large memory regions efficiently
QMP vs GDB:
- GDB (port 1234): CPU-level debugging, breakpoints, stepping
- QMP (port 4444): Emulator control, screenshots, keyboard/mouse
Save State Workflow:
1. savestate("before_crash.sav") - Create checkpoint
2. <do something that breaks>
3. loadstate("before_crash.sav") - Restore and try again
"""
from typing import Any from typing import Any
@ -93,8 +110,9 @@ def savestate(filename: str) -> dict:
Example: Example:
savestate("checkpoint1.sav") savestate("checkpoint1.sav")
""" """
result = _qmp_command("localhost", _get_qmp_port(), "savestate", result = _qmp_command(
{"filename": filename}, timeout=30.0) "localhost", _get_qmp_port(), "savestate", {"file": filename}, timeout=30.0
)
if "error" in result: if "error" in result:
return { return {
@ -132,8 +150,9 @@ def loadstate(filename: str) -> dict:
Example: Example:
loadstate("checkpoint1.sav") loadstate("checkpoint1.sav")
""" """
result = _qmp_command("localhost", _get_qmp_port(), "loadstate", result = _qmp_command(
{"filename": filename}, timeout=30.0) "localhost", _get_qmp_port(), "loadstate", {"file": filename}, timeout=30.0
)
if "error" in result: if "error" in result:
return { return {
@ -175,10 +194,10 @@ def memdump(address: str, length: int, filename: str | None = None) -> dict:
""" """
# Parse segment:offset if provided # Parse segment:offset if provided
if ':' in address: if ":" in address:
seg, off = address.split(':') seg, off = address.split(":")
flat_addr = (int(seg, 16) << 4) + int(off, 16) flat_addr = (int(seg, 16) << 4) + int(off, 16)
elif address.startswith('0x'): elif address.startswith("0x"):
flat_addr = int(address, 16) flat_addr = int(address, 16)
else: else:
flat_addr = int(address, 16) flat_addr = int(address, 16)

View File

@ -1,8 +1,26 @@
"""Execution control tools: launch, attach, continue, step, quit.""" """Execution control tools for DOSBox-X debugging sessions.
This module provides tools to:
- Start DOSBox-X with debugging enabled (launch)
- Connect to the GDB debug stub (attach)
- Control execution: step, continue, quit
Typical workflow:
1. launch() - Start DOSBox-X (auto-detects Docker vs native)
2. attach() - Connect to GDB stub
3. Use breakpoints, step, continue to debug
4. quit() - Clean up when done
GDB Stub Connection:
- Default port: 1234
- Protocol: GDB Remote Serial Protocol
- Auto-connects to Docker container or native DOSBox-X
"""
import time import time
from ..dosbox import DOSBoxConfig from ..dosbox import DOSBoxConfig
from ..fonts import get_font_path, list_fonts
from ..gdb_client import GDBError from ..gdb_client import GDBError
from ..state import client, manager from ..state import client, manager
from ..utils import format_address from ..utils import format_address
@ -18,6 +36,20 @@ def launch(
memsize: int = 16, memsize: int = 16,
joystick: str = "auto", joystick: str = "auto",
parallel1: str = "disabled", parallel1: str = "disabled",
serial1: str = "disabled",
serial2: str = "disabled",
serial1_port: int = 5555,
serial2_port: int = 5556,
ipx: bool = False,
# Display output settings
output: str = "opengl",
# TrueType font settings (when output="ttf")
ttf_font: str | None = None,
ttf_ptsize: int | None = None,
ttf_lins: int | None = None,
ttf_cols: int | None = None,
ttf_winperc: int | None = None,
ttf_wp: str | None = None,
) -> dict: ) -> dict:
"""Launch DOSBox-X with GDB debugging and QMP control enabled. """Launch DOSBox-X with GDB debugging and QMP control enabled.
@ -33,6 +65,21 @@ def launch(
joystick: Joystick type - auto, none, 2axis, 4axis (default: auto) joystick: Joystick type - auto, none, 2axis, 4axis (default: auto)
parallel1: Parallel port 1 - disabled, file, printer (default: disabled) parallel1: Parallel port 1 - disabled, file, printer (default: disabled)
Use "file" to capture print output to capture directory Use "file" to capture print output to capture directory
serial1: Serial port 1 mode - disabled, nullmodem, modem (default: disabled)
Use "nullmodem" to accept TCP connections on serial1_port
Use "modem" for dial-out with AT commands (ATDT hostname:port)
serial2: Serial port 2 mode - same options as serial1
serial1_port: TCP port for nullmodem mode on COM1 (default: 5555)
serial2_port: TCP port for nullmodem mode on COM2 (default: 5556)
ipx: Enable IPX networking for DOS multiplayer games (default: False)
output: Display output mode - opengl, ttf, surface (default: opengl)
Use "ttf" for TrueType font rendering (sharper text)
ttf_font: TrueType font name when output="ttf" (e.g., "Consola", "Nouveau_IBM")
ttf_ptsize: Font size in points for TTF mode
ttf_lins: Screen lines for TTF mode (e.g., 50 for taller screen)
ttf_cols: Screen columns for TTF mode (e.g., 132 for wider screen)
ttf_winperc: Window size as percentage for TTF mode (e.g., 75)
ttf_wp: Word processor mode for TTF - WP (WordPerfect), WS (WordStar), XY, FE
Returns: Returns:
Status dict with connection details Status dict with connection details
@ -41,6 +88,10 @@ def launch(
launch("/path/to/GAME.EXE") # Auto-detects native vs Docker launch("/path/to/GAME.EXE") # Auto-detects native vs Docker
launch("/path/to/GAME.EXE", joystick="2axis") # With joystick launch("/path/to/GAME.EXE", joystick="2axis") # With joystick
launch("/path/to/GAME.EXE", parallel1="file") # Capture printer output launch("/path/to/GAME.EXE", parallel1="file") # Capture printer output
launch("/path/to/RIPTERM.EXE", serial1="nullmodem") # RIPscrip testing
launch("/path/to/TELIX.EXE", serial1="modem") # BBS dial-out
launch("/path/to/DOOM.EXE", ipx=True) # Multiplayer gaming
launch("/path/to/WP.EXE", output="ttf", ttf_font="Consola", ttf_lins=50) # TTF mode
""" """
config = DOSBoxConfig( config = DOSBoxConfig(
gdb_port=gdb_port, gdb_port=gdb_port,
@ -51,6 +102,20 @@ def launch(
memsize=memsize, memsize=memsize,
joysticktype=joystick, joysticktype=joystick,
parallel1=parallel1, parallel1=parallel1,
serial1=serial1,
serial2=serial2,
serial1_port=serial1_port,
serial2_port=serial2_port,
ipx=ipx,
# Display output
output=output,
# TTF settings
ttf_font=ttf_font,
ttf_ptsize=ttf_ptsize,
ttf_lins=ttf_lins,
ttf_cols=ttf_cols,
ttf_winperc=ttf_winperc,
ttf_wp=ttf_wp,
) )
launch_method = None launch_method = None
@ -72,7 +137,7 @@ def launch(
launch_method = "native" launch_method = "native"
manager.launch_native(binary_path=binary_path, config=config) manager.launch_native(binary_path=binary_path, config=config)
return { result = {
"success": True, "success": True,
"message": f"DOSBox-X launched successfully ({launch_method})", "message": f"DOSBox-X launched successfully ({launch_method})",
"launch_method": launch_method, "launch_method": launch_method,
@ -82,6 +147,35 @@ def launch(
"pid": manager.pid, "pid": manager.pid,
"hint": f"Use attach() to connect to debugger on port {gdb_port}. QMP (screenshots/keyboard) on port {qmp_port}.", "hint": f"Use attach() to connect to debugger on port {gdb_port}. QMP (screenshots/keyboard) on port {qmp_port}.",
} }
# Add serial port info if configured
if serial1 != "disabled":
result["serial1"] = {
"mode": serial1,
"tcp_port": serial1_port if serial1.lower() == "nullmodem" else None,
}
if serial2 != "disabled":
result["serial2"] = {
"mode": serial2,
"tcp_port": serial2_port if serial2.lower() == "nullmodem" else None,
}
if ipx:
result["ipx"] = True
# Add TTF info if configured
if output == "ttf":
ttf_info = {"output": "ttf"}
if ttf_font:
ttf_info["font"] = ttf_font
if ttf_ptsize:
ttf_info["ptsize"] = ttf_ptsize
if ttf_lins:
ttf_info["lins"] = ttf_lins
if ttf_cols:
ttf_info["cols"] = ttf_cols
result["ttf"] = ttf_info
return result
except Exception as e: except Exception as e:
return { return {
"success": False, "success": False,
@ -152,13 +246,27 @@ def attach(
def continue_execution(timeout: float | None = None) -> dict: def continue_execution(timeout: float | None = None) -> dict:
"""Continue execution until breakpoint or signal. """Continue execution until breakpoint hit or signal received.
Resumes CPU execution. The debugger will stop when:
- A breakpoint is hit
- A signal/interrupt occurs
- Timeout is reached (if specified)
Args: Args:
timeout: Optional timeout in seconds timeout: Maximum seconds to wait (None = wait forever)
Use timeout for programs that may hang or loop
Returns: Returns:
Stop event info (reason, address, breakpoint hit) - stop_reason: Why execution stopped (breakpoint, signal, timeout)
- address: Where execution stopped (segment:offset format)
- breakpoint_id: Which breakpoint was hit (if any)
- cs_ip: Current code segment:instruction pointer
Common stop reasons:
- "breakpoint": Hit a set breakpoint
- "signal": Received interrupt (e.g., Ctrl+C)
- "timeout": Specified timeout elapsed
""" """
try: try:
event = client.continue_execution(timeout=timeout) event = client.continue_execution(timeout=timeout)
@ -254,3 +362,76 @@ def quit() -> dict:
"success": False, "success": False,
"error": str(e), "error": str(e),
} }
def fonts_list() -> dict:
"""List available bundled TrueType fonts for DOSBox-X TTF output.
These fonts are from The Ultimate Oldschool PC Font Pack by VileR (int10h.org)
and are bundled with dosbox-mcp for convenience. Licensed under CC BY-SA 4.0.
Returns:
Dictionary with:
- fonts: List of font info (name, description, best_for)
- usage_hint: How to use fonts with launch()
Font naming:
- Px437_* : Strict CP437 encoding (original DOS character set)
- PxPlus_* : Extended Unicode (CP437 + additional characters)
Example:
fonts = fonts_list()
# Then use with launch():
launch(output="ttf", ttf_font="Px437_IBM_VGA_9x16", ttf_ptsize=18)
"""
font_info = {
"Px437_IBM_VGA_8x16": {
"description": "Classic IBM VGA 8x16",
"best_for": "Standard DOS text (80x25)",
},
"Px437_IBM_VGA_9x16": {
"description": "IBM VGA 9x16 (wider)",
"best_for": "Better readability, recommended default",
},
"Px437_IBM_EGA_8x14": {
"description": "IBM EGA 8x14",
"best_for": "80x25 with smaller font",
},
"Px437_IBM_CGA": {
"description": "IBM CGA 8x8",
"best_for": "40-column mode, retro look",
},
"Px437_IBM_MDA": {
"description": "IBM Monochrome Display Adapter",
"best_for": "Word processing, green phosphor style",
},
"PxPlus_IBM_VGA_8x16": {
"description": "VGA 8x16 + Unicode",
"best_for": "Extended character support",
},
"PxPlus_IBM_VGA_9x16": {
"description": "VGA 9x16 + Unicode",
"best_for": "Extended chars + better readability",
},
}
fonts = []
for name in list_fonts():
info = font_info.get(name, {"description": name, "best_for": "General use"})
path = get_font_path(name)
fonts.append(
{
"name": name,
"description": info["description"],
"best_for": info["best_for"],
"available": path is not None and path.exists(),
}
)
return {
"success": True,
"fonts": fonts,
"recommended": "Px437_IBM_VGA_9x16",
"usage_hint": 'Use with launch(output="ttf", ttf_font="<font_name>", ttf_ptsize=18)',
"license": "CC BY-SA 4.0 - The Ultimate Oldschool PC Font Pack by VileR (int10h.org)",
}

View File

@ -1,4 +1,24 @@
"""Inspection tools: registers, memory, disassemble, stack, status.""" """Inspection tools for examining DOS program state.
These tools read CPU registers, memory, and screen contents.
Essential for reverse engineering - use after hitting a breakpoint
to understand what the program is doing.
Key registers for DOS (16-bit real mode):
- CS:IP = Code Segment:Instruction Pointer (where code runs)
- SS:SP = Stack Segment:Stack Pointer (function call stack)
- DS:SI = Data Segment:Source Index (string operations)
- ES:DI = Extra Segment:Destination Index (string operations)
- AX,BX,CX,DX = General purpose registers
Memory addressing (20-bit real mode):
- Physical = (Segment * 16) + Offset
- Example: 1234:5678 = 0x12340 + 0x5678 = 0x179B8
VGA memory locations:
- 0xB8000 = Text mode buffer (character + attribute pairs)
- 0xA0000 = Graphics mode buffer (mode 13h, etc.)
"""
from typing import Literal from typing import Literal
@ -9,16 +29,25 @@ from ..utils import format_address, hexdump, parse_address
def registers() -> dict: def registers() -> dict:
"""Read all CPU registers. """Read all CPU registers - the core debugging primitive.
Call this after breakpoint hits or stepping to see CPU state.
Returns: Returns:
Complete register state including: 32-bit registers: eax, ebx, ecx, edx, esi, edi, ebp, esp, eip
- 32-bit registers (EAX, EBX, etc.) 16-bit aliases: ax, bx, cx, dx, si, di, bp, sp, ip (lower 16 bits)
- 16-bit aliases (AX, BX, etc.) 8-bit aliases: al, ah, bl, bh, cl, ch, dl, dh
- Segment registers (CS, DS, ES, SS, FS, GS) Segment registers: cs, ds, es, ss, fs, gs
- Instruction pointer (CS:IP) Computed addresses:
- Stack pointer (SS:SP) - cs_ip: Physical address of next instruction
- Flags - ss_sp: Physical address of stack top
Flags: flags register value
Key values for DOS debugging:
- cs:ip = Current execution point
- ds:bx/si = Common data pointers
- ax = Function return values, INT 21h function number
- dx:ax = 32-bit return values
""" """
try: try:
regs = client.read_registers() regs = client.read_registers()
@ -55,17 +84,21 @@ def memory_read(
try: try:
# Handle register-based addresses like "DS:SI" # Handle register-based addresses like "DS:SI"
addr_str = address.upper() addr_str = address.upper()
if ':' in addr_str: if ":" in addr_str:
seg_part, off_part = addr_str.split(':') seg_part, off_part = addr_str.split(":")
# Check if parts are register names # Check if parts are register names
regs = None regs = None
seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'} seg_regs = {"CS", "DS", "ES", "SS", "FS", "GS"}
off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'} off_regs = {"IP", "SP", "BP", "SI", "DI", "BX", "AX", "CX", "DX"}
if seg_part in seg_regs or off_part in off_regs: if seg_part in seg_regs or off_part in off_regs:
regs = client.read_registers() regs = client.read_registers()
seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16) seg_val = (
off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16) getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16)
)
off_val = (
getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16)
)
addr = (seg_val << 4) + off_val addr = (seg_val << 4) + off_val
else: else:
addr = parse_address(address) addr = parse_address(address)
@ -121,10 +154,7 @@ def memory_write(
try: try:
addr = parse_address(address) addr = parse_address(address)
if format == "hex": bytes_data = bytes.fromhex(data) if format == "hex" else data.encode("latin-1")
bytes_data = bytes.fromhex(data)
else:
bytes_data = data.encode('latin-1')
client.write_memory(addr, bytes_data) client.write_memory(addr, bytes_data)
@ -176,26 +206,48 @@ def disassemble(address: str | None = None, count: int = 10) -> dict:
0xEB: "JMP short", 0xEB: "JMP short",
0x74: "JZ", 0x74: "JZ",
0x75: "JNZ", 0x75: "JNZ",
0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX", 0x50: "PUSH AX",
0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI", 0x51: "PUSH CX",
0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX", 0x52: "PUSH DX",
0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI", 0x53: "PUSH BX",
0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm", 0x54: "PUSH SP",
0x89: "MOV r/m,r", 0x8B: "MOV r,r/m", 0x55: "PUSH BP",
0x01: "ADD r/m,r", 0x03: "ADD r,r/m", 0x56: "PUSH SI",
0x29: "SUB r/m,r", 0x2B: "SUB r,r/m", 0x57: "PUSH DI",
0x31: "XOR r/m,r", 0x33: "XOR r,r/m", 0x58: "POP AX",
0x39: "CMP r/m,r", 0x3B: "CMP r,r/m", 0x59: "POP CX",
0x5A: "POP DX",
0x5B: "POP BX",
0x5C: "POP SP",
0x5D: "POP BP",
0x5E: "POP SI",
0x5F: "POP DI",
0xB8: "MOV AX,imm",
0xB9: "MOV CX,imm",
0xBA: "MOV DX,imm",
0xBB: "MOV BX,imm",
0x89: "MOV r/m,r",
0x8B: "MOV r,r/m",
0x01: "ADD r/m,r",
0x03: "ADD r,r/m",
0x29: "SUB r/m,r",
0x2B: "SUB r,r/m",
0x31: "XOR r/m,r",
0x33: "XOR r,r/m",
0x39: "CMP r/m,r",
0x3B: "CMP r,r/m",
} }
lines = [] lines = []
for i, b in enumerate(mem.data[:count]): for i, b in enumerate(mem.data[:count]):
hint = opcodes.get(b, f"?? ({b:02x})") hint = opcodes.get(b, f"?? ({b:02x})")
lines.append({ lines.append(
"address": format_address(addr + i), {
"byte": f"{b:02x}", "address": format_address(addr + i),
"hint": hint, "byte": f"{b:02x}",
}) "hint": hint,
}
)
return { return {
"success": True, "success": True,
@ -231,12 +283,14 @@ def stack(count: int = 16) -> dict:
words = [] words = []
for i in range(0, len(mem.data), 2): for i in range(0, len(mem.data), 2):
if i + 1 < len(mem.data): if i + 1 < len(mem.data):
word = int.from_bytes(mem.data[i:i+2], 'little') word = int.from_bytes(mem.data[i : i + 2], "little")
words.append({ words.append(
"offset": f"+{i:02x}", {
"address": format_address(sp_addr + i), "offset": f"+{i:02x}",
"value": f"{word:04x}", "address": format_address(sp_addr + i),
}) "value": f"{word:04x}",
}
)
return { return {
"success": True, "success": True,
@ -272,17 +326,17 @@ def screen_text(rows: int = 25, cols: int = 80) -> dict:
# VGA text mode buffer at 0xB8000 # VGA text mode buffer at 0xB8000
# Each cell is 2 bytes: [character][attribute] # Each cell is 2 bytes: [character][attribute]
# Attribute: bits 0-3 = foreground, 4-6 = background, 7 = blink # Attribute: bits 0-3 = foreground, 4-6 = background, 7 = blink
VIDEO_BASE = 0xB8000 video_base = 0xB8000
bytes_per_row = cols * 2 bytes_per_row = cols * 2
total_bytes = rows * bytes_per_row total_bytes = rows * bytes_per_row
mem = client.read_memory(VIDEO_BASE, total_bytes) mem = client.read_memory(video_base, total_bytes)
lines = [] lines = []
raw_lines = [] raw_lines = []
for row in range(rows): for row in range(rows):
offset = row * bytes_per_row offset = row * bytes_per_row
row_data = mem.data[offset:offset + bytes_per_row] row_data = mem.data[offset : offset + bytes_per_row]
# Extract characters (every other byte) # Extract characters (every other byte)
chars = [] chars = []
@ -292,11 +346,11 @@ def screen_text(rows: int = 25, cols: int = 80) -> dict:
if 32 <= char_byte < 127: if 32 <= char_byte < 127:
chars.append(chr(char_byte)) chars.append(chr(char_byte))
elif char_byte == 0: elif char_byte == 0:
chars.append(' ') chars.append(" ")
else: else:
chars.append('.') # Non-printable chars.append(".") # Non-printable
line = ''.join(chars).rstrip() line = "".join(chars).rstrip()
lines.append(line) lines.append(line)
raw_lines.append(row_data.hex()) raw_lines.append(row_data.hex())
@ -339,7 +393,7 @@ def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> d
Mode 12h: 4 planes, more complex Mode 12h: 4 planes, more complex
""" """
try: try:
VIDEO_BASE = 0xA0000 video_base = 0xA0000
if mode == "13h": if mode == "13h":
# Mode 13h: 320x200, 1 byte per pixel, linear # Mode 13h: 320x200, 1 byte per pixel, linear
@ -348,7 +402,7 @@ def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> d
# Conservative read for other modes # Conservative read for other modes
total_bytes = min(width * height // 8, 38400) total_bytes = min(width * height // 8, 38400)
mem = client.read_memory(VIDEO_BASE, total_bytes) mem = client.read_memory(video_base, total_bytes)
# Provide some statistics about the pixel data # Provide some statistics about the pixel data
pixel_counts = {} pixel_counts = {}

View File

@ -0,0 +1,357 @@
"""Logging control tools: query, enable/disable, capture, and clear log output.
These tools provide programmatic access to DOSBox-X debug logging via QMP,
enabling reverse engineering workflows like tracing INT 21h DOS API calls
and file I/O operations.
"""
from typing import Any
from .peripheral import _get_qmp_port, _qmp_command
def logging_status() -> dict:
"""Query current DOSBox-X logging state.
Returns the current state of logging options including:
- INT 21h call logging (tracks DOS API calls)
- File I/O logging (tracks file operations)
- Current log buffer size
Returns:
Logging state dict with:
- int21: Whether INT 21h logging is enabled
- fileio: Whether file I/O logging is enabled
- buffer_size: Current log buffer line count
Example:
status = logging_status()
if status["int21"]:
print("INT 21h logging is active")
"""
result = _qmp_command("localhost", _get_qmp_port(), "query-logging")
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
return {
"success": True,
"int21": ret.get("int21", False),
"fileio": ret.get("fileio", False),
"buffer_size": ret.get("buffer_size", 0),
}
return {"success": True, "raw_response": result}
def logging_enable(int21: bool = True, fileio: bool = True) -> dict:
"""Enable DOSBox-X debug logging options.
Enables logging of DOS API calls and/or file operations. This is
essential for reverse engineering - you can trace exactly which
DOS functions a program calls and what files it accesses.
Args:
int21: Enable INT 21h logging (DOS API call tracing)
fileio: Enable file I/O logging (file access tracing)
Returns:
Result dict with new logging state
Examples:
logging_enable() # Enable both
logging_enable(int21=True, fileio=False) # Just INT 21h
logging_enable(fileio=True, int21=False) # Just file I/O
INT 21h Logging:
When enabled, DOSBox-X logs every INT 21h call including:
- Function number (AH register value)
- Parameters passed in other registers
- File handles, strings, memory addresses
Common INT 21h functions you'll see:
- AH=3Dh: Open file
- AH=3Eh: Close file
- AH=3Fh: Read from file/device
- AH=40h: Write to file/device
- AH=4Ch: Exit program
File I/O Logging:
Tracks file-level operations with filenames and byte counts.
"""
args: dict[str, Any] = {}
if int21:
args["int21"] = True
if fileio:
args["fileio"] = True
# If neither specified, enable both (user called with defaults)
if not args:
args = {"int21": True, "fileio": True}
result = _qmp_command("localhost", _get_qmp_port(), "set-logging", args)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
enabled = []
if ret.get("int21"):
enabled.append("INT 21h")
if ret.get("fileio"):
enabled.append("File I/O")
return {
"success": True,
"message": f"Logging enabled: {', '.join(enabled) if enabled else 'none'}",
"int21": ret.get("int21", False),
"fileio": ret.get("fileio", False),
"changed": ret.get("changed", False),
}
return {"success": True, "raw_response": result}
def logging_disable(int21: bool = True, fileio: bool = True) -> dict:
"""Disable DOSBox-X debug logging options.
Disables the specified logging options. Call with no arguments to
disable all logging, or specify which types to disable.
Args:
int21: Disable INT 21h logging
fileio: Disable file I/O logging
Returns:
Result dict with new logging state
Examples:
logging_disable() # Disable all
logging_disable(int21=True, fileio=False) # Disable only INT 21h
"""
args: dict[str, Any] = {}
if int21:
args["int21"] = False
if fileio:
args["fileio"] = False
# If neither specified, disable both
if not args:
args = {"int21": False, "fileio": False}
result = _qmp_command("localhost", _get_qmp_port(), "set-logging", args)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
still_enabled = []
if ret.get("int21"):
still_enabled.append("INT 21h")
if ret.get("fileio"):
still_enabled.append("File I/O")
return {
"success": True,
"message": f"Logging still enabled: {', '.join(still_enabled)}"
if still_enabled
else "All logging disabled",
"int21": ret.get("int21", False),
"fileio": ret.get("fileio", False),
"changed": ret.get("changed", False),
}
return {"success": True, "raw_response": result}
def logging_category(category: str, level: str) -> dict:
"""Set logging level for a specific category.
DOSBox-X has multiple logging categories (CPU, files, VGA, etc.).
This allows fine-grained control over what gets logged.
Args:
category: Log category name (e.g., "files", "cpu", "int21", "vga")
level: Severity level - one of:
- "debug": Most verbose, all messages
- "normal": Standard logging
- "warn": Warnings and errors only
- "error": Errors only
- "fatal": Fatal errors only
- "never": Disable logging for this category
Returns:
Result dict with applied settings
Examples:
logging_category("files", "debug") # Verbose file logging
logging_category("cpu", "warn") # Only CPU warnings
logging_category("vga", "never") # Silence VGA messages
Common Categories:
- files: File system operations
- int21: DOS INT 21h calls
- cpu: CPU instruction execution
- vga: Video/graphics operations
- io: I/O port access
- dosmisc: Miscellaneous DOS operations
"""
result = _qmp_command(
"localhost", _get_qmp_port(), "set-logging-category", {"category": category, "level": level}
)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
return {
"success": True,
"message": f"Category '{ret.get('category')}' set to '{ret.get('level')}'",
"category": ret.get("category"),
"level": ret.get("level"),
}
return {"success": True, "raw_response": result}
def log_capture(limit: int = 100) -> dict:
"""Capture recent log output from DOSBox-X.
Retrieves the current contents of the log buffer. This is the primary
way to see what INT 21h calls and file operations have occurred.
Args:
limit: Maximum number of lines to return (default: 100, max: ~4000)
Returns:
Result dict with:
- lines: The log text (newline-separated)
- line_count: Number of lines returned
Examples:
# Capture last 50 lines of logs
result = log_capture(50)
print(result["lines"])
# Typical workflow: enable logging, run program, capture
logging_enable(int21=True)
# ... let program run ...
logs = log_capture(200)
for line in logs["lines"].split("\\n"):
if "INT 21" in line:
print(line)
Note:
The log buffer is circular with a maximum size (typically 4000 lines).
Older entries are discarded when the buffer fills.
"""
result = _qmp_command(
"localhost", _get_qmp_port(), "get-log-buffer", {"limit": limit}, timeout=10.0
)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
lines = ret.get("lines", "")
line_count = ret.get("line_count", 0)
return {
"success": True,
"lines": lines,
"line_count": line_count,
"limit_requested": limit,
}
return {"success": True, "raw_response": result}
def log_clear() -> dict:
"""Clear the DOSBox-X log buffer.
Removes all current log entries. Useful for:
- Starting a clean trace before running a program
- Reducing noise when looking for specific events
- Freeing memory if the buffer is very large
Returns:
Success status
Example:
# Clear logs, enable tracing, run operation, capture clean results
log_clear()
logging_enable(int21=True)
keyboard_send("DIR\\r") # Run DIR command
time.sleep(1)
result = log_capture()
# result["lines"] contains only logs from the DIR command
"""
result = _qmp_command("localhost", _get_qmp_port(), "clear-log-buffer")
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
return {
"success": True,
"message": "Log buffer cleared",
}

View File

@ -0,0 +1,250 @@
"""Network and port mapping tools for DOSBox-X.
Configure serial ports, IPX networking, and query connection status.
Serial Port Modes:
- disabled: No serial port
- nullmodem: TCP server mode (accepts incoming connections)
- modem: Dial-out mode (DOS programs use AT commands like ATDT hostname:port)
These tools help manage network connectivity for:
- RIPscrip testing via serial port
- BBS dial-out using terminal programs like TELIX
- Multiplayer gaming via IPX networking
"""
import socket
import time
from ..state import manager
def port_list() -> dict:
"""List all configured port mappings.
Returns current serial port, parallel port, and network
configuration from the running DOSBox-X instance.
Returns:
Dictionary with port configuration:
- serial1: {mode, tcp_port (if nullmodem)}
- serial2: {mode, tcp_port (if nullmodem)}
- ipx: boolean
- gdb_port: debugging port
- qmp_port: control port
"""
if not manager.running:
return {
"success": False,
"error": "DOSBox-X is not running. Use launch() first.",
}
result = {
"success": True,
"gdb_port": manager.gdb_port,
"qmp_port": manager.qmp_port,
"ipx_enabled": manager.ipx_enabled,
}
# Serial port 1 configuration
if manager.serial1_mode != "disabled":
serial1_info = {"mode": manager.serial1_mode}
if manager.serial1_mode.lower() == "nullmodem":
serial1_info["tcp_port"] = manager.serial1_port
serial1_info["hint"] = f"Connect clients to localhost:{manager.serial1_port}"
elif manager.serial1_mode.lower() == "modem":
serial1_info["hint"] = "Use ATDT hostname:port to dial out from DOS"
result["serial1"] = serial1_info
else:
result["serial1"] = {"mode": "disabled"}
# Serial port 2 configuration
if manager.serial2_mode != "disabled":
serial2_info = {"mode": manager.serial2_mode}
if manager.serial2_mode.lower() == "nullmodem":
serial2_info["tcp_port"] = manager.serial2_port
serial2_info["hint"] = f"Connect clients to localhost:{manager.serial2_port}"
elif manager.serial2_mode.lower() == "modem":
serial2_info["hint"] = "Use ATDT hostname:port to dial out from DOS"
result["serial2"] = serial2_info
else:
result["serial2"] = {"mode": "disabled"}
return result
def port_status() -> dict:
"""Check connection status of configured ports.
Shows which ports are listening and can accept connections.
Tests connectivity to GDB, QMP, and serial nullmodem ports.
Returns:
Dictionary with status for each port:
- gdb: {listening: bool, port: int}
- qmp: {listening: bool, port: int}
- serial1: {listening: bool, port: int} (if nullmodem)
- serial2: {listening: bool, port: int} (if nullmodem)
"""
if not manager.running:
return {
"success": False,
"error": "DOSBox-X is not running. Use launch() first.",
}
def check_port(port: int, timeout: float = 0.5) -> bool:
"""Check if a port is listening."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex(("localhost", port))
sock.close()
return result == 0
except OSError:
return False
result = {
"success": True,
"gdb": {
"port": manager.gdb_port,
"listening": check_port(manager.gdb_port),
},
"qmp": {
"port": manager.qmp_port,
"listening": check_port(manager.qmp_port),
},
}
# Check serial ports if configured as nullmodem
if manager.serial1_mode.lower() == "nullmodem":
result["serial1"] = {
"mode": "nullmodem",
"port": manager.serial1_port,
"listening": check_port(manager.serial1_port),
}
if manager.serial2_mode.lower() == "nullmodem":
result["serial2"] = {
"mode": "nullmodem",
"port": manager.serial2_port,
"listening": check_port(manager.serial2_port),
}
return result
def modem_dial(address: str, port: int = 1) -> dict:
"""Initiate modem dial-out connection.
Sends AT dial command to connect to a TCP server.
Requires serial port configured as "modem" in launch().
The modem will translate ATDT to a TCP connection, allowing
DOS terminal programs to connect to modern services.
Args:
address: Target address in format "hostname:port" or just "hostname"
Examples: "towel.blinkenlights.nl:23", "bbs.example.com:23"
port: COM port number (1 or 2)
Returns:
Result dict with dial status
Examples:
modem_dial("towel.blinkenlights.nl:23") # ASCII Star Wars!
modem_dial("bbs.synchronet.net:23") # Synchronet BBS
Note:
The DOS program itself (TELIX, Procomm, etc.) may need to
initialize the modem first. This tool directly sends ATDT.
"""
if not manager.running:
return {
"success": False,
"error": "DOSBox-X is not running. Use launch() first.",
}
# Check that the port is configured as modem
mode = manager.serial1_mode if port == 1 else manager.serial2_mode
if mode.lower() != "modem":
return {
"success": False,
"error": f"COM{port} is not configured as modem (current: {mode}). "
f'Use launch(serial{port}="modem") to enable dial-out.',
}
# Import keyboard_send to type the dial command
from .peripheral import keyboard_send
# Format the dial command: ATDT hostname:port followed by Enter
dial_cmd = f"ATDT{address}"
# Send the dial command via keyboard (typed into the DOS program)
result = keyboard_send(dial_cmd, delay_ms=50)
if not result.get("success"):
return {
"success": False,
"error": f"Failed to type dial command: {result.get('error')}",
}
# Send Enter to execute the command
keyboard_send("enter", delay_ms=50)
return {
"success": True,
"message": f"Dial command sent: {dial_cmd}",
"address": address,
"port": f"COM{port}",
"hint": "Check screen to see connection status. Connection time depends on target server.",
}
def modem_hangup(port: int = 1) -> dict:
"""Hang up active modem connection.
Sends ATH (hang-up) command to disconnect the modem.
Use this to cleanly terminate a dial-out connection.
Args:
port: COM port number (1 or 2)
Returns:
Result dict with hangup status
Note:
The modem may need to be in command mode (escaped from data mode)
for ATH to work. Some programs handle this automatically.
"""
if not manager.running:
return {
"success": False,
"error": "DOSBox-X is not running. Use launch() first.",
}
# Check that the port is configured as modem
mode = manager.serial1_mode if port == 1 else manager.serial2_mode
if mode.lower() != "modem":
return {
"success": False,
"error": f"COM{port} is not configured as modem (current: {mode}).",
}
from .peripheral import keyboard_send
# Send escape sequence (+++) to enter command mode, then ATH
# The escape sequence requires 1 second guard time before and after
# Type +++ to escape to command mode
keyboard_send("+++", delay_ms=100)
time.sleep(1.2) # Guard time
# Send ATH to hang up
keyboard_send("ATH", delay_ms=50)
keyboard_send("enter", delay_ms=50)
return {
"success": True,
"message": "Hang-up command sent (ATH)",
"port": f"COM{port}",
"hint": "Connection should terminate shortly. Check screen for confirmation.",
}

View File

@ -1,4 +1,26 @@
"""Peripheral tools: screenshot, serial communication, keyboard input.""" """Peripheral tools for interacting with DOSBox-X I/O.
These tools simulate user input and capture output via QMP:
- Screenshots: Capture display state
- Keyboard: Send keystrokes to DOS programs
- Mouse: Move cursor and click
- Serial: Send data to COM ports (for terminal programs)
- Joystick: Game controller input
- Parallel: Printer port communication
- Clipboard: Copy/paste between host and DOS
All peripheral tools use QMP (port 4444), not GDB.
Screenshot workflow:
1. screenshot() - Capture current display, get filename
2. Read dosbox://screenshots/{filename} resource for image data
3. Or use dosbox://screen resource for live capture without saving
Keyboard input examples:
- keyboard_send("dir") then keyboard_send("enter")
- keyboard_send("ctrl-c") for interrupt
- keyboard_send("alt-x") for menu shortcuts
"""
import json import json
import socket import socket
@ -9,7 +31,7 @@ from ..state import manager
# Default ports (fallback if manager not configured) # Default ports (fallback if manager not configured)
DEFAULT_SERIAL_PORT = 5555 # nullmodem server:5555 DEFAULT_SERIAL_PORT = 5555 # nullmodem server:5555
DEFAULT_QMP_PORT = 4444 # qmpserver port=4444 DEFAULT_QMP_PORT = 4444 # qmpserver port=4444
def _get_qmp_port() -> int: def _get_qmp_port() -> int:
@ -39,12 +61,17 @@ def _tcp_send(host: str, port: int, data: bytes, timeout: float = 2.0) -> tuple[
except TimeoutError: except TimeoutError:
return False, f"Connection timeout to {host}:{port}" return False, f"Connection timeout to {host}:{port}"
except ConnectionRefusedError: except ConnectionRefusedError:
return False, f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?" return (
False,
f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?",
)
except Exception as e: except Exception as e:
return False, f"Socket error: {e}" return False, f"Socket error: {e}"
def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0) -> dict: def _qmp_command(
host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0
) -> dict:
"""Send QMP command to DOSBox-X. """Send QMP command to DOSBox-X.
QMP (QEMU Machine Protocol) is a JSON-based protocol for machine control. QMP (QEMU Machine Protocol) is a JSON-based protocol for machine control.
@ -74,7 +101,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None
if not chunk: if not chunk:
break break
greeting += chunk greeting += chunk
except (TimeoutError, socket.timeout): except TimeoutError:
pass # Expected - no more data pass # Expected - no more data
# Build QMP command # Build QMP command
@ -95,7 +122,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None
if not chunk: if not chunk:
break break
response += chunk response += chunk
except (TimeoutError, socket.timeout): except TimeoutError:
pass pass
sock.close() sock.close()
@ -103,7 +130,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None
# Parse response # Parse response
if response: if response:
try: try:
return json.loads(response.decode().strip().split('\n')[-1]) return json.loads(response.decode().strip().split("\n")[-1])
except json.JSONDecodeError: except json.JSONDecodeError:
return {"success": True, "raw_response": response.decode()} return {"success": True, "raw_response": response.decode()}
@ -112,22 +139,37 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None
except TimeoutError: except TimeoutError:
return {"success": False, "error": f"QMP connection timeout to {host}:{port}"} return {"success": False, "error": f"QMP connection timeout to {host}:{port}"}
except ConnectionRefusedError: except ConnectionRefusedError:
return {"success": False, "error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?"} return {
"success": False,
"error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?",
}
except Exception as e: except Exception as e:
return {"success": False, "error": f"QMP error: {e}"} return {"success": False, "error": f"QMP error: {e}"}
def screenshot(filename: str | None = None) -> dict: def screenshot(filename: str | None = None) -> dict:
"""Capture DOSBox-X display. """Capture DOSBox-X display and save to file.
Uses QMP screendump command which calls DOSBox-X's internal capture function. Takes a screenshot of the current display. The image is saved to
Returns a resource URI that can be used to fetch the screenshot. DOSBox-X's capture directory and registered for access via resources.
Workflow:
1. Call screenshot() - returns filename in response
2. Access image via dosbox://screenshots/{filename} resource
3. Or use dosbox://screen resource for live capture without saving
Args: Args:
filename: Optional output filename (DOSBox-X uses auto-naming) filename: Output filename (default: DOSBox-X auto-generates name)
Returns: Returns:
Screenshot info including resource URI for fetching the image - success: True if capture succeeded
- filename: Use this with dosbox://screenshots/{filename}
- resource_uri: Direct URI to access the image
- size_bytes: File size
Example response:
{"success": true, "filename": "screenshot_001.png", ...}
Then access via: dosbox://screenshots/screenshot_001.png
""" """
import os import os
from datetime import datetime from datetime import datetime
@ -139,8 +181,13 @@ def screenshot(filename: str | None = None) -> dict:
resources = None resources = None
# Use QMP screendump command - it returns base64 PNG and saves to capture/ # Use QMP screendump command - it returns base64 PNG and saves to capture/
result = _qmp_command("localhost", _get_qmp_port(), "screendump", result = _qmp_command(
{"filename": filename or "screenshot.png"}, timeout=10.0) "localhost",
_get_qmp_port(),
"screendump",
{"filename": filename or "screenshot.png"},
timeout=10.0,
)
# Check for QMP error response # Check for QMP error response
if "error" in result: if "error" in result:
@ -194,15 +241,12 @@ def screenshot(filename: str | None = None) -> dict:
"size_bytes": ret.get("size", 0), "size_bytes": ret.get("size", 0),
} }
# Include resource URI (the only way clients should access screenshots) # Include resource URI for direct access
if resource_uri: if resource_uri:
response["resource_uri"] = resource_uri response["resource_uri"] = resource_uri
else: if screenshot_filename:
# Fallback: provide filename so client knows what was created response["filename"] = screenshot_filename
# but encourage using dosbox://screenshots/latest resource response["hint"] = f"Access via: dosbox://screenshots/{screenshot_filename}"
response["hint"] = "Use resource dosbox://screenshots/latest to fetch"
if screenshot_filename:
response["filename"] = screenshot_filename
return response return response
@ -220,51 +264,65 @@ def serial_send(data: str, port: int = 1) -> dict:
This is useful for RIPscrip testing - send graphics commands This is useful for RIPscrip testing - send graphics commands
to a program listening on COM1. to a program listening on COM1.
Requires serial port configured as "nullmodem" in launch().
The data is sent over TCP to the nullmodem server socket.
Args: Args:
data: Data to send (text or hex with \\x prefix) data: Data to send (text or hex with \\x prefix)
port: COM port number (1 or 2) port: COM port number (1 or 2)
Returns: Returns:
Send result Send result
Example:
launch(serial1="nullmodem") # Enable nullmodem on COM1
serial_send("!|L0000001919\\r") # Send RIPscrip line command
""" """
# Parse hex escapes like \x1b for ESC # Parse hex escapes like \x1b for ESC
def parse_data(s: str) -> bytes: def parse_data(s: str) -> bytes:
result = bytearray() result = bytearray()
i = 0 i = 0
while i < len(s): while i < len(s):
if s[i:i+2] == '\\x' and i + 4 <= len(s): if s[i : i + 2] == "\\x" and i + 4 <= len(s):
try: try:
result.append(int(s[i+2:i+4], 16)) result.append(int(s[i + 2 : i + 4], 16))
i += 4 i += 4
continue continue
except ValueError: except ValueError:
pass pass
elif s[i:i+2] == '\\r': elif s[i : i + 2] == "\\r":
result.append(0x0d) result.append(0x0D)
i += 2 i += 2
continue continue
elif s[i:i+2] == '\\n': elif s[i : i + 2] == "\\n":
result.append(0x0a) result.append(0x0A)
i += 2 i += 2
continue continue
result.append(ord(s[i])) result.append(ord(s[i]))
i += 1 i += 1
return bytes(result) return bytes(result)
# Map COM port to TCP port (based on dosbox.conf config) # Validate port number
# serial1=nullmodem server:5555 if port not in (1, 2):
tcp_port_map = {
1: 5555, # COM1
2: 5556, # COM2 (if configured)
}
if port not in tcp_port_map:
return { return {
"success": False, "success": False,
"error": f"Invalid COM port {port}. Supported: 1, 2", "error": f"Invalid COM port {port}. Supported: 1, 2",
} }
tcp_port = tcp_port_map[port] # Get TCP port from manager if running, otherwise use defaults
if manager.running:
tcp_port = manager.get_serial_tcp_port(port)
if tcp_port is None:
mode = manager.serial1_mode if port == 1 else manager.serial2_mode
return {
"success": False,
"error": f"COM{port} is not configured as nullmodem (current: {mode}). "
f'Use launch(serial{port}="nullmodem") to enable.',
}
else:
# Fallback to defaults when manager not running (for direct testing)
tcp_port = 5555 if port == 1 else 5556
try: try:
byte_data = parse_data(data) byte_data = parse_data(data)
@ -289,51 +347,121 @@ def serial_send(data: str, port: int = 1) -> dict:
return { return {
"success": False, "success": False,
"error": message, "error": message,
"hint": f"Ensure DOSBox-X is running with serial{port}=nullmodem server:{tcp_port}", "hint": f'Ensure DOSBox-X is running with serial{port}="nullmodem". '
f"Expected TCP port {tcp_port}. Use port_status() to check.",
} }
# QMP keyboard key mapping (from DOSBox-X remotedebug qmp.cpp) # QMP keyboard key mapping (from DOSBox-X remotedebug qmp.cpp)
QMP_KEY_MAP = { QMP_KEY_MAP = {
# Letters # Letters
"a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": "f", "g": "g", "a": "a",
"h": "h", "i": "i", "j": "j", "k": "k", "l": "l", "m": "m", "n": "n", "b": "b",
"o": "o", "p": "p", "q": "q", "r": "r", "s": "s", "t": "t", "u": "u", "c": "c",
"v": "v", "w": "w", "x": "x", "y": "y", "z": "z", "d": "d",
"e": "e",
"f": "f",
"g": "g",
"h": "h",
"i": "i",
"j": "j",
"k": "k",
"l": "l",
"m": "m",
"n": "n",
"o": "o",
"p": "p",
"q": "q",
"r": "r",
"s": "s",
"t": "t",
"u": "u",
"v": "v",
"w": "w",
"x": "x",
"y": "y",
"z": "z",
# Numbers # Numbers
"0": "0", "1": "1", "2": "2", "3": "3", "4": "4", "0": "0",
"5": "5", "6": "6", "7": "7", "8": "8", "9": "9", "1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
"6": "6",
"7": "7",
"8": "8",
"9": "9",
# Function keys # Function keys
"f1": "f1", "f2": "f2", "f3": "f3", "f4": "f4", "f5": "f5", "f6": "f6", "f1": "f1",
"f7": "f7", "f8": "f8", "f9": "f9", "f10": "f10", "f11": "f11", "f12": "f12", "f2": "f2",
"f3": "f3",
"f4": "f4",
"f5": "f5",
"f6": "f6",
"f7": "f7",
"f8": "f8",
"f9": "f9",
"f10": "f10",
"f11": "f11",
"f12": "f12",
# Special keys # Special keys
"ret": "ret", "enter": "ret", "return": "ret", "ret": "ret",
"esc": "esc", "escape": "esc", "enter": "ret",
"spc": "spc", "space": "spc", " ": "spc", "return": "ret",
"esc": "esc",
"escape": "esc",
"spc": "spc",
"space": "spc",
" ": "spc",
"tab": "tab", "tab": "tab",
"backspace": "backspace", "bs": "backspace", "backspace": "backspace",
"delete": "delete", "del": "delete", "bs": "backspace",
"insert": "insert", "ins": "insert", "delete": "delete",
"home": "home", "end": "end", "del": "delete",
"pgup": "pgup", "pageup": "pgup", "insert": "insert",
"pgdn": "pgdn", "pagedown": "pgdn", "ins": "insert",
"home": "home",
"end": "end",
"pgup": "pgup",
"pageup": "pgup",
"pgdn": "pgdn",
"pagedown": "pgdn",
# Arrow keys # Arrow keys
"up": "up", "down": "down", "left": "left", "right": "right", "up": "up",
"down": "down",
"left": "left",
"right": "right",
# Modifiers # Modifiers
"shift": "shift", "ctrl": "ctrl", "alt": "alt", "shift": "shift",
"shift_r": "shift_r", "ctrl_r": "ctrl_r", "alt_r": "alt_r", "ctrl": "ctrl",
"alt": "alt",
"shift_r": "shift_r",
"ctrl_r": "ctrl_r",
"alt_r": "alt_r",
# Punctuation # Punctuation
"minus": "minus", "-": "minus", "minus": "minus",
"equal": "equal", "=": "equal", "-": "minus",
"bracket_left": "bracket_left", "[": "bracket_left", "equal": "equal",
"bracket_right": "bracket_right", "]": "bracket_right", "=": "equal",
"backslash": "backslash", "\\": "backslash", "bracket_left": "bracket_left",
"semicolon": "semicolon", ";": "semicolon", "[": "bracket_left",
"apostrophe": "apostrophe", "'": "apostrophe", "bracket_right": "bracket_right",
"grave_accent": "grave_accent", "`": "grave_accent", "]": "bracket_right",
"comma": "comma", ",": "comma", "backslash": "backslash",
"dot": "dot", ".": "dot", "\\": "backslash",
"slash": "slash", "/": "slash", "semicolon": "semicolon",
";": "semicolon",
"apostrophe": "apostrophe",
"'": "apostrophe",
"grave_accent": "grave_accent",
"`": "grave_accent",
"comma": "comma",
",": "comma",
"dot": "dot",
".": "dot",
"slash": "slash",
"/": "slash",
} }
@ -369,13 +497,13 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict:
while i < len(keys): while i < len(keys):
# Check for modifier combinations like "ctrl-c" # Check for modifier combinations like "ctrl-c"
modifier = None modifier = None
if i + 5 < len(keys) and keys[i:i+5].lower() == "ctrl-": if i + 5 < len(keys) and keys[i : i + 5].lower() == "ctrl-":
modifier = "ctrl" modifier = "ctrl"
i += 5 i += 5
elif i + 4 < len(keys) and keys[i:i+4].lower() == "alt-": elif i + 4 < len(keys) and keys[i : i + 4].lower() == "alt-":
modifier = "alt" modifier = "alt"
i += 4 i += 4
elif i + 6 < len(keys) and keys[i:i+6].lower() == "shift-": elif i + 6 < len(keys) and keys[i : i + 6].lower() == "shift-":
modifier = "shift" modifier = "shift"
i += 6 i += 6
@ -398,14 +526,29 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict:
# Handle uppercase (needs shift) # Handle uppercase (needs shift)
if char.isupper() and modifier is None: if char.isupper() and modifier is None:
modifier = "shift" modifier = "shift"
elif char in "!@#$%^&*()_+{}|:\"<>?": elif char in '!@#$%^&*()_+{}|:"<>?':
# Shifted punctuation - map to base key with shift # Shifted punctuation - map to base key with shift
shift_map = { shift_map = {
"!": "1", "@": "2", "#": "3", "$": "4", "%": "5", "!": "1",
"^": "6", "&": "7", "*": "8", "(": "9", ")": "0", "@": "2",
"_": "minus", "+": "equal", "{": "bracket_left", "#": "3",
"}": "bracket_right", "|": "backslash", ":": "semicolon", "$": "4",
"\"": "apostrophe", "<": "comma", ">": "dot", "?": "slash", "%": "5",
"^": "6",
"&": "7",
"*": "8",
"(": "9",
")": "0",
"_": "minus",
"+": "equal",
"{": "bracket_left",
"}": "bracket_right",
"|": "backslash",
":": "semicolon",
'"': "apostrophe",
"<": "comma",
">": "dot",
"?": "slash",
} }
if char in shift_map: if char in shift_map:
key_to_send = shift_map[char] key_to_send = shift_map[char]
@ -414,7 +557,7 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict:
i += 1 i += 1
if key_to_send is None: if key_to_send is None:
errors.append(f"Unknown key: {keys[i-1] if i > 0 else '?'}") errors.append(f"Unknown key: {keys[i - 1] if i > 0 else '?'}")
continue continue
# Build QMP command arguments # Build QMP command arguments
@ -472,14 +615,8 @@ def mouse_move(dx: int, dy: int) -> dict:
# QMP input-send-event for relative mouse movement # QMP input-send-event for relative mouse movement
args = { args = {
"events": [ "events": [
{ {"type": "rel", "data": {"axis": "x", "value": dx}},
"type": "rel", {"type": "rel", "data": {"axis": "y", "value": dy}},
"data": {"axis": "x", "value": dx}
},
{
"type": "rel",
"data": {"axis": "y", "value": dy}
}
] ]
} }
@ -538,11 +675,7 @@ def mouse_click(button: str = "left", double: bool = False) -> dict:
for _ in range(clicks): for _ in range(clicks):
# Press # Press
args = { args = {"events": [{"type": "btn", "data": {"button": btn, "down": True}}]}
"events": [
{"type": "btn", "data": {"button": btn, "down": True}}
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result: if "error" in result:
errors.append(f"Press failed: {result.get('error')}") errors.append(f"Press failed: {result.get('error')}")
@ -550,11 +683,7 @@ def mouse_click(button: str = "left", double: bool = False) -> dict:
time.sleep(0.05) # Brief delay between press and release time.sleep(0.05) # Brief delay between press and release
# Release # Release
args = { args = {"events": [{"type": "btn", "data": {"button": btn, "down": False}}]}
"events": [
{"type": "btn", "data": {"button": btn, "down": False}}
]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result: if "error" in result:
errors.append(f"Release failed: {result.get('error')}") errors.append(f"Release failed: {result.get('error')}")
@ -614,9 +743,7 @@ def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str =
time.sleep(0.05) time.sleep(0.05)
# Press button # Press button
args = { args = {"events": [{"type": "btn", "data": {"button": btn, "down": True}}]}
"events": [{"type": "btn", "data": {"button": btn, "down": True}}]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result: if "error" in result:
errors.append(f"Button press failed: {result.get('error')}") errors.append(f"Button press failed: {result.get('error')}")
@ -647,9 +774,7 @@ def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str =
time.sleep(0.05) time.sleep(0.05)
# Release button # Release button
args = { args = {"events": [{"type": "btn", "data": {"button": btn, "down": False}}]}
"events": [{"type": "btn", "data": {"button": btn, "down": False}}]
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args)
if "error" in result: if "error" in result:
errors.append(f"Button release failed: {result.get('error')}") errors.append(f"Button release failed: {result.get('error')}")
@ -683,11 +808,12 @@ def clipboard_copy() -> dict:
from your host system's clipboard. from your host system's clipboard.
""" """
# DOSBox-X: Ctrl+F5 = Copy screen text to host clipboard # DOSBox-X: Ctrl+F5 = Copy screen text to host clipboard
result = _qmp_command("localhost", _get_qmp_port(), "send-key", result = _qmp_command(
{"keys": [ "localhost",
{"type": "qcode", "data": "ctrl"}, _get_qmp_port(),
{"type": "qcode", "data": "f5"} "send-key",
]}) {"keys": [{"type": "qcode", "data": "ctrl"}, {"type": "qcode", "data": "f5"}]},
)
if "error" in result: if "error" in result:
return { return {
@ -716,11 +842,12 @@ def clipboard_paste() -> dict:
Set your host clipboard content first, then call this. Set your host clipboard content first, then call this.
""" """
# DOSBox-X: Ctrl+F6 = Paste from host clipboard # DOSBox-X: Ctrl+F6 = Paste from host clipboard
result = _qmp_command("localhost", _get_qmp_port(), "send-key", result = _qmp_command(
{"keys": [ "localhost",
{"type": "qcode", "data": "ctrl"}, _get_qmp_port(),
{"type": "qcode", "data": "f6"} "send-key",
]}) {"keys": [{"type": "qcode", "data": "ctrl"}, {"type": "qcode", "data": "f6"}]},
)
if "error" in result: if "error" in result:
return { return {
@ -733,3 +860,262 @@ def clipboard_paste() -> dict:
"message": "Host clipboard pasted into DOSBox-X (Ctrl+F6)", "message": "Host clipboard pasted into DOSBox-X (Ctrl+F6)",
"hint": "Text from host clipboard was typed as keystrokes", "hint": "Text from host clipboard was typed as keystrokes",
} }
# =============================================================================
# Joystick Tools
# =============================================================================
def joystick_move(which: int = 0, x: float | None = None, y: float | None = None) -> dict:
"""Move joystick axis to specified position.
Sets joystick axis values. Values range from -1.0 (left/up) to 1.0 (right/down),
with 0.0 being center position.
Args:
which: Joystick number (0 or 1)
x: X-axis value (-1.0 to 1.0), None to leave unchanged
y: Y-axis value (-1.0 to 1.0), None to leave unchanged
Returns:
Result dict with new axis values
Examples:
joystick_move(0, x=1.0) # Push right
joystick_move(0, y=-1.0) # Push up
joystick_move(0, x=0.5, y=0.5) # Diagonal down-right
joystick_move(0, x=0, y=0) # Center position
Note:
Requires QMP joystick support patch applied to DOSBox-X.
"""
events = []
if x is not None:
# Clamp to valid range
x = max(-1.0, min(1.0, x))
events.append({"type": "joy", "data": {"which": which, "axis": "x", "value": x}})
if y is not None:
y = max(-1.0, min(1.0, y))
events.append({"type": "joy", "data": {"which": which, "axis": "y", "value": y}})
if not events:
return {
"success": False,
"error": "Must specify at least x or y axis value",
}
result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", {"events": events})
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict) and "CommandNotFound" in str(error):
return {
"success": False,
"error": "Joystick QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.",
}
return {
"success": False,
"error": str(error),
}
return {
"success": True,
"message": f"Joystick {which} moved",
"joystick": which,
"x": x,
"y": y,
}
def joystick_button(which: int = 0, button: int = 0, pressed: bool = True) -> dict:
"""Press or release joystick button.
Args:
which: Joystick number (0 or 1)
button: Button number (0-1)
pressed: True to press, False to release
Returns:
Result dict with button state
Examples:
joystick_button(0, 0, True) # Press button 0
joystick_button(0, 0, False) # Release button 0
joystick_button(0, 1) # Press button 1
Note:
Requires QMP joystick support patch applied to DOSBox-X.
"""
result = _qmp_command(
"localhost",
_get_qmp_port(),
"input-send-event",
{
"events": [
{"type": "joybtn", "data": {"which": which, "button": button, "down": pressed}}
]
},
)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict) and "CommandNotFound" in str(error):
return {
"success": False,
"error": "Joystick QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.",
}
return {
"success": False,
"error": str(error),
}
action = "pressed" if pressed else "released"
return {
"success": True,
"message": f"Joystick {which} button {button} {action}",
"joystick": which,
"button": button,
"pressed": pressed,
}
# =============================================================================
# Parallel Port Tools
# =============================================================================
def parallel_write(data: list[int] | str | bytes, port: int = 1) -> dict:
"""Write data to parallel port (LPT).
Sends bytes to the specified parallel port. Useful for testing
printer output or parallel port communication in DOS programs.
Args:
data: Data to send - can be:
- List of integers (0-255)
- String (ASCII encoded)
- Bytes
port: LPT port number (1-3)
Returns:
Result dict with bytes written
Examples:
parallel_write([0x41, 0x42, 0x43], port=1) # Send "ABC"
parallel_write("Hello LPT1", port=1)
parallel_write(b"\\x1b@", port=1) # ESC @ (printer reset)
Note:
Requires QMP parallel port support patch applied to DOSBox-X.
DOSBox-X must have parallel port configured in dosbox.conf.
"""
# Convert data to list of integers
if isinstance(data, str):
byte_list = list(data.encode("ascii", errors="replace"))
elif isinstance(data, bytes):
byte_list = list(data)
elif isinstance(data, list):
byte_list = [int(b) & 0xFF for b in data]
else:
return {
"success": False,
"error": f"Invalid data type: {type(data).__name__}. Use list, str, or bytes.",
}
result = _qmp_command(
"localhost", _get_qmp_port(), "parport-write", {"port": port, "data": byte_list}
)
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Parallel port QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
written = 0
if "return" in result and isinstance(result["return"], dict):
written = result["return"].get("written", len(byte_list))
return {
"success": True,
"message": f"Wrote {written} bytes to LPT{port}",
"port": port,
"bytes_written": written,
}
def parallel_read(port: int = 1) -> dict:
"""Read parallel port status and data register.
Reads the current state of the parallel port registers.
Useful for checking printer status or bidirectional communication.
Args:
port: LPT port number (1-3)
Returns:
Result dict with:
- data: Data register value (last byte written or received)
- status: Status register (busy, ack, paper out, etc.)
- control: Control register (strobe, auto-feed, etc.)
Status register bits:
- Bit 7: Busy (inverted)
- Bit 6: Acknowledge
- Bit 5: Paper Out
- Bit 4: Select
- Bit 3: Error
Note:
Requires QMP parallel port support patch applied to DOSBox-X.
"""
result = _qmp_command("localhost", _get_qmp_port(), "parport-read", {"port": port})
if "error" in result:
error = result.get("error", {})
if isinstance(error, dict):
desc = error.get("desc", str(error))
if "CommandNotFound" in desc:
return {
"success": False,
"error": "Parallel port QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.",
}
return {"success": False, "error": desc}
return {"success": False, "error": str(error)}
if "return" in result and isinstance(result["return"], dict):
ret = result["return"]
status = ret.get("status", 0)
# Decode status bits
status_flags = {
"busy": not bool(status & 0x80), # Inverted
"ack": bool(status & 0x40),
"paper_out": bool(status & 0x20),
"select": bool(status & 0x10),
"error": bool(status & 0x08),
}
return {
"success": True,
"port": port,
"data": ret.get("data", 0),
"status": status,
"status_flags": status_flags,
"control": ret.get("control", 0),
}
return {
"success": True,
"message": "Read completed but no data returned",
"port": port,
}

View File

@ -138,15 +138,24 @@ class Registers:
- OF (Overflow): bit 11 - OF (Overflow): bit 11
""" """
flag_bits = { flag_bits = {
'cf': 0, 'carry': 0, "cf": 0,
'pf': 2, 'parity': 2, "carry": 0,
'af': 4, 'aux': 4, "pf": 2,
'zf': 6, 'zero': 6, "parity": 2,
'sf': 7, 'sign': 7, "af": 4,
'tf': 8, 'trap': 8, "aux": 4,
'if': 9, 'interrupt': 9, "zf": 6,
'df': 10, 'direction': 10, "zero": 6,
'of': 11, 'overflow': 11, "sf": 7,
"sign": 7,
"tf": 8,
"trap": 8,
"if": 9,
"interrupt": 9,
"df": 10,
"direction": 10,
"of": 11,
"overflow": 11,
} }
bit = flag_bits.get(flag.lower()) bit = flag_bits.get(flag.lower())
if bit is None: if bit is None:
@ -157,47 +166,47 @@ class Registers:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
return { return {
# 32-bit registers # 32-bit registers
'eax': f'{self.eax:08x}', "eax": f"{self.eax:08x}",
'ecx': f'{self.ecx:08x}', "ecx": f"{self.ecx:08x}",
'edx': f'{self.edx:08x}', "edx": f"{self.edx:08x}",
'ebx': f'{self.ebx:08x}', "ebx": f"{self.ebx:08x}",
'esp': f'{self.esp:08x}', "esp": f"{self.esp:08x}",
'ebp': f'{self.ebp:08x}', "ebp": f"{self.ebp:08x}",
'esi': f'{self.esi:08x}', "esi": f"{self.esi:08x}",
'edi': f'{self.edi:08x}', "edi": f"{self.edi:08x}",
'eip': f'{self.eip:08x}', "eip": f"{self.eip:08x}",
'eflags': f'{self.eflags:08x}', "eflags": f"{self.eflags:08x}",
# 16-bit aliases # 16-bit aliases
'ax': f'{self.ax:04x}', "ax": f"{self.ax:04x}",
'cx': f'{self.cx:04x}', "cx": f"{self.cx:04x}",
'dx': f'{self.dx:04x}', "dx": f"{self.dx:04x}",
'bx': f'{self.bx:04x}', "bx": f"{self.bx:04x}",
'sp': f'{self.sp:04x}', "sp": f"{self.sp:04x}",
'bp': f'{self.bp:04x}', "bp": f"{self.bp:04x}",
'si': f'{self.si:04x}', "si": f"{self.si:04x}",
'di': f'{self.di:04x}', "di": f"{self.di:04x}",
'ip': f'{self.ip:04x}', "ip": f"{self.ip:04x}",
# Segment registers # Segment registers
'cs': f'{self.cs:04x}', "cs": f"{self.cs:04x}",
'ss': f'{self.ss:04x}', "ss": f"{self.ss:04x}",
'ds': f'{self.ds:04x}', "ds": f"{self.ds:04x}",
'es': f'{self.es:04x}', "es": f"{self.es:04x}",
'fs': f'{self.fs:04x}', "fs": f"{self.fs:04x}",
'gs': f'{self.gs:04x}', "gs": f"{self.gs:04x}",
# Computed addresses # Computed addresses
'cs:ip': f'{self.cs:04x}:{self.ip:04x}', "cs:ip": f"{self.cs:04x}:{self.ip:04x}",
'ss:sp': f'{self.ss:04x}:{self.sp:04x}', "ss:sp": f"{self.ss:04x}:{self.sp:04x}",
# Flags # Flags
'flags': { "flags": {
'carry': self.flag_set('cf'), "carry": self.flag_set("cf"),
'parity': self.flag_set('pf'), "parity": self.flag_set("pf"),
'aux': self.flag_set('af'), "aux": self.flag_set("af"),
'zero': self.flag_set('zf'), "zero": self.flag_set("zf"),
'sign': self.flag_set('sf'), "sign": self.flag_set("sf"),
'trap': self.flag_set('tf'), "trap": self.flag_set("tf"),
'interrupt': self.flag_set('if'), "interrupt": self.flag_set("if"),
'direction': self.flag_set('df'), "direction": self.flag_set("df"),
'overflow': self.flag_set('of'), "overflow": self.flag_set("of"),
}, },
} }
@ -217,11 +226,11 @@ class Breakpoint:
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
return { return {
'id': self.id, "id": self.id,
'address': f'{self.address:05x}', "address": f"{self.address:05x}",
'enabled': self.enabled, "enabled": self.enabled,
'hit_count': self.hit_count, "hit_count": self.hit_count,
'original': self.original, "original": self.original,
} }
@ -237,10 +246,10 @@ class StopEvent:
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
return { return {
'reason': self.reason.name.lower(), "reason": self.reason.name.lower(),
'address': f'{self.address:05x}', "address": f"{self.address:05x}",
'signal': self.signal, "signal": self.signal,
'breakpoint_id': self.breakpoint_id, "breakpoint_id": self.breakpoint_id,
} }
@ -257,18 +266,18 @@ class MemoryRegion:
def to_ascii(self) -> str: def to_ascii(self) -> str:
"""Return data as ASCII with non-printables as dots.""" """Return data as ASCII with non-printables as dots."""
return ''.join(chr(b) if 32 <= b < 127 else '.' for b in self.data) return "".join(chr(b) if 32 <= b < 127 else "." for b in self.data)
def to_dict(self, format: Literal["hex", "ascii", "both"] = "both") -> dict: def to_dict(self, format: Literal["hex", "ascii", "both"] = "both") -> dict:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
result = { result = {
'address': f'{self.address:05x}', "address": f"{self.address:05x}",
'length': len(self.data), "length": len(self.data),
} }
if format in ("hex", "both"): if format in ("hex", "both"):
result['hex'] = self.to_hex() result["hex"] = self.to_hex()
if format in ("ascii", "both"): if format in ("ascii", "both"):
result['ascii'] = self.to_ascii() result["ascii"] = self.to_ascii()
return result return result
@ -284,9 +293,9 @@ class DisassemblyLine:
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
return { return {
'address': f'{self.address:05x}', "address": f"{self.address:05x}",
'bytes': self.bytes_hex, "bytes": self.bytes_hex,
'instruction': f'{self.mnemonic} {self.operands}'.strip(), "instruction": f"{self.mnemonic} {self.operands}".strip(),
} }
@ -304,10 +313,10 @@ class DOSBoxStatus:
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization.""" """Convert to dictionary for JSON serialization."""
return { return {
'running': self.running, "running": self.running,
'connected': self.connected, "connected": self.connected,
'host': self.host, "host": self.host,
'port': self.port, "port": self.port,
'pid': self.pid, "pid": self.pid,
'breakpoint_count': len(self.breakpoints), "breakpoint_count": len(self.breakpoints),
} }

View File

@ -1,7 +1,6 @@
"""Utility functions for DOSBox-X MCP Server.""" """Utility functions for DOSBox-X MCP Server."""
def parse_address(addr: str) -> int: def parse_address(addr: str) -> int:
"""Parse a DOS address in various formats. """Parse a DOS address in various formats.
@ -30,8 +29,8 @@ def parse_address(addr: str) -> int:
addr = addr.strip().lower() addr = addr.strip().lower()
# Segment:offset format (e.g., "1234:5678") # Segment:offset format (e.g., "1234:5678")
if ':' in addr: if ":" in addr:
parts = addr.split(':') parts = addr.split(":")
if len(parts) != 2: if len(parts) != 2:
raise ValueError(f"Invalid segment:offset format: {addr}") raise ValueError(f"Invalid segment:offset format: {addr}")
segment = int(parts[0], 16) segment = int(parts[0], 16)
@ -39,15 +38,15 @@ def parse_address(addr: str) -> int:
return (segment << 4) + offset return (segment << 4) + offset
# Decimal format (e.g., "#12345") # Decimal format (e.g., "#12345")
if addr.startswith('#'): if addr.startswith("#"):
return int(addr[1:], 10) return int(addr[1:], 10)
# Hex with suffix (e.g., "12345h") # Hex with suffix (e.g., "12345h")
if addr.endswith('h'): if addr.endswith("h"):
return int(addr[:-1], 16) return int(addr[:-1], 16)
# Hex with prefix (e.g., "0x12345") # Hex with prefix (e.g., "0x12345")
if addr.startswith('0x'): if addr.startswith("0x"):
return int(addr, 16) return int(addr, 16)
# Assume hex # Assume hex
@ -76,13 +75,13 @@ def format_address(addr: int, style: str = "flat") -> str:
# Convert to segment:offset (canonical form with offset < 16) # Convert to segment:offset (canonical form with offset < 16)
segment = addr >> 4 segment = addr >> 4
offset = addr & 0x0F offset = addr & 0x0F
return f'{segment:04x}:{offset:04x}' return f"{segment:04x}:{offset:04x}"
elif style == "both": elif style == "both":
segment = addr >> 4 segment = addr >> 4
offset = addr & 0x0F offset = addr & 0x0F
return f'{addr:05x} ({segment:04x}:{offset:04x})' return f"{addr:05x} ({segment:04x}:{offset:04x})"
else: # flat else: # flat
return f'{addr:05x}' return f"{addr:05x}"
def calculate_checksum(data: str) -> str: def calculate_checksum(data: str) -> str:
@ -98,7 +97,7 @@ def calculate_checksum(data: str) -> str:
Two-character hex checksum Two-character hex checksum
""" """
total = sum(ord(c) for c in data) total = sum(ord(c) for c in data)
return f'{total & 0xFF:02x}' return f"{total & 0xFF:02x}"
def encode_hex(data: bytes) -> str: def encode_hex(data: bytes) -> str:
@ -118,11 +117,11 @@ def escape_binary(data: bytes) -> bytes:
Characters that must be escaped: $, #, }, * Characters that must be escaped: $, #, }, *
""" """
result = bytearray() result = bytearray()
escape_chars = {0x24, 0x23, 0x7d, 0x2a} # $, #, }, * escape_chars = {0x24, 0x23, 0x7D, 0x2A} # $, #, }, *
for b in data: for b in data:
if b in escape_chars: if b in escape_chars:
result.append(0x7d) result.append(0x7D)
result.append(b ^ 0x20) result.append(b ^ 0x20)
else: else:
result.append(b) result.append(b)
@ -135,7 +134,7 @@ def unescape_binary(data: bytes) -> bytes:
result = bytearray() result = bytearray()
i = 0 i = 0
while i < len(data): while i < len(data):
if data[i] == 0x7d and i + 1 < len(data): if data[i] == 0x7D and i + 1 < len(data):
result.append(data[i + 1] ^ 0x20) result.append(data[i + 1] ^ 0x20)
i += 2 i += 2
else: else:
@ -160,26 +159,26 @@ def parse_stop_reply(response: str) -> tuple[str, dict]:
if not response: if not response:
return ("unknown", {}) return ("unknown", {})
if response.startswith('S'): if response.startswith("S"):
signal = int(response[1:3], 16) signal = int(response[1:3], 16)
return ("signal", {"signal": signal}) return ("signal", {"signal": signal})
if response.startswith('T'): if response.startswith("T"):
signal = int(response[1:3], 16) signal = int(response[1:3], 16)
info = {"signal": signal} info = {"signal": signal}
# Parse additional key:value pairs # Parse additional key:value pairs
pairs = response[3:].rstrip(';').split(';') pairs = response[3:].rstrip(";").split(";")
for pair in pairs: for pair in pairs:
if ':' in pair: if ":" in pair:
key, value = pair.split(':', 1) key, value = pair.split(":", 1)
info[key] = value info[key] = value
return ("signal", info) return ("signal", info)
if response.startswith('W'): if response.startswith("W"):
code = int(response[1:3], 16) code = int(response[1:3], 16)
return ("exit", {"code": code}) return ("exit", {"code": code})
if response.startswith('X'): if response.startswith("X"):
signal = int(response[1:3], 16) signal = int(response[1:3], 16)
return ("terminated", {"signal": signal}) return ("terminated", {"signal": signal})
@ -199,13 +198,13 @@ def hexdump(data: bytes, address: int = 0, width: int = 16) -> str:
""" """
lines = [] lines = []
for i in range(0, len(data), width): for i in range(0, len(data), width):
chunk = data[i:i + width] chunk = data[i : i + width]
hex_part = ' '.join(f'{b:02x}' for b in chunk) hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk) ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
# Pad hex part for alignment # Pad hex part for alignment
hex_part = hex_part.ljust(width * 3 - 1) hex_part = hex_part.ljust(width * 3 - 1)
lines.append(f'{address + i:05x} {hex_part} |{ascii_part}|') lines.append(f"{address + i:05x} {hex_part} |{ascii_part}|")
return '\n'.join(lines) return "\n".join(lines)
def parse_registers_x86(hex_data: str) -> dict[str, int]: def parse_registers_x86(hex_data: str) -> dict[str, int]:
@ -219,40 +218,40 @@ def parse_registers_x86(hex_data: str) -> dict[str, int]:
Segment registers are 4 hex characters. Segment registers are 4 hex characters.
""" """
# Remove any whitespace # Remove any whitespace
hex_data = hex_data.replace(' ', '').replace('\n', '') hex_data = hex_data.replace(" ", "").replace("\n", "")
def read_le32(offset: int) -> int: def read_le32(offset: int) -> int:
"""Read 32-bit little-endian value from hex string.""" """Read 32-bit little-endian value from hex string."""
chunk = hex_data[offset:offset + 8] chunk = hex_data[offset : offset + 8]
if len(chunk) < 8: if len(chunk) < 8:
return 0 return 0
# GDB sends in target byte order (little-endian for x86) # GDB sends in target byte order (little-endian for x86)
return int.from_bytes(bytes.fromhex(chunk), 'little') return int.from_bytes(bytes.fromhex(chunk), "little")
def read_le16(offset: int) -> int: def read_le16(offset: int) -> int:
"""Read 16-bit little-endian value from hex string.""" """Read 16-bit little-endian value from hex string."""
chunk = hex_data[offset:offset + 4] chunk = hex_data[offset : offset + 4]
if len(chunk) < 4: if len(chunk) < 4:
return 0 return 0
return int.from_bytes(bytes.fromhex(chunk), 'little') return int.from_bytes(bytes.fromhex(chunk), "little")
# Parse in GDB order # Parse in GDB order
regs = {} regs = {}
pos = 0 pos = 0
# General purpose registers (32-bit each = 8 hex chars) # General purpose registers (32-bit each = 8 hex chars)
for name in ['eax', 'ecx', 'edx', 'ebx', 'esp', 'ebp', 'esi', 'edi']: for name in ["eax", "ecx", "edx", "ebx", "esp", "ebp", "esi", "edi"]:
regs[name] = read_le32(pos) regs[name] = read_le32(pos)
pos += 8 pos += 8
# EIP and EFLAGS # EIP and EFLAGS
regs['eip'] = read_le32(pos) regs["eip"] = read_le32(pos)
pos += 8 pos += 8
regs['eflags'] = read_le32(pos) regs["eflags"] = read_le32(pos)
pos += 8 pos += 8
# Segment registers (32-bit in GDB response, but only 16-bit meaningful) # Segment registers (32-bit in GDB response, but only 16-bit meaningful)
for name in ['cs', 'ss', 'ds', 'es', 'fs', 'gs']: for name in ["cs", "ss", "ds", "es", "fs", "gs"]:
regs[name] = read_le32(pos) & 0xFFFF regs[name] = read_le32(pos) & 0xFFFF
pos += 8 pos += 8