Add cp210x-mcp FastMCP server for device customization
MCP Tools: - list_devices: List connected CP210x devices - get_device_info: Full device details (part number, VID/PID, strings) - set_product_string: Change USB device name (max 126 chars) - set_manufacturer_string: Change manufacturer (max 45 chars) - set_serial_number: Change serial number (max 63 chars) - set_max_power: Set USB power draw (0-500mA) - set_self_powered: Toggle self/bus powered - reset_device: USB re-enumeration - lock_device: PERMANENTLY lock configuration Requires libcp210xmanufacturing.so (see aur/cp210xmanufacturing) Usage: claude mcp add cp210x -- uvx cp210x-mcp
This commit is contained in:
parent
7c04e0cb6a
commit
fc12d6a2d7
85
README.md
Normal file
85
README.md
Normal file
@ -0,0 +1,85 @@
|
||||
# CP210x MCP Server
|
||||
|
||||
MCP server for customizing Silicon Labs CP210x USB-UART bridge devices. Allows reading and writing USB descriptor strings, power configuration, and more.
|
||||
|
||||
## Features
|
||||
|
||||
- List connected CP210x devices
|
||||
- Read/write USB product string (device name)
|
||||
- Read/write manufacturer string
|
||||
- Read/write serial number
|
||||
- Configure max power draw
|
||||
- Set self-powered/bus-powered mode
|
||||
- Reset device (USB re-enumeration)
|
||||
- Lock device (permanent - prevents further changes)
|
||||
|
||||
## Requirements
|
||||
|
||||
- Linux x86_64
|
||||
- `libcp210xmanufacturing.so` - Install via AUR package or build from source
|
||||
|
||||
## Installation
|
||||
|
||||
### Install the library (Arch Linux)
|
||||
|
||||
```bash
|
||||
cd aur/cp210xmanufacturing
|
||||
makepkg -si
|
||||
```
|
||||
|
||||
Or build from source:
|
||||
|
||||
```bash
|
||||
cd AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing
|
||||
make LIB_ARCH=64
|
||||
sudo make install
|
||||
sudo ldconfig
|
||||
```
|
||||
|
||||
### Install the MCP server
|
||||
|
||||
```bash
|
||||
# With uv (recommended)
|
||||
uv tool install .
|
||||
|
||||
# Or with pip
|
||||
pip install .
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Add to Claude Code
|
||||
|
||||
```bash
|
||||
claude mcp add cp210x -- uvx cp210x-mcp
|
||||
```
|
||||
|
||||
### Available Tools
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `list_devices` | List connected CP210x devices |
|
||||
| `get_device_info` | Get detailed device information |
|
||||
| `set_product_string` | Set USB product string (device name) |
|
||||
| `set_manufacturer_string` | Set USB manufacturer string |
|
||||
| `set_serial_number` | Set USB serial number |
|
||||
| `set_max_power` | Set max USB power draw (mA) |
|
||||
| `set_self_powered` | Set self-powered vs bus-powered |
|
||||
| `reset_device` | Reset device (USB re-enumeration) |
|
||||
| `lock_device` | PERMANENTLY lock device config |
|
||||
|
||||
### Example
|
||||
|
||||
```bash
|
||||
# In Claude Code conversation:
|
||||
> What CP210x devices are connected?
|
||||
> Change the product name of device 0 to "My Custom Device"
|
||||
```
|
||||
|
||||
## Relationship to mcserial
|
||||
|
||||
This MCP server complements [mcserial](https://github.com/ryanmalloy/mcserial) which handles serial port communication. Use this server for **device customization** (changing USB descriptors) and mcserial for **serial communication** (sending/receiving data over UART).
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
53
pyproject.toml
Normal file
53
pyproject.toml
Normal file
@ -0,0 +1,53 @@
|
||||
[project]
|
||||
name = "cp210x-mcp"
|
||||
version = "0.1.0"
|
||||
description = "MCP server for CP210x USB-UART bridge customization"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
license = "MIT"
|
||||
authors = [
|
||||
{name = "Ryan Malloy", email = "ryan@supported.systems"}
|
||||
]
|
||||
keywords = ["mcp", "cp210x", "usb", "uart", "serial", "silicon-labs"]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: System :: Hardware :: Hardware Drivers",
|
||||
]
|
||||
|
||||
dependencies = [
|
||||
"fastmcp>=2.0.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"ruff",
|
||||
"pytest",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
cp210x-mcp = "cp210x_mcp:main"
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/ryanmalloy/cp210x-mcp"
|
||||
Repository = "https://github.com/ryanmalloy/cp210x-mcp"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/cp210x_mcp"]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
target-version = "py310"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "UP", "B"]
|
||||
ignore = ["E501"]
|
||||
5
src/cp210x_mcp/__init__.py
Normal file
5
src/cp210x_mcp/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
"""CP210x MCP Server - Device customization for Silicon Labs USB-UART bridges."""
|
||||
|
||||
from .server import mcp, main
|
||||
|
||||
__all__ = ["mcp", "main"]
|
||||
441
src/cp210x_mcp/bindings.py
Normal file
441
src/cp210x_mcp/bindings.py
Normal file
@ -0,0 +1,441 @@
|
||||
"""Low-level ctypes bindings for libcp210xmanufacturing."""
|
||||
|
||||
import ctypes
|
||||
from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_void_p, POINTER, byref
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Type aliases matching the C library
|
||||
CP210x_STATUS = c_int
|
||||
HANDLE = c_void_p
|
||||
DWORD = c_uint
|
||||
WORD = c_ushort
|
||||
BYTE = c_ubyte
|
||||
BOOL = c_int
|
||||
|
||||
# Return codes
|
||||
CP210x_SUCCESS = 0x00
|
||||
CP210x_DEVICE_NOT_FOUND = 0xFF
|
||||
CP210x_INVALID_HANDLE = 0x01
|
||||
CP210x_INVALID_PARAMETER = 0x02
|
||||
CP210x_DEVICE_IO_FAILED = 0x03
|
||||
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
|
||||
CP210x_GLOBAL_DATA_ERROR = 0x05
|
||||
CP210x_FILE_ERROR = 0x06
|
||||
CP210x_COMMAND_FAILED = 0x08
|
||||
CP210x_INVALID_ACCESS_TYPE = 0x09
|
||||
|
||||
# Part numbers
|
||||
PART_NUMBERS = {
|
||||
0x01: "CP2101",
|
||||
0x02: "CP2102",
|
||||
0x03: "CP2103",
|
||||
0x04: "CP2104",
|
||||
0x05: "CP2105",
|
||||
0x08: "CP2108",
|
||||
0x09: "CP2109",
|
||||
0x20: "CP2102N-QFN28",
|
||||
0x21: "CP2102N-QFN24",
|
||||
0x22: "CP2102N-QFN20",
|
||||
}
|
||||
|
||||
# Buffer sizes
|
||||
MAX_DEVICE_STRLEN = 256
|
||||
MAX_MANUFACTURER_STRLEN = 45
|
||||
MAX_PRODUCT_STRLEN = 126
|
||||
MAX_SERIAL_STRLEN = 63
|
||||
|
||||
# GetProductString flags
|
||||
RETURN_SERIAL_NUMBER = 0x00
|
||||
RETURN_DESCRIPTION = 0x01
|
||||
RETURN_FULL_PATH = 0x02
|
||||
|
||||
|
||||
class CP210xError(Exception):
|
||||
"""Exception raised for CP210x library errors."""
|
||||
|
||||
ERROR_MESSAGES = {
|
||||
CP210x_DEVICE_NOT_FOUND: "Device not found",
|
||||
CP210x_INVALID_HANDLE: "Invalid handle",
|
||||
CP210x_INVALID_PARAMETER: "Invalid parameter",
|
||||
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
|
||||
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
|
||||
CP210x_GLOBAL_DATA_ERROR: "Global data error",
|
||||
CP210x_FILE_ERROR: "File error",
|
||||
CP210x_COMMAND_FAILED: "Command failed",
|
||||
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
|
||||
}
|
||||
|
||||
def __init__(self, status: int, context: str = ""):
|
||||
self.status = status
|
||||
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
|
||||
if context:
|
||||
msg = f"{context}: {msg}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class CP210xLibrary:
|
||||
"""Wrapper for the CP210x manufacturing library."""
|
||||
|
||||
def __init__(self, lib_path: Optional[str] = None):
|
||||
"""Load the CP210x manufacturing library.
|
||||
|
||||
Args:
|
||||
lib_path: Path to libcp210xmanufacturing.so, or None to search default paths.
|
||||
"""
|
||||
if lib_path:
|
||||
self._lib = ctypes.CDLL(lib_path)
|
||||
else:
|
||||
# Search common locations
|
||||
search_paths = [
|
||||
"/usr/lib/libcp210xmanufacturing.so",
|
||||
"/usr/local/lib/libcp210xmanufacturing.so",
|
||||
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
|
||||
]
|
||||
for path in search_paths:
|
||||
if Path(path).exists():
|
||||
self._lib = ctypes.CDLL(path)
|
||||
break
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
"libcp210xmanufacturing.so not found. Install with: "
|
||||
"cd aur/cp210xmanufacturing && makepkg -si"
|
||||
)
|
||||
|
||||
self._setup_functions()
|
||||
|
||||
def _setup_functions(self):
|
||||
"""Set up function prototypes."""
|
||||
# CP210x_GetNumDevices
|
||||
self._lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
|
||||
self._lib.CP210x_GetNumDevices.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetProductString
|
||||
self._lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
|
||||
self._lib.CP210x_GetProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Open
|
||||
self._lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
|
||||
self._lib.CP210x_Open.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Close
|
||||
self._lib.CP210x_Close.argtypes = [HANDLE]
|
||||
self._lib.CP210x_Close.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetPartNumber
|
||||
self._lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetPartNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceVid
|
||||
self._lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDevicePid
|
||||
self._lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDevicePid.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceManufacturerString
|
||||
self._lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceProductString
|
||||
self._lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceSerialNumber
|
||||
self._lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetManufacturerString
|
||||
self._lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetProductString
|
||||
self._lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetSerialNumber
|
||||
self._lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetMaxPower
|
||||
self._lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetMaxPower
|
||||
self._lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
|
||||
self._lib.CP210x_SetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetSelfPower
|
||||
self._lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
|
||||
self._lib.CP210x_GetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetSelfPower
|
||||
self._lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
|
||||
self._lib.CP210x_SetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceVersion
|
||||
self._lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetDeviceVersion
|
||||
self._lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
|
||||
self._lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetLockValue
|
||||
self._lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetLockValue.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetLockValue
|
||||
self._lib.CP210x_SetLockValue.argtypes = [HANDLE]
|
||||
self._lib.CP210x_SetLockValue.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Reset
|
||||
self._lib.CP210x_Reset.argtypes = [HANDLE]
|
||||
self._lib.CP210x_Reset.restype = CP210x_STATUS
|
||||
|
||||
def _check_status(self, status: int, context: str = ""):
|
||||
"""Raise exception if status indicates error."""
|
||||
if status != CP210x_SUCCESS:
|
||||
raise CP210xError(status, context)
|
||||
|
||||
def get_num_devices(self) -> int:
|
||||
"""Get the number of connected CP210x devices."""
|
||||
num = DWORD()
|
||||
status = self._lib.CP210x_GetNumDevices(byref(num))
|
||||
self._check_status(status, "GetNumDevices")
|
||||
return num.value
|
||||
|
||||
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
|
||||
"""Get device string without opening the device.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index
|
||||
flag: What to return - RETURN_SERIAL_NUMBER, RETURN_DESCRIPTION, or RETURN_FULL_PATH
|
||||
"""
|
||||
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
|
||||
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
|
||||
self._check_status(status, "GetProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def open(self, device_index: int) -> HANDLE:
|
||||
"""Open a device by index."""
|
||||
handle = HANDLE()
|
||||
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
|
||||
self._check_status(status, f"Open device {device_index}")
|
||||
return handle
|
||||
|
||||
def close(self, handle: HANDLE):
|
||||
"""Close an open device."""
|
||||
status = self._lib.CP210x_Close(handle)
|
||||
self._check_status(status, "Close")
|
||||
|
||||
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
|
||||
"""Get the part number of an open device.
|
||||
|
||||
Returns:
|
||||
Tuple of (part_number_code, part_name)
|
||||
"""
|
||||
part = BYTE()
|
||||
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
|
||||
self._check_status(status, "GetPartNumber")
|
||||
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
|
||||
|
||||
def get_device_vid(self, handle: HANDLE) -> int:
|
||||
"""Get the USB Vendor ID."""
|
||||
vid = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
|
||||
self._check_status(status, "GetDeviceVid")
|
||||
return vid.value
|
||||
|
||||
def get_device_pid(self, handle: HANDLE) -> int:
|
||||
"""Get the USB Product ID."""
|
||||
pid = WORD()
|
||||
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
|
||||
self._check_status(status, "GetDevicePid")
|
||||
return pid.value
|
||||
|
||||
def get_manufacturer_string(self, handle: HANDLE) -> str:
|
||||
"""Get the manufacturer string."""
|
||||
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceManufacturerString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_product_string_device(self, handle: HANDLE) -> str:
|
||||
"""Get the product string from an open device."""
|
||||
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_serial_number(self, handle: HANDLE) -> str:
|
||||
"""Get the serial number."""
|
||||
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceSerialNumber")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
|
||||
"""Set the manufacturer string (max 45 chars)."""
|
||||
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
|
||||
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetManufacturerString")
|
||||
|
||||
def set_product_string(self, handle: HANDLE, product: str):
|
||||
"""Set the product string (max 126 chars)."""
|
||||
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
|
||||
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetProductString")
|
||||
|
||||
def set_serial_number(self, handle: HANDLE, serial: str):
|
||||
"""Set the serial number (max 63 chars)."""
|
||||
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
|
||||
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetSerialNumber")
|
||||
|
||||
def get_max_power(self, handle: HANDLE) -> int:
|
||||
"""Get max power in units of 2mA (multiply by 2 for mA)."""
|
||||
power = BYTE()
|
||||
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
|
||||
self._check_status(status, "GetMaxPower")
|
||||
return power.value
|
||||
|
||||
def set_max_power(self, handle: HANDLE, power_2ma: int):
|
||||
"""Set max power in units of 2mA (e.g., 50 = 100mA)."""
|
||||
if power_2ma > 250:
|
||||
raise ValueError("Max power cannot exceed 250 (500mA)")
|
||||
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
|
||||
self._check_status(status, "SetMaxPower")
|
||||
|
||||
def get_self_power(self, handle: HANDLE) -> bool:
|
||||
"""Check if device is self-powered."""
|
||||
self_power = BOOL()
|
||||
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
|
||||
self._check_status(status, "GetSelfPower")
|
||||
return bool(self_power.value)
|
||||
|
||||
def set_self_power(self, handle: HANDLE, self_powered: bool):
|
||||
"""Set whether device is self-powered."""
|
||||
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
|
||||
self._check_status(status, "SetSelfPower")
|
||||
|
||||
def get_device_version(self, handle: HANDLE) -> int:
|
||||
"""Get device version (bcdDevice)."""
|
||||
version = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
|
||||
self._check_status(status, "GetDeviceVersion")
|
||||
return version.value
|
||||
|
||||
def set_device_version(self, handle: HANDLE, version: int):
|
||||
"""Set device version (bcdDevice)."""
|
||||
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
|
||||
self._check_status(status, "SetDeviceVersion")
|
||||
|
||||
def get_lock_value(self, handle: HANDLE) -> int:
|
||||
"""Get lock value (0 = unlocked, 1-255 = locked)."""
|
||||
lock = BYTE()
|
||||
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
|
||||
self._check_status(status, "GetLockValue")
|
||||
return lock.value
|
||||
|
||||
def lock_device(self, handle: HANDLE):
|
||||
"""Lock the device (PERMANENT - cannot be undone!)."""
|
||||
status = self._lib.CP210x_SetLockValue(handle)
|
||||
self._check_status(status, "SetLockValue")
|
||||
|
||||
def reset(self, handle: HANDLE):
|
||||
"""Reset the device (USB disconnect/reconnect)."""
|
||||
status = self._lib.CP210x_Reset(handle)
|
||||
self._check_status(status, "Reset")
|
||||
|
||||
|
||||
# Convenience context manager for device access
|
||||
class CP210xDevice:
|
||||
"""Context manager for safe device access."""
|
||||
|
||||
def __init__(self, lib: CP210xLibrary, device_index: int):
|
||||
self.lib = lib
|
||||
self.device_index = device_index
|
||||
self.handle = None
|
||||
|
||||
def __enter__(self):
|
||||
self.handle = self.lib.open(self.device_index)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.handle:
|
||||
self.lib.close(self.handle)
|
||||
self.handle = None
|
||||
return False
|
||||
|
||||
@property
|
||||
def part_number(self) -> tuple[int, str]:
|
||||
return self.lib.get_part_number(self.handle)
|
||||
|
||||
@property
|
||||
def vid(self) -> int:
|
||||
return self.lib.get_device_vid(self.handle)
|
||||
|
||||
@property
|
||||
def pid(self) -> int:
|
||||
return self.lib.get_device_pid(self.handle)
|
||||
|
||||
@property
|
||||
def manufacturer(self) -> str:
|
||||
return self.lib.get_manufacturer_string(self.handle)
|
||||
|
||||
@manufacturer.setter
|
||||
def manufacturer(self, value: str):
|
||||
self.lib.set_manufacturer_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def product(self) -> str:
|
||||
return self.lib.get_product_string_device(self.handle)
|
||||
|
||||
@product.setter
|
||||
def product(self, value: str):
|
||||
self.lib.set_product_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def serial_number(self) -> str:
|
||||
return self.lib.get_serial_number(self.handle)
|
||||
|
||||
@serial_number.setter
|
||||
def serial_number(self, value: str):
|
||||
self.lib.set_serial_number(self.handle, value)
|
||||
|
||||
@property
|
||||
def max_power_ma(self) -> int:
|
||||
"""Max power in milliamps."""
|
||||
return self.lib.get_max_power(self.handle) * 2
|
||||
|
||||
@max_power_ma.setter
|
||||
def max_power_ma(self, value: int):
|
||||
"""Set max power in milliamps (will be rounded down to nearest 2mA)."""
|
||||
self.lib.set_max_power(self.handle, value // 2)
|
||||
|
||||
@property
|
||||
def self_powered(self) -> bool:
|
||||
return self.lib.get_self_power(self.handle)
|
||||
|
||||
@self_powered.setter
|
||||
def self_powered(self, value: bool):
|
||||
self.lib.set_self_power(self.handle, value)
|
||||
|
||||
@property
|
||||
def device_version(self) -> str:
|
||||
"""Device version as BCD string (e.g., '1.00')."""
|
||||
v = self.lib.get_device_version(self.handle)
|
||||
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
|
||||
|
||||
@property
|
||||
def is_locked(self) -> bool:
|
||||
return self.lib.get_lock_value(self.handle) != 0
|
||||
|
||||
def reset(self):
|
||||
"""Reset the device."""
|
||||
self.lib.reset(self.handle)
|
||||
290
src/cp210x_mcp/server.py
Normal file
290
src/cp210x_mcp/server.py
Normal file
@ -0,0 +1,290 @@
|
||||
"""FastMCP server for CP210x device customization."""
|
||||
|
||||
from typing import Optional
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS
|
||||
|
||||
mcp = FastMCP(
|
||||
"cp210x",
|
||||
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
|
||||
)
|
||||
|
||||
# Lazy-loaded library instance
|
||||
_lib: Optional[CP210xLibrary] = None
|
||||
|
||||
|
||||
def get_lib() -> CP210xLibrary:
|
||||
"""Get or create the library instance."""
|
||||
global _lib
|
||||
if _lib is None:
|
||||
_lib = CP210xLibrary()
|
||||
return _lib
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def list_devices() -> list[dict]:
|
||||
"""List all connected CP210x devices.
|
||||
|
||||
Returns a list of devices with their index, description, and serial number.
|
||||
Use the index to reference a specific device in other tools.
|
||||
"""
|
||||
lib = get_lib()
|
||||
num_devices = lib.get_num_devices()
|
||||
|
||||
devices = []
|
||||
for i in range(num_devices):
|
||||
try:
|
||||
desc = lib.get_product_string(i, flag=1) # RETURN_DESCRIPTION
|
||||
serial = lib.get_product_string(i, flag=0) # RETURN_SERIAL_NUMBER
|
||||
devices.append({
|
||||
"index": i,
|
||||
"description": desc,
|
||||
"serial_number": serial,
|
||||
})
|
||||
except CP210xError as e:
|
||||
devices.append({
|
||||
"index": i,
|
||||
"error": str(e),
|
||||
})
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_device_info(device_index: int = 0) -> dict:
|
||||
"""Get detailed information about a CP210x device.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based index of the device (default: 0 for first device)
|
||||
|
||||
Returns full device details including part number, VID/PID, strings, and power settings.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
return {
|
||||
"part_number": part_name,
|
||||
"part_code": f"0x{part_code:02X}",
|
||||
"vid": f"0x{dev.vid:04X}",
|
||||
"pid": f"0x{dev.pid:04X}",
|
||||
"manufacturer": dev.manufacturer,
|
||||
"product": dev.product,
|
||||
"serial_number": dev.serial_number,
|
||||
"device_version": dev.device_version,
|
||||
"max_power_ma": dev.max_power_ma,
|
||||
"self_powered": dev.self_powered,
|
||||
"is_locked": dev.is_locked,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_product_string(product: str, device_index: int = 0) -> dict:
|
||||
"""Set the USB product string (device name shown to host).
|
||||
|
||||
Args:
|
||||
product: New product string (max 126 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns the updated device info. Device may need to be re-plugged for
|
||||
changes to appear on the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_product = dev.product
|
||||
dev.product = product
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_product,
|
||||
"new_value": product[:126],
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_manufacturer_string(manufacturer: str, device_index: int = 0) -> dict:
|
||||
"""Set the USB manufacturer string.
|
||||
|
||||
Args:
|
||||
manufacturer: New manufacturer string (max 45 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_mfr = dev.manufacturer
|
||||
dev.manufacturer = manufacturer
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_mfr,
|
||||
"new_value": manufacturer[:45],
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_serial_number(serial_number: str, device_index: int = 0) -> dict:
|
||||
"""Set the USB serial number.
|
||||
|
||||
Args:
|
||||
serial_number: New serial number (max 63 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Note: Changing serial number may affect udev rules that match on serial.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_serial = dev.serial_number
|
||||
dev.serial_number = serial_number
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_serial,
|
||||
"new_value": serial_number[:63],
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_max_power(max_power_ma: int, device_index: int = 0) -> dict:
|
||||
"""Set the maximum USB power draw in milliamps.
|
||||
|
||||
Args:
|
||||
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
if max_power_ma < 0 or max_power_ma > 500:
|
||||
return {"error": "max_power_ma must be 0-500"}
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_power = dev.max_power_ma
|
||||
dev.max_power_ma = max_power_ma
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value_ma": old_power,
|
||||
"new_value_ma": (max_power_ma // 2) * 2, # Actual value after rounding
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_self_powered(self_powered: bool, device_index: int = 0) -> dict:
|
||||
"""Set whether device reports as self-powered or bus-powered.
|
||||
|
||||
Args:
|
||||
self_powered: True for self-powered, False for bus-powered
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_value = dev.self_powered
|
||||
dev.self_powered = self_powered
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_value,
|
||||
"new_value": self_powered,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def reset_device(device_index: int = 0) -> dict:
|
||||
"""Reset the CP210x device (USB disconnect/reconnect).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
This triggers a USB re-enumeration, which will make any programmed
|
||||
changes visible to the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
dev.reset()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"note": "Device has been reset. It may take a moment to re-enumerate.",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def lock_device(device_index: int = 0, confirm: bool = False) -> dict:
|
||||
"""PERMANENTLY lock the device to prevent further customization.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
confirm: Must be True to actually lock (safety check)
|
||||
|
||||
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
|
||||
configuration cannot be changed. The device will still function normally,
|
||||
but strings, power settings, etc. cannot be modified.
|
||||
"""
|
||||
if not confirm:
|
||||
return {
|
||||
"error": "Safety check failed",
|
||||
"message": "Set confirm=True to actually lock the device. THIS IS PERMANENT!",
|
||||
}
|
||||
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is already locked"}
|
||||
|
||||
lib.lock_device(dev.handle)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed.",
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point for the MCP server."""
|
||||
try:
|
||||
from importlib.metadata import version
|
||||
package_version = version("cp210x-mcp")
|
||||
except Exception:
|
||||
package_version = "0.1.0"
|
||||
|
||||
print(f"🔌 CP210x MCP Server v{package_version}")
|
||||
|
||||
try:
|
||||
lib = get_lib()
|
||||
num_devices = lib.get_num_devices()
|
||||
print(f" Found {num_devices} CP210x device(s)")
|
||||
except FileNotFoundError as e:
|
||||
print(f"⚠️ Library not found: {e}")
|
||||
print(" Server will start but tools will fail until library is installed.")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Error initializing: {e}")
|
||||
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user