From 1a26109fc1f0bed6f3a53708878782d2c30b5103 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 27 Jan 2026 22:05:59 -0700 Subject: [PATCH] Initial MCP serial server implementation FastMCP server for serial port access via Model Context Protocol: - 10 tools: list, open, close, read, write, configure, flush, status - 4 dynamic resources: ports list, data, status, raw hex - USB device filtering (hides phantom ttyS ports by default) - Full pyserial support with configurable baudrate/parity/etc --- .env.example | 13 ++ .gitignore | 53 +++++ README.md | 74 ++++++ pyproject.toml | 51 ++++ src/mcserial/__init__.py | 5 + src/mcserial/server.py | 489 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 685 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 src/mcserial/__init__.py create mode 100644 src/mcserial/server.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..997d1fe --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +# MCP Serial Server Configuration + +# Default baud rate for serial connections +MCSERIAL_DEFAULT_BAUDRATE=9600 + +# Default timeout in seconds for read operations +MCSERIAL_DEFAULT_TIMEOUT=1.0 + +# Maximum number of concurrent serial port connections +MCSERIAL_MAX_CONNECTIONS=10 + +# Log level (DEBUG, INFO, WARNING, ERROR) +MCSERIAL_LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cb88f92 --- /dev/null +++ b/.gitignore @@ -0,0 +1,53 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# uv +.uv/ +uv.lock + +# Environment +.env +.env.local + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..fe25901 --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# MCP Serial Server + +FastMCP server for serial port access via Model Context Protocol. + +## Features + +- **Tools** for serial port control (open, close, write, configure) +- **Dynamic Resources** for reading data (`serial://{port}/data`) +- Full pyserial support (baudrate, parity, stop bits, etc.) +- Multiple concurrent port connections +- Raw byte and text modes + +## Installation + +```bash +# With uvx (recommended) +uvx mcserial + +# Or install directly +uv pip install mcserial +``` + +## Usage with Claude Code + +```bash +# Add to Claude Code +claude mcp add mcserial "uvx mcserial" +``` + +## Tools + +| Tool | Description | +|------|-------------| +| `list_serial_ports` | Discover available serial ports | +| `open_serial_port` | Open a connection with config | +| `close_serial_port` | Close a connection | +| `write_serial` | Send text data | +| `write_serial_bytes` | Send raw bytes | +| `read_serial` | Read available data | +| `read_serial_line` | Read until newline | +| `configure_serial` | Change port settings | +| `flush_serial` | Clear buffers | +| `get_connection_status` | List open connections | + +## Resources + +| URI | Description | +|-----|-------------| +| `serial://ports` | List available ports | +| `serial://{port}/data` | Read data from open port | +| `serial://{port}/status` | Port configuration info | +| `serial://{port}/raw` | Read as hex dump | + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `MCSERIAL_DEFAULT_BAUDRATE` | 9600 | Default baud rate | +| `MCSERIAL_DEFAULT_TIMEOUT` | 1.0 | Read timeout (seconds) | +| `MCSERIAL_MAX_CONNECTIONS` | 10 | Max concurrent ports | + +## Example Workflow + +``` +1. list_serial_ports → find /dev/ttyUSB0 +2. open_serial_port(port="/dev/ttyUSB0", baudrate=115200) +3. write_serial(port="/dev/ttyUSB0", data="AT\r\n") +4. Read resource: serial:///dev/ttyUSB0/data +5. close_serial_port(port="/dev/ttyUSB0") +``` + +## License + +MIT diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9b59ce4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,51 @@ +[project] +name = "mcserial" +version = "2025.01.27" +description = "MCP Serial Port Server - FastMCP server for serial port access" +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +authors = [{ name = "Ryan Malloy", email = "ryan@supported.systems" }] +keywords = ["mcp", "serial", "fastmcp", "pyserial", "uart", "rs232"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Communications", + "Topic :: System :: Hardware", +] +dependencies = [ + "fastmcp>=2.14.4", + "pyserial>=3.5", +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.9.0", + "pytest>=8.0.0", +] + +[project.scripts] +mcserial = "mcserial:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/mcserial"] + +[tool.ruff] +target-version = "py310" +line-length = 100 + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "SIM"] +ignore = ["E501"] + +[tool.ruff.lint.isort] +known-first-party = ["mcserial"] diff --git a/src/mcserial/__init__.py b/src/mcserial/__init__.py new file mode 100644 index 0000000..81ea2f9 --- /dev/null +++ b/src/mcserial/__init__.py @@ -0,0 +1,5 @@ +"""MCP Serial Port Server - FastMCP server for serial port access.""" + +from mcserial.server import main, mcp + +__all__ = ["main", "mcp"] diff --git a/src/mcserial/server.py b/src/mcserial/server.py new file mode 100644 index 0000000..b92d374 --- /dev/null +++ b/src/mcserial/server.py @@ -0,0 +1,489 @@ +"""FastMCP Serial Port Server - Tools and Resources for serial communication.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import Literal + +import serial +import serial.tools.list_ports +from fastmcp import FastMCP + +# Configuration from environment +DEFAULT_BAUDRATE = int(os.getenv("MCSERIAL_DEFAULT_BAUDRATE", "9600")) +DEFAULT_TIMEOUT = float(os.getenv("MCSERIAL_DEFAULT_TIMEOUT", "1.0")) +MAX_CONNECTIONS = int(os.getenv("MCSERIAL_MAX_CONNECTIONS", "10")) + + +@dataclass +class SerialConnection: + """Tracks an open serial port connection.""" + + port: str + connection: serial.Serial + buffer: bytes = field(default_factory=bytes) + + +# Active connections registry +_connections: dict[str, SerialConnection] = {} + +mcp = FastMCP( + name="mcserial", + instructions="""Serial port MCP server. Use tools to open/close/write to serial ports. +Use resources to read data from open ports (serial://{port}/data). +Always list_serial_ports first to discover available ports.""", +) + + +# ============================================================================ +# TOOLS - Actions that modify state +# ============================================================================ + + +@mcp.tool() +def list_serial_ports(usb_only: bool = True) -> list[dict]: + """List available serial ports on the system. + + Args: + usb_only: If True (default), only show USB serial devices. + Set False to include legacy/phantom ttyS ports. + + Returns a list of ports with their device path, description, and hardware ID. + Call this first to discover what ports are available before opening one. + """ + ports = [] + for port in serial.tools.list_ports.comports(): + # Filter out phantom/legacy ports if usb_only is True + if usb_only and port.hwid == "n/a": + continue + ports.append({ + "device": port.device, + "description": port.description, + "hwid": port.hwid, + "manufacturer": port.manufacturer, + "product": port.product, + "serial_number": port.serial_number, + "is_open": port.device in _connections, + }) + # Sort USB ports first (ttyUSB, ttyACM), then others + ports.sort(key=lambda p: (0 if "USB" in p["device"] or "ACM" in p["device"] else 1, p["device"])) + return ports + + +@mcp.tool() +def open_serial_port( + port: str, + baudrate: int = DEFAULT_BAUDRATE, + bytesize: Literal[5, 6, 7, 8] = 8, + parity: Literal["N", "E", "O", "M", "S"] = "N", + stopbits: Literal[1, 1.5, 2] = 1, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """Open a serial port connection. + + Args: + port: Device path (e.g., '/dev/ttyUSB0', 'COM3') + baudrate: Baud rate (default from env or 9600) + bytesize: Data bits (5, 6, 7, or 8) + parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space) + stopbits: Stop bits (1, 1.5, or 2) + timeout: Read timeout in seconds + + Returns: + Connection status and details + """ + if port in _connections: + return {"error": f"Port {port} is already open", "success": False} + + if len(_connections) >= MAX_CONNECTIONS: + return {"error": f"Maximum connections ({MAX_CONNECTIONS}) reached", "success": False} + + try: + conn = serial.Serial( + port=port, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + timeout=timeout, + ) + _connections[port] = SerialConnection(port=port, connection=conn) + return { + "success": True, + "port": port, + "baudrate": baudrate, + "bytesize": bytesize, + "parity": parity, + "stopbits": stopbits, + "resource_uri": f"serial://{port}/data", + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def close_serial_port(port: str) -> dict: + """Close an open serial port connection. + + Args: + port: Device path of the port to close + + Returns: + Status of the close operation + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + _connections[port].connection.close() + del _connections[port] + return {"success": True, "port": port, "message": "Port closed"} + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def write_serial(port: str, data: str, encoding: str = "utf-8") -> dict: + """Write data to an open serial port. + + Args: + port: Device path of the port to write to + data: String data to send + encoding: Character encoding (default: utf-8) + + Returns: + Number of bytes written + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + encoded = data.encode(encoding) + bytes_written = conn.write(encoded) + conn.flush() + return {"success": True, "bytes_written": bytes_written, "port": port} + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def write_serial_bytes(port: str, data: list[int]) -> dict: + """Write raw bytes to an open serial port. + + Args: + port: Device path of the port to write to + data: List of byte values (0-255) + + Returns: + Number of bytes written + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + raw_bytes = bytes(data) + bytes_written = conn.write(raw_bytes) + conn.flush() + return {"success": True, "bytes_written": bytes_written, "port": port} + except (serial.SerialException, ValueError) as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def read_serial( + port: str, + size: int | None = None, + encoding: str = "utf-8", + timeout: float | None = None, +) -> dict: + """Read data from an open serial port. + + Args: + port: Device path of the port to read from + size: Maximum bytes to read (None = read all available) + encoding: Character encoding for decoding (default: utf-8) + timeout: Override timeout for this read (None = use port default) + + Returns: + Data read from the port + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + + # Temporarily override timeout if specified + original_timeout = conn.timeout + if timeout is not None: + conn.timeout = timeout + + try: + raw = conn.read(size) if size is not None else conn.read(conn.in_waiting or 1) + finally: + if timeout is not None: + conn.timeout = original_timeout + + return { + "success": True, + "data": raw.decode(encoding, errors="replace"), + "bytes_read": len(raw), + "raw_hex": raw.hex(), + "port": port, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def read_serial_line(port: str, encoding: str = "utf-8") -> dict: + """Read a single line (until newline) from an open serial port. + + Args: + port: Device path of the port to read from + encoding: Character encoding for decoding (default: utf-8) + + Returns: + Line of data read from the port + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + raw = conn.readline() + return { + "success": True, + "line": raw.decode(encoding, errors="replace").rstrip("\r\n"), + "bytes_read": len(raw), + "port": port, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def configure_serial( + port: str, + baudrate: int | None = None, + timeout: float | None = None, + rts: bool | None = None, + dtr: bool | None = None, +) -> dict: + """Configure settings on an open serial port. + + Args: + port: Device path of the port to configure + baudrate: New baud rate (None = no change) + timeout: New read timeout in seconds (None = no change) + rts: Set RTS line state (None = no change) + dtr: Set DTR line state (None = no change) + + Returns: + Updated port configuration + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + + if baudrate is not None: + conn.baudrate = baudrate + if timeout is not None: + conn.timeout = timeout + if rts is not None: + conn.rts = rts + if dtr is not None: + conn.dtr = dtr + + return { + "success": True, + "port": port, + "baudrate": conn.baudrate, + "timeout": conn.timeout, + "rts": conn.rts, + "dtr": conn.dtr, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def get_connection_status() -> dict: + """Get status of all open serial connections. + + Returns: + Dictionary of open connections with their settings + """ + status = {} + for port, sc in _connections.items(): + conn = sc.connection + try: + status[port] = { + "is_open": conn.is_open, + "baudrate": conn.baudrate, + "bytesize": conn.bytesize, + "parity": conn.parity, + "stopbits": conn.stopbits, + "timeout": conn.timeout, + "in_waiting": conn.in_waiting, + "out_waiting": conn.out_waiting, + "cts": conn.cts, + "dsr": conn.dsr, + "rts": conn.rts, + "dtr": conn.dtr, + "resource_uri": f"serial://{port}/data", + } + except serial.SerialException: + status[port] = {"is_open": False, "error": "Port disconnected"} + return {"connections": status, "count": len(_connections)} + + +@mcp.tool() +def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = True) -> dict: + """Flush serial port buffers. + + Args: + port: Device path of the port to flush + input_buffer: Clear input/receive buffer + output_buffer: Clear output/transmit buffer + + Returns: + Flush operation status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + if input_buffer: + conn.reset_input_buffer() + if output_buffer: + conn.reset_output_buffer() + return {"success": True, "port": port, "flushed_input": input_buffer, "flushed_output": output_buffer} + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +# ============================================================================ +# RESOURCES - Dynamic data access via URIs +# ============================================================================ + + +@mcp.resource("serial://ports") +def resource_list_ports() -> str: + """List available serial ports as a resource.""" + ports = list_serial_ports() + lines = ["# Available Serial Ports\n"] + for p in ports: + status = "[OPEN]" if p["is_open"] else "[closed]" + lines.append(f"- {p['device']} {status}: {p['description']}") + return "\n".join(lines) + + +@mcp.resource("serial://{port}/data") +def resource_read_data(port: str) -> str: + """Read available data from an open serial port. + + The {port} parameter should be URL-encoded if it contains special characters. + For example: serial:///dev/ttyUSB0/data + """ + # Handle URL-encoded port paths + import urllib.parse + port = urllib.parse.unquote(port) + + if port not in _connections: + return f"Error: Port {port} is not open. Use open_serial_port tool first." + + try: + conn = _connections[port].connection + available = conn.in_waiting + if available == 0: + return f"[No data available on {port}]" + + raw = conn.read(available) + text = raw.decode("utf-8", errors="replace") + return f"[{len(raw)} bytes from {port}]\n{text}" + except serial.SerialException as e: + return f"Error reading from {port}: {e}" + + +@mcp.resource("serial://{port}/status") +def resource_port_status(port: str) -> str: + """Get status information for a serial port.""" + import urllib.parse + port = urllib.parse.unquote(port) + + if port not in _connections: + return f"Port {port} is not open." + + try: + conn = _connections[port].connection + lines = [ + f"# Serial Port Status: {port}", + f"- Open: {conn.is_open}", + f"- Baudrate: {conn.baudrate}", + f"- Bytesize: {conn.bytesize}", + f"- Parity: {conn.parity}", + f"- Stopbits: {conn.stopbits}", + f"- Timeout: {conn.timeout}s", + f"- Bytes waiting (in): {conn.in_waiting}", + f"- Bytes waiting (out): {conn.out_waiting}", + f"- CTS: {conn.cts}", + f"- DSR: {conn.dsr}", + f"- RTS: {conn.rts}", + f"- DTR: {conn.dtr}", + ] + return "\n".join(lines) + except serial.SerialException as e: + return f"Error getting status for {port}: {e}" + + +@mcp.resource("serial://{port}/raw") +def resource_read_raw(port: str) -> str: + """Read available data as hex dump from an open serial port.""" + import urllib.parse + port = urllib.parse.unquote(port) + + if port not in _connections: + return f"Error: Port {port} is not open." + + try: + conn = _connections[port].connection + available = conn.in_waiting + if available == 0: + return f"[No data available on {port}]" + + raw = conn.read(available) + # Format as hex dump + hex_str = " ".join(f"{b:02x}" for b in raw) + return f"[{len(raw)} bytes from {port}]\n{hex_str}" + except serial.SerialException as e: + return f"Error reading from {port}: {e}" + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + + +def main(): + """Run the MCP serial server.""" + try: + from importlib.metadata import version + package_version = version("mcserial") + except Exception: + package_version = "dev" + + print(f"🔌 MCP Serial Server v{package_version}") + print(f" Default baudrate: {DEFAULT_BAUDRATE}") + print(f" Max connections: {MAX_CONNECTIONS}") + mcp.run() + + +if __name__ == "__main__": + main()