Compare commits

..

No commits in common. "07d6ee6ff19c0040bde73008fe423f6953e7077c" and "a28eed3849e4ccfe9fd8511341e34628600a8e59" have entirely different histories.

10 changed files with 979 additions and 1986 deletions

View File

@ -7,7 +7,7 @@
"run", "run",
"--directory", "--directory",
"/home/rpm/claude/lora/cp2102-uart", "/home/rpm/claude/lora/cp2102-uart",
"mcp210x-uart" "mcp210x"
], ],
"env": {} "env": {}
} }

113
README.md
View File

@ -1,10 +1,10 @@
# mcp210x-uart # mcp210x
It's MCP. It's CP210x. It was right there the whole time. It's MCP. It's CP210x. It was right there the whole time.
An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, GPIO port config, baud rate tables, udev rules, and device locking — through natural language in Claude Code. An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, udev rules, and device locking — through natural language in Claude Code.
Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to all 45 functions in Silicon Labs' native `libcp210xmanufacturing` library. Covers the full CP210x family: CP2101, CP2102, CP2102N, CP2103, CP2104, CP2105, CP2108, and CP2109. Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to Silicon Labs' native `libcp210xmanufacturing` library.
## The problem ## The problem
@ -32,78 +32,26 @@ Two devices found:
Each device gets a unique product string baked into its USB descriptor EPROM. Udev rules match on that string to create stable symlinks. Devices survive reboots, port reordering, and hub changes. Each device gets a unique product string baked into its USB descriptor EPROM. Udev rules match on that string to create stable symlinks. Devices survive reboots, port reordering, and hub changes.
## Tools ## Features
### Read-only (no confirmation) - **List and inspect** connected CP210x devices (part number, VID/PID, strings, power, lock state)
- **Write USB descriptors** — product string, manufacturer, serial number
| Tool | Description | - **Configure power** — max current draw, self-powered vs bus-powered
|------|-------------| - **Generate udev rules** — stable `/dev/` symlinks based on product string
| `list_devices` | List all connected CP210x devices | - **Reset device** — trigger USB re-enumeration after changes
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state, firmware version (CP2102N), flush config, device mode | - **Lock device** — permanently freeze configuration (with strict confirmation gate)
| `get_firmware_version` | Firmware version (CP2102N only) |
| `get_flush_buffer_config` | Flush buffer configuration (CP2104/CP2105/CP2108) |
| `get_device_mode` | Device mode — ECI/SCI assignment (CP2105 only) |
| `get_interface_string` | USB interface string (CP2105/CP2108 only) |
| `get_baud_rate_config` | Baud rate alias table (32 entries) |
| `get_port_config` | GPIO port configuration (auto-detects CP2103/4/5/8) |
| `get_raw_config` | Raw EPROM configuration blob (hex string) |
| `create_hex_file` | Dump device config to Intel HEX file |
| `reset_device` | USB disconnect/reconnect to apply changes |
### Normal writes (elicitation → fallback to proceed)
| Tool | Description |
|------|-------------|
| `set_product_string` | Write USB product string (max 126 chars) |
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
| `set_serial_number` | Write USB serial number (max 63 chars) |
| `set_max_power` | Set max USB power draw in mA (0500, rounded to nearest 2) |
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
| `set_device_version` | Set device version (bcdDevice field) |
| `set_interface_string` | Set USB interface string (CP2105/CP2108 only) |
| `set_flush_buffer_config` | Set flush buffer configuration (CP2104/CP2105/CP2108) |
| `set_device_mode` | Set device mode (CP2105 only) |
| `set_baud_rate_config` | Set full 32-entry baud rate alias table |
| `set_baud_rate_alias` | Modify a single baud rate alias entry (read-modify-write) |
| `set_port_config` | Set GPIO port configuration (auto-detects CP2103/4/5/8) |
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
### Strict writes (elicitation required — hard-refuses without it)
| Tool | Description |
|------|-------------|
| `set_vid` | Set USB Vendor ID (can break driver matching) |
| `set_pid` | Set USB Product ID (can break driver matching) |
| `set_raw_config` | Write raw configuration blob to EPROM |
| `update_firmware` | Update device firmware (CP2102N only) |
| `lock_device` | Permanently freeze device configuration |
## Safety model ## Safety model
CP210x descriptor EPROM is one-time-programmable with limited write cycles. Writes can't be undone. Locks are permanent. The server enforces a tiered confirmation model: CP210x descriptor EPROM is one-time-programmable with limited write cycles. Writes can't be undone. Locks are permanent. The server enforces a tiered confirmation model:
| Tier | Behavior | Tools | | Operation | Confirmation |
|------|----------|-------| |-----------|-------------|
| **None** | No confirmation | All `get_*`, `list_devices`, `reset_device`, `create_hex_file` | | Reads | None |
| **Normal** | Elicitation if client supports it; proceeds otherwise | String setters, power, baud config, port config, `setup_udev_rule` | | Writes (strings, power) | MCP elicitation if client supports it; proceeds otherwise |
| **Strict** | Elicitation **required**; returns error without it | `set_vid`, `set_pid`, `set_raw_config`, `update_firmware`, `lock_device` | | Lock | Elicitation **required**; hard-refuses without it |
The strict gate isn't a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog. The lock gate isn't just a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog.
## Part-number support
Different CP210x parts expose different features. The server auto-gates tools to supported parts:
| Feature | Supported parts |
|---------|----------------|
| Core (strings, power, baud) | All CP210x |
| Flush buffer config | CP2104, CP2105, CP2108 |
| Device mode | CP2105 |
| Interface strings | CP2105, CP2108 |
| Port config | CP2103, CP2104 |
| Dual port config | CP2105 |
| Quad port config | CP2108 |
| Firmware version/update | CP2102N |
## Requirements ## Requirements
@ -145,15 +93,30 @@ uv tool install .
### 3. Claude Code ### 3. Claude Code
```bash ```bash
claude mcp add cp210x -- uvx mcp210x-uart claude mcp add cp210x -- uvx mcp210x
``` ```
For development (runs from source): For development (runs from source):
```bash ```bash
claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x-uart claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x
``` ```
## Tools
| Tool | Description |
|------|-------------|
| `list_devices` | List connected CP210x devices with description and serial |
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state |
| `set_product_string` | Write USB product string (max 126 chars) |
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
| `set_serial_number` | Write USB serial number (max 63 chars) |
| `set_max_power` | Set max USB power draw in mA (0-500, rounded to nearest 2) |
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
| `reset_device` | USB disconnect/reconnect to apply changes |
| `lock_device` | Permanently freeze device configuration |
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
## Architecture ## Architecture
``` ```
@ -174,10 +137,10 @@ The native library uses **libusb** for device access, separate from the kernel's
## Project structure ## Project structure
``` ```
mcp210x-uart/ mcp210x/
├── src/mcp210x_uart/ ├── src/mcp210x/
│ ├── server.py # FastMCP tool definitions, elicitation, part-number dispatch │ ├── server.py # FastMCP tool definitions and elicitation logic
│ ├── bindings.py # ctypes: structs, prototypes, bitmask helpers, device wrapper │ ├── bindings.py # ctypes wrapper for libcp210xmanufacturing.so
│ └── __init__.py │ └── __init__.py
├── aur/cp210xmanufacturing/ ├── aur/cp210xmanufacturing/
│ ├── PKGBUILD # Arch Linux package for the native library │ ├── PKGBUILD # Arch Linux package for the native library
@ -189,7 +152,7 @@ mcp210x-uart/
## Complementary tools ## Complementary tools
This server handles **device customization** (USB descriptors, power config, GPIO, baud tables). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial). This server handles **device customization** (USB descriptors, power config). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial).
## Reference ## Reference

View File

@ -1,6 +1,6 @@
[project] [project]
name = "mcp210x-uart" name = "mcp210x"
version = "0.2.0" version = "0.1.0"
description = "MCP server for CP210x USB-UART bridge customization" description = "MCP server for CP210x USB-UART bridge customization"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
@ -31,18 +31,18 @@ dev = [
] ]
[project.scripts] [project.scripts]
mcp210x-uart = "mcp210x_uart:main" mcp210x = "mcp210x:main"
[project.urls] [project.urls]
Homepage = "https://forge.supported.systems/MCP/mcp210x-uart" Homepage = "https://forge.supported.systems/MCP/mcp210x"
Repository = "https://forge.supported.systems/MCP/mcp210x-uart" Repository = "https://forge.supported.systems/MCP/mcp210x"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["src/mcp210x_uart"] packages = ["src/mcp210x"]
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100

5
src/mcp210x/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""mcp210x - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
from .server import mcp, main
__all__ = ["mcp", "main"]

441
src/mcp210x/bindings.py Normal file
View File

@ -0,0 +1,441 @@
"""Low-level ctypes bindings for libcp210xmanufacturing."""
import ctypes
from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_void_p, POINTER, byref
from pathlib import Path
from typing import Optional
# Type aliases matching the C library
CP210x_STATUS = c_int
HANDLE = c_void_p
DWORD = c_uint
WORD = c_ushort
BYTE = c_ubyte
BOOL = c_int
# Return codes
CP210x_SUCCESS = 0x00
CP210x_DEVICE_NOT_FOUND = 0xFF
CP210x_INVALID_HANDLE = 0x01
CP210x_INVALID_PARAMETER = 0x02
CP210x_DEVICE_IO_FAILED = 0x03
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
CP210x_GLOBAL_DATA_ERROR = 0x05
CP210x_FILE_ERROR = 0x06
CP210x_COMMAND_FAILED = 0x08
CP210x_INVALID_ACCESS_TYPE = 0x09
# Part numbers
PART_NUMBERS = {
0x01: "CP2101",
0x02: "CP2102",
0x03: "CP2103",
0x04: "CP2104",
0x05: "CP2105",
0x08: "CP2108",
0x09: "CP2109",
0x20: "CP2102N-QFN28",
0x21: "CP2102N-QFN24",
0x22: "CP2102N-QFN20",
}
# Buffer sizes
MAX_DEVICE_STRLEN = 256
MAX_MANUFACTURER_STRLEN = 45
MAX_PRODUCT_STRLEN = 126
MAX_SERIAL_STRLEN = 63
# GetProductString flags
RETURN_SERIAL_NUMBER = 0x00
RETURN_DESCRIPTION = 0x01
RETURN_FULL_PATH = 0x02
class CP210xError(Exception):
"""Exception raised for CP210x library errors."""
ERROR_MESSAGES = {
CP210x_DEVICE_NOT_FOUND: "Device not found",
CP210x_INVALID_HANDLE: "Invalid handle",
CP210x_INVALID_PARAMETER: "Invalid parameter",
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
CP210x_GLOBAL_DATA_ERROR: "Global data error",
CP210x_FILE_ERROR: "File error",
CP210x_COMMAND_FAILED: "Command failed",
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
}
def __init__(self, status: int, context: str = ""):
self.status = status
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
if context:
msg = f"{context}: {msg}"
super().__init__(msg)
class CP210xLibrary:
"""Wrapper for the CP210x manufacturing library."""
def __init__(self, lib_path: Optional[str] = None):
"""Load the CP210x manufacturing library.
Args:
lib_path: Path to libcp210xmanufacturing.so, or None to search default paths.
"""
if lib_path:
self._lib = ctypes.CDLL(lib_path)
else:
# Search common locations
search_paths = [
"/usr/lib/libcp210xmanufacturing.so",
"/usr/local/lib/libcp210xmanufacturing.so",
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
]
for path in search_paths:
if Path(path).exists():
self._lib = ctypes.CDLL(path)
break
else:
raise FileNotFoundError(
"libcp210xmanufacturing.so not found. Install with: "
"cd aur/cp210xmanufacturing && makepkg -si"
)
self._setup_functions()
def _setup_functions(self):
"""Set up function prototypes."""
# CP210x_GetNumDevices
self._lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
self._lib.CP210x_GetNumDevices.restype = CP210x_STATUS
# CP210x_GetProductString
self._lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
self._lib.CP210x_GetProductString.restype = CP210x_STATUS
# CP210x_Open
self._lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
self._lib.CP210x_Open.restype = CP210x_STATUS
# CP210x_Close
self._lib.CP210x_Close.argtypes = [HANDLE]
self._lib.CP210x_Close.restype = CP210x_STATUS
# CP210x_GetPartNumber
self._lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetPartNumber.restype = CP210x_STATUS
# CP210x_GetDeviceVid
self._lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
# CP210x_GetDevicePid
self._lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDevicePid.restype = CP210x_STATUS
# CP210x_GetDeviceManufacturerString
self._lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
# CP210x_GetDeviceProductString
self._lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
# CP210x_GetDeviceSerialNumber
self._lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
# CP210x_SetManufacturerString
self._lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
# CP210x_SetProductString
self._lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetProductString.restype = CP210x_STATUS
# CP210x_SetSerialNumber
self._lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
# CP210x_GetMaxPower
self._lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetMaxPower.restype = CP210x_STATUS
# CP210x_SetMaxPower
self._lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
self._lib.CP210x_SetMaxPower.restype = CP210x_STATUS
# CP210x_GetSelfPower
self._lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
self._lib.CP210x_GetSelfPower.restype = CP210x_STATUS
# CP210x_SetSelfPower
self._lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
self._lib.CP210x_SetSelfPower.restype = CP210x_STATUS
# CP210x_GetDeviceVersion
self._lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
# CP210x_SetDeviceVersion
self._lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
self._lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
# CP210x_GetLockValue
self._lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetLockValue.restype = CP210x_STATUS
# CP210x_SetLockValue
self._lib.CP210x_SetLockValue.argtypes = [HANDLE]
self._lib.CP210x_SetLockValue.restype = CP210x_STATUS
# CP210x_Reset
self._lib.CP210x_Reset.argtypes = [HANDLE]
self._lib.CP210x_Reset.restype = CP210x_STATUS
def _check_status(self, status: int, context: str = ""):
"""Raise exception if status indicates error."""
if status != CP210x_SUCCESS:
raise CP210xError(status, context)
def get_num_devices(self) -> int:
"""Get the number of connected CP210x devices."""
num = DWORD()
status = self._lib.CP210x_GetNumDevices(byref(num))
self._check_status(status, "GetNumDevices")
return num.value
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
"""Get device string without opening the device.
Args:
device_index: Zero-based device index
flag: What to return - RETURN_SERIAL_NUMBER, RETURN_DESCRIPTION, or RETURN_FULL_PATH
"""
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
self._check_status(status, "GetProductString")
return buf.value.decode('utf-8', errors='replace')
def open(self, device_index: int) -> HANDLE:
"""Open a device by index."""
handle = HANDLE()
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
self._check_status(status, f"Open device {device_index}")
return handle
def close(self, handle: HANDLE):
"""Close an open device."""
status = self._lib.CP210x_Close(handle)
self._check_status(status, "Close")
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
"""Get the part number of an open device.
Returns:
Tuple of (part_number_code, part_name)
"""
part = BYTE()
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
self._check_status(status, "GetPartNumber")
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
def get_device_vid(self, handle: HANDLE) -> int:
"""Get the USB Vendor ID."""
vid = WORD()
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
self._check_status(status, "GetDeviceVid")
return vid.value
def get_device_pid(self, handle: HANDLE) -> int:
"""Get the USB Product ID."""
pid = WORD()
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
self._check_status(status, "GetDevicePid")
return pid.value
def get_manufacturer_string(self, handle: HANDLE) -> str:
"""Get the manufacturer string."""
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceManufacturerString")
return buf.value.decode('utf-8', errors='replace')
def get_product_string_device(self, handle: HANDLE) -> str:
"""Get the product string from an open device."""
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceProductString")
return buf.value.decode('utf-8', errors='replace')
def get_serial_number(self, handle: HANDLE) -> str:
"""Get the serial number."""
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceSerialNumber")
return buf.value.decode('utf-8', errors='replace')
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
"""Set the manufacturer string (max 45 chars)."""
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetManufacturerString")
def set_product_string(self, handle: HANDLE, product: str):
"""Set the product string (max 126 chars)."""
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetProductString")
def set_serial_number(self, handle: HANDLE, serial: str):
"""Set the serial number (max 63 chars)."""
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetSerialNumber")
def get_max_power(self, handle: HANDLE) -> int:
"""Get max power in units of 2mA (multiply by 2 for mA)."""
power = BYTE()
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
self._check_status(status, "GetMaxPower")
return power.value
def set_max_power(self, handle: HANDLE, power_2ma: int):
"""Set max power in units of 2mA (e.g., 50 = 100mA)."""
if power_2ma > 250:
raise ValueError("Max power cannot exceed 250 (500mA)")
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
self._check_status(status, "SetMaxPower")
def get_self_power(self, handle: HANDLE) -> bool:
"""Check if device is self-powered."""
self_power = BOOL()
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
self._check_status(status, "GetSelfPower")
return bool(self_power.value)
def set_self_power(self, handle: HANDLE, self_powered: bool):
"""Set whether device is self-powered."""
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
self._check_status(status, "SetSelfPower")
def get_device_version(self, handle: HANDLE) -> int:
"""Get device version (bcdDevice)."""
version = WORD()
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
self._check_status(status, "GetDeviceVersion")
return version.value
def set_device_version(self, handle: HANDLE, version: int):
"""Set device version (bcdDevice)."""
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
self._check_status(status, "SetDeviceVersion")
def get_lock_value(self, handle: HANDLE) -> int:
"""Get lock value (0 = unlocked, 1-255 = locked)."""
lock = BYTE()
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
self._check_status(status, "GetLockValue")
return lock.value
def lock_device(self, handle: HANDLE):
"""Lock the device (PERMANENT - cannot be undone!)."""
status = self._lib.CP210x_SetLockValue(handle)
self._check_status(status, "SetLockValue")
def reset(self, handle: HANDLE):
"""Reset the device (USB disconnect/reconnect)."""
status = self._lib.CP210x_Reset(handle)
self._check_status(status, "Reset")
# Convenience context manager for device access
class CP210xDevice:
"""Context manager for safe device access."""
def __init__(self, lib: CP210xLibrary, device_index: int):
self.lib = lib
self.device_index = device_index
self.handle = None
def __enter__(self):
self.handle = self.lib.open(self.device_index)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handle:
self.lib.close(self.handle)
self.handle = None
return False
@property
def part_number(self) -> tuple[int, str]:
return self.lib.get_part_number(self.handle)
@property
def vid(self) -> int:
return self.lib.get_device_vid(self.handle)
@property
def pid(self) -> int:
return self.lib.get_device_pid(self.handle)
@property
def manufacturer(self) -> str:
return self.lib.get_manufacturer_string(self.handle)
@manufacturer.setter
def manufacturer(self, value: str):
self.lib.set_manufacturer_string(self.handle, value)
@property
def product(self) -> str:
return self.lib.get_product_string_device(self.handle)
@product.setter
def product(self, value: str):
self.lib.set_product_string(self.handle, value)
@property
def serial_number(self) -> str:
return self.lib.get_serial_number(self.handle)
@serial_number.setter
def serial_number(self, value: str):
self.lib.set_serial_number(self.handle, value)
@property
def max_power_ma(self) -> int:
"""Max power in milliamps."""
return self.lib.get_max_power(self.handle) * 2
@max_power_ma.setter
def max_power_ma(self, value: int):
"""Set max power in milliamps (will be rounded down to nearest 2mA)."""
self.lib.set_max_power(self.handle, value // 2)
@property
def self_powered(self) -> bool:
return self.lib.get_self_power(self.handle)
@self_powered.setter
def self_powered(self, value: bool):
self.lib.set_self_power(self.handle, value)
@property
def device_version(self) -> str:
"""Device version as BCD string (e.g., '1.00')."""
v = self.lib.get_device_version(self.handle)
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
@property
def is_locked(self) -> bool:
return self.lib.get_lock_value(self.handle) != 0
def reset(self):
"""Reset the device."""
self.lib.reset(self.handle)

483
src/mcp210x/server.py Normal file
View File

@ -0,0 +1,483 @@
"""FastMCP server for CP210x device customization."""
import subprocess
from typing import Optional
from fastmcp import FastMCP, Context
from fastmcp.server.elicitation import AcceptedElicitation
from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS
mcp = FastMCP(
"cp210x",
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
)
# Lazy-loaded library instance
_lib: Optional[CP210xLibrary] = None
def get_lib() -> CP210xLibrary:
"""Get or create the library instance."""
global _lib
if _lib is None:
_lib = CP210xLibrary()
return _lib
async def confirm_write(ctx: Context, message: str) -> bool:
"""Ask user to confirm a write operation via elicitation.
Falls back to proceeding without confirmation if the client
doesn't support elicitation.
"""
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
# Client doesn't support elicitation — proceed as usual
return True
@mcp.tool()
def list_devices() -> list[dict]:
"""List all connected CP210x devices.
Returns a list of devices with their index, description, and serial number.
Use the index to reference a specific device in other tools.
"""
lib = get_lib()
num_devices = lib.get_num_devices()
devices = []
for i in range(num_devices):
try:
desc = lib.get_product_string(i, flag=1) # RETURN_DESCRIPTION
serial = lib.get_product_string(i, flag=0) # RETURN_SERIAL_NUMBER
devices.append({
"index": i,
"description": desc,
"serial_number": serial,
})
except CP210xError as e:
devices.append({
"index": i,
"error": str(e),
})
return devices
@mcp.tool()
def get_device_info(device_index: int = 0) -> dict:
"""Get detailed information about a CP210x device.
Args:
device_index: Zero-based index of the device (default: 0 for first device)
Returns full device details including part number, VID/PID, strings, and power settings.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
return {
"part_number": part_name,
"part_code": f"0x{part_code:02X}",
"vid": f"0x{dev.vid:04X}",
"pid": f"0x{dev.pid:04X}",
"manufacturer": dev.manufacturer,
"product": dev.product,
"serial_number": dev.serial_number,
"device_version": dev.device_version,
"max_power_ma": dev.max_power_ma,
"self_powered": dev.self_powered,
"is_locked": dev.is_locked,
}
@mcp.tool()
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB product string (device name shown to host).
Args:
product: New product string (max 126 characters)
device_index: Zero-based device index (default: 0)
Returns the updated device info. Device may need to be re-plugged for
changes to appear on the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_product = dev.product
new_value = product[:126]
if ctx and not await confirm_write(
ctx,
f"Write product string to CP210x OTP EPROM?\n\n"
f" Old: {old_product}\n"
f" New: {new_value}\n\n"
f"OTP writes are limited — this cannot be undone easily.",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.product = product
return {
"success": True,
"old_value": old_product,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB manufacturer string.
Args:
manufacturer: New manufacturer string (max 45 characters)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_mfr = dev.manufacturer
new_value = manufacturer[:45]
if ctx and not await confirm_write(
ctx,
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
f" Old: {old_mfr}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.manufacturer = manufacturer
return {
"success": True,
"old_value": old_mfr,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB serial number.
Args:
serial_number: New serial number (max 63 characters)
device_index: Zero-based device index (default: 0)
Note: Changing serial number may affect udev rules that match on serial.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_serial = dev.serial_number
new_value = serial_number[:63]
if ctx and not await confirm_write(
ctx,
f"Write serial number to CP210x OTP EPROM?\n\n"
f" Old: {old_serial}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.serial_number = serial_number
return {
"success": True,
"old_value": old_serial,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the maximum USB power draw in milliamps.
Args:
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if max_power_ma < 0 or max_power_ma > 500:
return {"error": "max_power_ma must be 0-500"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_power = dev.max_power_ma
actual_value = (max_power_ma // 2) * 2
if ctx and not await confirm_write(
ctx,
f"Write max power to CP210x OTP EPROM?\n\n"
f" Old: {old_power} mA\n"
f" New: {actual_value} mA",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.max_power_ma = max_power_ma
return {
"success": True,
"old_value_ma": old_power,
"new_value_ma": actual_value,
}
@mcp.tool()
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
"""Set whether device reports as self-powered or bus-powered.
Args:
self_powered: True for self-powered, False for bus-powered
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_value = dev.self_powered
mode = "self-powered" if self_powered else "bus-powered"
if ctx and not await confirm_write(
ctx,
f"Write power mode to CP210x OTP EPROM?\n\n"
f" Old: {'self-powered' if old_value else 'bus-powered'}\n"
f" New: {mode}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.self_powered = self_powered
return {
"success": True,
"old_value": old_value,
"new_value": self_powered,
}
@mcp.tool()
def reset_device(device_index: int = 0) -> dict:
"""Reset the CP210x device (USB disconnect/reconnect).
Args:
device_index: Zero-based device index (default: 0)
This triggers a USB re-enumeration, which will make any programmed
changes visible to the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
dev.reset()
return {
"success": True,
"note": "Device has been reset. It may take a moment to re-enumerate.",
}
@mcp.tool()
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
"""PERMANENTLY lock the device to prevent further customization.
Args:
device_index: Zero-based device index (default: 0)
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
configuration cannot be changed. The device will still function normally,
but strings, power settings, etc. cannot be modified.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is already locked"}
part_code, part_name = dev.part_number
info = (
f"PERMANENTLY lock this {part_name}?\n\n"
f" Product: {dev.product}\n"
f" Serial: {dev.serial_number}\n\n"
f"THIS CANNOT BE UNDONE. The device configuration\n"
f"will be frozen forever."
)
if not ctx:
return {
"error": "Lock requires interactive confirmation",
"message": "This operation is too dangerous without user confirmation.",
}
# Strict confirmation — do NOT fall back to True on failure.
# Unlike regular writes, an irreversible lock must get explicit consent.
try:
result = await ctx.elicit(info, ["Confirm", "Cancel"])
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return {
"error": "Lock requires elicitation support",
"message": "Your MCP client does not support elicitation. "
"Lock is too dangerous to proceed without explicit confirmation.",
}
if not confirmed:
return {"cancelled": True, "message": "Lock cancelled by user"}
lib.lock_device(dev.handle)
return {
"success": True,
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed.",
}
@mcp.tool()
async def setup_udev_rule(
device_index: int = 0,
symlink_name: Optional[str] = None,
ctx: Context = None,
) -> dict:
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
Generates a rule that matches the device's product string and creates
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
and port reordering.
Args:
device_index: Zero-based device index (default: 0)
symlink_name: Custom symlink name. If not provided, auto-generates
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
Requires sudo for rule installation.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
product = dev.product
vid = dev.vid
pid = dev.pid
if not product or product == "CP2102 USB to UART Bridge Controller":
return {
"error": "Device has default product string",
"message": "Customize the product string first to create a unique match rule.",
}
# Auto-generate symlink name from product string
if not symlink_name:
# "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27"
parts = product.split()
if len(parts) >= 2:
prefix = parts[0].lower()
suffix = parts[-1][-4:] # Last 4 chars of EUI/address
symlink_name = f"{prefix}-{suffix}"
else:
symlink_name = product.lower().replace(" ", "-")[:32]
# Build the udev rule
# Use glob pattern on product string to match the unique suffix
match_suffix = product[-4:]
rule = (
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
f'ATTRS{{idProduct}}=="{pid:04x}", '
f'ATTRS{{product}}=="*{match_suffix}", '
f'SYMLINK+="{symlink_name}"'
)
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
rule_content = (
f"# Auto-generated by mcp210x for: {product}\n"
f"# Creates /dev/{symlink_name} symlink\n"
f"{rule}\n"
)
# Elicit confirmation before installing
install_msg = (
f"Install udev rule for stable device symlink?\n\n"
f" Device: {product}\n"
f" Symlink: /dev/{symlink_name}\n"
f" File: {rules_path}\n\n"
f"Requires sudo to install."
)
if ctx and not await confirm_write(ctx, install_msg):
return {
"cancelled": True,
"rule": rule_content,
"message": "Rule not installed. You can install it manually.",
}
# Install the rule
try:
result = subprocess.run(
["sudo", "tee", rules_path],
input=rule_content,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return {
"error": f"Failed to install rule: {result.stderr.strip()}",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
# Reload udev rules
subprocess.run(
["sudo", "udevadm", "control", "--reload-rules"],
capture_output=True,
timeout=10,
)
subprocess.run(
["sudo", "udevadm", "trigger"],
capture_output=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return {
"error": "Sudo timed out — may need interactive password",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
return {
"success": True,
"symlink": f"/dev/{symlink_name}",
"rules_file": rules_path,
"rule": rule_content,
"note": "Symlink will appear after device re-plug or udevadm trigger",
}
def main():
"""Entry point for the MCP server."""
mcp.run()
if __name__ == "__main__":
main()

View File

@ -1,5 +0,0 @@
"""mcp210x-uart - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
from .server import main, mcp
__all__ = ["mcp", "main"]

View File

@ -1,937 +0,0 @@
"""Low-level ctypes bindings for libcp210xmanufacturing."""
import ctypes
from ctypes import POINTER, Structure, byref, c_char_p, c_int, c_ubyte, c_uint, c_ushort, c_void_p
from pathlib import Path
# Type aliases matching the C library
CP210x_STATUS = c_int
HANDLE = c_void_p
DWORD = c_uint
WORD = c_ushort
BYTE = c_ubyte
BOOL = c_int
# Return codes
CP210x_SUCCESS = 0x00
CP210x_DEVICE_NOT_FOUND = 0xFF
CP210x_INVALID_HANDLE = 0x01
CP210x_INVALID_PARAMETER = 0x02
CP210x_DEVICE_IO_FAILED = 0x03
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
CP210x_GLOBAL_DATA_ERROR = 0x05
CP210x_FILE_ERROR = 0x06
CP210x_COMMAND_FAILED = 0x08
CP210x_INVALID_ACCESS_TYPE = 0x09
# Part numbers
PART_NUMBERS = {
0x01: "CP2101",
0x02: "CP2102",
0x03: "CP2103",
0x04: "CP2104",
0x05: "CP2105",
0x08: "CP2108",
0x09: "CP2109",
0x20: "CP2102N-QFN28",
0x21: "CP2102N-QFN24",
0x22: "CP2102N-QFN20",
}
# Part number groups for feature gating
PARTS_CP2102N = {0x20, 0x21, 0x22}
PARTS_WITH_FLUSH_CONFIG = {0x04, 0x05, 0x08} # CP2104, CP2105, CP2108
PARTS_WITH_DEVICE_MODE = {0x05} # CP2105 only
PARTS_WITH_INTERFACE_STRING = {0x05, 0x08} # CP2105, CP2108
PARTS_WITH_PORT_CONFIG = {0x03, 0x04} # CP2103, CP2104
PARTS_WITH_DUAL_PORT_CONFIG = {0x05} # CP2105
PARTS_WITH_QUAD_PORT_CONFIG = {0x08} # CP2108
# Buffer sizes
MAX_DEVICE_STRLEN = 256
MAX_MANUFACTURER_STRLEN = 45
MAX_PRODUCT_STRLEN = 126
MAX_SERIAL_STRLEN = 63
CP2105_MAX_INTERFACE_STRLEN = 32
CP2108_MAX_INTERFACE_STRLEN = 126
# GetProductString flags
RETURN_SERIAL_NUMBER = 0x00
RETURN_DESCRIPTION = 0x01
RETURN_FULL_PATH = 0x02
# Flush buffer config flags (CP2104)
FC_OPEN_TX = 0x01
FC_OPEN_RX = 0x02
FC_CLOSE_TX = 0x04
FC_CLOSE_RX = 0x08
# CP2105 standard port
FC_OPEN_TX_SCI = FC_OPEN_TX
FC_OPEN_RX_SCI = FC_OPEN_RX
FC_CLOSE_TX_SCI = FC_CLOSE_TX
FC_CLOSE_RX_SCI = FC_CLOSE_RX
# CP2105 enhanced port
FC_OPEN_TX_ECI = 0x10
FC_OPEN_RX_ECI = 0x20
FC_CLOSE_TX_ECI = 0x40
FC_CLOSE_RX_ECI = 0x80
# CP2108 per-interface
FC_OPEN_TX_IFC0 = 0x0001
FC_OPEN_RX_IFC0 = 0x0002
FC_CLOSE_TX_IFC0 = 0x0004
FC_CLOSE_RX_IFC0 = 0x0008
FC_OPEN_TX_IFC1 = 0x0010
FC_OPEN_RX_IFC1 = 0x0020
FC_CLOSE_TX_IFC1 = 0x0040
FC_CLOSE_RX_IFC1 = 0x0080
FC_OPEN_TX_IFC2 = 0x0100
FC_OPEN_RX_IFC2 = 0x0200
FC_CLOSE_TX_IFC2 = 0x0400
FC_CLOSE_RX_IFC2 = 0x0800
FC_OPEN_TX_IFC3 = 0x1000
FC_OPEN_RX_IFC3 = 0x2000
FC_CLOSE_TX_IFC3 = 0x4000
FC_CLOSE_RX_IFC3 = 0x8000
# Baud rate config
NUM_BAUD_CONFIGS = 32
BAUD_CONFIG_SIZE = 10
# Port config pin flags (CP2103/CP2104)
PORT_RI_ON = 0x0001
PORT_DCD_ON = 0x0002
PORT_DTR_ON = 0x0004
PORT_DSR_ON = 0x0008
PORT_TXD_ON = 0x0010
PORT_RXD_ON = 0x0020
PORT_RTS_ON = 0x0040
PORT_CTS_ON = 0x0080
PORT_GPIO_0_ON = 0x0100
PORT_GPIO_1_ON = 0x0200
PORT_GPIO_2_ON = 0x0400
PORT_GPIO_3_ON = 0x0800
PORT_SUSPEND_ON = 0x4000
PORT_SUSPEND_BAR_ON = 0x8000
# Enhanced function flags (CP2103/CP2104)
EF_GPIO_0_TXLED = 0x01
EF_GPIO_1_RXLED = 0x02
EF_GPIO_2_RS485 = 0x04
EF_RS485_INVERT = 0x08
EF_WEAKPULLUP = 0x10
EF_RESERVED_1 = 0x20
EF_SERIAL_DYNAMIC_SUSPEND = 0x40
EF_GPIO_DYNAMIC_SUSPEND = 0x80
# Flush buffer flag names for human-readable output
FLUSH_FLAGS_CP2104 = {
FC_OPEN_TX: "open_tx",
FC_OPEN_RX: "open_rx",
FC_CLOSE_TX: "close_tx",
FC_CLOSE_RX: "close_rx",
}
FLUSH_FLAGS_CP2105 = {
FC_OPEN_TX_SCI: "open_tx_sci",
FC_OPEN_RX_SCI: "open_rx_sci",
FC_CLOSE_TX_SCI: "close_tx_sci",
FC_CLOSE_RX_SCI: "close_rx_sci",
FC_OPEN_TX_ECI: "open_tx_eci",
FC_OPEN_RX_ECI: "open_rx_eci",
FC_CLOSE_TX_ECI: "close_tx_eci",
FC_CLOSE_RX_ECI: "close_rx_eci",
}
ENHANCED_FXN_FLAGS = {
EF_GPIO_0_TXLED: "gpio0_txled",
EF_GPIO_1_RXLED: "gpio1_rxled",
EF_GPIO_2_RS485: "gpio2_rs485",
EF_RS485_INVERT: "rs485_invert",
EF_WEAKPULLUP: "weak_pullup",
EF_SERIAL_DYNAMIC_SUSPEND: "serial_dynamic_suspend",
EF_GPIO_DYNAMIC_SUSPEND: "gpio_dynamic_suspend",
}
PORT_PIN_FLAGS = {
PORT_RI_ON: "RI",
PORT_DCD_ON: "DCD",
PORT_DTR_ON: "DTR",
PORT_DSR_ON: "DSR",
PORT_TXD_ON: "TXD",
PORT_RXD_ON: "RXD",
PORT_RTS_ON: "RTS",
PORT_CTS_ON: "CTS",
PORT_GPIO_0_ON: "GPIO0",
PORT_GPIO_1_ON: "GPIO1",
PORT_GPIO_2_ON: "GPIO2",
PORT_GPIO_3_ON: "GPIO3",
PORT_SUSPEND_ON: "SUSPEND",
PORT_SUSPEND_BAR_ON: "SUSPEND_BAR",
}
def decode_bitmask(value: int, flag_map: dict) -> dict[str, bool]:
"""Decode a bitmask into a dict of named flags."""
return {name: bool(value & bit) for bit, name in flag_map.items()}
def encode_bitmask(flags: dict[str, bool], flag_map: dict) -> int:
"""Encode a dict of named flags back into a bitmask."""
name_to_bit = {name: bit for bit, name in flag_map.items()}
value = 0
for name, enabled in flags.items():
if name in name_to_bit and enabled:
value |= name_to_bit[name]
return value
# --- ctypes Structures ---
class BAUD_CONFIG(Structure):
_pack_ = 1
_fields_ = [
("BaudGen", WORD),
("Timer0Reload", WORD),
("Prescaler", BYTE),
("_pad", BYTE),
("BaudRate", DWORD),
]
assert ctypes.sizeof(BAUD_CONFIG) == BAUD_CONFIG_SIZE
BAUD_CONFIG_DATA = BAUD_CONFIG * NUM_BAUD_CONFIGS
class PORT_CONFIG(Structure):
_fields_ = [
("Mode", WORD),
("Reset_Latch", WORD),
("Suspend_Latch", WORD),
("EnhancedFxn", BYTE),
]
class DUAL_PORT_CONFIG(Structure):
_fields_ = [
("Mode", WORD),
("Reset_Latch", WORD),
("Suspend_Latch", WORD),
("EnhancedFxn_ECI", BYTE),
("EnhancedFxn_SCI", BYTE),
("EnhancedFxn_Device", BYTE),
]
class QUAD_PORT_STATE(Structure):
_fields_ = [
("Mode_PB0", WORD),
("Mode_PB1", WORD),
("Mode_PB2", WORD),
("Mode_PB3", WORD),
("Mode_PB4", WORD),
("LowPower_PB0", WORD),
("LowPower_PB1", WORD),
("LowPower_PB2", WORD),
("LowPower_PB3", WORD),
("LowPower_PB4", WORD),
("Latch_PB0", WORD),
("Latch_PB1", WORD),
("Latch_PB2", WORD),
("Latch_PB3", WORD),
("Latch_PB4", WORD),
]
class QUAD_PORT_CONFIG(Structure):
_fields_ = [
("Reset_Latch", QUAD_PORT_STATE),
("Suspend_Latch", QUAD_PORT_STATE),
("IPDelay_IFC0", BYTE),
("IPDelay_IFC1", BYTE),
("IPDelay_IFC2", BYTE),
("IPDelay_IFC3", BYTE),
("EnhancedFxn_IFC0", BYTE),
("EnhancedFxn_IFC1", BYTE),
("EnhancedFxn_IFC2", BYTE),
("EnhancedFxn_IFC3", BYTE),
("EnhancedFxn_Device", BYTE),
("ExtClk0Freq", BYTE),
("ExtClk1Freq", BYTE),
("ExtClk2Freq", BYTE),
("ExtClk3Freq", BYTE),
]
class FIRMWARE_T(Structure):
_fields_ = [
("major", BYTE),
("minor", BYTE),
("build", BYTE),
]
class CP210xError(Exception):
"""Exception raised for CP210x library errors."""
ERROR_MESSAGES = {
CP210x_DEVICE_NOT_FOUND: "Device not found",
CP210x_INVALID_HANDLE: "Invalid handle",
CP210x_INVALID_PARAMETER: "Invalid parameter",
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
CP210x_GLOBAL_DATA_ERROR: "Global data error",
CP210x_FILE_ERROR: "File error",
CP210x_COMMAND_FAILED: "Command failed",
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
}
def __init__(self, status: int, context: str = ""):
self.status = status
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
if context:
msg = f"{context}: {msg}"
super().__init__(msg)
class CP210xLibrary:
"""Wrapper for the CP210x manufacturing library."""
def __init__(self, lib_path: str | None = None):
if lib_path:
self._lib = ctypes.CDLL(lib_path)
else:
search_paths = [
"/usr/lib/libcp210xmanufacturing.so",
"/usr/local/lib/libcp210xmanufacturing.so",
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
]
for path in search_paths:
if Path(path).exists():
self._lib = ctypes.CDLL(path)
break
else:
raise FileNotFoundError(
"libcp210xmanufacturing.so not found. Install with: "
"cd aur/cp210xmanufacturing && makepkg -si"
)
self._setup_functions()
def _setup_functions(self):
"""Set up function prototypes for all library functions."""
lib = self._lib
# --- Device enumeration ---
lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
lib.CP210x_GetNumDevices.restype = CP210x_STATUS
lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
lib.CP210x_GetProductString.restype = CP210x_STATUS
lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
lib.CP210x_Open.restype = CP210x_STATUS
lib.CP210x_Close.argtypes = [HANDLE]
lib.CP210x_Close.restype = CP210x_STATUS
# --- Getters (scalars) ---
lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetPartNumber.restype = CP210x_STATUS
lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDevicePid.restype = CP210x_STATUS
lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
lib.CP210x_GetDeviceInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceInterfaceString.restype = CP210x_STATUS
lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetMaxPower.restype = CP210x_STATUS
lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
lib.CP210x_GetSelfPower.restype = CP210x_STATUS
lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
lib.CP210x_GetFlushBufferConfig.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetFlushBufferConfig.restype = CP210x_STATUS
lib.CP210x_GetDeviceMode.argtypes = [HANDLE, POINTER(BYTE), POINTER(BYTE)]
lib.CP210x_GetDeviceMode.restype = CP210x_STATUS
lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetLockValue.restype = CP210x_STATUS
lib.CP210x_GetFirmwareVersion.argtypes = [HANDLE, POINTER(FIRMWARE_T)]
lib.CP210x_GetFirmwareVersion.restype = CP210x_STATUS
# --- Setters (scalars) ---
lib.CP210x_SetVid.argtypes = [HANDLE, WORD]
lib.CP210x_SetVid.restype = CP210x_STATUS
lib.CP210x_SetPid.argtypes = [HANDLE, WORD]
lib.CP210x_SetPid.restype = CP210x_STATUS
lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetProductString.restype = CP210x_STATUS
lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
lib.CP210x_SetInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, BYTE, BOOL]
lib.CP210x_SetInterfaceString.restype = CP210x_STATUS
lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
lib.CP210x_SetMaxPower.restype = CP210x_STATUS
lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
lib.CP210x_SetSelfPower.restype = CP210x_STATUS
lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
lib.CP210x_SetFlushBufferConfig.argtypes = [HANDLE, WORD]
lib.CP210x_SetFlushBufferConfig.restype = CP210x_STATUS
lib.CP210x_SetDeviceMode.argtypes = [HANDLE, BYTE, BYTE]
lib.CP210x_SetDeviceMode.restype = CP210x_STATUS
lib.CP210x_SetLockValue.argtypes = [HANDLE]
lib.CP210x_SetLockValue.restype = CP210x_STATUS
# --- Struct-based configs ---
lib.CP210x_GetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
lib.CP210x_GetBaudRateConfig.restype = CP210x_STATUS
lib.CP210x_SetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
lib.CP210x_SetBaudRateConfig.restype = CP210x_STATUS
lib.CP210x_GetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
lib.CP210x_GetPortConfig.restype = CP210x_STATUS
lib.CP210x_SetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
lib.CP210x_SetPortConfig.restype = CP210x_STATUS
lib.CP210x_GetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
lib.CP210x_GetDualPortConfig.restype = CP210x_STATUS
lib.CP210x_SetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
lib.CP210x_SetDualPortConfig.restype = CP210x_STATUS
lib.CP210x_GetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
lib.CP210x_GetQuadPortConfig.restype = CP210x_STATUS
lib.CP210x_SetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
lib.CP210x_SetQuadPortConfig.restype = CP210x_STATUS
# --- Advanced / raw ---
lib.CP210x_GetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_GetConfig.restype = CP210x_STATUS
lib.CP210x_SetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_SetConfig.restype = CP210x_STATUS
lib.CP210x_GetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_GetGeneric.restype = CP210x_STATUS
lib.CP210x_SetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_SetGeneric.restype = CP210x_STATUS
lib.CP210x_CreateHexFile.argtypes = [HANDLE, c_char_p]
lib.CP210x_CreateHexFile.restype = CP210x_STATUS
lib.CP210x_UpdateFirmware.argtypes = [HANDLE]
lib.CP210x_UpdateFirmware.restype = CP210x_STATUS
lib.CP210x_Reset.argtypes = [HANDLE]
lib.CP210x_Reset.restype = CP210x_STATUS
def _check_status(self, status: int, context: str = ""):
if status != CP210x_SUCCESS:
raise CP210xError(status, context)
# --- Device enumeration ---
def get_num_devices(self) -> int:
num = DWORD()
status = self._lib.CP210x_GetNumDevices(byref(num))
self._check_status(status, "GetNumDevices")
return num.value
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
self._check_status(status, "GetProductString")
return buf.value.decode('utf-8', errors='replace')
def open(self, device_index: int) -> HANDLE:
handle = HANDLE()
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
self._check_status(status, f"Open device {device_index}")
return handle
def close(self, handle: HANDLE):
status = self._lib.CP210x_Close(handle)
self._check_status(status, "Close")
# --- Scalar getters ---
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
part = BYTE()
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
self._check_status(status, "GetPartNumber")
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
def get_device_vid(self, handle: HANDLE) -> int:
vid = WORD()
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
self._check_status(status, "GetDeviceVid")
return vid.value
def get_device_pid(self, handle: HANDLE) -> int:
pid = WORD()
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
self._check_status(status, "GetDevicePid")
return pid.value
def get_manufacturer_string(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceManufacturerString")
return buf.value.decode('utf-8', errors='replace')
def get_product_string_device(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceProductString")
return buf.value.decode('utf-8', errors='replace')
def get_serial_number(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceSerialNumber")
return buf.value.decode('utf-8', errors='replace')
def get_interface_string(self, handle: HANDLE, interface_number: int) -> str:
buf = ctypes.create_string_buffer(CP2108_MAX_INTERFACE_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceInterfaceString(
handle, BYTE(interface_number), buf, byref(length), BOOL(1)
)
self._check_status(status, f"GetDeviceInterfaceString(ifc={interface_number})")
return buf.value.decode('utf-8', errors='replace')
def get_max_power(self, handle: HANDLE) -> int:
power = BYTE()
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
self._check_status(status, "GetMaxPower")
return power.value
def get_self_power(self, handle: HANDLE) -> bool:
self_power = BOOL()
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
self._check_status(status, "GetSelfPower")
return bool(self_power.value)
def get_device_version(self, handle: HANDLE) -> int:
version = WORD()
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
self._check_status(status, "GetDeviceVersion")
return version.value
def get_flush_buffer_config(self, handle: HANDLE) -> int:
config = WORD()
status = self._lib.CP210x_GetFlushBufferConfig(handle, byref(config))
self._check_status(status, "GetFlushBufferConfig")
return config.value
def get_device_mode(self, handle: HANDLE) -> tuple[int, int]:
eci = BYTE()
sci = BYTE()
status = self._lib.CP210x_GetDeviceMode(handle, byref(eci), byref(sci))
self._check_status(status, "GetDeviceMode")
return eci.value, sci.value
def get_lock_value(self, handle: HANDLE) -> int:
lock = BYTE()
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
self._check_status(status, "GetLockValue")
return lock.value
def get_firmware_version(self, handle: HANDLE) -> tuple[int, int, int]:
fw = FIRMWARE_T()
status = self._lib.CP210x_GetFirmwareVersion(handle, byref(fw))
self._check_status(status, "GetFirmwareVersion")
return fw.major, fw.minor, fw.build
# --- Scalar setters ---
def set_vid(self, handle: HANDLE, vid: int):
status = self._lib.CP210x_SetVid(handle, WORD(vid))
self._check_status(status, "SetVid")
def set_pid(self, handle: HANDLE, pid: int):
status = self._lib.CP210x_SetPid(handle, WORD(pid))
self._check_status(status, "SetPid")
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetManufacturerString")
def set_product_string(self, handle: HANDLE, product: str):
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetProductString")
def set_serial_number(self, handle: HANDLE, serial: str):
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetSerialNumber")
def set_interface_string(self, handle: HANDLE, interface_number: int, value: str):
data = value.encode('utf-8')[:CP2108_MAX_INTERFACE_STRLEN]
status = self._lib.CP210x_SetInterfaceString(
handle, BYTE(interface_number), data, BYTE(len(data)), BOOL(1)
)
self._check_status(status, f"SetInterfaceString(ifc={interface_number})")
def set_max_power(self, handle: HANDLE, power_2ma: int):
if power_2ma > 250:
raise ValueError("Max power cannot exceed 250 (500mA)")
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
self._check_status(status, "SetMaxPower")
def set_self_power(self, handle: HANDLE, self_powered: bool):
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
self._check_status(status, "SetSelfPower")
def set_device_version(self, handle: HANDLE, version: int):
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
self._check_status(status, "SetDeviceVersion")
def set_flush_buffer_config(self, handle: HANDLE, config: int):
status = self._lib.CP210x_SetFlushBufferConfig(handle, WORD(config))
self._check_status(status, "SetFlushBufferConfig")
def set_device_mode(self, handle: HANDLE, eci_mode: int, sci_mode: int):
status = self._lib.CP210x_SetDeviceMode(handle, BYTE(eci_mode), BYTE(sci_mode))
self._check_status(status, "SetDeviceMode")
def lock_device(self, handle: HANDLE):
status = self._lib.CP210x_SetLockValue(handle)
self._check_status(status, "SetLockValue")
# --- Struct-based configs ---
def get_baud_rate_config(self, handle: HANDLE) -> list[dict]:
data = BAUD_CONFIG_DATA()
status = self._lib.CP210x_GetBaudRateConfig(handle, data)
self._check_status(status, "GetBaudRateConfig")
return [
{
"index": i,
"baud_gen": data[i].BaudGen,
"timer0_reload": data[i].Timer0Reload,
"prescaler": data[i].Prescaler,
"baud_rate": data[i].BaudRate,
}
for i in range(NUM_BAUD_CONFIGS)
]
def set_baud_rate_config(self, handle: HANDLE, configs: list[dict]):
if len(configs) != NUM_BAUD_CONFIGS:
raise ValueError(f"Expected {NUM_BAUD_CONFIGS} baud configs, got {len(configs)}")
data = BAUD_CONFIG_DATA()
for i, cfg in enumerate(configs):
data[i].BaudGen = cfg["baud_gen"]
data[i].Timer0Reload = cfg["timer0_reload"]
data[i].Prescaler = cfg["prescaler"]
data[i]._pad = 0
data[i].BaudRate = cfg["baud_rate"]
status = self._lib.CP210x_SetBaudRateConfig(handle, data)
self._check_status(status, "SetBaudRateConfig")
def get_port_config(self, handle: HANDLE) -> dict:
cfg = PORT_CONFIG()
status = self._lib.CP210x_GetPortConfig(handle, byref(cfg))
self._check_status(status, "GetPortConfig")
return {
"mode": cfg.Mode,
"reset_latch": cfg.Reset_Latch,
"suspend_latch": cfg.Suspend_Latch,
"enhanced_fxn": cfg.EnhancedFxn,
}
def set_port_config(self, handle: HANDLE, mode: int, reset_latch: int, suspend_latch: int, enhanced_fxn: int):
cfg = PORT_CONFIG()
cfg.Mode = mode
cfg.Reset_Latch = reset_latch
cfg.Suspend_Latch = suspend_latch
cfg.EnhancedFxn = enhanced_fxn
status = self._lib.CP210x_SetPortConfig(handle, byref(cfg))
self._check_status(status, "SetPortConfig")
def get_dual_port_config(self, handle: HANDLE) -> dict:
cfg = DUAL_PORT_CONFIG()
status = self._lib.CP210x_GetDualPortConfig(handle, byref(cfg))
self._check_status(status, "GetDualPortConfig")
return {
"mode": cfg.Mode,
"reset_latch": cfg.Reset_Latch,
"suspend_latch": cfg.Suspend_Latch,
"enhanced_fxn_eci": cfg.EnhancedFxn_ECI,
"enhanced_fxn_sci": cfg.EnhancedFxn_SCI,
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
}
def set_dual_port_config(self, handle: HANDLE, mode: int, reset_latch: int,
suspend_latch: int, enhanced_fxn_eci: int,
enhanced_fxn_sci: int, enhanced_fxn_device: int):
cfg = DUAL_PORT_CONFIG()
cfg.Mode = mode
cfg.Reset_Latch = reset_latch
cfg.Suspend_Latch = suspend_latch
cfg.EnhancedFxn_ECI = enhanced_fxn_eci
cfg.EnhancedFxn_SCI = enhanced_fxn_sci
cfg.EnhancedFxn_Device = enhanced_fxn_device
status = self._lib.CP210x_SetDualPortConfig(handle, byref(cfg))
self._check_status(status, "SetDualPortConfig")
def _quad_port_state_to_dict(self, qps: QUAD_PORT_STATE) -> dict:
return {
f"pb{i}": {"mode": getattr(qps, f"Mode_PB{i}"),
"low_power": getattr(qps, f"LowPower_PB{i}"),
"latch": getattr(qps, f"Latch_PB{i}")}
for i in range(5)
}
def get_quad_port_config(self, handle: HANDLE) -> dict:
cfg = QUAD_PORT_CONFIG()
status = self._lib.CP210x_GetQuadPortConfig(handle, byref(cfg))
self._check_status(status, "GetQuadPortConfig")
return {
"reset_latch": self._quad_port_state_to_dict(cfg.Reset_Latch),
"suspend_latch": self._quad_port_state_to_dict(cfg.Suspend_Latch),
"ip_delay": [cfg.IPDelay_IFC0, cfg.IPDelay_IFC1, cfg.IPDelay_IFC2, cfg.IPDelay_IFC3],
"enhanced_fxn": [cfg.EnhancedFxn_IFC0, cfg.EnhancedFxn_IFC1,
cfg.EnhancedFxn_IFC2, cfg.EnhancedFxn_IFC3],
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
"ext_clk_freq": [cfg.ExtClk0Freq, cfg.ExtClk1Freq,
cfg.ExtClk2Freq, cfg.ExtClk3Freq],
}
def set_quad_port_config(self, handle: HANDLE, config_dict: dict):
cfg = QUAD_PORT_CONFIG()
for state_name in ("reset_latch", "suspend_latch"):
state = getattr(cfg, state_name.title().replace("_l", "_L"))
src = config_dict[state_name]
for i in range(5):
pb = src[f"pb{i}"]
setattr(state, f"Mode_PB{i}", pb["mode"])
setattr(state, f"LowPower_PB{i}", pb["low_power"])
setattr(state, f"Latch_PB{i}", pb["latch"])
for i in range(4):
setattr(cfg, f"IPDelay_IFC{i}", config_dict["ip_delay"][i])
setattr(cfg, f"EnhancedFxn_IFC{i}", config_dict["enhanced_fxn"][i])
cfg.EnhancedFxn_Device = config_dict["enhanced_fxn_device"]
for i in range(4):
setattr(cfg, f"ExtClk{i}Freq", config_dict["ext_clk_freq"][i])
status = self._lib.CP210x_SetQuadPortConfig(handle, byref(cfg))
self._check_status(status, "SetQuadPortConfig")
# --- Advanced / raw ---
def get_config(self, handle: HANDLE, size: int = 512) -> bytes:
buf = (BYTE * size)()
status = self._lib.CP210x_GetConfig(handle, buf, WORD(size))
self._check_status(status, "GetConfig")
return bytes(buf)
def set_config(self, handle: HANDLE, data: bytes):
buf = (BYTE * len(data))(*data)
status = self._lib.CP210x_SetConfig(handle, buf, WORD(len(data)))
self._check_status(status, "SetConfig")
def get_generic(self, handle: HANDLE, size: int = 512) -> bytes:
buf = (BYTE * size)()
status = self._lib.CP210x_GetGeneric(handle, buf, WORD(size))
self._check_status(status, "GetGeneric")
return bytes(buf)
def set_generic(self, handle: HANDLE, data: bytes):
buf = (BYTE * len(data))(*data)
status = self._lib.CP210x_SetGeneric(handle, buf, WORD(len(data)))
self._check_status(status, "SetGeneric")
def create_hex_file(self, handle: HANDLE, filename: str):
status = self._lib.CP210x_CreateHexFile(handle, filename.encode('utf-8'))
self._check_status(status, "CreateHexFile")
def update_firmware(self, handle: HANDLE):
status = self._lib.CP210x_UpdateFirmware(handle)
self._check_status(status, "UpdateFirmware")
def reset(self, handle: HANDLE):
status = self._lib.CP210x_Reset(handle)
self._check_status(status, "Reset")
class CP210xDevice:
"""Context manager for safe device access."""
def __init__(self, lib: CP210xLibrary, device_index: int):
self.lib = lib
self.device_index = device_index
self.handle = None
self._part_code: int | None = None
def __enter__(self):
self.handle = self.lib.open(self.device_index)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handle:
self.lib.close(self.handle)
self.handle = None
return False
def _get_part_code(self) -> int:
if self._part_code is None:
self._part_code = self.lib.get_part_number(self.handle)[0]
return self._part_code
def _require_part(self, allowed: set[int], feature: str):
code = self._get_part_code()
if code not in allowed:
name = PART_NUMBERS.get(code, f"0x{code:02X}")
allowed_names = ", ".join(PART_NUMBERS.get(c, f"0x{c:02X}") for c in sorted(allowed))
raise CP210xError(
CP210x_FUNCTION_NOT_SUPPORTED,
f"{feature} not supported on {name} (requires {allowed_names})",
)
@property
def part_number(self) -> tuple[int, str]:
return self.lib.get_part_number(self.handle)
@property
def vid(self) -> int:
return self.lib.get_device_vid(self.handle)
@property
def pid(self) -> int:
return self.lib.get_device_pid(self.handle)
@property
def manufacturer(self) -> str:
return self.lib.get_manufacturer_string(self.handle)
@manufacturer.setter
def manufacturer(self, value: str):
self.lib.set_manufacturer_string(self.handle, value)
@property
def product(self) -> str:
return self.lib.get_product_string_device(self.handle)
@product.setter
def product(self, value: str):
self.lib.set_product_string(self.handle, value)
@property
def serial_number(self) -> str:
return self.lib.get_serial_number(self.handle)
@serial_number.setter
def serial_number(self, value: str):
self.lib.set_serial_number(self.handle, value)
@property
def max_power_ma(self) -> int:
return self.lib.get_max_power(self.handle) * 2
@max_power_ma.setter
def max_power_ma(self, value: int):
self.lib.set_max_power(self.handle, value // 2)
@property
def self_powered(self) -> bool:
return self.lib.get_self_power(self.handle)
@self_powered.setter
def self_powered(self, value: bool):
self.lib.set_self_power(self.handle, value)
@property
def device_version(self) -> str:
v = self.lib.get_device_version(self.handle)
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
@device_version.setter
def device_version(self, value: int):
self.lib.set_device_version(self.handle, value)
@property
def is_locked(self) -> bool:
return self.lib.get_lock_value(self.handle) != 0
@property
def firmware_version(self) -> str | None:
"""Firmware version string (CP2102N only). Returns None if not supported."""
try:
major, minor, build = self.lib.get_firmware_version(self.handle)
return f"{major}.{minor}.{build}"
except CP210xError:
return None
@property
def flush_buffer_config(self) -> int | None:
"""Raw flush buffer config word. Returns None if not supported."""
try:
return self.lib.get_flush_buffer_config(self.handle)
except CP210xError:
return None
@property
def device_mode(self) -> tuple[int, int] | None:
"""(ECI mode, SCI mode) for CP2105. Returns None if not supported."""
try:
return self.lib.get_device_mode(self.handle)
except CP210xError:
return None
def get_interface_string(self, interface_number: int) -> str:
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
return self.lib.get_interface_string(self.handle, interface_number)
def set_interface_string(self, interface_number: int, value: str):
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
self.lib.set_interface_string(self.handle, interface_number, value)
def reset(self):
self.lib.reset(self.handle)

View File

@ -1,957 +0,0 @@
"""FastMCP server for CP210x device customization."""
import subprocess
from fastmcp import Context, FastMCP
from fastmcp.server.elicitation import AcceptedElicitation
from .bindings import (
ENHANCED_FXN_FLAGS,
FLUSH_FLAGS_CP2104,
FLUSH_FLAGS_CP2105,
PARTS_CP2102N,
PARTS_WITH_DEVICE_MODE,
PARTS_WITH_DUAL_PORT_CONFIG,
PARTS_WITH_FLUSH_CONFIG,
PARTS_WITH_PORT_CONFIG,
PARTS_WITH_QUAD_PORT_CONFIG,
PORT_PIN_FLAGS,
CP210xDevice,
CP210xError,
CP210xLibrary,
decode_bitmask,
encode_bitmask,
)
mcp = FastMCP(
"cp210x",
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
)
_lib: CP210xLibrary | None = None
def get_lib() -> CP210xLibrary:
global _lib
if _lib is None:
_lib = CP210xLibrary()
return _lib
async def confirm_write(ctx: Context, message: str) -> bool:
"""Normal-tier confirmation: falls back to proceeding if elicitation unavailable."""
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return True
async def strict_confirm(ctx: Context, message: str) -> dict | None:
"""Strict-tier confirmation: returns error dict if elicitation unavailable."""
if not ctx:
return {
"error": "This operation requires interactive confirmation",
"message": "Too dangerous to proceed without explicit user consent.",
}
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return {
"error": "This operation requires elicitation support",
"message": "Your MCP client does not support elicitation. "
"This operation is too dangerous to proceed without explicit confirmation.",
}
if not confirmed:
return {"cancelled": True, "message": "Operation cancelled by user"}
return None
# ===========================================================================
# Read-only tools (no confirmation needed)
# ===========================================================================
@mcp.tool()
def list_devices() -> list[dict]:
"""List all connected CP210x devices.
Returns a list of devices with their index, description, and serial number.
Use the index to reference a specific device in other tools.
"""
lib = get_lib()
num_devices = lib.get_num_devices()
devices = []
for i in range(num_devices):
try:
desc = lib.get_product_string(i, flag=1)
serial = lib.get_product_string(i, flag=0)
devices.append({
"index": i,
"description": desc,
"serial_number": serial,
})
except CP210xError as e:
devices.append({"index": i, "error": str(e)})
return devices
@mcp.tool()
def get_device_info(device_index: int = 0) -> dict:
"""Get detailed information about a CP210x device.
Args:
device_index: Zero-based index of the device (default: 0 for first device)
Returns full device details including part number, VID/PID, strings, and power settings.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
info = {
"part_number": part_name,
"part_code": f"0x{part_code:02X}",
"vid": f"0x{dev.vid:04X}",
"pid": f"0x{dev.pid:04X}",
"manufacturer": dev.manufacturer,
"product": dev.product,
"serial_number": dev.serial_number,
"device_version": dev.device_version,
"max_power_ma": dev.max_power_ma,
"self_powered": dev.self_powered,
"is_locked": dev.is_locked,
}
# Conditionally add part-specific fields
fw = dev.firmware_version
if fw is not None:
info["firmware_version"] = fw
flush = dev.flush_buffer_config
if flush is not None:
# Decode to named flags based on part
if part_code in PARTS_WITH_FLUSH_CONFIG:
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
info["flush_buffer_config"] = decode_bitmask(flush, flag_map)
else:
info["flush_buffer_config_raw"] = f"0x{flush:04X}"
mode = dev.device_mode
if mode is not None:
info["device_mode"] = {"eci": mode[0], "sci": mode[1]}
return info
@mcp.tool()
def get_firmware_version(device_index: int = 0) -> dict:
"""Get firmware version (CP2102N only).
Args:
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
fw = dev.firmware_version
if fw is None:
part_code, part_name = dev.part_number
return {"error": f"Firmware version not available on {part_name}",
"note": "Only CP2102N variants support this"}
return {"firmware_version": fw}
@mcp.tool()
def get_flush_buffer_config(device_index: int = 0) -> dict:
"""Get flush buffer configuration (CP2104/CP2105/CP2108).
Args:
device_index: Zero-based device index (default: 0)
Returns named flags showing which buffers are flushed on open/close.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_FLUSH_CONFIG:
return {"error": f"Flush buffer config not available on {part_name}",
"supported": "CP2104, CP2105, CP2108"}
raw = lib.get_flush_buffer_config(dev.handle)
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
return {
"raw": f"0x{raw:04X}",
"flags": decode_bitmask(raw, flag_map),
}
@mcp.tool()
def get_device_mode(device_index: int = 0) -> dict:
"""Get device mode (CP2105 only).
Args:
device_index: Zero-based device index (default: 0)
Returns ECI and SCI interface modes.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_DEVICE_MODE:
return {"error": f"Device mode not available on {part_name}",
"supported": "CP2105"}
eci, sci = lib.get_device_mode(dev.handle)
return {"eci_mode": eci, "sci_mode": sci}
@mcp.tool()
def get_interface_string(interface_number: int, device_index: int = 0) -> dict:
"""Get USB interface string (CP2105/CP2108 only).
Args:
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
try:
value = dev.get_interface_string(interface_number)
return {"interface": interface_number, "value": value}
except CP210xError as e:
return {"error": str(e)}
@mcp.tool()
def get_baud_rate_config(device_index: int = 0) -> dict:
"""Get the baud rate alias configuration table (32 entries).
Args:
device_index: Zero-based device index (default: 0)
Returns the full 32-entry baud rate alias table showing how standard
baud rates map to timer register values.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
configs = lib.get_baud_rate_config(dev.handle)
return {"entries": configs}
@mcp.tool()
def get_port_config(device_index: int = 0) -> dict:
"""Get GPIO port configuration (auto-detects CP2103/4/5/8).
Args:
device_index: Zero-based device index (default: 0)
Returns pin modes, latch values, and enhanced function settings.
Auto-dispatches to the correct struct based on part number.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code in PARTS_WITH_PORT_CONFIG:
raw = lib.get_port_config(dev.handle)
return {
"type": "single",
"part": part_name,
"mode": decode_bitmask(raw["mode"], PORT_PIN_FLAGS),
"reset_latch": decode_bitmask(raw["reset_latch"], PORT_PIN_FLAGS),
"suspend_latch": decode_bitmask(raw["suspend_latch"], PORT_PIN_FLAGS),
"enhanced_fxn": decode_bitmask(raw["enhanced_fxn"], ENHANCED_FXN_FLAGS),
"raw": raw,
}
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
raw = lib.get_dual_port_config(dev.handle)
return {"type": "dual", "part": part_name, **raw}
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
raw = lib.get_quad_port_config(dev.handle)
return {"type": "quad", "part": part_name, **raw}
else:
return {"error": f"Port config not available on {part_name}",
"supported": "CP2103, CP2104, CP2105, CP2108"}
@mcp.tool()
def get_raw_config(device_index: int = 0, size: int = 512) -> dict:
"""Read raw EPROM configuration blob.
Args:
device_index: Zero-based device index (default: 0)
size: Number of bytes to read (default: 512)
Returns hex-encoded config data.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
data = lib.get_config(dev.handle, size)
return {"hex": data.hex(), "size": len(data)}
@mcp.tool()
def create_hex_file(filename: str, device_index: int = 0) -> dict:
"""Dump device config to Intel HEX file.
Args:
filename: Output file path for the .hex file
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
lib.create_hex_file(dev.handle, filename)
return {"success": True, "filename": filename}
# ===========================================================================
# Normal-tier write tools (elicit → fallback to proceed)
# ===========================================================================
@mcp.tool()
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB product string (device name shown to host).
Args:
product: New product string (max 126 characters)
device_index: Zero-based device index (default: 0)
Returns the updated device info. Device may need to be re-plugged for
changes to appear on the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_product = dev.product
new_value = product[:126]
if ctx and not await confirm_write(
ctx,
f"Write product string to CP210x OTP EPROM?\n\n"
f" Old: {old_product}\n New: {new_value}\n\n"
f"OTP writes are limited — this cannot be undone easily.",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.product = product
return {"success": True, "old_value": old_product, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB manufacturer string.
Args:
manufacturer: New manufacturer string (max 45 characters)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_mfr = dev.manufacturer
new_value = manufacturer[:45]
if ctx and not await confirm_write(
ctx,
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
f" Old: {old_mfr}\n New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.manufacturer = manufacturer
return {"success": True, "old_value": old_mfr, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB serial number.
Args:
serial_number: New serial number (max 63 characters)
device_index: Zero-based device index (default: 0)
Note: Changing serial number may affect udev rules that match on serial.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_serial = dev.serial_number
new_value = serial_number[:63]
if ctx and not await confirm_write(
ctx,
f"Write serial number to CP210x OTP EPROM?\n\n"
f" Old: {old_serial}\n New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.serial_number = serial_number
return {"success": True, "old_value": old_serial, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the maximum USB power draw in milliamps.
Args:
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if max_power_ma < 0 or max_power_ma > 500:
return {"error": "max_power_ma must be 0-500"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_power = dev.max_power_ma
actual_value = (max_power_ma // 2) * 2
if ctx and not await confirm_write(
ctx,
f"Write max power to CP210x OTP EPROM?\n\n"
f" Old: {old_power} mA\n New: {actual_value} mA",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.max_power_ma = max_power_ma
return {"success": True, "old_value_ma": old_power, "new_value_ma": actual_value}
@mcp.tool()
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
"""Set whether device reports as self-powered or bus-powered.
Args:
self_powered: True for self-powered, False for bus-powered
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_value = dev.self_powered
mode = "self-powered" if self_powered else "bus-powered"
if ctx and not await confirm_write(
ctx,
f"Write power mode to CP210x OTP EPROM?\n\n"
f" Old: {'self-powered' if old_value else 'bus-powered'}\n New: {mode}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.self_powered = self_powered
return {"success": True, "old_value": old_value, "new_value": self_powered}
@mcp.tool()
async def set_device_version(version: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set device version (bcdDevice field).
Args:
version: BCD version number (e.g., 0x0100 for 1.00)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_version = dev.device_version
if ctx and not await confirm_write(
ctx,
f"Write device version to CP210x OTP EPROM?\n\n"
f" Old: {old_version}\n New: 0x{version:04X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.device_version = version
new_ver = f"{(version >> 8) & 0xFF}.{version & 0xFF:02d}"
return {"success": True, "old_value": old_version, "new_value": new_ver}
@mcp.tool()
async def set_interface_string(
interface_number: int, value: str, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set USB interface string (CP2105/CP2108 only).
Args:
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
value: New interface string
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
try:
old_value = dev.get_interface_string(interface_number)
except CP210xError as e:
return {"error": str(e)}
if ctx and not await confirm_write(
ctx,
f"Write interface {interface_number} string to CP210x OTP EPROM?\n\n"
f" Old: {old_value}\n New: {value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.set_interface_string(interface_number, value)
return {"success": True, "interface": interface_number,
"old_value": old_value, "new_value": value}
@mcp.tool()
async def set_flush_buffer_config(
flags: dict, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set flush buffer configuration (CP2104/CP2105/CP2108).
Args:
flags: Dict of flag_name -> bool. Flag names depend on part:
CP2104: open_tx, open_rx, close_tx, close_rx
CP2105: open_tx_sci, open_rx_sci, close_tx_sci, close_rx_sci,
open_tx_eci, open_rx_eci, close_tx_eci, close_rx_eci
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_FLUSH_CONFIG:
return {"error": f"Flush buffer config not available on {part_name}"}
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
new_value = encode_bitmask(flags, flag_map)
old_raw = lib.get_flush_buffer_config(dev.handle)
if ctx and not await confirm_write(
ctx,
f"Write flush buffer config to CP210x?\n\n"
f" Old: 0x{old_raw:04X}\n New: 0x{new_value:04X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_flush_buffer_config(dev.handle, new_value)
return {"success": True, "old_raw": f"0x{old_raw:04X}",
"new_raw": f"0x{new_value:04X}", "flags": flags}
@mcp.tool()
async def set_device_mode(
eci_mode: int, sci_mode: int, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set device mode (CP2105 only).
Args:
eci_mode: Enhanced interface mode byte
sci_mode: Standard interface mode byte
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_DEVICE_MODE:
return {"error": f"Device mode not available on {part_name}"}
old_eci, old_sci = lib.get_device_mode(dev.handle)
if ctx and not await confirm_write(
ctx,
f"Write device mode to CP210x?\n\n"
f" Old ECI: 0x{old_eci:02X}, SCI: 0x{old_sci:02X}\n"
f" New ECI: 0x{eci_mode:02X}, SCI: 0x{sci_mode:02X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_device_mode(dev.handle, eci_mode, sci_mode)
return {"success": True,
"old": {"eci": old_eci, "sci": old_sci},
"new": {"eci": eci_mode, "sci": sci_mode}}
@mcp.tool()
async def set_baud_rate_config(
entries: list[dict], device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set the full 32-entry baud rate alias table.
Args:
entries: List of 32 dicts, each with baud_gen, timer0_reload, prescaler, baud_rate
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if len(entries) != 32:
return {"error": f"Expected 32 baud config entries, got {len(entries)}"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
if ctx and not await confirm_write(
ctx,
"Write complete baud rate table (32 entries) to CP210x OTP EPROM?",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_baud_rate_config(dev.handle, entries)
return {"success": True, "entries_written": 32}
@mcp.tool()
async def set_baud_rate_alias(
index: int, baud_rate: int, baud_gen: int, timer0_reload: int, prescaler: int,
device_index: int = 0, ctx: Context = None,
) -> dict:
"""Modify a single baud rate alias entry (read-modify-write).
Args:
index: Entry index (0-31)
baud_rate: Target baud rate
baud_gen: Baud rate generator register value
timer0_reload: Timer0 reload register value
prescaler: Prescaler value
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if not 0 <= index < 32:
return {"error": "Index must be 0-31"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
current = lib.get_baud_rate_config(dev.handle)
old_entry = current[index].copy()
current[index] = {
"index": index,
"baud_gen": baud_gen,
"timer0_reload": timer0_reload,
"prescaler": prescaler,
"baud_rate": baud_rate,
}
if ctx and not await confirm_write(
ctx,
f"Modify baud rate alias #{index}?\n\n"
f" Old: {old_entry['baud_rate']} baud\n"
f" New: {baud_rate} baud",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_baud_rate_config(dev.handle, current)
return {"success": True, "index": index,
"old_entry": old_entry, "new_baud_rate": baud_rate}
@mcp.tool()
async def set_port_config(
config: dict, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set GPIO port configuration (auto-detects CP2103/4/5/8).
Args:
config: Port configuration dict. Structure depends on part type:
Single (CP2103/4): {mode, reset_latch, suspend_latch, enhanced_fxn}
- Values can be raw integers or named-flag dicts
Dual (CP2105): {mode, reset_latch, suspend_latch,
enhanced_fxn_eci, enhanced_fxn_sci, enhanced_fxn_device}
Quad (CP2108): Full quad port config dict (see get_port_config output)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if ctx and not await confirm_write(
ctx,
f"Write port configuration to {part_name} OTP EPROM?",
):
return {"cancelled": True, "message": "Write cancelled by user"}
if part_code in PARTS_WITH_PORT_CONFIG:
# Allow either raw ints or named-flag dicts
mode = config.get("mode", 0)
if isinstance(mode, dict):
mode = encode_bitmask(mode, PORT_PIN_FLAGS)
reset = config.get("reset_latch", 0)
if isinstance(reset, dict):
reset = encode_bitmask(reset, PORT_PIN_FLAGS)
suspend = config.get("suspend_latch", 0)
if isinstance(suspend, dict):
suspend = encode_bitmask(suspend, PORT_PIN_FLAGS)
ef = config.get("enhanced_fxn", 0)
if isinstance(ef, dict):
ef = encode_bitmask(ef, ENHANCED_FXN_FLAGS)
lib.set_port_config(dev.handle, mode, reset, suspend, ef)
return {"success": True, "type": "single", "part": part_name}
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
lib.set_dual_port_config(
dev.handle,
config["mode"], config["reset_latch"], config["suspend_latch"],
config["enhanced_fxn_eci"], config["enhanced_fxn_sci"],
config["enhanced_fxn_device"],
)
return {"success": True, "type": "dual", "part": part_name}
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
lib.set_quad_port_config(dev.handle, config)
return {"success": True, "type": "quad", "part": part_name}
else:
return {"error": f"Port config not available on {part_name}"}
@mcp.tool()
def reset_device(device_index: int = 0) -> dict:
"""Reset the CP210x device (USB disconnect/reconnect).
Args:
device_index: Zero-based device index (default: 0)
This triggers a USB re-enumeration, which will make any programmed
changes visible to the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
dev.reset()
return {"success": True, "note": "Device has been reset. It may take a moment to re-enumerate."}
@mcp.tool()
async def setup_udev_rule(
device_index: int = 0,
symlink_name: str | None = None,
ctx: Context = None,
) -> dict:
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
Generates a rule that matches the device's product string and creates
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
and port reordering.
Args:
device_index: Zero-based device index (default: 0)
symlink_name: Custom symlink name. If not provided, auto-generates
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
Requires sudo for rule installation.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
product = dev.product
vid = dev.vid
pid = dev.pid
if not product or product == "CP2102 USB to UART Bridge Controller":
return {
"error": "Device has default product string",
"message": "Customize the product string first to create a unique match rule.",
}
if not symlink_name:
parts = product.split()
if len(parts) >= 2:
prefix = parts[0].lower()
suffix = parts[-1][-4:]
symlink_name = f"{prefix}-{suffix}"
else:
symlink_name = product.lower().replace(" ", "-")[:32]
match_suffix = product[-4:]
rule = (
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
f'ATTRS{{idProduct}}=="{pid:04x}", '
f'ATTRS{{product}}=="*{match_suffix}", '
f'SYMLINK+="{symlink_name}"'
)
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
rule_content = (
f"# Auto-generated by mcp210x-uart for: {product}\n"
f"# Creates /dev/{symlink_name} symlink\n"
f"{rule}\n"
)
install_msg = (
f"Install udev rule for stable device symlink?\n\n"
f" Device: {product}\n"
f" Symlink: /dev/{symlink_name}\n"
f" File: {rules_path}\n\n"
f"Requires sudo to install."
)
if ctx and not await confirm_write(ctx, install_msg):
return {"cancelled": True, "rule": rule_content,
"message": "Rule not installed. You can install it manually."}
try:
result = subprocess.run(
["sudo", "tee", rules_path],
input=rule_content, capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return {"error": f"Failed to install rule: {result.stderr.strip()}",
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
subprocess.run(["sudo", "udevadm", "control", "--reload-rules"],
capture_output=True, timeout=10)
subprocess.run(["sudo", "udevadm", "trigger"],
capture_output=True, timeout=10)
except subprocess.TimeoutExpired:
return {"error": "Sudo timed out — may need interactive password",
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
return {"success": True, "symlink": f"/dev/{symlink_name}",
"rules_file": rules_path, "rule": rule_content,
"note": "Symlink will appear after device re-plug or udevadm trigger"}
# ===========================================================================
# Strict-tier write tools (hard-refuse without elicitation)
# ===========================================================================
@mcp.tool()
async def set_vid(vid: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB Vendor ID (DANGEROUS — can break driver matching).
Args:
vid: New USB Vendor ID (16-bit, e.g., 0x10C4 for Silicon Labs)
device_index: Zero-based device index (default: 0)
WARNING: Changing VID can prevent the device from being recognized by
standard CP210x drivers. Only use if you have custom drivers.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
f"Write USB Vendor ID to CP210x OTP EPROM?\n\n"
f" New VID: 0x{vid:04X}\n\n"
f"WARNING: Changing VID can prevent the device from being recognized\n"
f"by standard CP210x drivers. This is PERMANENT.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_vid = dev.vid
lib.set_vid(dev.handle, vid)
return {"success": True, "old_vid": f"0x{old_vid:04X}", "new_vid": f"0x{vid:04X}"}
@mcp.tool()
async def set_pid(pid: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB Product ID (DANGEROUS — can break driver matching).
Args:
pid: New USB Product ID (16-bit, e.g., 0xEA60)
device_index: Zero-based device index (default: 0)
WARNING: Changing PID can prevent the device from being recognized by
standard CP210x drivers.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
f"Write USB Product ID to CP210x OTP EPROM?\n\n"
f" New PID: 0x{pid:04X}\n\n"
f"WARNING: Changing PID can prevent the device from being recognized\n"
f"by standard CP210x drivers. This is PERMANENT.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_pid = dev.pid
lib.set_pid(dev.handle, pid)
return {"success": True, "old_pid": f"0x{old_pid:04X}", "new_pid": f"0x{pid:04X}"}
@mcp.tool()
async def set_raw_config(hex_data: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Write raw configuration blob to device EPROM (DANGEROUS).
Args:
hex_data: Hex-encoded config data
device_index: Zero-based device index (default: 0)
WARNING: Writing invalid config data can brick the device.
Use get_raw_config to read current config first.
"""
lib = get_lib()
try:
data = bytes.fromhex(hex_data)
except ValueError:
return {"error": "Invalid hex string"}
err = await strict_confirm(
ctx,
f"Write {len(data)} bytes of raw config to CP210x EPROM?\n\n"
f"WARNING: Writing invalid config data can brick the device.\n"
f"This is PERMANENT and cannot be undone.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
lib.set_config(dev.handle, data)
return {"success": True, "bytes_written": len(data)}
@mcp.tool()
async def update_firmware(device_index: int = 0, ctx: Context = None) -> dict:
"""Update device firmware (CP2102N only, DANGEROUS).
Args:
device_index: Zero-based device index (default: 0)
WARNING: Firmware update failure can brick the device.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
"Update CP2102N firmware?\n\n"
"WARNING: Firmware update failure can BRICK the device.\n"
"Ensure stable USB connection and power during update.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_CP2102N:
return {"error": f"Firmware update not available on {part_name}",
"supported": "CP2102N variants only"}
lib.update_firmware(dev.handle)
return {"success": True, "note": "Firmware updated. Device may need reset."}
@mcp.tool()
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
"""PERMANENTLY lock the device to prevent further customization.
Args:
device_index: Zero-based device index (default: 0)
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
configuration cannot be changed. The device will still function normally,
but strings, power settings, etc. cannot be modified.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is already locked"}
part_code, part_name = dev.part_number
err = await strict_confirm(
ctx,
f"PERMANENTLY lock this {part_name}?\n\n"
f" Product: {dev.product}\n"
f" Serial: {dev.serial_number}\n\n"
f"THIS CANNOT BE UNDONE. The device configuration\n"
f"will be frozen forever.",
)
if err:
return err
lib.lock_device(dev.handle)
return {"success": True,
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed."}
def main():
"""Entry point for the MCP server."""
mcp.run()
if __name__ == "__main__":
main()

10
uv.lock generated
View File

@ -780,8 +780,8 @@ wheels = [
] ]
[[package]] [[package]]
name = "mcp210x-uart" name = "mcp210x"
version = "0.2.0" version = "0.1.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },
@ -1191,11 +1191,11 @@ wheels = [
[[package]] [[package]]
name = "pyjwt" name = "pyjwt"
version = "2.11.0" version = "2.10.1"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5c/5a/b46fa56bf322901eee5b0454a34343cdbdae202cd421775a8ee4e42fd519/pyjwt-2.11.0.tar.gz", hash = "sha256:35f95c1f0fbe5d5ba6e43f00271c275f7a1a4db1dab27bf708073b75318ea623", size = 98019, upload-time = "2026-01-30T19:59:55.694Z" } sdist = { url = "https://files.pythonhosted.org/packages/e7/46/bd74733ff231675599650d3e47f361794b22ef3e3770998dda30d3b63726/pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953", size = 87785, upload-time = "2024-11-28T03:43:29.933Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/01/c26ce75ba460d5cd503da9e13b21a33804d38c2165dec7b716d06b13010c/pyjwt-2.11.0-py3-none-any.whl", hash = "sha256:94a6bde30eb5c8e04fee991062b534071fd1439ef58d2adc9ccb823e7bcd0469", size = 28224, upload-time = "2026-01-30T19:59:54.539Z" }, { url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" },
] ]
[package.optional-dependencies] [package.optional-dependencies]