Initial MCP serial server implementation

FastMCP server for serial port access via Model Context Protocol:
- 10 tools: list, open, close, read, write, configure, flush, status
- 4 dynamic resources: ports list, data, status, raw hex
- USB device filtering (hides phantom ttyS ports by default)
- Full pyserial support with configurable baudrate/parity/etc
This commit is contained in:
Ryan Malloy 2026-01-27 22:05:59 -07:00
commit 1a26109fc1
6 changed files with 685 additions and 0 deletions

13
.env.example Normal file
View File

@ -0,0 +1,13 @@
# MCP Serial Server Configuration
# Default baud rate for serial connections
MCSERIAL_DEFAULT_BAUDRATE=9600
# Default timeout in seconds for read operations
MCSERIAL_DEFAULT_TIMEOUT=1.0
# Maximum number of concurrent serial port connections
MCSERIAL_MAX_CONNECTIONS=10
# Log level (DEBUG, INFO, WARNING, ERROR)
MCSERIAL_LOG_LEVEL=INFO

53
.gitignore vendored Normal file
View File

@ -0,0 +1,53 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
env/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/
# uv
.uv/
uv.lock
# Environment
.env
.env.local
# OS
.DS_Store
Thumbs.db

74
README.md Normal file
View File

@ -0,0 +1,74 @@
# MCP Serial Server
FastMCP server for serial port access via Model Context Protocol.
## Features
- **Tools** for serial port control (open, close, write, configure)
- **Dynamic Resources** for reading data (`serial://{port}/data`)
- Full pyserial support (baudrate, parity, stop bits, etc.)
- Multiple concurrent port connections
- Raw byte and text modes
## Installation
```bash
# With uvx (recommended)
uvx mcserial
# Or install directly
uv pip install mcserial
```
## Usage with Claude Code
```bash
# Add to Claude Code
claude mcp add mcserial "uvx mcserial"
```
## Tools
| Tool | Description |
|------|-------------|
| `list_serial_ports` | Discover available serial ports |
| `open_serial_port` | Open a connection with config |
| `close_serial_port` | Close a connection |
| `write_serial` | Send text data |
| `write_serial_bytes` | Send raw bytes |
| `read_serial` | Read available data |
| `read_serial_line` | Read until newline |
| `configure_serial` | Change port settings |
| `flush_serial` | Clear buffers |
| `get_connection_status` | List open connections |
## Resources
| URI | Description |
|-----|-------------|
| `serial://ports` | List available ports |
| `serial://{port}/data` | Read data from open port |
| `serial://{port}/status` | Port configuration info |
| `serial://{port}/raw` | Read as hex dump |
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `MCSERIAL_DEFAULT_BAUDRATE` | 9600 | Default baud rate |
| `MCSERIAL_DEFAULT_TIMEOUT` | 1.0 | Read timeout (seconds) |
| `MCSERIAL_MAX_CONNECTIONS` | 10 | Max concurrent ports |
## Example Workflow
```
1. list_serial_ports → find /dev/ttyUSB0
2. open_serial_port(port="/dev/ttyUSB0", baudrate=115200)
3. write_serial(port="/dev/ttyUSB0", data="AT\r\n")
4. Read resource: serial:///dev/ttyUSB0/data
5. close_serial_port(port="/dev/ttyUSB0")
```
## License
MIT

51
pyproject.toml Normal file
View File

@ -0,0 +1,51 @@
[project]
name = "mcserial"
version = "2025.01.27"
description = "MCP Serial Port Server - FastMCP server for serial port access"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [{ name = "Ryan Malloy", email = "ryan@supported.systems" }]
keywords = ["mcp", "serial", "fastmcp", "pyserial", "uart", "rs232"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Communications",
"Topic :: System :: Hardware",
]
dependencies = [
"fastmcp>=2.14.4",
"pyserial>=3.5",
]
[project.optional-dependencies]
dev = [
"ruff>=0.9.0",
"pytest>=8.0.0",
]
[project.scripts]
mcserial = "mcserial:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcserial"]
[tool.ruff]
target-version = "py310"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
ignore = ["E501"]
[tool.ruff.lint.isort]
known-first-party = ["mcserial"]

5
src/mcserial/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""MCP Serial Port Server - FastMCP server for serial port access."""
from mcserial.server import main, mcp
__all__ = ["main", "mcp"]

489
src/mcserial/server.py Normal file
View File

@ -0,0 +1,489 @@
"""FastMCP Serial Port Server - Tools and Resources for serial communication."""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Literal
import serial
import serial.tools.list_ports
from fastmcp import FastMCP
# Configuration from environment
DEFAULT_BAUDRATE = int(os.getenv("MCSERIAL_DEFAULT_BAUDRATE", "9600"))
DEFAULT_TIMEOUT = float(os.getenv("MCSERIAL_DEFAULT_TIMEOUT", "1.0"))
MAX_CONNECTIONS = int(os.getenv("MCSERIAL_MAX_CONNECTIONS", "10"))
@dataclass
class SerialConnection:
"""Tracks an open serial port connection."""
port: str
connection: serial.Serial
buffer: bytes = field(default_factory=bytes)
# Active connections registry
_connections: dict[str, SerialConnection] = {}
mcp = FastMCP(
name="mcserial",
instructions="""Serial port MCP server. Use tools to open/close/write to serial ports.
Use resources to read data from open ports (serial://{port}/data).
Always list_serial_ports first to discover available ports.""",
)
# ============================================================================
# TOOLS - Actions that modify state
# ============================================================================
@mcp.tool()
def list_serial_ports(usb_only: bool = True) -> list[dict]:
"""List available serial ports on the system.
Args:
usb_only: If True (default), only show USB serial devices.
Set False to include legacy/phantom ttyS ports.
Returns a list of ports with their device path, description, and hardware ID.
Call this first to discover what ports are available before opening one.
"""
ports = []
for port in serial.tools.list_ports.comports():
# Filter out phantom/legacy ports if usb_only is True
if usb_only and port.hwid == "n/a":
continue
ports.append({
"device": port.device,
"description": port.description,
"hwid": port.hwid,
"manufacturer": port.manufacturer,
"product": port.product,
"serial_number": port.serial_number,
"is_open": port.device in _connections,
})
# Sort USB ports first (ttyUSB, ttyACM), then others
ports.sort(key=lambda p: (0 if "USB" in p["device"] or "ACM" in p["device"] else 1, p["device"]))
return ports
@mcp.tool()
def open_serial_port(
port: str,
baudrate: int = DEFAULT_BAUDRATE,
bytesize: Literal[5, 6, 7, 8] = 8,
parity: Literal["N", "E", "O", "M", "S"] = "N",
stopbits: Literal[1, 1.5, 2] = 1,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
"""Open a serial port connection.
Args:
port: Device path (e.g., '/dev/ttyUSB0', 'COM3')
baudrate: Baud rate (default from env or 9600)
bytesize: Data bits (5, 6, 7, or 8)
parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space)
stopbits: Stop bits (1, 1.5, or 2)
timeout: Read timeout in seconds
Returns:
Connection status and details
"""
if port in _connections:
return {"error": f"Port {port} is already open", "success": False}
if len(_connections) >= MAX_CONNECTIONS:
return {"error": f"Maximum connections ({MAX_CONNECTIONS}) reached", "success": False}
try:
conn = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
timeout=timeout,
)
_connections[port] = SerialConnection(port=port, connection=conn)
return {
"success": True,
"port": port,
"baudrate": baudrate,
"bytesize": bytesize,
"parity": parity,
"stopbits": stopbits,
"resource_uri": f"serial://{port}/data",
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def close_serial_port(port: str) -> dict:
"""Close an open serial port connection.
Args:
port: Device path of the port to close
Returns:
Status of the close operation
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
_connections[port].connection.close()
del _connections[port]
return {"success": True, "port": port, "message": "Port closed"}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def write_serial(port: str, data: str, encoding: str = "utf-8") -> dict:
"""Write data to an open serial port.
Args:
port: Device path of the port to write to
data: String data to send
encoding: Character encoding (default: utf-8)
Returns:
Number of bytes written
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
encoded = data.encode(encoding)
bytes_written = conn.write(encoded)
conn.flush()
return {"success": True, "bytes_written": bytes_written, "port": port}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def write_serial_bytes(port: str, data: list[int]) -> dict:
"""Write raw bytes to an open serial port.
Args:
port: Device path of the port to write to
data: List of byte values (0-255)
Returns:
Number of bytes written
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
raw_bytes = bytes(data)
bytes_written = conn.write(raw_bytes)
conn.flush()
return {"success": True, "bytes_written": bytes_written, "port": port}
except (serial.SerialException, ValueError) as e:
return {"error": str(e), "success": False}
@mcp.tool()
def read_serial(
port: str,
size: int | None = None,
encoding: str = "utf-8",
timeout: float | None = None,
) -> dict:
"""Read data from an open serial port.
Args:
port: Device path of the port to read from
size: Maximum bytes to read (None = read all available)
encoding: Character encoding for decoding (default: utf-8)
timeout: Override timeout for this read (None = use port default)
Returns:
Data read from the port
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
# Temporarily override timeout if specified
original_timeout = conn.timeout
if timeout is not None:
conn.timeout = timeout
try:
raw = conn.read(size) if size is not None else conn.read(conn.in_waiting or 1)
finally:
if timeout is not None:
conn.timeout = original_timeout
return {
"success": True,
"data": raw.decode(encoding, errors="replace"),
"bytes_read": len(raw),
"raw_hex": raw.hex(),
"port": port,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def read_serial_line(port: str, encoding: str = "utf-8") -> dict:
"""Read a single line (until newline) from an open serial port.
Args:
port: Device path of the port to read from
encoding: Character encoding for decoding (default: utf-8)
Returns:
Line of data read from the port
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
raw = conn.readline()
return {
"success": True,
"line": raw.decode(encoding, errors="replace").rstrip("\r\n"),
"bytes_read": len(raw),
"port": port,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def configure_serial(
port: str,
baudrate: int | None = None,
timeout: float | None = None,
rts: bool | None = None,
dtr: bool | None = None,
) -> dict:
"""Configure settings on an open serial port.
Args:
port: Device path of the port to configure
baudrate: New baud rate (None = no change)
timeout: New read timeout in seconds (None = no change)
rts: Set RTS line state (None = no change)
dtr: Set DTR line state (None = no change)
Returns:
Updated port configuration
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
if baudrate is not None:
conn.baudrate = baudrate
if timeout is not None:
conn.timeout = timeout
if rts is not None:
conn.rts = rts
if dtr is not None:
conn.dtr = dtr
return {
"success": True,
"port": port,
"baudrate": conn.baudrate,
"timeout": conn.timeout,
"rts": conn.rts,
"dtr": conn.dtr,
}
except serial.SerialException as e:
return {"error": str(e), "success": False}
@mcp.tool()
def get_connection_status() -> dict:
"""Get status of all open serial connections.
Returns:
Dictionary of open connections with their settings
"""
status = {}
for port, sc in _connections.items():
conn = sc.connection
try:
status[port] = {
"is_open": conn.is_open,
"baudrate": conn.baudrate,
"bytesize": conn.bytesize,
"parity": conn.parity,
"stopbits": conn.stopbits,
"timeout": conn.timeout,
"in_waiting": conn.in_waiting,
"out_waiting": conn.out_waiting,
"cts": conn.cts,
"dsr": conn.dsr,
"rts": conn.rts,
"dtr": conn.dtr,
"resource_uri": f"serial://{port}/data",
}
except serial.SerialException:
status[port] = {"is_open": False, "error": "Port disconnected"}
return {"connections": status, "count": len(_connections)}
@mcp.tool()
def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = True) -> dict:
"""Flush serial port buffers.
Args:
port: Device path of the port to flush
input_buffer: Clear input/receive buffer
output_buffer: Clear output/transmit buffer
Returns:
Flush operation status
"""
if port not in _connections:
return {"error": f"Port {port} is not open", "success": False}
try:
conn = _connections[port].connection
if input_buffer:
conn.reset_input_buffer()
if output_buffer:
conn.reset_output_buffer()
return {"success": True, "port": port, "flushed_input": input_buffer, "flushed_output": output_buffer}
except serial.SerialException as e:
return {"error": str(e), "success": False}
# ============================================================================
# RESOURCES - Dynamic data access via URIs
# ============================================================================
@mcp.resource("serial://ports")
def resource_list_ports() -> str:
"""List available serial ports as a resource."""
ports = list_serial_ports()
lines = ["# Available Serial Ports\n"]
for p in ports:
status = "[OPEN]" if p["is_open"] else "[closed]"
lines.append(f"- {p['device']} {status}: {p['description']}")
return "\n".join(lines)
@mcp.resource("serial://{port}/data")
def resource_read_data(port: str) -> str:
"""Read available data from an open serial port.
The {port} parameter should be URL-encoded if it contains special characters.
For example: serial:///dev/ttyUSB0/data
"""
# Handle URL-encoded port paths
import urllib.parse
port = urllib.parse.unquote(port)
if port not in _connections:
return f"Error: Port {port} is not open. Use open_serial_port tool first."
try:
conn = _connections[port].connection
available = conn.in_waiting
if available == 0:
return f"[No data available on {port}]"
raw = conn.read(available)
text = raw.decode("utf-8", errors="replace")
return f"[{len(raw)} bytes from {port}]\n{text}"
except serial.SerialException as e:
return f"Error reading from {port}: {e}"
@mcp.resource("serial://{port}/status")
def resource_port_status(port: str) -> str:
"""Get status information for a serial port."""
import urllib.parse
port = urllib.parse.unquote(port)
if port not in _connections:
return f"Port {port} is not open."
try:
conn = _connections[port].connection
lines = [
f"# Serial Port Status: {port}",
f"- Open: {conn.is_open}",
f"- Baudrate: {conn.baudrate}",
f"- Bytesize: {conn.bytesize}",
f"- Parity: {conn.parity}",
f"- Stopbits: {conn.stopbits}",
f"- Timeout: {conn.timeout}s",
f"- Bytes waiting (in): {conn.in_waiting}",
f"- Bytes waiting (out): {conn.out_waiting}",
f"- CTS: {conn.cts}",
f"- DSR: {conn.dsr}",
f"- RTS: {conn.rts}",
f"- DTR: {conn.dtr}",
]
return "\n".join(lines)
except serial.SerialException as e:
return f"Error getting status for {port}: {e}"
@mcp.resource("serial://{port}/raw")
def resource_read_raw(port: str) -> str:
"""Read available data as hex dump from an open serial port."""
import urllib.parse
port = urllib.parse.unquote(port)
if port not in _connections:
return f"Error: Port {port} is not open."
try:
conn = _connections[port].connection
available = conn.in_waiting
if available == 0:
return f"[No data available on {port}]"
raw = conn.read(available)
# Format as hex dump
hex_str = " ".join(f"{b:02x}" for b in raw)
return f"[{len(raw)} bytes from {port}]\n{hex_str}"
except serial.SerialException as e:
return f"Error reading from {port}: {e}"
# ============================================================================
# ENTRY POINT
# ============================================================================
def main():
"""Run the MCP serial server."""
try:
from importlib.metadata import version
package_version = version("mcserial")
except Exception:
package_version = "dev"
print(f"🔌 MCP Serial Server v{package_version}")
print(f" Default baudrate: {DEFAULT_BAUDRATE}")
print(f" Max connections: {MAX_CONNECTIONS}")
mcp.run()
if __name__ == "__main__":
main()