diff --git a/Dockerfile b/Dockerfile index 7fd513d..5c8b367 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ autoconf \ libtool \ pkg-config \ + patch \ libsdl2-dev \ libsdl2-net-dev \ libsdl2-image-dev \ @@ -37,11 +38,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # # IMPORTANT: Clone and build MUST be in the same RUN to prevent BuildKit from # caching the build step separately from the git clone step. -ARG CACHE_BUST=2026-01-27-v19-add-ncurses-for-debug +ARG CACHE_BUST=2026-01-28-v23-git-only WORKDIR /build + # Configure and build with GDB server support # --enable-remotedebug: Enables C_REMOTEDEBUG flag for GDB/QMP servers # --enable-debug: Enables internal debugger (Alt+Pause) +# Note: Removed --disable-printer to enable parallel port support +# All patches are now committed to the rsp2k fork (joystick, parport, logging) RUN echo "Cache bust: ${CACHE_BUST}" && \ git clone --branch remotedebug --depth 1 https://github.com/rsp2k/dosbox-x-remotedebug.git dosbox-x && \ cd dosbox-x && \ @@ -50,8 +54,7 @@ RUN echo "Cache bust: ${CACHE_BUST}" && \ --prefix=/opt/dosbox-x \ --enable-remotedebug \ --enable-debug \ - --enable-sdl2 \ - --disable-printer && \ + --enable-sdl2 && \ make -j$(nproc) && \ make install diff --git a/pyproject.toml b/pyproject.toml index cfdada5..31b0161 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,11 +41,7 @@ build-backend = "hatchling.build" packages = ["src/dosbox_mcp"] [tool.hatch.build.targets.sdist] -include = ["src/dosbox_mcp"] - -# This is the key setting for src-layout -[tool.hatch.build] -sources = ["src"] +include = ["src/dosbox_mcp", "src/dosbox_mcp/fonts"] [tool.ruff] line-length = 100 diff --git a/src/dosbox_mcp/dosbox.py b/src/dosbox_mcp/dosbox.py index d439fb7..242d5e5 100644 --- a/src/dosbox_mcp/dosbox.py +++ b/src/dosbox_mcp/dosbox.py @@ -7,6 +7,7 @@ This module handles: - Configuration file handling """ +import contextlib import logging import os import shutil @@ -16,6 +17,8 @@ import time from dataclasses import dataclass, field from pathlib import Path +from .fonts import DOCKER_FONTS_PATH, FONTS_DIR, resolve_font + logger = logging.getLogger(__name__) @@ -34,6 +37,22 @@ class DOSBoxConfig: # Display settings fullscreen: bool = False windowresolution: str = "800x600" + output: str = "opengl" # Display output: opengl, ttf, surface, etc. + + # TrueType font settings (when output=ttf) + # TTF mode renders text using system TrueType fonts for sharper display + # Useful for terminal programs, word processors, and text-heavy apps + ttf_font: str | None = None # TTF font name (e.g., "Consola", "Nouveau_IBM") + ttf_fontbold: str | None = None # Bold variant + ttf_fontital: str | None = None # Italic variant + ttf_fontboit: str | None = None # Bold-italic variant + ttf_ptsize: int | None = None # Font size in points (default: auto) + ttf_winperc: int | None = None # Window size as percentage (e.g., 75) + ttf_lins: int | None = None # Screen lines (e.g., 50 for taller screen) + ttf_cols: int | None = None # Screen columns (e.g., 132 for wider screen) + ttf_blinkc: str | None = None # Cursor blink rate (0-7, or "false") + ttf_wp: str | None = None # Word processor mode: WP, WS, XY, FE + ttf_colors: str | None = None # Custom color scheme (16 RGB or hex values) # CPU settings # CRITICAL: Must use "normal" core for GDB breakpoints to work! @@ -48,10 +67,20 @@ class DOSBoxConfig: # Startup startbanner: bool = False # Disable splash screen for cleaner automation - # Serial ports (for future RIPscrip work) + # Serial ports (for RIPscrip work and modem dial-out) + # Options: disabled, nullmodem, modem, direct, dummy + # For nullmodem: use serial1_port/serial2_port for TCP port serial1: str = "disabled" serial2: str = "disabled" + # TCP ports for nullmodem serial mode + # These are the ports external clients connect to + serial1_port: int = 5555 + serial2_port: int = 5556 + + # IPX networking (for DOS multiplayer games like DOOM, Duke Nukem) + ipx: bool = False + # Parallel ports (LPT) # Options: disabled, file, printer, reallpt, disney # file: writes output to capture directory (useful for capturing print jobs) @@ -73,14 +102,47 @@ class DOSBoxConfig: # DOS startup files (written to mounted drive) # These are actual DOS files, not DOSBox config sections autoexec_bat: str | None = None # Content for AUTOEXEC.BAT - config_sys: str | None = None # Content for CONFIG.SYS + config_sys: str | None = None # Content for CONFIG.SYS - def to_conf(self) -> str: - """Generate DOSBox-X configuration file content.""" + def _build_serial_config(self, mode: str, tcp_port: int) -> str: + """Build serial port configuration string. + + Args: + mode: Serial mode (disabled, nullmodem, modem, direct, dummy) + tcp_port: TCP port for nullmodem mode + + Returns: + DOSBox-X serial configuration string + """ + mode_lower = mode.lower().strip() + + if mode_lower == "disabled": + return "disabled" + elif mode_lower == "nullmodem": + # nullmodem server mode - listens on TCP port for incoming connections + return f"nullmodem server:0.0.0.0 port:{tcp_port}" + elif mode_lower == "modem": + # modem mode - allows DOS programs to dial out via AT commands + return "modem" + elif mode_lower in ("direct", "dummy"): + return mode_lower + else: + # Custom configuration string passed directly + return mode + + def to_conf(self, container_fonts_path: str | None = None) -> str: + """Generate DOSBox-X configuration file content. + + Args: + container_fonts_path: If running in Docker, the container path where + fonts are mounted (e.g., "/fonts"). If None, + uses host paths. + """ lines = [ "[sdl]", f"fullscreen={str(self.fullscreen).lower()}", f"windowresolution={self.windowresolution}", + f"output={self.output}", "", "[cpu]", f"core={self.core}", @@ -95,33 +157,43 @@ class DOSBoxConfig: # GDB stub configuration (lokkju/dosbox-x-remotedebug fork) # Must be in [dosbox] section with "gdbserver port=" format if self.gdb_enabled: - lines.extend([ - "gdbserver=true", - f"gdbserver port={self.gdb_port}", - ]) + lines.extend( + [ + "gdbserver=true", + f"gdbserver port={self.gdb_port}", + ] + ) # QMP server configuration (for screenshots, keyboard, mouse) if self.qmp_enabled: - lines.extend([ - "qmpserver=true", - f"qmpserver port={self.qmp_port}", - ]) + lines.extend( + [ + "qmpserver=true", + f"qmpserver port={self.qmp_port}", + ] + ) - lines.extend([ - "", - "[serial]", - f"serial1={self.serial1}", - f"serial2={self.serial2}", - "", - "[parallel]", - f"parallel1={self.parallel1}", - f"parallel2={self.parallel2}", - "", - "[joystick]", - f"joysticktype={self.joysticktype}", - f"timed={str(self.joystick_timed).lower()}", - "", - ]) + # Serial port configuration with TCP port handling for nullmodem + serial1_config = self._build_serial_config(self.serial1, self.serial1_port) + serial2_config = self._build_serial_config(self.serial2, self.serial2_port) + + lines.extend( + [ + "", + "[serial]", + f"serial1={serial1_config}", + f"serial2={serial2_config}", + "", + "[parallel]", + f"parallel1={self.parallel1}", + f"parallel2={self.parallel2}", + "", + "[joystick]", + f"joysticktype={self.joysticktype}", + f"timed={str(self.joystick_timed).lower()}", + "", + ] + ) # Autoexec section lines.append("[autoexec]") @@ -133,7 +205,53 @@ class DOSBoxConfig: # Add custom autoexec commands lines.extend(self.autoexec) - return '\n'.join(lines) + # IPX networking section (for DOS multiplayer games) + if self.ipx: + lines.extend( + [ + "", + "[ipx]", + "ipx=true", + ] + ) + + # TrueType font section (when output=ttf or TTF settings provided) + # resolve_font() converts bundled font names to full paths + # container_fonts_path is used for Docker to map to container paths + ttf_settings = [] + font_path = resolve_font(self.ttf_font, container_fonts_path) + if font_path: + ttf_settings.append(f"font={font_path}") + fontbold_path = resolve_font(self.ttf_fontbold, container_fonts_path) + if fontbold_path: + ttf_settings.append(f"fontbold={fontbold_path}") + fontital_path = resolve_font(self.ttf_fontital, container_fonts_path) + if fontital_path: + ttf_settings.append(f"fontital={fontital_path}") + fontboit_path = resolve_font(self.ttf_fontboit, container_fonts_path) + if fontboit_path: + ttf_settings.append(f"fontboit={fontboit_path}") + if self.ttf_ptsize is not None: + ttf_settings.append(f"ptsize={self.ttf_ptsize}") + if self.ttf_winperc is not None: + ttf_settings.append(f"winperc={self.ttf_winperc}") + if self.ttf_lins is not None: + ttf_settings.append(f"lins={self.ttf_lins}") + if self.ttf_cols is not None: + ttf_settings.append(f"cols={self.ttf_cols}") + if self.ttf_blinkc is not None: + ttf_settings.append(f"blinkc={self.ttf_blinkc}") + if self.ttf_wp: + ttf_settings.append(f"wp={self.ttf_wp}") + if self.ttf_colors: + ttf_settings.append(f"colors={self.ttf_colors}") + + if ttf_settings: + lines.append("") + lines.append("[ttf]") + lines.extend(ttf_settings) + + return "\n".join(lines) class DOSBoxManager: @@ -151,6 +269,12 @@ class DOSBoxManager: self._gdb_port: int = 1234 self._qmp_port: int = 4444 self._xhost_granted: bool = False + # Serial port configuration tracking + self._serial1_mode: str = "disabled" + self._serial2_mode: str = "disabled" + self._serial1_port: int = 5555 + self._serial2_port: int = 5556 + self._ipx_enabled: bool = False def _setup_x11_access(self) -> bool: """Grant X11 access for Docker containers. @@ -199,8 +323,7 @@ class DOSBoxManager: pass logger.warning( - "Could not grant X11 access. Display may not work. " - "Try running: xhost +local:docker" + "Could not grant X11 access. Display may not work. Try running: xhost +local:docker" ) return False @@ -223,6 +346,46 @@ class DOSBoxManager: """Get the QMP port.""" return self._qmp_port + @property + def serial1_mode(self) -> str: + """Get serial port 1 mode.""" + return self._serial1_mode + + @property + def serial2_mode(self) -> str: + """Get serial port 2 mode.""" + return self._serial2_mode + + @property + def serial1_port(self) -> int: + """Get serial port 1 TCP port (for nullmodem mode).""" + return self._serial1_port + + @property + def serial2_port(self) -> int: + """Get serial port 2 TCP port (for nullmodem mode).""" + return self._serial2_port + + @property + def ipx_enabled(self) -> bool: + """Check if IPX networking is enabled.""" + return self._ipx_enabled + + def get_serial_tcp_port(self, com_port: int) -> int | None: + """Get the TCP port for a COM port if configured for nullmodem. + + Args: + com_port: COM port number (1 or 2) + + Returns: + TCP port number if nullmodem is configured, None otherwise + """ + if com_port == 1: + return self._serial1_port if self._serial1_mode == "nullmodem" else None + elif com_port == 2: + return self._serial2_port if self._serial2_mode == "nullmodem" else None + return None + @property def pid(self) -> int | None: """Get process ID if running natively.""" @@ -239,7 +402,7 @@ class DOSBoxManager: ["docker", "inspect", "-f", "{{.State.Running}}", self._container_id], capture_output=True, text=True, - timeout=5 + timeout=5, ) return result.stdout.strip() == "true" except (subprocess.SubprocessError, FileNotFoundError): @@ -280,15 +443,19 @@ class DOSBoxManager: dosbox_exe = self._find_dosbox() if not dosbox_exe: - raise RuntimeError( - "DOSBox-X not found. Install dosbox-x or use Docker container." - ) + raise RuntimeError("DOSBox-X not found. Install dosbox-x or use Docker container.") # Create temporary directory for config self._temp_dir = Path(tempfile.mkdtemp(prefix="dosbox-mcp-")) config = config or DOSBoxConfig() self._gdb_port = config.gdb_port self._qmp_port = config.qmp_port + # Store serial port configuration + self._serial1_mode = config.serial1 + self._serial2_mode = config.serial2 + self._serial1_port = config.serial1_port + self._serial2_port = config.serial2_port + self._ipx_enabled = config.ipx # If binary specified, set up mount and autoexec if binary_path: @@ -362,6 +529,12 @@ class DOSBoxManager: config = config or DOSBoxConfig() self._gdb_port = config.gdb_port self._qmp_port = config.qmp_port + # Store serial port configuration + self._serial1_mode = config.serial1 + self._serial2_mode = config.serial2 + self._serial1_port = config.serial1_port + self._serial2_port = config.serial2_port + self._ipx_enabled = config.ipx # If binary specified, set up mount and autoexec for container paths if binary_path: @@ -373,28 +546,44 @@ class DOSBoxManager: config.autoexec.append("C:") config.autoexec.append(binary.name) - # Write config file + # Write config file - use container font path for Docker self._config_path = self._temp_dir / "dosbox.conf" - self._config_path.write_text(config.to_conf()) + self._config_path.write_text(config.to_conf(container_fonts_path=DOCKER_FONTS_PATH)) # Build docker command display = display or os.environ.get("DISPLAY", ":0") cmd = [ - "docker", "run", + "docker", + "run", "--rm", "-d", # Detached - "--name", f"dosbox-mcp-{os.getpid()}", + "--name", + f"dosbox-mcp-{os.getpid()}", # Network - expose GDB and QMP ports - "-p", f"{self._gdb_port}:{self._gdb_port}", - "-p", f"{self._qmp_port}:{self._qmp_port}", + "-p", + f"{self._gdb_port}:{self._gdb_port}", + "-p", + f"{self._qmp_port}:{self._qmp_port}", # X11 forwarding - "-e", f"DISPLAY={display}", - "-v", "/tmp/.X11-unix:/tmp/.X11-unix", + "-e", + f"DISPLAY={display}", + "-v", + "/tmp/.X11-unix:/tmp/.X11-unix", # Config mount - "-v", f"{self._config_path}:/config/dosbox.conf:ro", + "-v", + f"{self._config_path}:/config/dosbox.conf:ro", + # Fonts mount (bundled TTF fonts) + "-v", + f"{FONTS_DIR}:{DOCKER_FONTS_PATH}:ro", ] + # Expose serial ports for nullmodem mode + if config.serial1.lower() == "nullmodem": + cmd.extend(["-p", f"{self._serial1_port}:{self._serial1_port}"]) + if config.serial2.lower() == "nullmodem": + cmd.extend(["-p", f"{self._serial2_port}:{self._serial2_port}"]) + # Mount binary directory if specified (rw for screenshots/capture to work) if binary_path: binary = Path(binary_path).resolve() @@ -441,23 +630,18 @@ class DOSBoxManager: subprocess.run( ["docker", "stop", "-t", str(int(timeout)), self._container_id], capture_output=True, - timeout=timeout + 5 + timeout=timeout + 5, ) except subprocess.SubprocessError: # Force remove - subprocess.run( - ["docker", "rm", "-f", self._container_id], - capture_output=True - ) + subprocess.run(["docker", "rm", "-f", self._container_id], capture_output=True) self._container_id = None logger.info("DOSBox-X container stopped") # Cleanup temp directory if self._temp_dir and self._temp_dir.exists(): - try: + with contextlib.suppress(OSError): shutil.rmtree(self._temp_dir) - except OSError: - pass self._temp_dir = None def get_logs(self, lines: int = 50) -> str: @@ -479,7 +663,7 @@ class DOSBoxManager: ["docker", "logs", "--tail", str(lines), self._container_id], capture_output=True, text=True, - timeout=5 + timeout=5, ) return result.stdout + result.stderr except subprocess.SubprocessError: diff --git a/src/dosbox_mcp/fonts.py b/src/dosbox_mcp/fonts.py new file mode 100644 index 0000000..3a3f560 --- /dev/null +++ b/src/dosbox_mcp/fonts.py @@ -0,0 +1,114 @@ +"""Font utilities for DOSBox-X TTF output. + +This module provides access to bundled IBM PC fonts from +The Ultimate Oldschool PC Font Pack by VileR (int10h.org). + +Fonts are licensed under CC BY-SA 4.0. +""" + +from pathlib import Path + +# Font directory location +FONTS_DIR = Path(__file__).parent / "fonts" + +# Available bundled fonts (name -> filename mapping) +BUNDLED_FONTS = { + # Strict CP437 encoding + "Px437_IBM_VGA_8x16": "Px437_IBM_VGA_8x16.ttf", + "Px437_IBM_VGA_9x16": "Px437_IBM_VGA_9x16.ttf", + "Px437_IBM_EGA_8x14": "Px437_IBM_EGA_8x14.ttf", + "Px437_IBM_CGA": "Px437_IBM_CGA.ttf", + "Px437_IBM_MDA": "Px437_IBM_MDA.ttf", + # Extended Unicode (CP437 + additional characters) + "PxPlus_IBM_VGA_8x16": "PxPlus_IBM_VGA_8x16.ttf", + "PxPlus_IBM_VGA_9x16": "PxPlus_IBM_VGA_9x16.ttf", +} + +# Default font for general use +DEFAULT_FONT = "Px437_IBM_VGA_9x16" + + +def get_font_path(font_name: str) -> Path | None: + """Get the full path to a bundled font. + + Args: + font_name: Font name without .ttf extension + (e.g., "Px437_IBM_VGA_9x16") + + Returns: + Path to font file, or None if not found + + Example: + >>> path = get_font_path("Px437_IBM_VGA_9x16") + >>> str(path) + '/path/to/dosbox_mcp/fonts/Px437_IBM_VGA_9x16.ttf' + """ + filename = BUNDLED_FONTS.get(font_name) + if filename: + path = FONTS_DIR / filename + if path.exists(): + return path + return None + + +def list_fonts() -> list[str]: + """List all available bundled font names. + + Returns: + List of font names that can be passed to get_font_path() + + Example: + >>> list_fonts() + ['Px437_IBM_CGA', 'Px437_IBM_EGA_8x14', ...] + """ + return sorted(BUNDLED_FONTS.keys()) + + +def is_bundled_font(font_name: str) -> bool: + """Check if a font name refers to a bundled font. + + Args: + font_name: Font name to check + + Returns: + True if font is bundled with dosbox-mcp + """ + return font_name in BUNDLED_FONTS + + +def resolve_font(font_name: str | None, container_path: str | None = None) -> str | None: + """Resolve a font name to a full path if bundled, or return as-is. + + This allows users to specify either: + - A bundled font name (e.g., "Px437_IBM_VGA_9x16") + - A system font name (e.g., "Consolas") + - A full path to a TTF file + + Args: + font_name: Font name, path, or None + container_path: If set, return container path instead of host path + (e.g., "/fonts" for Docker) + + Returns: + Full path to bundled font, or original value if not bundled + """ + if font_name is None: + return None + + # Check if it's a bundled font + filename = BUNDLED_FONTS.get(font_name) + if filename: + if container_path: + # Return container-relative path + return f"{container_path}/{filename}" + # Return host path + path = FONTS_DIR / filename + if path.exists(): + return str(path) + + # Return as-is (system font or path) + return font_name + + +# Docker container path for mounted fonts +DOCKER_FONTS_PATH = "/fonts" diff --git a/src/dosbox_mcp/fonts/LICENSE.TXT b/src/dosbox_mcp/fonts/LICENSE.TXT new file mode 100644 index 0000000..fd662a7 --- /dev/null +++ b/src/dosbox_mcp/fonts/LICENSE.TXT @@ -0,0 +1,428 @@ +Attribution-ShareAlike 4.0 International + +======================================================================= + +Creative Commons Corporation ("Creative Commons") is not a law firm and +does not provide legal services or legal advice. Distribution of +Creative Commons public licenses does not create a lawyer-client or +other relationship. Creative Commons makes its licenses and related +information available on an "as-is" basis. Creative Commons gives no +warranties regarding its licenses, any material licensed under their +terms and conditions, or any related information. Creative Commons +disclaims all liability for damages resulting from their use to the +fullest extent possible. + +Using Creative Commons Public Licenses + +Creative Commons public licenses provide a standard set of terms and +conditions that creators and other rights holders may use to share +original works of authorship and other material subject to copyright +and certain other rights specified in the public license below. The +following considerations are for informational purposes only, are not +exhaustive, and do not form part of our licenses. + + Considerations for licensors: Our public licenses are + intended for use by those authorized to give the public + permission to use material in ways otherwise restricted by + copyright and certain other rights. Our licenses are + irrevocable. Licensors should read and understand the terms + and conditions of the license they choose before applying it. + Licensors should also secure all rights necessary before + applying our licenses so that the public can reuse the + material as expected. Licensors should clearly mark any + material not subject to the license. This includes other CC- + licensed material, or material used under an exception or + limitation to copyright. More considerations for licensors: + wiki.creativecommons.org/Considerations_for_licensors + + Considerations for the public: By using one of our public + licenses, a licensor grants the public permission to use the + licensed material under specified terms and conditions. If + the licensor's permission is not necessary for any reason--for + example, because of any applicable exception or limitation to + copyright--then that use is not regulated by the license. Our + licenses grant only permissions under copyright and certain + other rights that a licensor has authority to grant. Use of + the licensed material may still be restricted for other + reasons, including because others have copyright or other + rights in the material. A licensor may make special requests, + such as asking that all changes be marked or described. + Although not required by our licenses, you are encouraged to + respect those requests where reasonable. More_considerations + for the public: + wiki.creativecommons.org/Considerations_for_licensees + +======================================================================= + +Creative Commons Attribution-ShareAlike 4.0 International Public +License + +By exercising the Licensed Rights (defined below), You accept and agree +to be bound by the terms and conditions of this Creative Commons +Attribution-ShareAlike 4.0 International Public License ("Public +License"). To the extent this Public License may be interpreted as a +contract, You are granted the Licensed Rights in consideration of Your +acceptance of these terms and conditions, and the Licensor grants You +such rights in consideration of benefits the Licensor receives from +making the Licensed Material available under these terms and +conditions. + + +Section 1 -- Definitions. + + a. Adapted Material means material subject to Copyright and Similar + Rights that is derived from or based upon the Licensed Material + and in which the Licensed Material is translated, altered, + arranged, transformed, or otherwise modified in a manner requiring + permission under the Copyright and Similar Rights held by the + Licensor. For purposes of this Public License, where the Licensed + Material is a musical work, performance, or sound recording, + Adapted Material is always produced where the Licensed Material is + synched in timed relation with a moving image. + + b. Adapter's License means the license You apply to Your Copyright + and Similar Rights in Your contributions to Adapted Material in + accordance with the terms and conditions of this Public License. + + c. BY-SA Compatible License means a license listed at + creativecommons.org/compatiblelicenses, approved by Creative + Commons as essentially the equivalent of this Public License. + + d. Copyright and Similar Rights means copyright and/or similar rights + closely related to copyright including, without limitation, + performance, broadcast, sound recording, and Sui Generis Database + Rights, without regard to how the rights are labeled or + categorized. For purposes of this Public License, the rights + specified in Section 2(b)(1)-(2) are not Copyright and Similar + Rights. + + e. Effective Technological Measures means those measures that, in the + absence of proper authority, may not be circumvented under laws + fulfilling obligations under Article 11 of the WIPO Copyright + Treaty adopted on December 20, 1996, and/or similar international + agreements. + + f. Exceptions and Limitations means fair use, fair dealing, and/or + any other exception or limitation to Copyright and Similar Rights + that applies to Your use of the Licensed Material. + + g. License Elements means the license attributes listed in the name + of a Creative Commons Public License. The License Elements of this + Public License are Attribution and ShareAlike. + + h. Licensed Material means the artistic or literary work, database, + or other material to which the Licensor applied this Public + License. + + i. Licensed Rights means the rights granted to You subject to the + terms and conditions of this Public License, which are limited to + all Copyright and Similar Rights that apply to Your use of the + Licensed Material and that the Licensor has authority to license. + + j. Licensor means the individual(s) or entity(ies) granting rights + under this Public License. + + k. Share means to provide material to the public by any means or + process that requires permission under the Licensed Rights, such + as reproduction, public display, public performance, distribution, + dissemination, communication, or importation, and to make material + available to the public including in ways that members of the + public may access the material from a place and at a time + individually chosen by them. + + l. Sui Generis Database Rights means rights other than copyright + resulting from Directive 96/9/EC of the European Parliament and of + the Council of 11 March 1996 on the legal protection of databases, + as amended and/or succeeded, as well as other essentially + equivalent rights anywhere in the world. + + m. You means the individual or entity exercising the Licensed Rights + under this Public License. Your has a corresponding meaning. + + +Section 2 -- Scope. + + a. License grant. + + 1. Subject to the terms and conditions of this Public License, + the Licensor hereby grants You a worldwide, royalty-free, + non-sublicensable, non-exclusive, irrevocable license to + exercise the Licensed Rights in the Licensed Material to: + + a. reproduce and Share the Licensed Material, in whole or + in part; and + + b. produce, reproduce, and Share Adapted Material. + + 2. Exceptions and Limitations. For the avoidance of doubt, where + Exceptions and Limitations apply to Your use, this Public + License does not apply, and You do not need to comply with + its terms and conditions. + + 3. Term. The term of this Public License is specified in Section + 6(a). + + 4. Media and formats; technical modifications allowed. The + Licensor authorizes You to exercise the Licensed Rights in + all media and formats whether now known or hereafter created, + and to make technical modifications necessary to do so. The + Licensor waives and/or agrees not to assert any right or + authority to forbid You from making technical modifications + necessary to exercise the Licensed Rights, including + technical modifications necessary to circumvent Effective + Technological Measures. For purposes of this Public License, + simply making modifications authorized by this Section 2(a) + (4) never produces Adapted Material. + + 5. Downstream recipients. + + a. Offer from the Licensor -- Licensed Material. Every + recipient of the Licensed Material automatically + receives an offer from the Licensor to exercise the + Licensed Rights under the terms and conditions of this + Public License. + + b. Additional offer from the Licensor -- Adapted Material. + Every recipient of Adapted Material from You + automatically receives an offer from the Licensor to + exercise the Licensed Rights in the Adapted Material + under the conditions of the Adapter's License You apply. + + c. No downstream restrictions. You may not offer or impose + any additional or different terms or conditions on, or + apply any Effective Technological Measures to, the + Licensed Material if doing so restricts exercise of the + Licensed Rights by any recipient of the Licensed + Material. + + 6. No endorsement. Nothing in this Public License constitutes or + may be construed as permission to assert or imply that You + are, or that Your use of the Licensed Material is, connected + with, or sponsored, endorsed, or granted official status by, + the Licensor or others designated to receive attribution as + provided in Section 3(a)(1)(A)(i). + + b. Other rights. + + 1. Moral rights, such as the right of integrity, are not + licensed under this Public License, nor are publicity, + privacy, and/or other similar personality rights; however, to + the extent possible, the Licensor waives and/or agrees not to + assert any such rights held by the Licensor to the limited + extent necessary to allow You to exercise the Licensed + Rights, but not otherwise. + + 2. Patent and trademark rights are not licensed under this + Public License. + + 3. To the extent possible, the Licensor waives any right to + collect royalties from You for the exercise of the Licensed + Rights, whether directly or through a collecting society + under any voluntary or waivable statutory or compulsory + licensing scheme. In all other cases the Licensor expressly + reserves any right to collect such royalties. + + +Section 3 -- License Conditions. + +Your exercise of the Licensed Rights is expressly made subject to the +following conditions. + + a. Attribution. + + 1. If You Share the Licensed Material (including in modified + form), You must: + + a. retain the following if it is supplied by the Licensor + with the Licensed Material: + + i. identification of the creator(s) of the Licensed + Material and any others designated to receive + attribution, in any reasonable manner requested by + the Licensor (including by pseudonym if + designated); + + ii. a copyright notice; + + iii. a notice that refers to this Public License; + + iv. a notice that refers to the disclaimer of + warranties; + + v. a URI or hyperlink to the Licensed Material to the + extent reasonably practicable; + + b. indicate if You modified the Licensed Material and + retain an indication of any previous modifications; and + + c. indicate the Licensed Material is licensed under this + Public License, and include the text of, or the URI or + hyperlink to, this Public License. + + 2. You may satisfy the conditions in Section 3(a)(1) in any + reasonable manner based on the medium, means, and context in + which You Share the Licensed Material. For example, it may be + reasonable to satisfy the conditions by providing a URI or + hyperlink to a resource that includes the required + information. + + 3. If requested by the Licensor, You must remove any of the + information required by Section 3(a)(1)(A) to the extent + reasonably practicable. + + b. ShareAlike. + + In addition to the conditions in Section 3(a), if You Share + Adapted Material You produce, the following conditions also apply. + + 1. The Adapter's License You apply must be a Creative Commons + license with the same License Elements, this version or + later, or a BY-SA Compatible License. + + 2. You must include the text of, or the URI or hyperlink to, the + Adapter's License You apply. You may satisfy this condition + in any reasonable manner based on the medium, means, and + context in which You Share Adapted Material. + + 3. You may not offer or impose any additional or different terms + or conditions on, or apply any Effective Technological + Measures to, Adapted Material that restrict exercise of the + rights granted under the Adapter's License You apply. + + +Section 4 -- Sui Generis Database Rights. + +Where the Licensed Rights include Sui Generis Database Rights that +apply to Your use of the Licensed Material: + + a. for the avoidance of doubt, Section 2(a)(1) grants You the right + to extract, reuse, reproduce, and Share all or a substantial + portion of the contents of the database; + + b. if You include all or a substantial portion of the database + contents in a database in which You have Sui Generis Database + Rights, then the database in which You have Sui Generis Database + Rights (but not its individual contents) is Adapted Material, + + including for purposes of Section 3(b); and + c. You must comply with the conditions in Section 3(a) if You Share + all or a substantial portion of the contents of the database. + +For the avoidance of doubt, this Section 4 supplements and does not +replace Your obligations under this Public License where the Licensed +Rights include other Copyright and Similar Rights. + + +Section 5 -- Disclaimer of Warranties and Limitation of Liability. + + a. UNLESS OTHERWISE SEPARATELY UNDERTAKEN BY THE LICENSOR, TO THE + EXTENT POSSIBLE, THE LICENSOR OFFERS THE LICENSED MATERIAL AS-IS + AND AS-AVAILABLE, AND MAKES NO REPRESENTATIONS OR WARRANTIES OF + ANY KIND CONCERNING THE LICENSED MATERIAL, WHETHER EXPRESS, + IMPLIED, STATUTORY, OR OTHER. THIS INCLUDES, WITHOUT LIMITATION, + WARRANTIES OF TITLE, MERCHANTABILITY, FITNESS FOR A PARTICULAR + PURPOSE, NON-INFRINGEMENT, ABSENCE OF LATENT OR OTHER DEFECTS, + ACCURACY, OR THE PRESENCE OR ABSENCE OF ERRORS, WHETHER OR NOT + KNOWN OR DISCOVERABLE. WHERE DISCLAIMERS OF WARRANTIES ARE NOT + ALLOWED IN FULL OR IN PART, THIS DISCLAIMER MAY NOT APPLY TO YOU. + + b. TO THE EXTENT POSSIBLE, IN NO EVENT WILL THE LICENSOR BE LIABLE + TO YOU ON ANY LEGAL THEORY (INCLUDING, WITHOUT LIMITATION, + NEGLIGENCE) OR OTHERWISE FOR ANY DIRECT, SPECIAL, INDIRECT, + INCIDENTAL, CONSEQUENTIAL, PUNITIVE, EXEMPLARY, OR OTHER LOSSES, + COSTS, EXPENSES, OR DAMAGES ARISING OUT OF THIS PUBLIC LICENSE OR + USE OF THE LICENSED MATERIAL, EVEN IF THE LICENSOR HAS BEEN + ADVISED OF THE POSSIBILITY OF SUCH LOSSES, COSTS, EXPENSES, OR + DAMAGES. WHERE A LIMITATION OF LIABILITY IS NOT ALLOWED IN FULL OR + IN PART, THIS LIMITATION MAY NOT APPLY TO YOU. + + c. The disclaimer of warranties and limitation of liability provided + above shall be interpreted in a manner that, to the extent + possible, most closely approximates an absolute disclaimer and + waiver of all liability. + + +Section 6 -- Term and Termination. + + a. This Public License applies for the term of the Copyright and + Similar Rights licensed here. However, if You fail to comply with + this Public License, then Your rights under this Public License + terminate automatically. + + b. Where Your right to use the Licensed Material has terminated under + Section 6(a), it reinstates: + + 1. automatically as of the date the violation is cured, provided + it is cured within 30 days of Your discovery of the + violation; or + + 2. upon express reinstatement by the Licensor. + + For the avoidance of doubt, this Section 6(b) does not affect any + right the Licensor may have to seek remedies for Your violations + of this Public License. + + c. For the avoidance of doubt, the Licensor may also offer the + Licensed Material under separate terms or conditions or stop + distributing the Licensed Material at any time; however, doing so + will not terminate this Public License. + + d. Sections 1, 5, 6, 7, and 8 survive termination of this Public + License. + + +Section 7 -- Other Terms and Conditions. + + a. The Licensor shall not be bound by any additional or different + terms or conditions communicated by You unless expressly agreed. + + b. Any arrangements, understandings, or agreements regarding the + Licensed Material not stated herein are separate from and + independent of the terms and conditions of this Public License. + + +Section 8 -- Interpretation. + + a. For the avoidance of doubt, this Public License does not, and + shall not be interpreted to, reduce, limit, restrict, or impose + conditions on any use of the Licensed Material that could lawfully + be made without permission under this Public License. + + b. To the extent possible, if any provision of this Public License is + deemed unenforceable, it shall be automatically reformed to the + minimum extent necessary to make it enforceable. If the provision + cannot be reformed, it shall be severed from this Public License + without affecting the enforceability of the remaining terms and + conditions. + + c. No term or condition of this Public License will be waived and no + failure to comply consented to unless expressly agreed to by the + Licensor. + + d. Nothing in this Public License constitutes or may be interpreted + as a limitation upon, or waiver of, any privileges and immunities + that apply to the Licensor or You, including from the legal + processes of any jurisdiction or authority. + + +======================================================================= + +Creative Commons is not a party to its public +licenses. Notwithstanding, Creative Commons may elect to apply one of +its public licenses to material it publishes and in those instances +will be considered the “Licensor.” The text of the Creative Commons +public licenses is dedicated to the public domain under the CC0 Public +Domain Dedication. Except for the limited purpose of indicating that +material is shared under a Creative Commons public license or as +otherwise permitted by the Creative Commons policies published at +creativecommons.org/policies, Creative Commons does not authorize the +use of the trademark "Creative Commons" or any other trademark or logo +of Creative Commons without its prior written consent including, +without limitation, in connection with any unauthorized modifications +to any of its public licenses or any other arrangements, +understandings, or agreements concerning use of licensed material. For +the avoidance of doubt, this paragraph does not form part of the +public licenses. + +Creative Commons may be contacted at creativecommons.org. + diff --git a/src/dosbox_mcp/fonts/Px437_IBM_CGA.ttf b/src/dosbox_mcp/fonts/Px437_IBM_CGA.ttf new file mode 100644 index 0000000..2e4945a Binary files /dev/null and b/src/dosbox_mcp/fonts/Px437_IBM_CGA.ttf differ diff --git a/src/dosbox_mcp/fonts/Px437_IBM_EGA_8x14.ttf b/src/dosbox_mcp/fonts/Px437_IBM_EGA_8x14.ttf new file mode 100644 index 0000000..e0fafb5 Binary files /dev/null and b/src/dosbox_mcp/fonts/Px437_IBM_EGA_8x14.ttf differ diff --git a/src/dosbox_mcp/fonts/Px437_IBM_MDA.ttf b/src/dosbox_mcp/fonts/Px437_IBM_MDA.ttf new file mode 100644 index 0000000..37ad05a Binary files /dev/null and b/src/dosbox_mcp/fonts/Px437_IBM_MDA.ttf differ diff --git a/src/dosbox_mcp/fonts/Px437_IBM_VGA_8x16.ttf b/src/dosbox_mcp/fonts/Px437_IBM_VGA_8x16.ttf new file mode 100644 index 0000000..7bee1e9 Binary files /dev/null and b/src/dosbox_mcp/fonts/Px437_IBM_VGA_8x16.ttf differ diff --git a/src/dosbox_mcp/fonts/Px437_IBM_VGA_9x16.ttf b/src/dosbox_mcp/fonts/Px437_IBM_VGA_9x16.ttf new file mode 100644 index 0000000..0032bb2 Binary files /dev/null and b/src/dosbox_mcp/fonts/Px437_IBM_VGA_9x16.ttf differ diff --git a/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_8x16.ttf b/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_8x16.ttf new file mode 100644 index 0000000..3bf32d0 Binary files /dev/null and b/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_8x16.ttf differ diff --git a/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_9x16.ttf b/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_9x16.ttf new file mode 100644 index 0000000..ac76345 Binary files /dev/null and b/src/dosbox_mcp/fonts/PxPlus_IBM_VGA_9x16.ttf differ diff --git a/src/dosbox_mcp/fonts/README.md b/src/dosbox_mcp/fonts/README.md new file mode 100644 index 0000000..932f44a --- /dev/null +++ b/src/dosbox_mcp/fonts/README.md @@ -0,0 +1,45 @@ +# Bundled DOS Fonts + +These TrueType fonts are from **The Ultimate Oldschool PC Font Pack** by VileR (int10h.org). + +## Included Fonts + +| Font | Description | Best For | +|------|-------------|----------| +| `Px437_IBM_VGA_8x16.ttf` | Classic IBM VGA 8x16 | Standard DOS text (80x25) | +| `Px437_IBM_VGA_9x16.ttf` | IBM VGA 9x16 (wider) | Better readability | +| `Px437_IBM_EGA_8x14.ttf` | IBM EGA 8x14 | 80x25 with smaller font | +| `Px437_IBM_CGA.ttf` | IBM CGA | 40-column mode, retro look | +| `Px437_IBM_MDA.ttf` | IBM MDA (Monochrome) | Word processing style | +| `PxPlus_IBM_VGA_8x16.ttf` | VGA 8x16 + Unicode | Extended character support | +| `PxPlus_IBM_VGA_9x16.ttf` | VGA 9x16 + Unicode | Extended + better readability | + +## Font Naming Convention + +- **Px437_** - Strict CP437 encoding (original DOS character set) +- **PxPlus_** - Extended Unicode with full CP437 + additional characters + +## Usage in DOSBox-X + +```python +from dosbox_mcp.tools import launch + +# Use bundled IBM VGA font +launch( + binary_path="/dos/RIPTERM.EXE", + output="ttf", + ttf_font="Px437_IBM_VGA_9x16", # Font name without .ttf + ttf_ptsize=20, +) +``` + +## License + +These fonts are licensed under **Creative Commons Attribution-ShareAlike 4.0 International (CC BY-SA 4.0)**. + +See LICENSE.TXT for full license text. + +**Attribution:** +- Font Pack: The Ultimate Oldschool PC Font Pack +- Author: VileR +- Website: https://int10h.org/oldschool-pc-fonts/ diff --git a/src/dosbox_mcp/gdb_client.py b/src/dosbox_mcp/gdb_client.py index 33a34be..c552dbe 100644 --- a/src/dosbox_mcp/gdb_client.py +++ b/src/dosbox_mcp/gdb_client.py @@ -139,7 +139,7 @@ class GDBClient: # Parse supported features if response: - supported = response.split(';') + supported = response.split(";") logger.info(f"GDB stub supports: {supported[:5]}...") # First 5 # Enable no-ack mode if supported (faster communication) @@ -158,10 +158,10 @@ class GDBClient: def disconnect(self) -> None: """Disconnect from GDB stub.""" if self._socket: - try: + import contextlib + + with contextlib.suppress(OSError): self._socket.close() - except OSError: - pass self._socket = None self._connected = False self._host = "" @@ -185,7 +185,7 @@ class GDBClient: logger.debug(f"Sending: {packet}") try: - self._socket.sendall(packet.encode('latin-1')) + self._socket.sendall(packet.encode("latin-1")) except OSError as e: self._connected = False raise GDBError(f"Send failed: {e}") from e @@ -212,20 +212,20 @@ class GDBClient: data += chunk # Look for packet boundaries - decoded = data.decode('latin-1') + decoded = data.decode("latin-1") # Skip any leading ACK/NAK - while decoded and decoded[0] in '+-': + while decoded and decoded[0] in "+-": decoded = decoded[1:] - if '$' in decoded and '#' in decoded: + if "$" in decoded and "#" in decoded: # Find packet bounds - start = decoded.index('$') - end = decoded.index('#', start) + start = decoded.index("$") + end = decoded.index("#", start) if end + 2 <= len(decoded): # Complete packet - packet_data = decoded[start + 1:end] - checksum = decoded[end + 1:end + 3] + packet_data = decoded[start + 1 : end] + checksum = decoded[end + 1 : end + 3] # Verify checksum expected = calculate_checksum(packet_data) @@ -234,11 +234,11 @@ class GDBClient: f"Checksum mismatch: got {checksum}, expected {expected}" ) # Send NAK - self._socket.sendall(b'-') + self._socket.sendall(b"-") continue # Send ACK - self._socket.sendall(b'+') + self._socket.sendall(b"+") logger.debug(f"Received: ${packet_data}#{checksum}") return packet_data @@ -284,16 +284,28 @@ class GDBClient: Args: regs: Registers object with values to write """ + # Build register string in GDB order (little-endian) def le32(val: int) -> str: - return val.to_bytes(4, 'little').hex() + return val.to_bytes(4, "little").hex() hex_data = ( - le32(regs.eax) + le32(regs.ecx) + le32(regs.edx) + le32(regs.ebx) + - le32(regs.esp) + le32(regs.ebp) + le32(regs.esi) + le32(regs.edi) + - le32(regs.eip) + le32(regs.eflags) + - le32(regs.cs) + le32(regs.ss) + le32(regs.ds) + - le32(regs.es) + le32(regs.fs) + le32(regs.gs) + le32(regs.eax) + + le32(regs.ecx) + + le32(regs.edx) + + le32(regs.ebx) + + le32(regs.esp) + + le32(regs.ebp) + + le32(regs.esi) + + le32(regs.edi) + + le32(regs.eip) + + le32(regs.eflags) + + le32(regs.cs) + + le32(regs.ss) + + le32(regs.ds) + + le32(regs.es) + + le32(regs.fs) + + le32(regs.gs) ) response = self._command(f"G{hex_data}") @@ -312,7 +324,7 @@ class GDBClient: response = self._command(f"p{reg_num:x}") if response.startswith("E"): raise GDBError(f"Failed to read register {reg_num}: {response}") - return int.from_bytes(bytes.fromhex(response), 'little') + return int.from_bytes(bytes.fromhex(response), "little") # ========================================================================= # Memory Operations @@ -371,10 +383,7 @@ class GDBClient: raise GDBError("Breakpoints not supported by this GDB stub") bp = Breakpoint( - id=self._next_bp_id, - address=address, - enabled=True, - original=f"{address:05x}" + id=self._next_bp_id, address=address, enabled=True, original=f"{address:05x}" ) self._breakpoints[bp.id] = bp self._next_bp_id += 1 @@ -528,24 +537,13 @@ class GDBClient: reason=StopReason.BREAKPOINT, address=addr, signal=signal, - breakpoint_id=bp.id + breakpoint_id=bp.id, ) - return StopEvent( - reason=StopReason.STEP, - address=addr, - signal=signal - ) - return StopEvent( - reason=StopReason.SIGNAL, - address=0, - signal=signal - ) + return StopEvent(reason=StopReason.STEP, address=addr, signal=signal) + return StopEvent(reason=StopReason.SIGNAL, address=0, signal=signal) elif stop_type == "exit": - return StopEvent( - reason=StopReason.EXITED, - signal=info.get("code", 0) - ) + return StopEvent(reason=StopReason.EXITED, signal=info.get("code", 0)) return StopEvent(reason=StopReason.UNKNOWN) @@ -562,7 +560,7 @@ class GDBClient: response = self._command("qSupported") if response.startswith("E"): return [] - return response.split(';') + return response.split(";") def query_attached(self) -> bool: """Query if attached to existing process. @@ -580,10 +578,10 @@ class GDBClient: def kill(self) -> None: """Kill the target process.""" - try: + import contextlib + + with contextlib.suppress(GDBError): self._command("k") - except GDBError: - pass # Connection may close immediately self.disconnect() # ========================================================================= @@ -599,7 +597,7 @@ class GDBClient: raise GDBError("Not connected to GDB stub") try: - self._socket.sendall(b'\x03') + self._socket.sendall(b"\x03") logger.debug("Sent interrupt") except OSError as e: self._connected = False diff --git a/src/dosbox_mcp/resources.py b/src/dosbox_mcp/resources.py index 8cd8172..ecdba02 100644 --- a/src/dosbox_mcp/resources.py +++ b/src/dosbox_mcp/resources.py @@ -13,10 +13,7 @@ from typing import Any _screenshot_registry: dict[str, dict[str, Any]] = {} # Default capture directory (can be overridden) -CAPTURE_DIR = os.environ.get( - "DOS_DIR", - os.path.expanduser("~/claude/dosbox-mcp/dos") -) + "/capture" +CAPTURE_DIR = os.environ.get("DOS_DIR", os.path.expanduser("~/claude/dosbox-mcp/dos")) + "/capture" def register_screenshot(filename: str, path: str, size: int, timestamp: str | None = None) -> str: @@ -81,12 +78,14 @@ def list_screenshots() -> list[dict[str, Any]]: # Add registered screenshots for filename, info in _screenshot_registry.items(): - screenshots.append({ - "uri": f"dosbox://screenshots/{filename}", - "filename": filename, - "size": info["size"], - "timestamp": info["timestamp"], - }) + screenshots.append( + { + "uri": f"dosbox://screenshots/{filename}", + "filename": filename, + "size": info["size"], + "timestamp": info["timestamp"], + } + ) # Scan capture directory for additional files if os.path.isdir(CAPTURE_DIR): @@ -94,12 +93,14 @@ def list_screenshots() -> list[dict[str, Any]]: filename = os.path.basename(path) if filename not in _screenshot_registry: stat = os.stat(path) - screenshots.append({ - "uri": f"dosbox://screenshots/{filename}", - "filename": filename, - "size": stat.st_size, - "timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(), - }) + screenshots.append( + { + "uri": f"dosbox://screenshots/{filename}", + "filename": filename, + "size": stat.st_size, + "timestamp": datetime.fromtimestamp(stat.st_mtime).isoformat(), + } + ) # Sort by timestamp (newest first) screenshots.sort(key=lambda x: x["timestamp"], reverse=True) @@ -123,8 +124,9 @@ def capture_screen_live() -> bytes | None: """ from .tools.peripheral import _get_qmp_port, _qmp_command - result = _qmp_command("localhost", _get_qmp_port(), "screendump", - {"filename": "_live_capture.png"}, timeout=10.0) + result = _qmp_command( + "localhost", _get_qmp_port(), "screendump", {"filename": "_live_capture.png"}, timeout=10.0 + ) if "error" in result: return None diff --git a/src/dosbox_mcp/server.py b/src/dosbox_mcp/server.py index 04e436d..b6473a4 100644 --- a/src/dosbox_mcp/server.py +++ b/src/dosbox_mcp/server.py @@ -17,8 +17,7 @@ from . import resources, tools # Configure logging logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @@ -53,7 +52,7 @@ Address formats supported: - Segment:offset: "1234:5678" (standard DOS format) - Flat hex: "0x12345" or "12345" - Decimal: "#12345" -""" +""", ) # ============================================================================= @@ -69,6 +68,7 @@ _TOOLS = [ tools.step, tools.step_over, tools.quit, + tools.fonts_list, # Breakpoints tools.breakpoint_set, tools.breakpoint_list, @@ -91,6 +91,12 @@ _TOOLS = [ tools.mouse_drag, tools.clipboard_copy, tools.clipboard_paste, + # Joystick (requires QMP patch) + tools.joystick_move, + tools.joystick_button, + # Parallel Port (requires QMP patch) + tools.parallel_write, + tools.parallel_read, # Control (QMP-based) tools.pause, tools.resume, @@ -99,6 +105,18 @@ _TOOLS = [ tools.loadstate, tools.memdump, tools.query_status, + # Logging (requires QMP logging patch) + tools.logging_status, + tools.logging_enable, + tools.logging_disable, + tools.logging_category, + tools.log_capture, + tools.log_clear, + # Network (port mapping and modem) + tools.port_list, + tools.port_status, + tools.modem_dial, + tools.modem_hangup, ] for func in _TOOLS: @@ -110,15 +128,22 @@ for func in _TOOLS: # ============================================================================= -@mcp.resource("dosbox://screen") +@mcp.resource("dosbox://screen", mime_type="image/png") def get_screen_resource() -> bytes: - """Capture and return the current DOSBox-X screen. + """Live capture of the current DOSBox-X display. - This is a live capture - no need to call screenshot() first. - Simply read this resource to get the current display as PNG. + Use this to see what's currently on screen without saving a file. + Returns PNG image data directly - just read this resource. + + Requires: DOSBox-X running with QMP enabled (port 4444) + + When to use: + - Quick visual check of current display state + - Verifying UI state before/after interactions + - Debugging display issues Returns: - PNG image data of the current screen + PNG image bytes of the current display """ data = resources.capture_screen_live() if data is None: @@ -128,24 +153,39 @@ def get_screen_resource() -> bytes: @mcp.resource("dosbox://screenshots") def list_screenshots_resource() -> str: - """List all available DOSBox-X screenshots. + """List all saved screenshots with metadata. - Returns a JSON list of screenshot metadata including URIs. + Returns JSON array of screenshot info including: + - filename: Use with dosbox://screenshots/{filename} to retrieve + - size: File size in bytes + - timestamp: When the screenshot was taken + + Workflow: + 1. Call screenshot() tool to capture display + 2. Note the filename from the response + 3. Access via dosbox://screenshots/{filename} + + Returns: + JSON array of screenshot metadata """ import json + screenshots = resources.list_screenshots() return json.dumps(screenshots, indent=2) -@mcp.resource("dosbox://screenshots/{filename}") +@mcp.resource("dosbox://screenshots/{filename}", mime_type="image/png") def get_screenshot_resource(filename: str) -> bytes: - """Get a specific screenshot by filename. + """Retrieve a saved screenshot by exact filename. Args: - filename: Screenshot filename (e.g., "ripterm_001.png") + filename: Exact filename from screenshot() result or list + Example: "screenshot_001.png" Returns: - PNG image data + PNG image bytes + + Note: Use dosbox://screenshots to list available files. """ data = resources.get_screenshot_data(filename) if data is None: @@ -153,23 +193,6 @@ def get_screenshot_resource(filename: str) -> bytes: return data -@mcp.resource("dosbox://screenshots/latest") -def get_latest_screenshot_resource() -> bytes: - """Get the most recent screenshot. - - Returns: - PNG image data of the latest screenshot - """ - latest = resources.get_latest_screenshot() - if latest is None: - raise ValueError("No screenshots available") - - data = resources.get_screenshot_data(latest["filename"]) - if data is None: - raise ValueError("Screenshot file not found") - return data - - # ============================================================================= # Entry Point # ============================================================================= diff --git a/src/dosbox_mcp/tools/__init__.py b/src/dosbox_mcp/tools/__init__.py index 46db365..77e6576 100644 --- a/src/dosbox_mcp/tools/__init__.py +++ b/src/dosbox_mcp/tools/__init__.py @@ -4,13 +4,15 @@ Tools are organized by function: - execution: launch, attach, continue, step, quit - breakpoints: breakpoint_set, breakpoint_list, breakpoint_delete - inspection: registers, memory_read, memory_write, disassemble, stack, status -- peripheral: screenshot, serial_send, keyboard_send, mouse_* +- peripheral: screenshot, serial_send, keyboard_send, mouse_*, joystick_*, parallel_* - control: pause, resume, reset, savestate, loadstate, memdump, query_status +- logging: logging_status, logging_enable, logging_disable, log_capture, log_clear +- network: port_list, port_status, modem_dial, modem_hangup """ from .breakpoints import breakpoint_delete, breakpoint_list, breakpoint_set from .control import loadstate, memdump, pause, query_status, reset, resume, savestate -from .execution import attach, continue_execution, launch, quit, step, step_over +from .execution import attach, continue_execution, fonts_list, launch, quit, step, step_over from .inspection import ( disassemble, memory_read, @@ -21,13 +23,26 @@ from .inspection import ( stack, status, ) +from .logging import ( + log_capture, + log_clear, + logging_category, + logging_disable, + logging_enable, + logging_status, +) +from .network import modem_dial, modem_hangup, port_list, port_status from .peripheral import ( clipboard_copy, clipboard_paste, + joystick_button, + joystick_move, keyboard_send, mouse_click, mouse_drag, mouse_move, + parallel_read, + parallel_write, screenshot, serial_send, ) @@ -40,6 +55,7 @@ __all__ = [ "step", "step_over", "quit", + "fonts_list", # Breakpoints "breakpoint_set", "breakpoint_list", @@ -62,6 +78,10 @@ __all__ = [ "mouse_drag", "clipboard_copy", "clipboard_paste", + "joystick_move", + "joystick_button", + "parallel_write", + "parallel_read", # Control "pause", "resume", @@ -70,4 +90,16 @@ __all__ = [ "loadstate", "memdump", "query_status", + # Logging + "logging_status", + "logging_enable", + "logging_disable", + "logging_category", + "log_capture", + "log_clear", + # Network + "port_list", + "port_status", + "modem_dial", + "modem_hangup", ] diff --git a/src/dosbox_mcp/tools/breakpoints.py b/src/dosbox_mcp/tools/breakpoints.py index b18929b..da0213d 100644 --- a/src/dosbox_mcp/tools/breakpoints.py +++ b/src/dosbox_mcp/tools/breakpoints.py @@ -1,4 +1,21 @@ -"""Breakpoint management tools.""" +"""Breakpoint management tools for DOS debugging. + +Breakpoints pause execution when the CPU reaches a specific address. +Essential for reverse engineering - set breakpoints at interesting +code locations, then continue execution to analyze program behavior. + +Address formats accepted: +- Segment:offset: "1234:0100" (standard DOS format) +- Flat hex: "0x12340" or "12340" +- With 0x prefix recommended for flat addresses + +Workflow: +1. breakpoint_set("CS:0100") - Set at code segment offset +2. continue_execution() - Run until breakpoint hit +3. registers() / memory_read() - Inspect state +4. breakpoint_list() - See all active breakpoints +5. breakpoint_delete(id=1) - Remove when done +""" from ..gdb_client import GDBError from ..state import client @@ -6,17 +23,26 @@ from ..utils import format_address, parse_address def breakpoint_set(address: str) -> dict: - """Set a software breakpoint at the specified address. + """Set a software breakpoint at a memory address. + + When the CPU's instruction pointer reaches this address, + execution will pause and you can inspect registers/memory. Args: - address: Memory address (segment:offset or flat hex) + address: Where to break. Formats: + - "1234:0100" - segment:offset (DOS standard) + - "0x12340" - flat 20-bit address + - "CS:0100" - relative to code segment Returns: - Breakpoint info + - breakpoint_id: Reference ID for delete/list operations + - address: Normalized address where breakpoint is set + - original: The address string you provided Examples: - breakpoint_set("1234:0100") # segment:offset - breakpoint_set("0x12340") # flat address + breakpoint_set("1234:0100") # At segment 1234, offset 0100 + breakpoint_set("0x12340") # At flat address + breakpoint_set("CS:0100") # Relative to current code segment """ try: addr = parse_address(address) @@ -35,10 +61,14 @@ def breakpoint_set(address: str) -> dict: def breakpoint_list() -> dict: - """List all active breakpoints. + """List all currently active breakpoints. Returns: - List of breakpoint info + - count: Number of active breakpoints + - breakpoints: Array of breakpoint info, each with: + - id: Use this ID with breakpoint_delete() + - address: Memory address being watched + - enabled: Whether breakpoint is active """ bps = client.list_breakpoints() return { @@ -48,14 +78,19 @@ def breakpoint_list() -> dict: def breakpoint_delete(id: int | None = None, all: bool = False) -> dict: - """Delete breakpoint(s). + """Delete one or all breakpoints. Args: - id: Specific breakpoint ID to delete - all: If True, delete all breakpoints + id: Breakpoint ID from breakpoint_set() or breakpoint_list() + all: Set True to delete ALL breakpoints at once Returns: - Deletion result + - deleted: Count of breakpoints removed (when all=True) + - deleted_id: ID that was removed (when id specified) + + Examples: + breakpoint_delete(id=1) # Delete breakpoint #1 + breakpoint_delete(all=True) # Clear all breakpoints """ try: if all: diff --git a/src/dosbox_mcp/tools/control.py b/src/dosbox_mcp/tools/control.py index dcf2872..6e3a979 100644 --- a/src/dosbox_mcp/tools/control.py +++ b/src/dosbox_mcp/tools/control.py @@ -1,4 +1,21 @@ -"""Control tools: pause, resume, reset, savestate, loadstate.""" +"""Emulator control tools via QMP (QEMU Machine Protocol). + +These tools control the DOSBox-X emulator itself, not the debugger. +Use these for: +- Pausing/resuming the emulator (different from GDB breakpoints) +- Creating save states for checkpointing +- Resetting the emulated system +- Dumping large memory regions efficiently + +QMP vs GDB: +- GDB (port 1234): CPU-level debugging, breakpoints, stepping +- QMP (port 4444): Emulator control, screenshots, keyboard/mouse + +Save State Workflow: +1. savestate("before_crash.sav") - Create checkpoint +2. +3. loadstate("before_crash.sav") - Restore and try again +""" from typing import Any @@ -93,8 +110,9 @@ def savestate(filename: str) -> dict: Example: savestate("checkpoint1.sav") """ - result = _qmp_command("localhost", _get_qmp_port(), "savestate", - {"filename": filename}, timeout=30.0) + result = _qmp_command( + "localhost", _get_qmp_port(), "savestate", {"file": filename}, timeout=30.0 + ) if "error" in result: return { @@ -132,8 +150,9 @@ def loadstate(filename: str) -> dict: Example: loadstate("checkpoint1.sav") """ - result = _qmp_command("localhost", _get_qmp_port(), "loadstate", - {"filename": filename}, timeout=30.0) + result = _qmp_command( + "localhost", _get_qmp_port(), "loadstate", {"file": filename}, timeout=30.0 + ) if "error" in result: return { @@ -175,10 +194,10 @@ def memdump(address: str, length: int, filename: str | None = None) -> dict: """ # Parse segment:offset if provided - if ':' in address: - seg, off = address.split(':') + if ":" in address: + seg, off = address.split(":") flat_addr = (int(seg, 16) << 4) + int(off, 16) - elif address.startswith('0x'): + elif address.startswith("0x"): flat_addr = int(address, 16) else: flat_addr = int(address, 16) diff --git a/src/dosbox_mcp/tools/execution.py b/src/dosbox_mcp/tools/execution.py index 25662e3..0ccb602 100644 --- a/src/dosbox_mcp/tools/execution.py +++ b/src/dosbox_mcp/tools/execution.py @@ -1,8 +1,26 @@ -"""Execution control tools: launch, attach, continue, step, quit.""" +"""Execution control tools for DOSBox-X debugging sessions. + +This module provides tools to: +- Start DOSBox-X with debugging enabled (launch) +- Connect to the GDB debug stub (attach) +- Control execution: step, continue, quit + +Typical workflow: +1. launch() - Start DOSBox-X (auto-detects Docker vs native) +2. attach() - Connect to GDB stub +3. Use breakpoints, step, continue to debug +4. quit() - Clean up when done + +GDB Stub Connection: +- Default port: 1234 +- Protocol: GDB Remote Serial Protocol +- Auto-connects to Docker container or native DOSBox-X +""" import time from ..dosbox import DOSBoxConfig +from ..fonts import get_font_path, list_fonts from ..gdb_client import GDBError from ..state import client, manager from ..utils import format_address @@ -18,6 +36,20 @@ def launch( memsize: int = 16, joystick: str = "auto", parallel1: str = "disabled", + serial1: str = "disabled", + serial2: str = "disabled", + serial1_port: int = 5555, + serial2_port: int = 5556, + ipx: bool = False, + # Display output settings + output: str = "opengl", + # TrueType font settings (when output="ttf") + ttf_font: str | None = None, + ttf_ptsize: int | None = None, + ttf_lins: int | None = None, + ttf_cols: int | None = None, + ttf_winperc: int | None = None, + ttf_wp: str | None = None, ) -> dict: """Launch DOSBox-X with GDB debugging and QMP control enabled. @@ -33,6 +65,21 @@ def launch( joystick: Joystick type - auto, none, 2axis, 4axis (default: auto) parallel1: Parallel port 1 - disabled, file, printer (default: disabled) Use "file" to capture print output to capture directory + serial1: Serial port 1 mode - disabled, nullmodem, modem (default: disabled) + Use "nullmodem" to accept TCP connections on serial1_port + Use "modem" for dial-out with AT commands (ATDT hostname:port) + serial2: Serial port 2 mode - same options as serial1 + serial1_port: TCP port for nullmodem mode on COM1 (default: 5555) + serial2_port: TCP port for nullmodem mode on COM2 (default: 5556) + ipx: Enable IPX networking for DOS multiplayer games (default: False) + output: Display output mode - opengl, ttf, surface (default: opengl) + Use "ttf" for TrueType font rendering (sharper text) + ttf_font: TrueType font name when output="ttf" (e.g., "Consola", "Nouveau_IBM") + ttf_ptsize: Font size in points for TTF mode + ttf_lins: Screen lines for TTF mode (e.g., 50 for taller screen) + ttf_cols: Screen columns for TTF mode (e.g., 132 for wider screen) + ttf_winperc: Window size as percentage for TTF mode (e.g., 75) + ttf_wp: Word processor mode for TTF - WP (WordPerfect), WS (WordStar), XY, FE Returns: Status dict with connection details @@ -41,6 +88,10 @@ def launch( launch("/path/to/GAME.EXE") # Auto-detects native vs Docker launch("/path/to/GAME.EXE", joystick="2axis") # With joystick launch("/path/to/GAME.EXE", parallel1="file") # Capture printer output + launch("/path/to/RIPTERM.EXE", serial1="nullmodem") # RIPscrip testing + launch("/path/to/TELIX.EXE", serial1="modem") # BBS dial-out + launch("/path/to/DOOM.EXE", ipx=True) # Multiplayer gaming + launch("/path/to/WP.EXE", output="ttf", ttf_font="Consola", ttf_lins=50) # TTF mode """ config = DOSBoxConfig( gdb_port=gdb_port, @@ -51,6 +102,20 @@ def launch( memsize=memsize, joysticktype=joystick, parallel1=parallel1, + serial1=serial1, + serial2=serial2, + serial1_port=serial1_port, + serial2_port=serial2_port, + ipx=ipx, + # Display output + output=output, + # TTF settings + ttf_font=ttf_font, + ttf_ptsize=ttf_ptsize, + ttf_lins=ttf_lins, + ttf_cols=ttf_cols, + ttf_winperc=ttf_winperc, + ttf_wp=ttf_wp, ) launch_method = None @@ -72,7 +137,7 @@ def launch( launch_method = "native" manager.launch_native(binary_path=binary_path, config=config) - return { + result = { "success": True, "message": f"DOSBox-X launched successfully ({launch_method})", "launch_method": launch_method, @@ -82,6 +147,35 @@ def launch( "pid": manager.pid, "hint": f"Use attach() to connect to debugger on port {gdb_port}. QMP (screenshots/keyboard) on port {qmp_port}.", } + + # Add serial port info if configured + if serial1 != "disabled": + result["serial1"] = { + "mode": serial1, + "tcp_port": serial1_port if serial1.lower() == "nullmodem" else None, + } + if serial2 != "disabled": + result["serial2"] = { + "mode": serial2, + "tcp_port": serial2_port if serial2.lower() == "nullmodem" else None, + } + if ipx: + result["ipx"] = True + + # Add TTF info if configured + if output == "ttf": + ttf_info = {"output": "ttf"} + if ttf_font: + ttf_info["font"] = ttf_font + if ttf_ptsize: + ttf_info["ptsize"] = ttf_ptsize + if ttf_lins: + ttf_info["lins"] = ttf_lins + if ttf_cols: + ttf_info["cols"] = ttf_cols + result["ttf"] = ttf_info + + return result except Exception as e: return { "success": False, @@ -152,13 +246,27 @@ def attach( def continue_execution(timeout: float | None = None) -> dict: - """Continue execution until breakpoint or signal. + """Continue execution until breakpoint hit or signal received. + + Resumes CPU execution. The debugger will stop when: + - A breakpoint is hit + - A signal/interrupt occurs + - Timeout is reached (if specified) Args: - timeout: Optional timeout in seconds + timeout: Maximum seconds to wait (None = wait forever) + Use timeout for programs that may hang or loop Returns: - Stop event info (reason, address, breakpoint hit) + - stop_reason: Why execution stopped (breakpoint, signal, timeout) + - address: Where execution stopped (segment:offset format) + - breakpoint_id: Which breakpoint was hit (if any) + - cs_ip: Current code segment:instruction pointer + + Common stop reasons: + - "breakpoint": Hit a set breakpoint + - "signal": Received interrupt (e.g., Ctrl+C) + - "timeout": Specified timeout elapsed """ try: event = client.continue_execution(timeout=timeout) @@ -254,3 +362,76 @@ def quit() -> dict: "success": False, "error": str(e), } + + +def fonts_list() -> dict: + """List available bundled TrueType fonts for DOSBox-X TTF output. + + These fonts are from The Ultimate Oldschool PC Font Pack by VileR (int10h.org) + and are bundled with dosbox-mcp for convenience. Licensed under CC BY-SA 4.0. + + Returns: + Dictionary with: + - fonts: List of font info (name, description, best_for) + - usage_hint: How to use fonts with launch() + + Font naming: + - Px437_* : Strict CP437 encoding (original DOS character set) + - PxPlus_* : Extended Unicode (CP437 + additional characters) + + Example: + fonts = fonts_list() + # Then use with launch(): + launch(output="ttf", ttf_font="Px437_IBM_VGA_9x16", ttf_ptsize=18) + """ + font_info = { + "Px437_IBM_VGA_8x16": { + "description": "Classic IBM VGA 8x16", + "best_for": "Standard DOS text (80x25)", + }, + "Px437_IBM_VGA_9x16": { + "description": "IBM VGA 9x16 (wider)", + "best_for": "Better readability, recommended default", + }, + "Px437_IBM_EGA_8x14": { + "description": "IBM EGA 8x14", + "best_for": "80x25 with smaller font", + }, + "Px437_IBM_CGA": { + "description": "IBM CGA 8x8", + "best_for": "40-column mode, retro look", + }, + "Px437_IBM_MDA": { + "description": "IBM Monochrome Display Adapter", + "best_for": "Word processing, green phosphor style", + }, + "PxPlus_IBM_VGA_8x16": { + "description": "VGA 8x16 + Unicode", + "best_for": "Extended character support", + }, + "PxPlus_IBM_VGA_9x16": { + "description": "VGA 9x16 + Unicode", + "best_for": "Extended chars + better readability", + }, + } + + fonts = [] + for name in list_fonts(): + info = font_info.get(name, {"description": name, "best_for": "General use"}) + path = get_font_path(name) + fonts.append( + { + "name": name, + "description": info["description"], + "best_for": info["best_for"], + "available": path is not None and path.exists(), + } + ) + + return { + "success": True, + "fonts": fonts, + "recommended": "Px437_IBM_VGA_9x16", + "usage_hint": 'Use with launch(output="ttf", ttf_font="", ttf_ptsize=18)', + "license": "CC BY-SA 4.0 - The Ultimate Oldschool PC Font Pack by VileR (int10h.org)", + } diff --git a/src/dosbox_mcp/tools/inspection.py b/src/dosbox_mcp/tools/inspection.py index 840dc3e..61b0b51 100644 --- a/src/dosbox_mcp/tools/inspection.py +++ b/src/dosbox_mcp/tools/inspection.py @@ -1,4 +1,24 @@ -"""Inspection tools: registers, memory, disassemble, stack, status.""" +"""Inspection tools for examining DOS program state. + +These tools read CPU registers, memory, and screen contents. +Essential for reverse engineering - use after hitting a breakpoint +to understand what the program is doing. + +Key registers for DOS (16-bit real mode): +- CS:IP = Code Segment:Instruction Pointer (where code runs) +- SS:SP = Stack Segment:Stack Pointer (function call stack) +- DS:SI = Data Segment:Source Index (string operations) +- ES:DI = Extra Segment:Destination Index (string operations) +- AX,BX,CX,DX = General purpose registers + +Memory addressing (20-bit real mode): +- Physical = (Segment * 16) + Offset +- Example: 1234:5678 = 0x12340 + 0x5678 = 0x179B8 + +VGA memory locations: +- 0xB8000 = Text mode buffer (character + attribute pairs) +- 0xA0000 = Graphics mode buffer (mode 13h, etc.) +""" from typing import Literal @@ -9,16 +29,25 @@ from ..utils import format_address, hexdump, parse_address def registers() -> dict: - """Read all CPU registers. + """Read all CPU registers - the core debugging primitive. + + Call this after breakpoint hits or stepping to see CPU state. Returns: - Complete register state including: - - 32-bit registers (EAX, EBX, etc.) - - 16-bit aliases (AX, BX, etc.) - - Segment registers (CS, DS, ES, SS, FS, GS) - - Instruction pointer (CS:IP) - - Stack pointer (SS:SP) - - Flags + 32-bit registers: eax, ebx, ecx, edx, esi, edi, ebp, esp, eip + 16-bit aliases: ax, bx, cx, dx, si, di, bp, sp, ip (lower 16 bits) + 8-bit aliases: al, ah, bl, bh, cl, ch, dl, dh + Segment registers: cs, ds, es, ss, fs, gs + Computed addresses: + - cs_ip: Physical address of next instruction + - ss_sp: Physical address of stack top + Flags: flags register value + + Key values for DOS debugging: + - cs:ip = Current execution point + - ds:bx/si = Common data pointers + - ax = Function return values, INT 21h function number + - dx:ax = 32-bit return values """ try: regs = client.read_registers() @@ -55,17 +84,21 @@ def memory_read( try: # Handle register-based addresses like "DS:SI" addr_str = address.upper() - if ':' in addr_str: - seg_part, off_part = addr_str.split(':') + if ":" in addr_str: + seg_part, off_part = addr_str.split(":") # Check if parts are register names regs = None - seg_regs = {'CS', 'DS', 'ES', 'SS', 'FS', 'GS'} - off_regs = {'IP', 'SP', 'BP', 'SI', 'DI', 'BX', 'AX', 'CX', 'DX'} + seg_regs = {"CS", "DS", "ES", "SS", "FS", "GS"} + off_regs = {"IP", "SP", "BP", "SI", "DI", "BX", "AX", "CX", "DX"} if seg_part in seg_regs or off_part in off_regs: regs = client.read_registers() - seg_val = getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16) - off_val = getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16) + seg_val = ( + getattr(regs, seg_part.lower()) if seg_part in seg_regs else int(seg_part, 16) + ) + off_val = ( + getattr(regs, off_part.lower()) if off_part in off_regs else int(off_part, 16) + ) addr = (seg_val << 4) + off_val else: addr = parse_address(address) @@ -121,10 +154,7 @@ def memory_write( try: addr = parse_address(address) - if format == "hex": - bytes_data = bytes.fromhex(data) - else: - bytes_data = data.encode('latin-1') + bytes_data = bytes.fromhex(data) if format == "hex" else data.encode("latin-1") client.write_memory(addr, bytes_data) @@ -176,26 +206,48 @@ def disassemble(address: str | None = None, count: int = 10) -> dict: 0xEB: "JMP short", 0x74: "JZ", 0x75: "JNZ", - 0x50: "PUSH AX", 0x51: "PUSH CX", 0x52: "PUSH DX", 0x53: "PUSH BX", - 0x54: "PUSH SP", 0x55: "PUSH BP", 0x56: "PUSH SI", 0x57: "PUSH DI", - 0x58: "POP AX", 0x59: "POP CX", 0x5A: "POP DX", 0x5B: "POP BX", - 0x5C: "POP SP", 0x5D: "POP BP", 0x5E: "POP SI", 0x5F: "POP DI", - 0xB8: "MOV AX,imm", 0xB9: "MOV CX,imm", 0xBA: "MOV DX,imm", 0xBB: "MOV BX,imm", - 0x89: "MOV r/m,r", 0x8B: "MOV r,r/m", - 0x01: "ADD r/m,r", 0x03: "ADD r,r/m", - 0x29: "SUB r/m,r", 0x2B: "SUB r,r/m", - 0x31: "XOR r/m,r", 0x33: "XOR r,r/m", - 0x39: "CMP r/m,r", 0x3B: "CMP r,r/m", + 0x50: "PUSH AX", + 0x51: "PUSH CX", + 0x52: "PUSH DX", + 0x53: "PUSH BX", + 0x54: "PUSH SP", + 0x55: "PUSH BP", + 0x56: "PUSH SI", + 0x57: "PUSH DI", + 0x58: "POP AX", + 0x59: "POP CX", + 0x5A: "POP DX", + 0x5B: "POP BX", + 0x5C: "POP SP", + 0x5D: "POP BP", + 0x5E: "POP SI", + 0x5F: "POP DI", + 0xB8: "MOV AX,imm", + 0xB9: "MOV CX,imm", + 0xBA: "MOV DX,imm", + 0xBB: "MOV BX,imm", + 0x89: "MOV r/m,r", + 0x8B: "MOV r,r/m", + 0x01: "ADD r/m,r", + 0x03: "ADD r,r/m", + 0x29: "SUB r/m,r", + 0x2B: "SUB r,r/m", + 0x31: "XOR r/m,r", + 0x33: "XOR r,r/m", + 0x39: "CMP r/m,r", + 0x3B: "CMP r,r/m", } lines = [] for i, b in enumerate(mem.data[:count]): hint = opcodes.get(b, f"?? ({b:02x})") - lines.append({ - "address": format_address(addr + i), - "byte": f"{b:02x}", - "hint": hint, - }) + lines.append( + { + "address": format_address(addr + i), + "byte": f"{b:02x}", + "hint": hint, + } + ) return { "success": True, @@ -231,12 +283,14 @@ def stack(count: int = 16) -> dict: words = [] for i in range(0, len(mem.data), 2): if i + 1 < len(mem.data): - word = int.from_bytes(mem.data[i:i+2], 'little') - words.append({ - "offset": f"+{i:02x}", - "address": format_address(sp_addr + i), - "value": f"{word:04x}", - }) + word = int.from_bytes(mem.data[i : i + 2], "little") + words.append( + { + "offset": f"+{i:02x}", + "address": format_address(sp_addr + i), + "value": f"{word:04x}", + } + ) return { "success": True, @@ -272,17 +326,17 @@ def screen_text(rows: int = 25, cols: int = 80) -> dict: # VGA text mode buffer at 0xB8000 # Each cell is 2 bytes: [character][attribute] # Attribute: bits 0-3 = foreground, 4-6 = background, 7 = blink - VIDEO_BASE = 0xB8000 + video_base = 0xB8000 bytes_per_row = cols * 2 total_bytes = rows * bytes_per_row - mem = client.read_memory(VIDEO_BASE, total_bytes) + mem = client.read_memory(video_base, total_bytes) lines = [] raw_lines = [] for row in range(rows): offset = row * bytes_per_row - row_data = mem.data[offset:offset + bytes_per_row] + row_data = mem.data[offset : offset + bytes_per_row] # Extract characters (every other byte) chars = [] @@ -292,11 +346,11 @@ def screen_text(rows: int = 25, cols: int = 80) -> dict: if 32 <= char_byte < 127: chars.append(chr(char_byte)) elif char_byte == 0: - chars.append(' ') + chars.append(" ") else: - chars.append('.') # Non-printable + chars.append(".") # Non-printable - line = ''.join(chars).rstrip() + line = "".join(chars).rstrip() lines.append(line) raw_lines.append(row_data.hex()) @@ -339,7 +393,7 @@ def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> d Mode 12h: 4 planes, more complex """ try: - VIDEO_BASE = 0xA0000 + video_base = 0xA0000 if mode == "13h": # Mode 13h: 320x200, 1 byte per pixel, linear @@ -348,7 +402,7 @@ def screen_graphics(width: int = 320, height: int = 200, mode: str = "13h") -> d # Conservative read for other modes total_bytes = min(width * height // 8, 38400) - mem = client.read_memory(VIDEO_BASE, total_bytes) + mem = client.read_memory(video_base, total_bytes) # Provide some statistics about the pixel data pixel_counts = {} diff --git a/src/dosbox_mcp/tools/logging.py b/src/dosbox_mcp/tools/logging.py new file mode 100644 index 0000000..d1a0e21 --- /dev/null +++ b/src/dosbox_mcp/tools/logging.py @@ -0,0 +1,357 @@ +"""Logging control tools: query, enable/disable, capture, and clear log output. + +These tools provide programmatic access to DOSBox-X debug logging via QMP, +enabling reverse engineering workflows like tracing INT 21h DOS API calls +and file I/O operations. +""" + +from typing import Any + +from .peripheral import _get_qmp_port, _qmp_command + + +def logging_status() -> dict: + """Query current DOSBox-X logging state. + + Returns the current state of logging options including: + - INT 21h call logging (tracks DOS API calls) + - File I/O logging (tracks file operations) + - Current log buffer size + + Returns: + Logging state dict with: + - int21: Whether INT 21h logging is enabled + - fileio: Whether file I/O logging is enabled + - buffer_size: Current log buffer line count + + Example: + status = logging_status() + if status["int21"]: + print("INT 21h logging is active") + """ + result = _qmp_command("localhost", _get_qmp_port(), "query-logging") + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + return { + "success": True, + "int21": ret.get("int21", False), + "fileio": ret.get("fileio", False), + "buffer_size": ret.get("buffer_size", 0), + } + + return {"success": True, "raw_response": result} + + +def logging_enable(int21: bool = True, fileio: bool = True) -> dict: + """Enable DOSBox-X debug logging options. + + Enables logging of DOS API calls and/or file operations. This is + essential for reverse engineering - you can trace exactly which + DOS functions a program calls and what files it accesses. + + Args: + int21: Enable INT 21h logging (DOS API call tracing) + fileio: Enable file I/O logging (file access tracing) + + Returns: + Result dict with new logging state + + Examples: + logging_enable() # Enable both + logging_enable(int21=True, fileio=False) # Just INT 21h + logging_enable(fileio=True, int21=False) # Just file I/O + + INT 21h Logging: + When enabled, DOSBox-X logs every INT 21h call including: + - Function number (AH register value) + - Parameters passed in other registers + - File handles, strings, memory addresses + + Common INT 21h functions you'll see: + - AH=3Dh: Open file + - AH=3Eh: Close file + - AH=3Fh: Read from file/device + - AH=40h: Write to file/device + - AH=4Ch: Exit program + + File I/O Logging: + Tracks file-level operations with filenames and byte counts. + """ + args: dict[str, Any] = {} + if int21: + args["int21"] = True + if fileio: + args["fileio"] = True + + # If neither specified, enable both (user called with defaults) + if not args: + args = {"int21": True, "fileio": True} + + result = _qmp_command("localhost", _get_qmp_port(), "set-logging", args) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + enabled = [] + if ret.get("int21"): + enabled.append("INT 21h") + if ret.get("fileio"): + enabled.append("File I/O") + + return { + "success": True, + "message": f"Logging enabled: {', '.join(enabled) if enabled else 'none'}", + "int21": ret.get("int21", False), + "fileio": ret.get("fileio", False), + "changed": ret.get("changed", False), + } + + return {"success": True, "raw_response": result} + + +def logging_disable(int21: bool = True, fileio: bool = True) -> dict: + """Disable DOSBox-X debug logging options. + + Disables the specified logging options. Call with no arguments to + disable all logging, or specify which types to disable. + + Args: + int21: Disable INT 21h logging + fileio: Disable file I/O logging + + Returns: + Result dict with new logging state + + Examples: + logging_disable() # Disable all + logging_disable(int21=True, fileio=False) # Disable only INT 21h + """ + args: dict[str, Any] = {} + if int21: + args["int21"] = False + if fileio: + args["fileio"] = False + + # If neither specified, disable both + if not args: + args = {"int21": False, "fileio": False} + + result = _qmp_command("localhost", _get_qmp_port(), "set-logging", args) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + still_enabled = [] + if ret.get("int21"): + still_enabled.append("INT 21h") + if ret.get("fileio"): + still_enabled.append("File I/O") + + return { + "success": True, + "message": f"Logging still enabled: {', '.join(still_enabled)}" + if still_enabled + else "All logging disabled", + "int21": ret.get("int21", False), + "fileio": ret.get("fileio", False), + "changed": ret.get("changed", False), + } + + return {"success": True, "raw_response": result} + + +def logging_category(category: str, level: str) -> dict: + """Set logging level for a specific category. + + DOSBox-X has multiple logging categories (CPU, files, VGA, etc.). + This allows fine-grained control over what gets logged. + + Args: + category: Log category name (e.g., "files", "cpu", "int21", "vga") + level: Severity level - one of: + - "debug": Most verbose, all messages + - "normal": Standard logging + - "warn": Warnings and errors only + - "error": Errors only + - "fatal": Fatal errors only + - "never": Disable logging for this category + + Returns: + Result dict with applied settings + + Examples: + logging_category("files", "debug") # Verbose file logging + logging_category("cpu", "warn") # Only CPU warnings + logging_category("vga", "never") # Silence VGA messages + + Common Categories: + - files: File system operations + - int21: DOS INT 21h calls + - cpu: CPU instruction execution + - vga: Video/graphics operations + - io: I/O port access + - dosmisc: Miscellaneous DOS operations + """ + result = _qmp_command( + "localhost", _get_qmp_port(), "set-logging-category", {"category": category, "level": level} + ) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + return { + "success": True, + "message": f"Category '{ret.get('category')}' set to '{ret.get('level')}'", + "category": ret.get("category"), + "level": ret.get("level"), + } + + return {"success": True, "raw_response": result} + + +def log_capture(limit: int = 100) -> dict: + """Capture recent log output from DOSBox-X. + + Retrieves the current contents of the log buffer. This is the primary + way to see what INT 21h calls and file operations have occurred. + + Args: + limit: Maximum number of lines to return (default: 100, max: ~4000) + + Returns: + Result dict with: + - lines: The log text (newline-separated) + - line_count: Number of lines returned + + Examples: + # Capture last 50 lines of logs + result = log_capture(50) + print(result["lines"]) + + # Typical workflow: enable logging, run program, capture + logging_enable(int21=True) + # ... let program run ... + logs = log_capture(200) + for line in logs["lines"].split("\\n"): + if "INT 21" in line: + print(line) + + Note: + The log buffer is circular with a maximum size (typically 4000 lines). + Older entries are discarded when the buffer fills. + """ + result = _qmp_command( + "localhost", _get_qmp_port(), "get-log-buffer", {"limit": limit}, timeout=10.0 + ) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + lines = ret.get("lines", "") + line_count = ret.get("line_count", 0) + + return { + "success": True, + "lines": lines, + "line_count": line_count, + "limit_requested": limit, + } + + return {"success": True, "raw_response": result} + + +def log_clear() -> dict: + """Clear the DOSBox-X log buffer. + + Removes all current log entries. Useful for: + - Starting a clean trace before running a program + - Reducing noise when looking for specific events + - Freeing memory if the buffer is very large + + Returns: + Success status + + Example: + # Clear logs, enable tracing, run operation, capture clean results + log_clear() + logging_enable(int21=True) + keyboard_send("DIR\\r") # Run DIR command + time.sleep(1) + result = log_capture() + # result["lines"] contains only logs from the DIR command + """ + result = _qmp_command("localhost", _get_qmp_port(), "clear-log-buffer") + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Logging QMP commands not available. Apply qmp-logging.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + return { + "success": True, + "message": "Log buffer cleared", + } diff --git a/src/dosbox_mcp/tools/network.py b/src/dosbox_mcp/tools/network.py new file mode 100644 index 0000000..4026142 --- /dev/null +++ b/src/dosbox_mcp/tools/network.py @@ -0,0 +1,250 @@ +"""Network and port mapping tools for DOSBox-X. + +Configure serial ports, IPX networking, and query connection status. + +Serial Port Modes: +- disabled: No serial port +- nullmodem: TCP server mode (accepts incoming connections) +- modem: Dial-out mode (DOS programs use AT commands like ATDT hostname:port) + +These tools help manage network connectivity for: +- RIPscrip testing via serial port +- BBS dial-out using terminal programs like TELIX +- Multiplayer gaming via IPX networking +""" + +import socket +import time + +from ..state import manager + + +def port_list() -> dict: + """List all configured port mappings. + + Returns current serial port, parallel port, and network + configuration from the running DOSBox-X instance. + + Returns: + Dictionary with port configuration: + - serial1: {mode, tcp_port (if nullmodem)} + - serial2: {mode, tcp_port (if nullmodem)} + - ipx: boolean + - gdb_port: debugging port + - qmp_port: control port + """ + if not manager.running: + return { + "success": False, + "error": "DOSBox-X is not running. Use launch() first.", + } + + result = { + "success": True, + "gdb_port": manager.gdb_port, + "qmp_port": manager.qmp_port, + "ipx_enabled": manager.ipx_enabled, + } + + # Serial port 1 configuration + if manager.serial1_mode != "disabled": + serial1_info = {"mode": manager.serial1_mode} + if manager.serial1_mode.lower() == "nullmodem": + serial1_info["tcp_port"] = manager.serial1_port + serial1_info["hint"] = f"Connect clients to localhost:{manager.serial1_port}" + elif manager.serial1_mode.lower() == "modem": + serial1_info["hint"] = "Use ATDT hostname:port to dial out from DOS" + result["serial1"] = serial1_info + else: + result["serial1"] = {"mode": "disabled"} + + # Serial port 2 configuration + if manager.serial2_mode != "disabled": + serial2_info = {"mode": manager.serial2_mode} + if manager.serial2_mode.lower() == "nullmodem": + serial2_info["tcp_port"] = manager.serial2_port + serial2_info["hint"] = f"Connect clients to localhost:{manager.serial2_port}" + elif manager.serial2_mode.lower() == "modem": + serial2_info["hint"] = "Use ATDT hostname:port to dial out from DOS" + result["serial2"] = serial2_info + else: + result["serial2"] = {"mode": "disabled"} + + return result + + +def port_status() -> dict: + """Check connection status of configured ports. + + Shows which ports are listening and can accept connections. + Tests connectivity to GDB, QMP, and serial nullmodem ports. + + Returns: + Dictionary with status for each port: + - gdb: {listening: bool, port: int} + - qmp: {listening: bool, port: int} + - serial1: {listening: bool, port: int} (if nullmodem) + - serial2: {listening: bool, port: int} (if nullmodem) + """ + if not manager.running: + return { + "success": False, + "error": "DOSBox-X is not running. Use launch() first.", + } + + def check_port(port: int, timeout: float = 0.5) -> bool: + """Check if a port is listening.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex(("localhost", port)) + sock.close() + return result == 0 + except OSError: + return False + + result = { + "success": True, + "gdb": { + "port": manager.gdb_port, + "listening": check_port(manager.gdb_port), + }, + "qmp": { + "port": manager.qmp_port, + "listening": check_port(manager.qmp_port), + }, + } + + # Check serial ports if configured as nullmodem + if manager.serial1_mode.lower() == "nullmodem": + result["serial1"] = { + "mode": "nullmodem", + "port": manager.serial1_port, + "listening": check_port(manager.serial1_port), + } + + if manager.serial2_mode.lower() == "nullmodem": + result["serial2"] = { + "mode": "nullmodem", + "port": manager.serial2_port, + "listening": check_port(manager.serial2_port), + } + + return result + + +def modem_dial(address: str, port: int = 1) -> dict: + """Initiate modem dial-out connection. + + Sends AT dial command to connect to a TCP server. + Requires serial port configured as "modem" in launch(). + + The modem will translate ATDT to a TCP connection, allowing + DOS terminal programs to connect to modern services. + + Args: + address: Target address in format "hostname:port" or just "hostname" + Examples: "towel.blinkenlights.nl:23", "bbs.example.com:23" + port: COM port number (1 or 2) + + Returns: + Result dict with dial status + + Examples: + modem_dial("towel.blinkenlights.nl:23") # ASCII Star Wars! + modem_dial("bbs.synchronet.net:23") # Synchronet BBS + + Note: + The DOS program itself (TELIX, Procomm, etc.) may need to + initialize the modem first. This tool directly sends ATDT. + """ + if not manager.running: + return { + "success": False, + "error": "DOSBox-X is not running. Use launch() first.", + } + + # Check that the port is configured as modem + mode = manager.serial1_mode if port == 1 else manager.serial2_mode + if mode.lower() != "modem": + return { + "success": False, + "error": f"COM{port} is not configured as modem (current: {mode}). " + f'Use launch(serial{port}="modem") to enable dial-out.', + } + + # Import keyboard_send to type the dial command + from .peripheral import keyboard_send + + # Format the dial command: ATDT hostname:port followed by Enter + dial_cmd = f"ATDT{address}" + + # Send the dial command via keyboard (typed into the DOS program) + result = keyboard_send(dial_cmd, delay_ms=50) + if not result.get("success"): + return { + "success": False, + "error": f"Failed to type dial command: {result.get('error')}", + } + + # Send Enter to execute the command + keyboard_send("enter", delay_ms=50) + + return { + "success": True, + "message": f"Dial command sent: {dial_cmd}", + "address": address, + "port": f"COM{port}", + "hint": "Check screen to see connection status. Connection time depends on target server.", + } + + +def modem_hangup(port: int = 1) -> dict: + """Hang up active modem connection. + + Sends ATH (hang-up) command to disconnect the modem. + Use this to cleanly terminate a dial-out connection. + + Args: + port: COM port number (1 or 2) + + Returns: + Result dict with hangup status + + Note: + The modem may need to be in command mode (escaped from data mode) + for ATH to work. Some programs handle this automatically. + """ + if not manager.running: + return { + "success": False, + "error": "DOSBox-X is not running. Use launch() first.", + } + + # Check that the port is configured as modem + mode = manager.serial1_mode if port == 1 else manager.serial2_mode + if mode.lower() != "modem": + return { + "success": False, + "error": f"COM{port} is not configured as modem (current: {mode}).", + } + + from .peripheral import keyboard_send + + # Send escape sequence (+++) to enter command mode, then ATH + # The escape sequence requires 1 second guard time before and after + + # Type +++ to escape to command mode + keyboard_send("+++", delay_ms=100) + time.sleep(1.2) # Guard time + + # Send ATH to hang up + keyboard_send("ATH", delay_ms=50) + keyboard_send("enter", delay_ms=50) + + return { + "success": True, + "message": "Hang-up command sent (ATH)", + "port": f"COM{port}", + "hint": "Connection should terminate shortly. Check screen for confirmation.", + } diff --git a/src/dosbox_mcp/tools/peripheral.py b/src/dosbox_mcp/tools/peripheral.py index e458451..61a6622 100644 --- a/src/dosbox_mcp/tools/peripheral.py +++ b/src/dosbox_mcp/tools/peripheral.py @@ -1,4 +1,26 @@ -"""Peripheral tools: screenshot, serial communication, keyboard input.""" +"""Peripheral tools for interacting with DOSBox-X I/O. + +These tools simulate user input and capture output via QMP: +- Screenshots: Capture display state +- Keyboard: Send keystrokes to DOS programs +- Mouse: Move cursor and click +- Serial: Send data to COM ports (for terminal programs) +- Joystick: Game controller input +- Parallel: Printer port communication +- Clipboard: Copy/paste between host and DOS + +All peripheral tools use QMP (port 4444), not GDB. + +Screenshot workflow: +1. screenshot() - Capture current display, get filename +2. Read dosbox://screenshots/{filename} resource for image data +3. Or use dosbox://screen resource for live capture without saving + +Keyboard input examples: +- keyboard_send("dir") then keyboard_send("enter") +- keyboard_send("ctrl-c") for interrupt +- keyboard_send("alt-x") for menu shortcuts +""" import json import socket @@ -9,7 +31,7 @@ from ..state import manager # Default ports (fallback if manager not configured) DEFAULT_SERIAL_PORT = 5555 # nullmodem server:5555 -DEFAULT_QMP_PORT = 4444 # qmpserver port=4444 +DEFAULT_QMP_PORT = 4444 # qmpserver port=4444 def _get_qmp_port() -> int: @@ -39,12 +61,17 @@ def _tcp_send(host: str, port: int, data: bytes, timeout: float = 2.0) -> tuple[ except TimeoutError: return False, f"Connection timeout to {host}:{port}" except ConnectionRefusedError: - return False, f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?" + return ( + False, + f"Connection refused to {host}:{port} - is DOSBox-X running with serial enabled?", + ) except Exception as e: return False, f"Socket error: {e}" -def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0) -> dict: +def _qmp_command( + host: str, port: int, command: str, args: dict[str, Any] | None = None, timeout: float = 2.0 +) -> dict: """Send QMP command to DOSBox-X. QMP (QEMU Machine Protocol) is a JSON-based protocol for machine control. @@ -74,7 +101,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None if not chunk: break greeting += chunk - except (TimeoutError, socket.timeout): + except TimeoutError: pass # Expected - no more data # Build QMP command @@ -95,7 +122,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None if not chunk: break response += chunk - except (TimeoutError, socket.timeout): + except TimeoutError: pass sock.close() @@ -103,7 +130,7 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None # Parse response if response: try: - return json.loads(response.decode().strip().split('\n')[-1]) + return json.loads(response.decode().strip().split("\n")[-1]) except json.JSONDecodeError: return {"success": True, "raw_response": response.decode()} @@ -112,22 +139,37 @@ def _qmp_command(host: str, port: int, command: str, args: dict[str, Any] | None except TimeoutError: return {"success": False, "error": f"QMP connection timeout to {host}:{port}"} except ConnectionRefusedError: - return {"success": False, "error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?"} + return { + "success": False, + "error": "QMP connection refused - is DOSBox-X running with qmpserver enabled?", + } except Exception as e: return {"success": False, "error": f"QMP error: {e}"} def screenshot(filename: str | None = None) -> dict: - """Capture DOSBox-X display. + """Capture DOSBox-X display and save to file. - Uses QMP screendump command which calls DOSBox-X's internal capture function. - Returns a resource URI that can be used to fetch the screenshot. + Takes a screenshot of the current display. The image is saved to + DOSBox-X's capture directory and registered for access via resources. + + Workflow: + 1. Call screenshot() - returns filename in response + 2. Access image via dosbox://screenshots/{filename} resource + 3. Or use dosbox://screen resource for live capture without saving Args: - filename: Optional output filename (DOSBox-X uses auto-naming) + filename: Output filename (default: DOSBox-X auto-generates name) Returns: - Screenshot info including resource URI for fetching the image + - success: True if capture succeeded + - filename: Use this with dosbox://screenshots/{filename} + - resource_uri: Direct URI to access the image + - size_bytes: File size + + Example response: + {"success": true, "filename": "screenshot_001.png", ...} + Then access via: dosbox://screenshots/screenshot_001.png """ import os from datetime import datetime @@ -139,8 +181,13 @@ def screenshot(filename: str | None = None) -> dict: resources = None # Use QMP screendump command - it returns base64 PNG and saves to capture/ - result = _qmp_command("localhost", _get_qmp_port(), "screendump", - {"filename": filename or "screenshot.png"}, timeout=10.0) + result = _qmp_command( + "localhost", + _get_qmp_port(), + "screendump", + {"filename": filename or "screenshot.png"}, + timeout=10.0, + ) # Check for QMP error response if "error" in result: @@ -194,15 +241,12 @@ def screenshot(filename: str | None = None) -> dict: "size_bytes": ret.get("size", 0), } - # Include resource URI (the only way clients should access screenshots) + # Include resource URI for direct access if resource_uri: response["resource_uri"] = resource_uri - else: - # Fallback: provide filename so client knows what was created - # but encourage using dosbox://screenshots/latest resource - response["hint"] = "Use resource dosbox://screenshots/latest to fetch" - if screenshot_filename: - response["filename"] = screenshot_filename + if screenshot_filename: + response["filename"] = screenshot_filename + response["hint"] = f"Access via: dosbox://screenshots/{screenshot_filename}" return response @@ -220,51 +264,65 @@ def serial_send(data: str, port: int = 1) -> dict: This is useful for RIPscrip testing - send graphics commands to a program listening on COM1. + Requires serial port configured as "nullmodem" in launch(). + The data is sent over TCP to the nullmodem server socket. + Args: data: Data to send (text or hex with \\x prefix) port: COM port number (1 or 2) Returns: Send result + + Example: + launch(serial1="nullmodem") # Enable nullmodem on COM1 + serial_send("!|L0000001919\\r") # Send RIPscrip line command """ + # Parse hex escapes like \x1b for ESC def parse_data(s: str) -> bytes: result = bytearray() i = 0 while i < len(s): - if s[i:i+2] == '\\x' and i + 4 <= len(s): + if s[i : i + 2] == "\\x" and i + 4 <= len(s): try: - result.append(int(s[i+2:i+4], 16)) + result.append(int(s[i + 2 : i + 4], 16)) i += 4 continue except ValueError: pass - elif s[i:i+2] == '\\r': - result.append(0x0d) + elif s[i : i + 2] == "\\r": + result.append(0x0D) i += 2 continue - elif s[i:i+2] == '\\n': - result.append(0x0a) + elif s[i : i + 2] == "\\n": + result.append(0x0A) i += 2 continue result.append(ord(s[i])) i += 1 return bytes(result) - # Map COM port to TCP port (based on dosbox.conf config) - # serial1=nullmodem server:5555 - tcp_port_map = { - 1: 5555, # COM1 - 2: 5556, # COM2 (if configured) - } - - if port not in tcp_port_map: + # Validate port number + if port not in (1, 2): return { "success": False, "error": f"Invalid COM port {port}. Supported: 1, 2", } - tcp_port = tcp_port_map[port] + # Get TCP port from manager if running, otherwise use defaults + if manager.running: + tcp_port = manager.get_serial_tcp_port(port) + if tcp_port is None: + mode = manager.serial1_mode if port == 1 else manager.serial2_mode + return { + "success": False, + "error": f"COM{port} is not configured as nullmodem (current: {mode}). " + f'Use launch(serial{port}="nullmodem") to enable.', + } + else: + # Fallback to defaults when manager not running (for direct testing) + tcp_port = 5555 if port == 1 else 5556 try: byte_data = parse_data(data) @@ -289,51 +347,121 @@ def serial_send(data: str, port: int = 1) -> dict: return { "success": False, "error": message, - "hint": f"Ensure DOSBox-X is running with serial{port}=nullmodem server:{tcp_port}", + "hint": f'Ensure DOSBox-X is running with serial{port}="nullmodem". ' + f"Expected TCP port {tcp_port}. Use port_status() to check.", } # QMP keyboard key mapping (from DOSBox-X remotedebug qmp.cpp) QMP_KEY_MAP = { # Letters - "a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": "f", "g": "g", - "h": "h", "i": "i", "j": "j", "k": "k", "l": "l", "m": "m", "n": "n", - "o": "o", "p": "p", "q": "q", "r": "r", "s": "s", "t": "t", "u": "u", - "v": "v", "w": "w", "x": "x", "y": "y", "z": "z", + "a": "a", + "b": "b", + "c": "c", + "d": "d", + "e": "e", + "f": "f", + "g": "g", + "h": "h", + "i": "i", + "j": "j", + "k": "k", + "l": "l", + "m": "m", + "n": "n", + "o": "o", + "p": "p", + "q": "q", + "r": "r", + "s": "s", + "t": "t", + "u": "u", + "v": "v", + "w": "w", + "x": "x", + "y": "y", + "z": "z", # Numbers - "0": "0", "1": "1", "2": "2", "3": "3", "4": "4", - "5": "5", "6": "6", "7": "7", "8": "8", "9": "9", + "0": "0", + "1": "1", + "2": "2", + "3": "3", + "4": "4", + "5": "5", + "6": "6", + "7": "7", + "8": "8", + "9": "9", # Function keys - "f1": "f1", "f2": "f2", "f3": "f3", "f4": "f4", "f5": "f5", "f6": "f6", - "f7": "f7", "f8": "f8", "f9": "f9", "f10": "f10", "f11": "f11", "f12": "f12", + "f1": "f1", + "f2": "f2", + "f3": "f3", + "f4": "f4", + "f5": "f5", + "f6": "f6", + "f7": "f7", + "f8": "f8", + "f9": "f9", + "f10": "f10", + "f11": "f11", + "f12": "f12", # Special keys - "ret": "ret", "enter": "ret", "return": "ret", - "esc": "esc", "escape": "esc", - "spc": "spc", "space": "spc", " ": "spc", + "ret": "ret", + "enter": "ret", + "return": "ret", + "esc": "esc", + "escape": "esc", + "spc": "spc", + "space": "spc", + " ": "spc", "tab": "tab", - "backspace": "backspace", "bs": "backspace", - "delete": "delete", "del": "delete", - "insert": "insert", "ins": "insert", - "home": "home", "end": "end", - "pgup": "pgup", "pageup": "pgup", - "pgdn": "pgdn", "pagedown": "pgdn", + "backspace": "backspace", + "bs": "backspace", + "delete": "delete", + "del": "delete", + "insert": "insert", + "ins": "insert", + "home": "home", + "end": "end", + "pgup": "pgup", + "pageup": "pgup", + "pgdn": "pgdn", + "pagedown": "pgdn", # Arrow keys - "up": "up", "down": "down", "left": "left", "right": "right", + "up": "up", + "down": "down", + "left": "left", + "right": "right", # Modifiers - "shift": "shift", "ctrl": "ctrl", "alt": "alt", - "shift_r": "shift_r", "ctrl_r": "ctrl_r", "alt_r": "alt_r", + "shift": "shift", + "ctrl": "ctrl", + "alt": "alt", + "shift_r": "shift_r", + "ctrl_r": "ctrl_r", + "alt_r": "alt_r", # Punctuation - "minus": "minus", "-": "minus", - "equal": "equal", "=": "equal", - "bracket_left": "bracket_left", "[": "bracket_left", - "bracket_right": "bracket_right", "]": "bracket_right", - "backslash": "backslash", "\\": "backslash", - "semicolon": "semicolon", ";": "semicolon", - "apostrophe": "apostrophe", "'": "apostrophe", - "grave_accent": "grave_accent", "`": "grave_accent", - "comma": "comma", ",": "comma", - "dot": "dot", ".": "dot", - "slash": "slash", "/": "slash", + "minus": "minus", + "-": "minus", + "equal": "equal", + "=": "equal", + "bracket_left": "bracket_left", + "[": "bracket_left", + "bracket_right": "bracket_right", + "]": "bracket_right", + "backslash": "backslash", + "\\": "backslash", + "semicolon": "semicolon", + ";": "semicolon", + "apostrophe": "apostrophe", + "'": "apostrophe", + "grave_accent": "grave_accent", + "`": "grave_accent", + "comma": "comma", + ",": "comma", + "dot": "dot", + ".": "dot", + "slash": "slash", + "/": "slash", } @@ -369,13 +497,13 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict: while i < len(keys): # Check for modifier combinations like "ctrl-c" modifier = None - if i + 5 < len(keys) and keys[i:i+5].lower() == "ctrl-": + if i + 5 < len(keys) and keys[i : i + 5].lower() == "ctrl-": modifier = "ctrl" i += 5 - elif i + 4 < len(keys) and keys[i:i+4].lower() == "alt-": + elif i + 4 < len(keys) and keys[i : i + 4].lower() == "alt-": modifier = "alt" i += 4 - elif i + 6 < len(keys) and keys[i:i+6].lower() == "shift-": + elif i + 6 < len(keys) and keys[i : i + 6].lower() == "shift-": modifier = "shift" i += 6 @@ -398,14 +526,29 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict: # Handle uppercase (needs shift) if char.isupper() and modifier is None: modifier = "shift" - elif char in "!@#$%^&*()_+{}|:\"<>?": + elif char in '!@#$%^&*()_+{}|:"<>?': # Shifted punctuation - map to base key with shift shift_map = { - "!": "1", "@": "2", "#": "3", "$": "4", "%": "5", - "^": "6", "&": "7", "*": "8", "(": "9", ")": "0", - "_": "minus", "+": "equal", "{": "bracket_left", - "}": "bracket_right", "|": "backslash", ":": "semicolon", - "\"": "apostrophe", "<": "comma", ">": "dot", "?": "slash", + "!": "1", + "@": "2", + "#": "3", + "$": "4", + "%": "5", + "^": "6", + "&": "7", + "*": "8", + "(": "9", + ")": "0", + "_": "minus", + "+": "equal", + "{": "bracket_left", + "}": "bracket_right", + "|": "backslash", + ":": "semicolon", + '"': "apostrophe", + "<": "comma", + ">": "dot", + "?": "slash", } if char in shift_map: key_to_send = shift_map[char] @@ -414,7 +557,7 @@ def keyboard_send(keys: str, delay_ms: int = 10) -> dict: i += 1 if key_to_send is None: - errors.append(f"Unknown key: {keys[i-1] if i > 0 else '?'}") + errors.append(f"Unknown key: {keys[i - 1] if i > 0 else '?'}") continue # Build QMP command arguments @@ -472,14 +615,8 @@ def mouse_move(dx: int, dy: int) -> dict: # QMP input-send-event for relative mouse movement args = { "events": [ - { - "type": "rel", - "data": {"axis": "x", "value": dx} - }, - { - "type": "rel", - "data": {"axis": "y", "value": dy} - } + {"type": "rel", "data": {"axis": "x", "value": dx}}, + {"type": "rel", "data": {"axis": "y", "value": dy}}, ] } @@ -538,11 +675,7 @@ def mouse_click(button: str = "left", double: bool = False) -> dict: for _ in range(clicks): # Press - args = { - "events": [ - {"type": "btn", "data": {"button": btn, "down": True}} - ] - } + args = {"events": [{"type": "btn", "data": {"button": btn, "down": True}}]} result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) if "error" in result: errors.append(f"Press failed: {result.get('error')}") @@ -550,11 +683,7 @@ def mouse_click(button: str = "left", double: bool = False) -> dict: time.sleep(0.05) # Brief delay between press and release # Release - args = { - "events": [ - {"type": "btn", "data": {"button": btn, "down": False}} - ] - } + args = {"events": [{"type": "btn", "data": {"button": btn, "down": False}}]} result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) if "error" in result: errors.append(f"Release failed: {result.get('error')}") @@ -614,9 +743,7 @@ def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str = time.sleep(0.05) # Press button - args = { - "events": [{"type": "btn", "data": {"button": btn, "down": True}}] - } + args = {"events": [{"type": "btn", "data": {"button": btn, "down": True}}]} result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) if "error" in result: errors.append(f"Button press failed: {result.get('error')}") @@ -647,9 +774,7 @@ def mouse_drag(start_x: int, start_y: int, end_x: int, end_y: int, button: str = time.sleep(0.05) # Release button - args = { - "events": [{"type": "btn", "data": {"button": btn, "down": False}}] - } + args = {"events": [{"type": "btn", "data": {"button": btn, "down": False}}]} result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", args) if "error" in result: errors.append(f"Button release failed: {result.get('error')}") @@ -683,11 +808,12 @@ def clipboard_copy() -> dict: from your host system's clipboard. """ # DOSBox-X: Ctrl+F5 = Copy screen text to host clipboard - result = _qmp_command("localhost", _get_qmp_port(), "send-key", - {"keys": [ - {"type": "qcode", "data": "ctrl"}, - {"type": "qcode", "data": "f5"} - ]}) + result = _qmp_command( + "localhost", + _get_qmp_port(), + "send-key", + {"keys": [{"type": "qcode", "data": "ctrl"}, {"type": "qcode", "data": "f5"}]}, + ) if "error" in result: return { @@ -716,11 +842,12 @@ def clipboard_paste() -> dict: Set your host clipboard content first, then call this. """ # DOSBox-X: Ctrl+F6 = Paste from host clipboard - result = _qmp_command("localhost", _get_qmp_port(), "send-key", - {"keys": [ - {"type": "qcode", "data": "ctrl"}, - {"type": "qcode", "data": "f6"} - ]}) + result = _qmp_command( + "localhost", + _get_qmp_port(), + "send-key", + {"keys": [{"type": "qcode", "data": "ctrl"}, {"type": "qcode", "data": "f6"}]}, + ) if "error" in result: return { @@ -733,3 +860,262 @@ def clipboard_paste() -> dict: "message": "Host clipboard pasted into DOSBox-X (Ctrl+F6)", "hint": "Text from host clipboard was typed as keystrokes", } + + +# ============================================================================= +# Joystick Tools +# ============================================================================= + + +def joystick_move(which: int = 0, x: float | None = None, y: float | None = None) -> dict: + """Move joystick axis to specified position. + + Sets joystick axis values. Values range from -1.0 (left/up) to 1.0 (right/down), + with 0.0 being center position. + + Args: + which: Joystick number (0 or 1) + x: X-axis value (-1.0 to 1.0), None to leave unchanged + y: Y-axis value (-1.0 to 1.0), None to leave unchanged + + Returns: + Result dict with new axis values + + Examples: + joystick_move(0, x=1.0) # Push right + joystick_move(0, y=-1.0) # Push up + joystick_move(0, x=0.5, y=0.5) # Diagonal down-right + joystick_move(0, x=0, y=0) # Center position + + Note: + Requires QMP joystick support patch applied to DOSBox-X. + """ + events = [] + + if x is not None: + # Clamp to valid range + x = max(-1.0, min(1.0, x)) + events.append({"type": "joy", "data": {"which": which, "axis": "x", "value": x}}) + + if y is not None: + y = max(-1.0, min(1.0, y)) + events.append({"type": "joy", "data": {"which": which, "axis": "y", "value": y}}) + + if not events: + return { + "success": False, + "error": "Must specify at least x or y axis value", + } + + result = _qmp_command("localhost", _get_qmp_port(), "input-send-event", {"events": events}) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict) and "CommandNotFound" in str(error): + return { + "success": False, + "error": "Joystick QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.", + } + return { + "success": False, + "error": str(error), + } + + return { + "success": True, + "message": f"Joystick {which} moved", + "joystick": which, + "x": x, + "y": y, + } + + +def joystick_button(which: int = 0, button: int = 0, pressed: bool = True) -> dict: + """Press or release joystick button. + + Args: + which: Joystick number (0 or 1) + button: Button number (0-1) + pressed: True to press, False to release + + Returns: + Result dict with button state + + Examples: + joystick_button(0, 0, True) # Press button 0 + joystick_button(0, 0, False) # Release button 0 + joystick_button(0, 1) # Press button 1 + + Note: + Requires QMP joystick support patch applied to DOSBox-X. + """ + result = _qmp_command( + "localhost", + _get_qmp_port(), + "input-send-event", + { + "events": [ + {"type": "joybtn", "data": {"which": which, "button": button, "down": pressed}} + ] + }, + ) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict) and "CommandNotFound" in str(error): + return { + "success": False, + "error": "Joystick QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.", + } + return { + "success": False, + "error": str(error), + } + + action = "pressed" if pressed else "released" + return { + "success": True, + "message": f"Joystick {which} button {button} {action}", + "joystick": which, + "button": button, + "pressed": pressed, + } + + +# ============================================================================= +# Parallel Port Tools +# ============================================================================= + + +def parallel_write(data: list[int] | str | bytes, port: int = 1) -> dict: + """Write data to parallel port (LPT). + + Sends bytes to the specified parallel port. Useful for testing + printer output or parallel port communication in DOS programs. + + Args: + data: Data to send - can be: + - List of integers (0-255) + - String (ASCII encoded) + - Bytes + port: LPT port number (1-3) + + Returns: + Result dict with bytes written + + Examples: + parallel_write([0x41, 0x42, 0x43], port=1) # Send "ABC" + parallel_write("Hello LPT1", port=1) + parallel_write(b"\\x1b@", port=1) # ESC @ (printer reset) + + Note: + Requires QMP parallel port support patch applied to DOSBox-X. + DOSBox-X must have parallel port configured in dosbox.conf. + """ + # Convert data to list of integers + if isinstance(data, str): + byte_list = list(data.encode("ascii", errors="replace")) + elif isinstance(data, bytes): + byte_list = list(data) + elif isinstance(data, list): + byte_list = [int(b) & 0xFF for b in data] + else: + return { + "success": False, + "error": f"Invalid data type: {type(data).__name__}. Use list, str, or bytes.", + } + + result = _qmp_command( + "localhost", _get_qmp_port(), "parport-write", {"port": port, "data": byte_list} + ) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Parallel port QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + written = 0 + if "return" in result and isinstance(result["return"], dict): + written = result["return"].get("written", len(byte_list)) + + return { + "success": True, + "message": f"Wrote {written} bytes to LPT{port}", + "port": port, + "bytes_written": written, + } + + +def parallel_read(port: int = 1) -> dict: + """Read parallel port status and data register. + + Reads the current state of the parallel port registers. + Useful for checking printer status or bidirectional communication. + + Args: + port: LPT port number (1-3) + + Returns: + Result dict with: + - data: Data register value (last byte written or received) + - status: Status register (busy, ack, paper out, etc.) + - control: Control register (strobe, auto-feed, etc.) + + Status register bits: + - Bit 7: Busy (inverted) + - Bit 6: Acknowledge + - Bit 5: Paper Out + - Bit 4: Select + - Bit 3: Error + + Note: + Requires QMP parallel port support patch applied to DOSBox-X. + """ + result = _qmp_command("localhost", _get_qmp_port(), "parport-read", {"port": port}) + + if "error" in result: + error = result.get("error", {}) + if isinstance(error, dict): + desc = error.get("desc", str(error)) + if "CommandNotFound" in desc: + return { + "success": False, + "error": "Parallel port QMP support not available. Apply qmp-joystick-parport.patch to DOSBox-X.", + } + return {"success": False, "error": desc} + return {"success": False, "error": str(error)} + + if "return" in result and isinstance(result["return"], dict): + ret = result["return"] + status = ret.get("status", 0) + + # Decode status bits + status_flags = { + "busy": not bool(status & 0x80), # Inverted + "ack": bool(status & 0x40), + "paper_out": bool(status & 0x20), + "select": bool(status & 0x10), + "error": bool(status & 0x08), + } + + return { + "success": True, + "port": port, + "data": ret.get("data", 0), + "status": status, + "status_flags": status_flags, + "control": ret.get("control", 0), + } + + return { + "success": True, + "message": "Read completed but no data returned", + "port": port, + } diff --git a/src/dosbox_mcp/types.py b/src/dosbox_mcp/types.py index e131476..6fdb1fe 100644 --- a/src/dosbox_mcp/types.py +++ b/src/dosbox_mcp/types.py @@ -138,15 +138,24 @@ class Registers: - OF (Overflow): bit 11 """ flag_bits = { - 'cf': 0, 'carry': 0, - 'pf': 2, 'parity': 2, - 'af': 4, 'aux': 4, - 'zf': 6, 'zero': 6, - 'sf': 7, 'sign': 7, - 'tf': 8, 'trap': 8, - 'if': 9, 'interrupt': 9, - 'df': 10, 'direction': 10, - 'of': 11, 'overflow': 11, + "cf": 0, + "carry": 0, + "pf": 2, + "parity": 2, + "af": 4, + "aux": 4, + "zf": 6, + "zero": 6, + "sf": 7, + "sign": 7, + "tf": 8, + "trap": 8, + "if": 9, + "interrupt": 9, + "df": 10, + "direction": 10, + "of": 11, + "overflow": 11, } bit = flag_bits.get(flag.lower()) if bit is None: @@ -157,47 +166,47 @@ class Registers: """Convert to dictionary for JSON serialization.""" return { # 32-bit registers - 'eax': f'{self.eax:08x}', - 'ecx': f'{self.ecx:08x}', - 'edx': f'{self.edx:08x}', - 'ebx': f'{self.ebx:08x}', - 'esp': f'{self.esp:08x}', - 'ebp': f'{self.ebp:08x}', - 'esi': f'{self.esi:08x}', - 'edi': f'{self.edi:08x}', - 'eip': f'{self.eip:08x}', - 'eflags': f'{self.eflags:08x}', + "eax": f"{self.eax:08x}", + "ecx": f"{self.ecx:08x}", + "edx": f"{self.edx:08x}", + "ebx": f"{self.ebx:08x}", + "esp": f"{self.esp:08x}", + "ebp": f"{self.ebp:08x}", + "esi": f"{self.esi:08x}", + "edi": f"{self.edi:08x}", + "eip": f"{self.eip:08x}", + "eflags": f"{self.eflags:08x}", # 16-bit aliases - 'ax': f'{self.ax:04x}', - 'cx': f'{self.cx:04x}', - 'dx': f'{self.dx:04x}', - 'bx': f'{self.bx:04x}', - 'sp': f'{self.sp:04x}', - 'bp': f'{self.bp:04x}', - 'si': f'{self.si:04x}', - 'di': f'{self.di:04x}', - 'ip': f'{self.ip:04x}', + "ax": f"{self.ax:04x}", + "cx": f"{self.cx:04x}", + "dx": f"{self.dx:04x}", + "bx": f"{self.bx:04x}", + "sp": f"{self.sp:04x}", + "bp": f"{self.bp:04x}", + "si": f"{self.si:04x}", + "di": f"{self.di:04x}", + "ip": f"{self.ip:04x}", # Segment registers - 'cs': f'{self.cs:04x}', - 'ss': f'{self.ss:04x}', - 'ds': f'{self.ds:04x}', - 'es': f'{self.es:04x}', - 'fs': f'{self.fs:04x}', - 'gs': f'{self.gs:04x}', + "cs": f"{self.cs:04x}", + "ss": f"{self.ss:04x}", + "ds": f"{self.ds:04x}", + "es": f"{self.es:04x}", + "fs": f"{self.fs:04x}", + "gs": f"{self.gs:04x}", # Computed addresses - 'cs:ip': f'{self.cs:04x}:{self.ip:04x}', - 'ss:sp': f'{self.ss:04x}:{self.sp:04x}', + "cs:ip": f"{self.cs:04x}:{self.ip:04x}", + "ss:sp": f"{self.ss:04x}:{self.sp:04x}", # Flags - 'flags': { - 'carry': self.flag_set('cf'), - 'parity': self.flag_set('pf'), - 'aux': self.flag_set('af'), - 'zero': self.flag_set('zf'), - 'sign': self.flag_set('sf'), - 'trap': self.flag_set('tf'), - 'interrupt': self.flag_set('if'), - 'direction': self.flag_set('df'), - 'overflow': self.flag_set('of'), + "flags": { + "carry": self.flag_set("cf"), + "parity": self.flag_set("pf"), + "aux": self.flag_set("af"), + "zero": self.flag_set("zf"), + "sign": self.flag_set("sf"), + "trap": self.flag_set("tf"), + "interrupt": self.flag_set("if"), + "direction": self.flag_set("df"), + "overflow": self.flag_set("of"), }, } @@ -217,11 +226,11 @@ class Breakpoint: def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { - 'id': self.id, - 'address': f'{self.address:05x}', - 'enabled': self.enabled, - 'hit_count': self.hit_count, - 'original': self.original, + "id": self.id, + "address": f"{self.address:05x}", + "enabled": self.enabled, + "hit_count": self.hit_count, + "original": self.original, } @@ -237,10 +246,10 @@ class StopEvent: def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { - 'reason': self.reason.name.lower(), - 'address': f'{self.address:05x}', - 'signal': self.signal, - 'breakpoint_id': self.breakpoint_id, + "reason": self.reason.name.lower(), + "address": f"{self.address:05x}", + "signal": self.signal, + "breakpoint_id": self.breakpoint_id, } @@ -257,18 +266,18 @@ class MemoryRegion: def to_ascii(self) -> str: """Return data as ASCII with non-printables as dots.""" - return ''.join(chr(b) if 32 <= b < 127 else '.' for b in self.data) + return "".join(chr(b) if 32 <= b < 127 else "." for b in self.data) def to_dict(self, format: Literal["hex", "ascii", "both"] = "both") -> dict: """Convert to dictionary for JSON serialization.""" result = { - 'address': f'{self.address:05x}', - 'length': len(self.data), + "address": f"{self.address:05x}", + "length": len(self.data), } if format in ("hex", "both"): - result['hex'] = self.to_hex() + result["hex"] = self.to_hex() if format in ("ascii", "both"): - result['ascii'] = self.to_ascii() + result["ascii"] = self.to_ascii() return result @@ -284,9 +293,9 @@ class DisassemblyLine: def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { - 'address': f'{self.address:05x}', - 'bytes': self.bytes_hex, - 'instruction': f'{self.mnemonic} {self.operands}'.strip(), + "address": f"{self.address:05x}", + "bytes": self.bytes_hex, + "instruction": f"{self.mnemonic} {self.operands}".strip(), } @@ -304,10 +313,10 @@ class DOSBoxStatus: def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { - 'running': self.running, - 'connected': self.connected, - 'host': self.host, - 'port': self.port, - 'pid': self.pid, - 'breakpoint_count': len(self.breakpoints), + "running": self.running, + "connected": self.connected, + "host": self.host, + "port": self.port, + "pid": self.pid, + "breakpoint_count": len(self.breakpoints), } diff --git a/src/dosbox_mcp/utils.py b/src/dosbox_mcp/utils.py index accc047..da1b47a 100644 --- a/src/dosbox_mcp/utils.py +++ b/src/dosbox_mcp/utils.py @@ -1,7 +1,6 @@ """Utility functions for DOSBox-X MCP Server.""" - def parse_address(addr: str) -> int: """Parse a DOS address in various formats. @@ -30,8 +29,8 @@ def parse_address(addr: str) -> int: addr = addr.strip().lower() # Segment:offset format (e.g., "1234:5678") - if ':' in addr: - parts = addr.split(':') + if ":" in addr: + parts = addr.split(":") if len(parts) != 2: raise ValueError(f"Invalid segment:offset format: {addr}") segment = int(parts[0], 16) @@ -39,15 +38,15 @@ def parse_address(addr: str) -> int: return (segment << 4) + offset # Decimal format (e.g., "#12345") - if addr.startswith('#'): + if addr.startswith("#"): return int(addr[1:], 10) # Hex with suffix (e.g., "12345h") - if addr.endswith('h'): + if addr.endswith("h"): return int(addr[:-1], 16) # Hex with prefix (e.g., "0x12345") - if addr.startswith('0x'): + if addr.startswith("0x"): return int(addr, 16) # Assume hex @@ -76,13 +75,13 @@ def format_address(addr: int, style: str = "flat") -> str: # Convert to segment:offset (canonical form with offset < 16) segment = addr >> 4 offset = addr & 0x0F - return f'{segment:04x}:{offset:04x}' + return f"{segment:04x}:{offset:04x}" elif style == "both": segment = addr >> 4 offset = addr & 0x0F - return f'{addr:05x} ({segment:04x}:{offset:04x})' + return f"{addr:05x} ({segment:04x}:{offset:04x})" else: # flat - return f'{addr:05x}' + return f"{addr:05x}" def calculate_checksum(data: str) -> str: @@ -98,7 +97,7 @@ def calculate_checksum(data: str) -> str: Two-character hex checksum """ total = sum(ord(c) for c in data) - return f'{total & 0xFF:02x}' + return f"{total & 0xFF:02x}" def encode_hex(data: bytes) -> str: @@ -118,11 +117,11 @@ def escape_binary(data: bytes) -> bytes: Characters that must be escaped: $, #, }, * """ result = bytearray() - escape_chars = {0x24, 0x23, 0x7d, 0x2a} # $, #, }, * + escape_chars = {0x24, 0x23, 0x7D, 0x2A} # $, #, }, * for b in data: if b in escape_chars: - result.append(0x7d) + result.append(0x7D) result.append(b ^ 0x20) else: result.append(b) @@ -135,7 +134,7 @@ def unescape_binary(data: bytes) -> bytes: result = bytearray() i = 0 while i < len(data): - if data[i] == 0x7d and i + 1 < len(data): + if data[i] == 0x7D and i + 1 < len(data): result.append(data[i + 1] ^ 0x20) i += 2 else: @@ -160,26 +159,26 @@ def parse_stop_reply(response: str) -> tuple[str, dict]: if not response: return ("unknown", {}) - if response.startswith('S'): + if response.startswith("S"): signal = int(response[1:3], 16) return ("signal", {"signal": signal}) - if response.startswith('T'): + if response.startswith("T"): signal = int(response[1:3], 16) info = {"signal": signal} # Parse additional key:value pairs - pairs = response[3:].rstrip(';').split(';') + pairs = response[3:].rstrip(";").split(";") for pair in pairs: - if ':' in pair: - key, value = pair.split(':', 1) + if ":" in pair: + key, value = pair.split(":", 1) info[key] = value return ("signal", info) - if response.startswith('W'): + if response.startswith("W"): code = int(response[1:3], 16) return ("exit", {"code": code}) - if response.startswith('X'): + if response.startswith("X"): signal = int(response[1:3], 16) return ("terminated", {"signal": signal}) @@ -199,13 +198,13 @@ def hexdump(data: bytes, address: int = 0, width: int = 16) -> str: """ lines = [] for i in range(0, len(data), width): - chunk = data[i:i + width] - hex_part = ' '.join(f'{b:02x}' for b in chunk) - ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk) + chunk = data[i : i + width] + hex_part = " ".join(f"{b:02x}" for b in chunk) + ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) # Pad hex part for alignment hex_part = hex_part.ljust(width * 3 - 1) - lines.append(f'{address + i:05x} {hex_part} |{ascii_part}|') - return '\n'.join(lines) + lines.append(f"{address + i:05x} {hex_part} |{ascii_part}|") + return "\n".join(lines) def parse_registers_x86(hex_data: str) -> dict[str, int]: @@ -219,40 +218,40 @@ def parse_registers_x86(hex_data: str) -> dict[str, int]: Segment registers are 4 hex characters. """ # Remove any whitespace - hex_data = hex_data.replace(' ', '').replace('\n', '') + hex_data = hex_data.replace(" ", "").replace("\n", "") def read_le32(offset: int) -> int: """Read 32-bit little-endian value from hex string.""" - chunk = hex_data[offset:offset + 8] + chunk = hex_data[offset : offset + 8] if len(chunk) < 8: return 0 # GDB sends in target byte order (little-endian for x86) - return int.from_bytes(bytes.fromhex(chunk), 'little') + return int.from_bytes(bytes.fromhex(chunk), "little") def read_le16(offset: int) -> int: """Read 16-bit little-endian value from hex string.""" - chunk = hex_data[offset:offset + 4] + chunk = hex_data[offset : offset + 4] if len(chunk) < 4: return 0 - return int.from_bytes(bytes.fromhex(chunk), 'little') + return int.from_bytes(bytes.fromhex(chunk), "little") # Parse in GDB order regs = {} pos = 0 # General purpose registers (32-bit each = 8 hex chars) - for name in ['eax', 'ecx', 'edx', 'ebx', 'esp', 'ebp', 'esi', 'edi']: + for name in ["eax", "ecx", "edx", "ebx", "esp", "ebp", "esi", "edi"]: regs[name] = read_le32(pos) pos += 8 # EIP and EFLAGS - regs['eip'] = read_le32(pos) + regs["eip"] = read_le32(pos) pos += 8 - regs['eflags'] = read_le32(pos) + regs["eflags"] = read_le32(pos) pos += 8 # Segment registers (32-bit in GDB response, but only 16-bit meaningful) - for name in ['cs', 'ss', 'ds', 'es', 'fs', 'gs']: + for name in ["cs", "ss", "ds", "es", "fs", "gs"]: regs[name] = read_le32(pos) & 0xFFFF pos += 8