Compare commits

...

3 Commits

Author SHA1 Message Date
07d6ee6ff1 Update README for full API coverage and safety tiers
Tool table split by safety tier (none/normal/strict), new
part-number support matrix, updated project structure.
2026-01-31 09:47:35 -07:00
c0bedde54b Add full CP210x API coverage: 29 tools, all part families
Complete ctypes bindings for all 45 libcp210xmanufacturing functions
including structs (BAUD_CONFIG, PORT_CONFIG, DUAL_PORT_CONFIG,
QUAD_PORT_CONFIG, FIRMWARE_T), part-number gating, bitmask helpers,
and three-tier safety model (none/normal/strict elicitation).

New tools: set_vid, set_pid, get/set_interface_string,
get/set_flush_buffer_config, get/set_device_mode,
get_firmware_version, set_device_version, get/set_baud_rate_config,
set_baud_rate_alias, get/set_port_config, get/set_raw_config,
create_hex_file, update_firmware.
2026-01-31 09:46:43 -07:00
80220f5728 Rename project from mcp210x to mcp210x-uart
Renames package, module directory, script entry point, URLs,
Gitea repo, and git remote to mcp210x-uart.
2026-01-31 09:40:20 -07:00
10 changed files with 1986 additions and 979 deletions

View File

@ -7,7 +7,7 @@
"run",
"--directory",
"/home/rpm/claude/lora/cp2102-uart",
"mcp210x"
"mcp210x-uart"
],
"env": {}
}

113
README.md
View File

@ -1,10 +1,10 @@
# mcp210x
# mcp210x-uart
It's MCP. It's CP210x. It was right there the whole time.
An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, udev rules, and device locking — through natural language in Claude Code.
An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, GPIO port config, baud rate tables, udev rules, and device locking — through natural language in Claude Code.
Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to Silicon Labs' native `libcp210xmanufacturing` library.
Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to all 45 functions in Silicon Labs' native `libcp210xmanufacturing` library. Covers the full CP210x family: CP2101, CP2102, CP2102N, CP2103, CP2104, CP2105, CP2108, and CP2109.
## The problem
@ -32,26 +32,78 @@ Two devices found:
Each device gets a unique product string baked into its USB descriptor EPROM. Udev rules match on that string to create stable symlinks. Devices survive reboots, port reordering, and hub changes.
## Features
## Tools
- **List and inspect** connected CP210x devices (part number, VID/PID, strings, power, lock state)
- **Write USB descriptors** — product string, manufacturer, serial number
- **Configure power** — max current draw, self-powered vs bus-powered
- **Generate udev rules** — stable `/dev/` symlinks based on product string
- **Reset device** — trigger USB re-enumeration after changes
- **Lock device** — permanently freeze configuration (with strict confirmation gate)
### Read-only (no confirmation)
| Tool | Description |
|------|-------------|
| `list_devices` | List all connected CP210x devices |
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state, firmware version (CP2102N), flush config, device mode |
| `get_firmware_version` | Firmware version (CP2102N only) |
| `get_flush_buffer_config` | Flush buffer configuration (CP2104/CP2105/CP2108) |
| `get_device_mode` | Device mode — ECI/SCI assignment (CP2105 only) |
| `get_interface_string` | USB interface string (CP2105/CP2108 only) |
| `get_baud_rate_config` | Baud rate alias table (32 entries) |
| `get_port_config` | GPIO port configuration (auto-detects CP2103/4/5/8) |
| `get_raw_config` | Raw EPROM configuration blob (hex string) |
| `create_hex_file` | Dump device config to Intel HEX file |
| `reset_device` | USB disconnect/reconnect to apply changes |
### Normal writes (elicitation → fallback to proceed)
| Tool | Description |
|------|-------------|
| `set_product_string` | Write USB product string (max 126 chars) |
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
| `set_serial_number` | Write USB serial number (max 63 chars) |
| `set_max_power` | Set max USB power draw in mA (0500, rounded to nearest 2) |
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
| `set_device_version` | Set device version (bcdDevice field) |
| `set_interface_string` | Set USB interface string (CP2105/CP2108 only) |
| `set_flush_buffer_config` | Set flush buffer configuration (CP2104/CP2105/CP2108) |
| `set_device_mode` | Set device mode (CP2105 only) |
| `set_baud_rate_config` | Set full 32-entry baud rate alias table |
| `set_baud_rate_alias` | Modify a single baud rate alias entry (read-modify-write) |
| `set_port_config` | Set GPIO port configuration (auto-detects CP2103/4/5/8) |
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
### Strict writes (elicitation required — hard-refuses without it)
| Tool | Description |
|------|-------------|
| `set_vid` | Set USB Vendor ID (can break driver matching) |
| `set_pid` | Set USB Product ID (can break driver matching) |
| `set_raw_config` | Write raw configuration blob to EPROM |
| `update_firmware` | Update device firmware (CP2102N only) |
| `lock_device` | Permanently freeze device configuration |
## Safety model
CP210x descriptor EPROM is one-time-programmable with limited write cycles. Writes can't be undone. Locks are permanent. The server enforces a tiered confirmation model:
| Operation | Confirmation |
|-----------|-------------|
| Reads | None |
| Writes (strings, power) | MCP elicitation if client supports it; proceeds otherwise |
| Lock | Elicitation **required**; hard-refuses without it |
| Tier | Behavior | Tools |
|------|----------|-------|
| **None** | No confirmation | All `get_*`, `list_devices`, `reset_device`, `create_hex_file` |
| **Normal** | Elicitation if client supports it; proceeds otherwise | String setters, power, baud config, port config, `setup_udev_rule` |
| **Strict** | Elicitation **required**; returns error without it | `set_vid`, `set_pid`, `set_raw_config`, `update_firmware`, `lock_device` |
The lock gate isn't just a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog.
The strict gate isn't a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog.
## Part-number support
Different CP210x parts expose different features. The server auto-gates tools to supported parts:
| Feature | Supported parts |
|---------|----------------|
| Core (strings, power, baud) | All CP210x |
| Flush buffer config | CP2104, CP2105, CP2108 |
| Device mode | CP2105 |
| Interface strings | CP2105, CP2108 |
| Port config | CP2103, CP2104 |
| Dual port config | CP2105 |
| Quad port config | CP2108 |
| Firmware version/update | CP2102N |
## Requirements
@ -93,30 +145,15 @@ uv tool install .
### 3. Claude Code
```bash
claude mcp add cp210x -- uvx mcp210x
claude mcp add cp210x -- uvx mcp210x-uart
```
For development (runs from source):
```bash
claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x
claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x-uart
```
## Tools
| Tool | Description |
|------|-------------|
| `list_devices` | List connected CP210x devices with description and serial |
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state |
| `set_product_string` | Write USB product string (max 126 chars) |
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
| `set_serial_number` | Write USB serial number (max 63 chars) |
| `set_max_power` | Set max USB power draw in mA (0-500, rounded to nearest 2) |
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
| `reset_device` | USB disconnect/reconnect to apply changes |
| `lock_device` | Permanently freeze device configuration |
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
## Architecture
```
@ -137,10 +174,10 @@ The native library uses **libusb** for device access, separate from the kernel's
## Project structure
```
mcp210x/
├── src/mcp210x/
│ ├── server.py # FastMCP tool definitions and elicitation logic
│ ├── bindings.py # ctypes wrapper for libcp210xmanufacturing.so
mcp210x-uart/
├── src/mcp210x_uart/
│ ├── server.py # FastMCP tool definitions, elicitation, part-number dispatch
│ ├── bindings.py # ctypes: structs, prototypes, bitmask helpers, device wrapper
│ └── __init__.py
├── aur/cp210xmanufacturing/
│ ├── PKGBUILD # Arch Linux package for the native library
@ -152,7 +189,7 @@ mcp210x/
## Complementary tools
This server handles **device customization** (USB descriptors, power config). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial).
This server handles **device customization** (USB descriptors, power config, GPIO, baud tables). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial).
## Reference

View File

@ -1,6 +1,6 @@
[project]
name = "mcp210x"
version = "0.1.0"
name = "mcp210x-uart"
version = "0.2.0"
description = "MCP server for CP210x USB-UART bridge customization"
readme = "README.md"
requires-python = ">=3.10"
@ -31,18 +31,18 @@ dev = [
]
[project.scripts]
mcp210x = "mcp210x:main"
mcp210x-uart = "mcp210x_uart:main"
[project.urls]
Homepage = "https://forge.supported.systems/MCP/mcp210x"
Repository = "https://forge.supported.systems/MCP/mcp210x"
Homepage = "https://forge.supported.systems/MCP/mcp210x-uart"
Repository = "https://forge.supported.systems/MCP/mcp210x-uart"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp210x"]
packages = ["src/mcp210x_uart"]
[tool.ruff]
line-length = 100

View File

@ -1,5 +0,0 @@
"""mcp210x - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
from .server import mcp, main
__all__ = ["mcp", "main"]

View File

@ -1,441 +0,0 @@
"""Low-level ctypes bindings for libcp210xmanufacturing."""
import ctypes
from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_void_p, POINTER, byref
from pathlib import Path
from typing import Optional
# Type aliases matching the C library
CP210x_STATUS = c_int
HANDLE = c_void_p
DWORD = c_uint
WORD = c_ushort
BYTE = c_ubyte
BOOL = c_int
# Return codes
CP210x_SUCCESS = 0x00
CP210x_DEVICE_NOT_FOUND = 0xFF
CP210x_INVALID_HANDLE = 0x01
CP210x_INVALID_PARAMETER = 0x02
CP210x_DEVICE_IO_FAILED = 0x03
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
CP210x_GLOBAL_DATA_ERROR = 0x05
CP210x_FILE_ERROR = 0x06
CP210x_COMMAND_FAILED = 0x08
CP210x_INVALID_ACCESS_TYPE = 0x09
# Part numbers
PART_NUMBERS = {
0x01: "CP2101",
0x02: "CP2102",
0x03: "CP2103",
0x04: "CP2104",
0x05: "CP2105",
0x08: "CP2108",
0x09: "CP2109",
0x20: "CP2102N-QFN28",
0x21: "CP2102N-QFN24",
0x22: "CP2102N-QFN20",
}
# Buffer sizes
MAX_DEVICE_STRLEN = 256
MAX_MANUFACTURER_STRLEN = 45
MAX_PRODUCT_STRLEN = 126
MAX_SERIAL_STRLEN = 63
# GetProductString flags
RETURN_SERIAL_NUMBER = 0x00
RETURN_DESCRIPTION = 0x01
RETURN_FULL_PATH = 0x02
class CP210xError(Exception):
"""Exception raised for CP210x library errors."""
ERROR_MESSAGES = {
CP210x_DEVICE_NOT_FOUND: "Device not found",
CP210x_INVALID_HANDLE: "Invalid handle",
CP210x_INVALID_PARAMETER: "Invalid parameter",
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
CP210x_GLOBAL_DATA_ERROR: "Global data error",
CP210x_FILE_ERROR: "File error",
CP210x_COMMAND_FAILED: "Command failed",
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
}
def __init__(self, status: int, context: str = ""):
self.status = status
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
if context:
msg = f"{context}: {msg}"
super().__init__(msg)
class CP210xLibrary:
"""Wrapper for the CP210x manufacturing library."""
def __init__(self, lib_path: Optional[str] = None):
"""Load the CP210x manufacturing library.
Args:
lib_path: Path to libcp210xmanufacturing.so, or None to search default paths.
"""
if lib_path:
self._lib = ctypes.CDLL(lib_path)
else:
# Search common locations
search_paths = [
"/usr/lib/libcp210xmanufacturing.so",
"/usr/local/lib/libcp210xmanufacturing.so",
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
]
for path in search_paths:
if Path(path).exists():
self._lib = ctypes.CDLL(path)
break
else:
raise FileNotFoundError(
"libcp210xmanufacturing.so not found. Install with: "
"cd aur/cp210xmanufacturing && makepkg -si"
)
self._setup_functions()
def _setup_functions(self):
"""Set up function prototypes."""
# CP210x_GetNumDevices
self._lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
self._lib.CP210x_GetNumDevices.restype = CP210x_STATUS
# CP210x_GetProductString
self._lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
self._lib.CP210x_GetProductString.restype = CP210x_STATUS
# CP210x_Open
self._lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
self._lib.CP210x_Open.restype = CP210x_STATUS
# CP210x_Close
self._lib.CP210x_Close.argtypes = [HANDLE]
self._lib.CP210x_Close.restype = CP210x_STATUS
# CP210x_GetPartNumber
self._lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetPartNumber.restype = CP210x_STATUS
# CP210x_GetDeviceVid
self._lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
# CP210x_GetDevicePid
self._lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDevicePid.restype = CP210x_STATUS
# CP210x_GetDeviceManufacturerString
self._lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
# CP210x_GetDeviceProductString
self._lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
# CP210x_GetDeviceSerialNumber
self._lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
self._lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
# CP210x_SetManufacturerString
self._lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
# CP210x_SetProductString
self._lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetProductString.restype = CP210x_STATUS
# CP210x_SetSerialNumber
self._lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
self._lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
# CP210x_GetMaxPower
self._lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetMaxPower.restype = CP210x_STATUS
# CP210x_SetMaxPower
self._lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
self._lib.CP210x_SetMaxPower.restype = CP210x_STATUS
# CP210x_GetSelfPower
self._lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
self._lib.CP210x_GetSelfPower.restype = CP210x_STATUS
# CP210x_SetSelfPower
self._lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
self._lib.CP210x_SetSelfPower.restype = CP210x_STATUS
# CP210x_GetDeviceVersion
self._lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
self._lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
# CP210x_SetDeviceVersion
self._lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
self._lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
# CP210x_GetLockValue
self._lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
self._lib.CP210x_GetLockValue.restype = CP210x_STATUS
# CP210x_SetLockValue
self._lib.CP210x_SetLockValue.argtypes = [HANDLE]
self._lib.CP210x_SetLockValue.restype = CP210x_STATUS
# CP210x_Reset
self._lib.CP210x_Reset.argtypes = [HANDLE]
self._lib.CP210x_Reset.restype = CP210x_STATUS
def _check_status(self, status: int, context: str = ""):
"""Raise exception if status indicates error."""
if status != CP210x_SUCCESS:
raise CP210xError(status, context)
def get_num_devices(self) -> int:
"""Get the number of connected CP210x devices."""
num = DWORD()
status = self._lib.CP210x_GetNumDevices(byref(num))
self._check_status(status, "GetNumDevices")
return num.value
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
"""Get device string without opening the device.
Args:
device_index: Zero-based device index
flag: What to return - RETURN_SERIAL_NUMBER, RETURN_DESCRIPTION, or RETURN_FULL_PATH
"""
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
self._check_status(status, "GetProductString")
return buf.value.decode('utf-8', errors='replace')
def open(self, device_index: int) -> HANDLE:
"""Open a device by index."""
handle = HANDLE()
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
self._check_status(status, f"Open device {device_index}")
return handle
def close(self, handle: HANDLE):
"""Close an open device."""
status = self._lib.CP210x_Close(handle)
self._check_status(status, "Close")
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
"""Get the part number of an open device.
Returns:
Tuple of (part_number_code, part_name)
"""
part = BYTE()
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
self._check_status(status, "GetPartNumber")
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
def get_device_vid(self, handle: HANDLE) -> int:
"""Get the USB Vendor ID."""
vid = WORD()
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
self._check_status(status, "GetDeviceVid")
return vid.value
def get_device_pid(self, handle: HANDLE) -> int:
"""Get the USB Product ID."""
pid = WORD()
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
self._check_status(status, "GetDevicePid")
return pid.value
def get_manufacturer_string(self, handle: HANDLE) -> str:
"""Get the manufacturer string."""
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceManufacturerString")
return buf.value.decode('utf-8', errors='replace')
def get_product_string_device(self, handle: HANDLE) -> str:
"""Get the product string from an open device."""
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceProductString")
return buf.value.decode('utf-8', errors='replace')
def get_serial_number(self, handle: HANDLE) -> str:
"""Get the serial number."""
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceSerialNumber")
return buf.value.decode('utf-8', errors='replace')
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
"""Set the manufacturer string (max 45 chars)."""
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetManufacturerString")
def set_product_string(self, handle: HANDLE, product: str):
"""Set the product string (max 126 chars)."""
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetProductString")
def set_serial_number(self, handle: HANDLE, serial: str):
"""Set the serial number (max 63 chars)."""
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetSerialNumber")
def get_max_power(self, handle: HANDLE) -> int:
"""Get max power in units of 2mA (multiply by 2 for mA)."""
power = BYTE()
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
self._check_status(status, "GetMaxPower")
return power.value
def set_max_power(self, handle: HANDLE, power_2ma: int):
"""Set max power in units of 2mA (e.g., 50 = 100mA)."""
if power_2ma > 250:
raise ValueError("Max power cannot exceed 250 (500mA)")
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
self._check_status(status, "SetMaxPower")
def get_self_power(self, handle: HANDLE) -> bool:
"""Check if device is self-powered."""
self_power = BOOL()
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
self._check_status(status, "GetSelfPower")
return bool(self_power.value)
def set_self_power(self, handle: HANDLE, self_powered: bool):
"""Set whether device is self-powered."""
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
self._check_status(status, "SetSelfPower")
def get_device_version(self, handle: HANDLE) -> int:
"""Get device version (bcdDevice)."""
version = WORD()
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
self._check_status(status, "GetDeviceVersion")
return version.value
def set_device_version(self, handle: HANDLE, version: int):
"""Set device version (bcdDevice)."""
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
self._check_status(status, "SetDeviceVersion")
def get_lock_value(self, handle: HANDLE) -> int:
"""Get lock value (0 = unlocked, 1-255 = locked)."""
lock = BYTE()
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
self._check_status(status, "GetLockValue")
return lock.value
def lock_device(self, handle: HANDLE):
"""Lock the device (PERMANENT - cannot be undone!)."""
status = self._lib.CP210x_SetLockValue(handle)
self._check_status(status, "SetLockValue")
def reset(self, handle: HANDLE):
"""Reset the device (USB disconnect/reconnect)."""
status = self._lib.CP210x_Reset(handle)
self._check_status(status, "Reset")
# Convenience context manager for device access
class CP210xDevice:
"""Context manager for safe device access."""
def __init__(self, lib: CP210xLibrary, device_index: int):
self.lib = lib
self.device_index = device_index
self.handle = None
def __enter__(self):
self.handle = self.lib.open(self.device_index)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handle:
self.lib.close(self.handle)
self.handle = None
return False
@property
def part_number(self) -> tuple[int, str]:
return self.lib.get_part_number(self.handle)
@property
def vid(self) -> int:
return self.lib.get_device_vid(self.handle)
@property
def pid(self) -> int:
return self.lib.get_device_pid(self.handle)
@property
def manufacturer(self) -> str:
return self.lib.get_manufacturer_string(self.handle)
@manufacturer.setter
def manufacturer(self, value: str):
self.lib.set_manufacturer_string(self.handle, value)
@property
def product(self) -> str:
return self.lib.get_product_string_device(self.handle)
@product.setter
def product(self, value: str):
self.lib.set_product_string(self.handle, value)
@property
def serial_number(self) -> str:
return self.lib.get_serial_number(self.handle)
@serial_number.setter
def serial_number(self, value: str):
self.lib.set_serial_number(self.handle, value)
@property
def max_power_ma(self) -> int:
"""Max power in milliamps."""
return self.lib.get_max_power(self.handle) * 2
@max_power_ma.setter
def max_power_ma(self, value: int):
"""Set max power in milliamps (will be rounded down to nearest 2mA)."""
self.lib.set_max_power(self.handle, value // 2)
@property
def self_powered(self) -> bool:
return self.lib.get_self_power(self.handle)
@self_powered.setter
def self_powered(self, value: bool):
self.lib.set_self_power(self.handle, value)
@property
def device_version(self) -> str:
"""Device version as BCD string (e.g., '1.00')."""
v = self.lib.get_device_version(self.handle)
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
@property
def is_locked(self) -> bool:
return self.lib.get_lock_value(self.handle) != 0
def reset(self):
"""Reset the device."""
self.lib.reset(self.handle)

View File

@ -1,483 +0,0 @@
"""FastMCP server for CP210x device customization."""
import subprocess
from typing import Optional
from fastmcp import FastMCP, Context
from fastmcp.server.elicitation import AcceptedElicitation
from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS
mcp = FastMCP(
"cp210x",
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
)
# Lazy-loaded library instance
_lib: Optional[CP210xLibrary] = None
def get_lib() -> CP210xLibrary:
"""Get or create the library instance."""
global _lib
if _lib is None:
_lib = CP210xLibrary()
return _lib
async def confirm_write(ctx: Context, message: str) -> bool:
"""Ask user to confirm a write operation via elicitation.
Falls back to proceeding without confirmation if the client
doesn't support elicitation.
"""
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
# Client doesn't support elicitation — proceed as usual
return True
@mcp.tool()
def list_devices() -> list[dict]:
"""List all connected CP210x devices.
Returns a list of devices with their index, description, and serial number.
Use the index to reference a specific device in other tools.
"""
lib = get_lib()
num_devices = lib.get_num_devices()
devices = []
for i in range(num_devices):
try:
desc = lib.get_product_string(i, flag=1) # RETURN_DESCRIPTION
serial = lib.get_product_string(i, flag=0) # RETURN_SERIAL_NUMBER
devices.append({
"index": i,
"description": desc,
"serial_number": serial,
})
except CP210xError as e:
devices.append({
"index": i,
"error": str(e),
})
return devices
@mcp.tool()
def get_device_info(device_index: int = 0) -> dict:
"""Get detailed information about a CP210x device.
Args:
device_index: Zero-based index of the device (default: 0 for first device)
Returns full device details including part number, VID/PID, strings, and power settings.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
return {
"part_number": part_name,
"part_code": f"0x{part_code:02X}",
"vid": f"0x{dev.vid:04X}",
"pid": f"0x{dev.pid:04X}",
"manufacturer": dev.manufacturer,
"product": dev.product,
"serial_number": dev.serial_number,
"device_version": dev.device_version,
"max_power_ma": dev.max_power_ma,
"self_powered": dev.self_powered,
"is_locked": dev.is_locked,
}
@mcp.tool()
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB product string (device name shown to host).
Args:
product: New product string (max 126 characters)
device_index: Zero-based device index (default: 0)
Returns the updated device info. Device may need to be re-plugged for
changes to appear on the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_product = dev.product
new_value = product[:126]
if ctx and not await confirm_write(
ctx,
f"Write product string to CP210x OTP EPROM?\n\n"
f" Old: {old_product}\n"
f" New: {new_value}\n\n"
f"OTP writes are limited — this cannot be undone easily.",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.product = product
return {
"success": True,
"old_value": old_product,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB manufacturer string.
Args:
manufacturer: New manufacturer string (max 45 characters)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_mfr = dev.manufacturer
new_value = manufacturer[:45]
if ctx and not await confirm_write(
ctx,
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
f" Old: {old_mfr}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.manufacturer = manufacturer
return {
"success": True,
"old_value": old_mfr,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB serial number.
Args:
serial_number: New serial number (max 63 characters)
device_index: Zero-based device index (default: 0)
Note: Changing serial number may affect udev rules that match on serial.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_serial = dev.serial_number
new_value = serial_number[:63]
if ctx and not await confirm_write(
ctx,
f"Write serial number to CP210x OTP EPROM?\n\n"
f" Old: {old_serial}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.serial_number = serial_number
return {
"success": True,
"old_value": old_serial,
"new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host",
}
@mcp.tool()
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the maximum USB power draw in milliamps.
Args:
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if max_power_ma < 0 or max_power_ma > 500:
return {"error": "max_power_ma must be 0-500"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_power = dev.max_power_ma
actual_value = (max_power_ma // 2) * 2
if ctx and not await confirm_write(
ctx,
f"Write max power to CP210x OTP EPROM?\n\n"
f" Old: {old_power} mA\n"
f" New: {actual_value} mA",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.max_power_ma = max_power_ma
return {
"success": True,
"old_value_ma": old_power,
"new_value_ma": actual_value,
}
@mcp.tool()
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
"""Set whether device reports as self-powered or bus-powered.
Args:
self_powered: True for self-powered, False for bus-powered
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_value = dev.self_powered
mode = "self-powered" if self_powered else "bus-powered"
if ctx and not await confirm_write(
ctx,
f"Write power mode to CP210x OTP EPROM?\n\n"
f" Old: {'self-powered' if old_value else 'bus-powered'}\n"
f" New: {mode}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.self_powered = self_powered
return {
"success": True,
"old_value": old_value,
"new_value": self_powered,
}
@mcp.tool()
def reset_device(device_index: int = 0) -> dict:
"""Reset the CP210x device (USB disconnect/reconnect).
Args:
device_index: Zero-based device index (default: 0)
This triggers a USB re-enumeration, which will make any programmed
changes visible to the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
dev.reset()
return {
"success": True,
"note": "Device has been reset. It may take a moment to re-enumerate.",
}
@mcp.tool()
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
"""PERMANENTLY lock the device to prevent further customization.
Args:
device_index: Zero-based device index (default: 0)
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
configuration cannot be changed. The device will still function normally,
but strings, power settings, etc. cannot be modified.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is already locked"}
part_code, part_name = dev.part_number
info = (
f"PERMANENTLY lock this {part_name}?\n\n"
f" Product: {dev.product}\n"
f" Serial: {dev.serial_number}\n\n"
f"THIS CANNOT BE UNDONE. The device configuration\n"
f"will be frozen forever."
)
if not ctx:
return {
"error": "Lock requires interactive confirmation",
"message": "This operation is too dangerous without user confirmation.",
}
# Strict confirmation — do NOT fall back to True on failure.
# Unlike regular writes, an irreversible lock must get explicit consent.
try:
result = await ctx.elicit(info, ["Confirm", "Cancel"])
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return {
"error": "Lock requires elicitation support",
"message": "Your MCP client does not support elicitation. "
"Lock is too dangerous to proceed without explicit confirmation.",
}
if not confirmed:
return {"cancelled": True, "message": "Lock cancelled by user"}
lib.lock_device(dev.handle)
return {
"success": True,
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed.",
}
@mcp.tool()
async def setup_udev_rule(
device_index: int = 0,
symlink_name: Optional[str] = None,
ctx: Context = None,
) -> dict:
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
Generates a rule that matches the device's product string and creates
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
and port reordering.
Args:
device_index: Zero-based device index (default: 0)
symlink_name: Custom symlink name. If not provided, auto-generates
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
Requires sudo for rule installation.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
product = dev.product
vid = dev.vid
pid = dev.pid
if not product or product == "CP2102 USB to UART Bridge Controller":
return {
"error": "Device has default product string",
"message": "Customize the product string first to create a unique match rule.",
}
# Auto-generate symlink name from product string
if not symlink_name:
# "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27"
parts = product.split()
if len(parts) >= 2:
prefix = parts[0].lower()
suffix = parts[-1][-4:] # Last 4 chars of EUI/address
symlink_name = f"{prefix}-{suffix}"
else:
symlink_name = product.lower().replace(" ", "-")[:32]
# Build the udev rule
# Use glob pattern on product string to match the unique suffix
match_suffix = product[-4:]
rule = (
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
f'ATTRS{{idProduct}}=="{pid:04x}", '
f'ATTRS{{product}}=="*{match_suffix}", '
f'SYMLINK+="{symlink_name}"'
)
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
rule_content = (
f"# Auto-generated by mcp210x for: {product}\n"
f"# Creates /dev/{symlink_name} symlink\n"
f"{rule}\n"
)
# Elicit confirmation before installing
install_msg = (
f"Install udev rule for stable device symlink?\n\n"
f" Device: {product}\n"
f" Symlink: /dev/{symlink_name}\n"
f" File: {rules_path}\n\n"
f"Requires sudo to install."
)
if ctx and not await confirm_write(ctx, install_msg):
return {
"cancelled": True,
"rule": rule_content,
"message": "Rule not installed. You can install it manually.",
}
# Install the rule
try:
result = subprocess.run(
["sudo", "tee", rules_path],
input=rule_content,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return {
"error": f"Failed to install rule: {result.stderr.strip()}",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
# Reload udev rules
subprocess.run(
["sudo", "udevadm", "control", "--reload-rules"],
capture_output=True,
timeout=10,
)
subprocess.run(
["sudo", "udevadm", "trigger"],
capture_output=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return {
"error": "Sudo timed out — may need interactive password",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
return {
"success": True,
"symlink": f"/dev/{symlink_name}",
"rules_file": rules_path,
"rule": rule_content,
"note": "Symlink will appear after device re-plug or udevadm trigger",
}
def main():
"""Entry point for the MCP server."""
mcp.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
"""mcp210x-uart - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
from .server import main, mcp
__all__ = ["mcp", "main"]

View File

@ -0,0 +1,937 @@
"""Low-level ctypes bindings for libcp210xmanufacturing."""
import ctypes
from ctypes import POINTER, Structure, byref, c_char_p, c_int, c_ubyte, c_uint, c_ushort, c_void_p
from pathlib import Path
# Type aliases matching the C library
CP210x_STATUS = c_int
HANDLE = c_void_p
DWORD = c_uint
WORD = c_ushort
BYTE = c_ubyte
BOOL = c_int
# Return codes
CP210x_SUCCESS = 0x00
CP210x_DEVICE_NOT_FOUND = 0xFF
CP210x_INVALID_HANDLE = 0x01
CP210x_INVALID_PARAMETER = 0x02
CP210x_DEVICE_IO_FAILED = 0x03
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
CP210x_GLOBAL_DATA_ERROR = 0x05
CP210x_FILE_ERROR = 0x06
CP210x_COMMAND_FAILED = 0x08
CP210x_INVALID_ACCESS_TYPE = 0x09
# Part numbers
PART_NUMBERS = {
0x01: "CP2101",
0x02: "CP2102",
0x03: "CP2103",
0x04: "CP2104",
0x05: "CP2105",
0x08: "CP2108",
0x09: "CP2109",
0x20: "CP2102N-QFN28",
0x21: "CP2102N-QFN24",
0x22: "CP2102N-QFN20",
}
# Part number groups for feature gating
PARTS_CP2102N = {0x20, 0x21, 0x22}
PARTS_WITH_FLUSH_CONFIG = {0x04, 0x05, 0x08} # CP2104, CP2105, CP2108
PARTS_WITH_DEVICE_MODE = {0x05} # CP2105 only
PARTS_WITH_INTERFACE_STRING = {0x05, 0x08} # CP2105, CP2108
PARTS_WITH_PORT_CONFIG = {0x03, 0x04} # CP2103, CP2104
PARTS_WITH_DUAL_PORT_CONFIG = {0x05} # CP2105
PARTS_WITH_QUAD_PORT_CONFIG = {0x08} # CP2108
# Buffer sizes
MAX_DEVICE_STRLEN = 256
MAX_MANUFACTURER_STRLEN = 45
MAX_PRODUCT_STRLEN = 126
MAX_SERIAL_STRLEN = 63
CP2105_MAX_INTERFACE_STRLEN = 32
CP2108_MAX_INTERFACE_STRLEN = 126
# GetProductString flags
RETURN_SERIAL_NUMBER = 0x00
RETURN_DESCRIPTION = 0x01
RETURN_FULL_PATH = 0x02
# Flush buffer config flags (CP2104)
FC_OPEN_TX = 0x01
FC_OPEN_RX = 0x02
FC_CLOSE_TX = 0x04
FC_CLOSE_RX = 0x08
# CP2105 standard port
FC_OPEN_TX_SCI = FC_OPEN_TX
FC_OPEN_RX_SCI = FC_OPEN_RX
FC_CLOSE_TX_SCI = FC_CLOSE_TX
FC_CLOSE_RX_SCI = FC_CLOSE_RX
# CP2105 enhanced port
FC_OPEN_TX_ECI = 0x10
FC_OPEN_RX_ECI = 0x20
FC_CLOSE_TX_ECI = 0x40
FC_CLOSE_RX_ECI = 0x80
# CP2108 per-interface
FC_OPEN_TX_IFC0 = 0x0001
FC_OPEN_RX_IFC0 = 0x0002
FC_CLOSE_TX_IFC0 = 0x0004
FC_CLOSE_RX_IFC0 = 0x0008
FC_OPEN_TX_IFC1 = 0x0010
FC_OPEN_RX_IFC1 = 0x0020
FC_CLOSE_TX_IFC1 = 0x0040
FC_CLOSE_RX_IFC1 = 0x0080
FC_OPEN_TX_IFC2 = 0x0100
FC_OPEN_RX_IFC2 = 0x0200
FC_CLOSE_TX_IFC2 = 0x0400
FC_CLOSE_RX_IFC2 = 0x0800
FC_OPEN_TX_IFC3 = 0x1000
FC_OPEN_RX_IFC3 = 0x2000
FC_CLOSE_TX_IFC3 = 0x4000
FC_CLOSE_RX_IFC3 = 0x8000
# Baud rate config
NUM_BAUD_CONFIGS = 32
BAUD_CONFIG_SIZE = 10
# Port config pin flags (CP2103/CP2104)
PORT_RI_ON = 0x0001
PORT_DCD_ON = 0x0002
PORT_DTR_ON = 0x0004
PORT_DSR_ON = 0x0008
PORT_TXD_ON = 0x0010
PORT_RXD_ON = 0x0020
PORT_RTS_ON = 0x0040
PORT_CTS_ON = 0x0080
PORT_GPIO_0_ON = 0x0100
PORT_GPIO_1_ON = 0x0200
PORT_GPIO_2_ON = 0x0400
PORT_GPIO_3_ON = 0x0800
PORT_SUSPEND_ON = 0x4000
PORT_SUSPEND_BAR_ON = 0x8000
# Enhanced function flags (CP2103/CP2104)
EF_GPIO_0_TXLED = 0x01
EF_GPIO_1_RXLED = 0x02
EF_GPIO_2_RS485 = 0x04
EF_RS485_INVERT = 0x08
EF_WEAKPULLUP = 0x10
EF_RESERVED_1 = 0x20
EF_SERIAL_DYNAMIC_SUSPEND = 0x40
EF_GPIO_DYNAMIC_SUSPEND = 0x80
# Flush buffer flag names for human-readable output
FLUSH_FLAGS_CP2104 = {
FC_OPEN_TX: "open_tx",
FC_OPEN_RX: "open_rx",
FC_CLOSE_TX: "close_tx",
FC_CLOSE_RX: "close_rx",
}
FLUSH_FLAGS_CP2105 = {
FC_OPEN_TX_SCI: "open_tx_sci",
FC_OPEN_RX_SCI: "open_rx_sci",
FC_CLOSE_TX_SCI: "close_tx_sci",
FC_CLOSE_RX_SCI: "close_rx_sci",
FC_OPEN_TX_ECI: "open_tx_eci",
FC_OPEN_RX_ECI: "open_rx_eci",
FC_CLOSE_TX_ECI: "close_tx_eci",
FC_CLOSE_RX_ECI: "close_rx_eci",
}
ENHANCED_FXN_FLAGS = {
EF_GPIO_0_TXLED: "gpio0_txled",
EF_GPIO_1_RXLED: "gpio1_rxled",
EF_GPIO_2_RS485: "gpio2_rs485",
EF_RS485_INVERT: "rs485_invert",
EF_WEAKPULLUP: "weak_pullup",
EF_SERIAL_DYNAMIC_SUSPEND: "serial_dynamic_suspend",
EF_GPIO_DYNAMIC_SUSPEND: "gpio_dynamic_suspend",
}
PORT_PIN_FLAGS = {
PORT_RI_ON: "RI",
PORT_DCD_ON: "DCD",
PORT_DTR_ON: "DTR",
PORT_DSR_ON: "DSR",
PORT_TXD_ON: "TXD",
PORT_RXD_ON: "RXD",
PORT_RTS_ON: "RTS",
PORT_CTS_ON: "CTS",
PORT_GPIO_0_ON: "GPIO0",
PORT_GPIO_1_ON: "GPIO1",
PORT_GPIO_2_ON: "GPIO2",
PORT_GPIO_3_ON: "GPIO3",
PORT_SUSPEND_ON: "SUSPEND",
PORT_SUSPEND_BAR_ON: "SUSPEND_BAR",
}
def decode_bitmask(value: int, flag_map: dict) -> dict[str, bool]:
"""Decode a bitmask into a dict of named flags."""
return {name: bool(value & bit) for bit, name in flag_map.items()}
def encode_bitmask(flags: dict[str, bool], flag_map: dict) -> int:
"""Encode a dict of named flags back into a bitmask."""
name_to_bit = {name: bit for bit, name in flag_map.items()}
value = 0
for name, enabled in flags.items():
if name in name_to_bit and enabled:
value |= name_to_bit[name]
return value
# --- ctypes Structures ---
class BAUD_CONFIG(Structure):
_pack_ = 1
_fields_ = [
("BaudGen", WORD),
("Timer0Reload", WORD),
("Prescaler", BYTE),
("_pad", BYTE),
("BaudRate", DWORD),
]
assert ctypes.sizeof(BAUD_CONFIG) == BAUD_CONFIG_SIZE
BAUD_CONFIG_DATA = BAUD_CONFIG * NUM_BAUD_CONFIGS
class PORT_CONFIG(Structure):
_fields_ = [
("Mode", WORD),
("Reset_Latch", WORD),
("Suspend_Latch", WORD),
("EnhancedFxn", BYTE),
]
class DUAL_PORT_CONFIG(Structure):
_fields_ = [
("Mode", WORD),
("Reset_Latch", WORD),
("Suspend_Latch", WORD),
("EnhancedFxn_ECI", BYTE),
("EnhancedFxn_SCI", BYTE),
("EnhancedFxn_Device", BYTE),
]
class QUAD_PORT_STATE(Structure):
_fields_ = [
("Mode_PB0", WORD),
("Mode_PB1", WORD),
("Mode_PB2", WORD),
("Mode_PB3", WORD),
("Mode_PB4", WORD),
("LowPower_PB0", WORD),
("LowPower_PB1", WORD),
("LowPower_PB2", WORD),
("LowPower_PB3", WORD),
("LowPower_PB4", WORD),
("Latch_PB0", WORD),
("Latch_PB1", WORD),
("Latch_PB2", WORD),
("Latch_PB3", WORD),
("Latch_PB4", WORD),
]
class QUAD_PORT_CONFIG(Structure):
_fields_ = [
("Reset_Latch", QUAD_PORT_STATE),
("Suspend_Latch", QUAD_PORT_STATE),
("IPDelay_IFC0", BYTE),
("IPDelay_IFC1", BYTE),
("IPDelay_IFC2", BYTE),
("IPDelay_IFC3", BYTE),
("EnhancedFxn_IFC0", BYTE),
("EnhancedFxn_IFC1", BYTE),
("EnhancedFxn_IFC2", BYTE),
("EnhancedFxn_IFC3", BYTE),
("EnhancedFxn_Device", BYTE),
("ExtClk0Freq", BYTE),
("ExtClk1Freq", BYTE),
("ExtClk2Freq", BYTE),
("ExtClk3Freq", BYTE),
]
class FIRMWARE_T(Structure):
_fields_ = [
("major", BYTE),
("minor", BYTE),
("build", BYTE),
]
class CP210xError(Exception):
"""Exception raised for CP210x library errors."""
ERROR_MESSAGES = {
CP210x_DEVICE_NOT_FOUND: "Device not found",
CP210x_INVALID_HANDLE: "Invalid handle",
CP210x_INVALID_PARAMETER: "Invalid parameter",
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
CP210x_GLOBAL_DATA_ERROR: "Global data error",
CP210x_FILE_ERROR: "File error",
CP210x_COMMAND_FAILED: "Command failed",
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
}
def __init__(self, status: int, context: str = ""):
self.status = status
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
if context:
msg = f"{context}: {msg}"
super().__init__(msg)
class CP210xLibrary:
"""Wrapper for the CP210x manufacturing library."""
def __init__(self, lib_path: str | None = None):
if lib_path:
self._lib = ctypes.CDLL(lib_path)
else:
search_paths = [
"/usr/lib/libcp210xmanufacturing.so",
"/usr/local/lib/libcp210xmanufacturing.so",
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
]
for path in search_paths:
if Path(path).exists():
self._lib = ctypes.CDLL(path)
break
else:
raise FileNotFoundError(
"libcp210xmanufacturing.so not found. Install with: "
"cd aur/cp210xmanufacturing && makepkg -si"
)
self._setup_functions()
def _setup_functions(self):
"""Set up function prototypes for all library functions."""
lib = self._lib
# --- Device enumeration ---
lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
lib.CP210x_GetNumDevices.restype = CP210x_STATUS
lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
lib.CP210x_GetProductString.restype = CP210x_STATUS
lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
lib.CP210x_Open.restype = CP210x_STATUS
lib.CP210x_Close.argtypes = [HANDLE]
lib.CP210x_Close.restype = CP210x_STATUS
# --- Getters (scalars) ---
lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetPartNumber.restype = CP210x_STATUS
lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDevicePid.restype = CP210x_STATUS
lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
lib.CP210x_GetDeviceInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, POINTER(BYTE), BOOL]
lib.CP210x_GetDeviceInterfaceString.restype = CP210x_STATUS
lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetMaxPower.restype = CP210x_STATUS
lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
lib.CP210x_GetSelfPower.restype = CP210x_STATUS
lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
lib.CP210x_GetFlushBufferConfig.argtypes = [HANDLE, POINTER(WORD)]
lib.CP210x_GetFlushBufferConfig.restype = CP210x_STATUS
lib.CP210x_GetDeviceMode.argtypes = [HANDLE, POINTER(BYTE), POINTER(BYTE)]
lib.CP210x_GetDeviceMode.restype = CP210x_STATUS
lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
lib.CP210x_GetLockValue.restype = CP210x_STATUS
lib.CP210x_GetFirmwareVersion.argtypes = [HANDLE, POINTER(FIRMWARE_T)]
lib.CP210x_GetFirmwareVersion.restype = CP210x_STATUS
# --- Setters (scalars) ---
lib.CP210x_SetVid.argtypes = [HANDLE, WORD]
lib.CP210x_SetVid.restype = CP210x_STATUS
lib.CP210x_SetPid.argtypes = [HANDLE, WORD]
lib.CP210x_SetPid.restype = CP210x_STATUS
lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetProductString.restype = CP210x_STATUS
lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
lib.CP210x_SetInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, BYTE, BOOL]
lib.CP210x_SetInterfaceString.restype = CP210x_STATUS
lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
lib.CP210x_SetMaxPower.restype = CP210x_STATUS
lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
lib.CP210x_SetSelfPower.restype = CP210x_STATUS
lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
lib.CP210x_SetFlushBufferConfig.argtypes = [HANDLE, WORD]
lib.CP210x_SetFlushBufferConfig.restype = CP210x_STATUS
lib.CP210x_SetDeviceMode.argtypes = [HANDLE, BYTE, BYTE]
lib.CP210x_SetDeviceMode.restype = CP210x_STATUS
lib.CP210x_SetLockValue.argtypes = [HANDLE]
lib.CP210x_SetLockValue.restype = CP210x_STATUS
# --- Struct-based configs ---
lib.CP210x_GetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
lib.CP210x_GetBaudRateConfig.restype = CP210x_STATUS
lib.CP210x_SetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
lib.CP210x_SetBaudRateConfig.restype = CP210x_STATUS
lib.CP210x_GetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
lib.CP210x_GetPortConfig.restype = CP210x_STATUS
lib.CP210x_SetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
lib.CP210x_SetPortConfig.restype = CP210x_STATUS
lib.CP210x_GetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
lib.CP210x_GetDualPortConfig.restype = CP210x_STATUS
lib.CP210x_SetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
lib.CP210x_SetDualPortConfig.restype = CP210x_STATUS
lib.CP210x_GetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
lib.CP210x_GetQuadPortConfig.restype = CP210x_STATUS
lib.CP210x_SetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
lib.CP210x_SetQuadPortConfig.restype = CP210x_STATUS
# --- Advanced / raw ---
lib.CP210x_GetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_GetConfig.restype = CP210x_STATUS
lib.CP210x_SetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_SetConfig.restype = CP210x_STATUS
lib.CP210x_GetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_GetGeneric.restype = CP210x_STATUS
lib.CP210x_SetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
lib.CP210x_SetGeneric.restype = CP210x_STATUS
lib.CP210x_CreateHexFile.argtypes = [HANDLE, c_char_p]
lib.CP210x_CreateHexFile.restype = CP210x_STATUS
lib.CP210x_UpdateFirmware.argtypes = [HANDLE]
lib.CP210x_UpdateFirmware.restype = CP210x_STATUS
lib.CP210x_Reset.argtypes = [HANDLE]
lib.CP210x_Reset.restype = CP210x_STATUS
def _check_status(self, status: int, context: str = ""):
if status != CP210x_SUCCESS:
raise CP210xError(status, context)
# --- Device enumeration ---
def get_num_devices(self) -> int:
num = DWORD()
status = self._lib.CP210x_GetNumDevices(byref(num))
self._check_status(status, "GetNumDevices")
return num.value
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
self._check_status(status, "GetProductString")
return buf.value.decode('utf-8', errors='replace')
def open(self, device_index: int) -> HANDLE:
handle = HANDLE()
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
self._check_status(status, f"Open device {device_index}")
return handle
def close(self, handle: HANDLE):
status = self._lib.CP210x_Close(handle)
self._check_status(status, "Close")
# --- Scalar getters ---
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
part = BYTE()
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
self._check_status(status, "GetPartNumber")
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
def get_device_vid(self, handle: HANDLE) -> int:
vid = WORD()
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
self._check_status(status, "GetDeviceVid")
return vid.value
def get_device_pid(self, handle: HANDLE) -> int:
pid = WORD()
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
self._check_status(status, "GetDevicePid")
return pid.value
def get_manufacturer_string(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceManufacturerString")
return buf.value.decode('utf-8', errors='replace')
def get_product_string_device(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceProductString")
return buf.value.decode('utf-8', errors='replace')
def get_serial_number(self, handle: HANDLE) -> str:
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
self._check_status(status, "GetDeviceSerialNumber")
return buf.value.decode('utf-8', errors='replace')
def get_interface_string(self, handle: HANDLE, interface_number: int) -> str:
buf = ctypes.create_string_buffer(CP2108_MAX_INTERFACE_STRLEN)
length = BYTE()
status = self._lib.CP210x_GetDeviceInterfaceString(
handle, BYTE(interface_number), buf, byref(length), BOOL(1)
)
self._check_status(status, f"GetDeviceInterfaceString(ifc={interface_number})")
return buf.value.decode('utf-8', errors='replace')
def get_max_power(self, handle: HANDLE) -> int:
power = BYTE()
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
self._check_status(status, "GetMaxPower")
return power.value
def get_self_power(self, handle: HANDLE) -> bool:
self_power = BOOL()
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
self._check_status(status, "GetSelfPower")
return bool(self_power.value)
def get_device_version(self, handle: HANDLE) -> int:
version = WORD()
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
self._check_status(status, "GetDeviceVersion")
return version.value
def get_flush_buffer_config(self, handle: HANDLE) -> int:
config = WORD()
status = self._lib.CP210x_GetFlushBufferConfig(handle, byref(config))
self._check_status(status, "GetFlushBufferConfig")
return config.value
def get_device_mode(self, handle: HANDLE) -> tuple[int, int]:
eci = BYTE()
sci = BYTE()
status = self._lib.CP210x_GetDeviceMode(handle, byref(eci), byref(sci))
self._check_status(status, "GetDeviceMode")
return eci.value, sci.value
def get_lock_value(self, handle: HANDLE) -> int:
lock = BYTE()
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
self._check_status(status, "GetLockValue")
return lock.value
def get_firmware_version(self, handle: HANDLE) -> tuple[int, int, int]:
fw = FIRMWARE_T()
status = self._lib.CP210x_GetFirmwareVersion(handle, byref(fw))
self._check_status(status, "GetFirmwareVersion")
return fw.major, fw.minor, fw.build
# --- Scalar setters ---
def set_vid(self, handle: HANDLE, vid: int):
status = self._lib.CP210x_SetVid(handle, WORD(vid))
self._check_status(status, "SetVid")
def set_pid(self, handle: HANDLE, pid: int):
status = self._lib.CP210x_SetPid(handle, WORD(pid))
self._check_status(status, "SetPid")
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetManufacturerString")
def set_product_string(self, handle: HANDLE, product: str):
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetProductString")
def set_serial_number(self, handle: HANDLE, serial: str):
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
self._check_status(status, "SetSerialNumber")
def set_interface_string(self, handle: HANDLE, interface_number: int, value: str):
data = value.encode('utf-8')[:CP2108_MAX_INTERFACE_STRLEN]
status = self._lib.CP210x_SetInterfaceString(
handle, BYTE(interface_number), data, BYTE(len(data)), BOOL(1)
)
self._check_status(status, f"SetInterfaceString(ifc={interface_number})")
def set_max_power(self, handle: HANDLE, power_2ma: int):
if power_2ma > 250:
raise ValueError("Max power cannot exceed 250 (500mA)")
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
self._check_status(status, "SetMaxPower")
def set_self_power(self, handle: HANDLE, self_powered: bool):
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
self._check_status(status, "SetSelfPower")
def set_device_version(self, handle: HANDLE, version: int):
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
self._check_status(status, "SetDeviceVersion")
def set_flush_buffer_config(self, handle: HANDLE, config: int):
status = self._lib.CP210x_SetFlushBufferConfig(handle, WORD(config))
self._check_status(status, "SetFlushBufferConfig")
def set_device_mode(self, handle: HANDLE, eci_mode: int, sci_mode: int):
status = self._lib.CP210x_SetDeviceMode(handle, BYTE(eci_mode), BYTE(sci_mode))
self._check_status(status, "SetDeviceMode")
def lock_device(self, handle: HANDLE):
status = self._lib.CP210x_SetLockValue(handle)
self._check_status(status, "SetLockValue")
# --- Struct-based configs ---
def get_baud_rate_config(self, handle: HANDLE) -> list[dict]:
data = BAUD_CONFIG_DATA()
status = self._lib.CP210x_GetBaudRateConfig(handle, data)
self._check_status(status, "GetBaudRateConfig")
return [
{
"index": i,
"baud_gen": data[i].BaudGen,
"timer0_reload": data[i].Timer0Reload,
"prescaler": data[i].Prescaler,
"baud_rate": data[i].BaudRate,
}
for i in range(NUM_BAUD_CONFIGS)
]
def set_baud_rate_config(self, handle: HANDLE, configs: list[dict]):
if len(configs) != NUM_BAUD_CONFIGS:
raise ValueError(f"Expected {NUM_BAUD_CONFIGS} baud configs, got {len(configs)}")
data = BAUD_CONFIG_DATA()
for i, cfg in enumerate(configs):
data[i].BaudGen = cfg["baud_gen"]
data[i].Timer0Reload = cfg["timer0_reload"]
data[i].Prescaler = cfg["prescaler"]
data[i]._pad = 0
data[i].BaudRate = cfg["baud_rate"]
status = self._lib.CP210x_SetBaudRateConfig(handle, data)
self._check_status(status, "SetBaudRateConfig")
def get_port_config(self, handle: HANDLE) -> dict:
cfg = PORT_CONFIG()
status = self._lib.CP210x_GetPortConfig(handle, byref(cfg))
self._check_status(status, "GetPortConfig")
return {
"mode": cfg.Mode,
"reset_latch": cfg.Reset_Latch,
"suspend_latch": cfg.Suspend_Latch,
"enhanced_fxn": cfg.EnhancedFxn,
}
def set_port_config(self, handle: HANDLE, mode: int, reset_latch: int, suspend_latch: int, enhanced_fxn: int):
cfg = PORT_CONFIG()
cfg.Mode = mode
cfg.Reset_Latch = reset_latch
cfg.Suspend_Latch = suspend_latch
cfg.EnhancedFxn = enhanced_fxn
status = self._lib.CP210x_SetPortConfig(handle, byref(cfg))
self._check_status(status, "SetPortConfig")
def get_dual_port_config(self, handle: HANDLE) -> dict:
cfg = DUAL_PORT_CONFIG()
status = self._lib.CP210x_GetDualPortConfig(handle, byref(cfg))
self._check_status(status, "GetDualPortConfig")
return {
"mode": cfg.Mode,
"reset_latch": cfg.Reset_Latch,
"suspend_latch": cfg.Suspend_Latch,
"enhanced_fxn_eci": cfg.EnhancedFxn_ECI,
"enhanced_fxn_sci": cfg.EnhancedFxn_SCI,
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
}
def set_dual_port_config(self, handle: HANDLE, mode: int, reset_latch: int,
suspend_latch: int, enhanced_fxn_eci: int,
enhanced_fxn_sci: int, enhanced_fxn_device: int):
cfg = DUAL_PORT_CONFIG()
cfg.Mode = mode
cfg.Reset_Latch = reset_latch
cfg.Suspend_Latch = suspend_latch
cfg.EnhancedFxn_ECI = enhanced_fxn_eci
cfg.EnhancedFxn_SCI = enhanced_fxn_sci
cfg.EnhancedFxn_Device = enhanced_fxn_device
status = self._lib.CP210x_SetDualPortConfig(handle, byref(cfg))
self._check_status(status, "SetDualPortConfig")
def _quad_port_state_to_dict(self, qps: QUAD_PORT_STATE) -> dict:
return {
f"pb{i}": {"mode": getattr(qps, f"Mode_PB{i}"),
"low_power": getattr(qps, f"LowPower_PB{i}"),
"latch": getattr(qps, f"Latch_PB{i}")}
for i in range(5)
}
def get_quad_port_config(self, handle: HANDLE) -> dict:
cfg = QUAD_PORT_CONFIG()
status = self._lib.CP210x_GetQuadPortConfig(handle, byref(cfg))
self._check_status(status, "GetQuadPortConfig")
return {
"reset_latch": self._quad_port_state_to_dict(cfg.Reset_Latch),
"suspend_latch": self._quad_port_state_to_dict(cfg.Suspend_Latch),
"ip_delay": [cfg.IPDelay_IFC0, cfg.IPDelay_IFC1, cfg.IPDelay_IFC2, cfg.IPDelay_IFC3],
"enhanced_fxn": [cfg.EnhancedFxn_IFC0, cfg.EnhancedFxn_IFC1,
cfg.EnhancedFxn_IFC2, cfg.EnhancedFxn_IFC3],
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
"ext_clk_freq": [cfg.ExtClk0Freq, cfg.ExtClk1Freq,
cfg.ExtClk2Freq, cfg.ExtClk3Freq],
}
def set_quad_port_config(self, handle: HANDLE, config_dict: dict):
cfg = QUAD_PORT_CONFIG()
for state_name in ("reset_latch", "suspend_latch"):
state = getattr(cfg, state_name.title().replace("_l", "_L"))
src = config_dict[state_name]
for i in range(5):
pb = src[f"pb{i}"]
setattr(state, f"Mode_PB{i}", pb["mode"])
setattr(state, f"LowPower_PB{i}", pb["low_power"])
setattr(state, f"Latch_PB{i}", pb["latch"])
for i in range(4):
setattr(cfg, f"IPDelay_IFC{i}", config_dict["ip_delay"][i])
setattr(cfg, f"EnhancedFxn_IFC{i}", config_dict["enhanced_fxn"][i])
cfg.EnhancedFxn_Device = config_dict["enhanced_fxn_device"]
for i in range(4):
setattr(cfg, f"ExtClk{i}Freq", config_dict["ext_clk_freq"][i])
status = self._lib.CP210x_SetQuadPortConfig(handle, byref(cfg))
self._check_status(status, "SetQuadPortConfig")
# --- Advanced / raw ---
def get_config(self, handle: HANDLE, size: int = 512) -> bytes:
buf = (BYTE * size)()
status = self._lib.CP210x_GetConfig(handle, buf, WORD(size))
self._check_status(status, "GetConfig")
return bytes(buf)
def set_config(self, handle: HANDLE, data: bytes):
buf = (BYTE * len(data))(*data)
status = self._lib.CP210x_SetConfig(handle, buf, WORD(len(data)))
self._check_status(status, "SetConfig")
def get_generic(self, handle: HANDLE, size: int = 512) -> bytes:
buf = (BYTE * size)()
status = self._lib.CP210x_GetGeneric(handle, buf, WORD(size))
self._check_status(status, "GetGeneric")
return bytes(buf)
def set_generic(self, handle: HANDLE, data: bytes):
buf = (BYTE * len(data))(*data)
status = self._lib.CP210x_SetGeneric(handle, buf, WORD(len(data)))
self._check_status(status, "SetGeneric")
def create_hex_file(self, handle: HANDLE, filename: str):
status = self._lib.CP210x_CreateHexFile(handle, filename.encode('utf-8'))
self._check_status(status, "CreateHexFile")
def update_firmware(self, handle: HANDLE):
status = self._lib.CP210x_UpdateFirmware(handle)
self._check_status(status, "UpdateFirmware")
def reset(self, handle: HANDLE):
status = self._lib.CP210x_Reset(handle)
self._check_status(status, "Reset")
class CP210xDevice:
"""Context manager for safe device access."""
def __init__(self, lib: CP210xLibrary, device_index: int):
self.lib = lib
self.device_index = device_index
self.handle = None
self._part_code: int | None = None
def __enter__(self):
self.handle = self.lib.open(self.device_index)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handle:
self.lib.close(self.handle)
self.handle = None
return False
def _get_part_code(self) -> int:
if self._part_code is None:
self._part_code = self.lib.get_part_number(self.handle)[0]
return self._part_code
def _require_part(self, allowed: set[int], feature: str):
code = self._get_part_code()
if code not in allowed:
name = PART_NUMBERS.get(code, f"0x{code:02X}")
allowed_names = ", ".join(PART_NUMBERS.get(c, f"0x{c:02X}") for c in sorted(allowed))
raise CP210xError(
CP210x_FUNCTION_NOT_SUPPORTED,
f"{feature} not supported on {name} (requires {allowed_names})",
)
@property
def part_number(self) -> tuple[int, str]:
return self.lib.get_part_number(self.handle)
@property
def vid(self) -> int:
return self.lib.get_device_vid(self.handle)
@property
def pid(self) -> int:
return self.lib.get_device_pid(self.handle)
@property
def manufacturer(self) -> str:
return self.lib.get_manufacturer_string(self.handle)
@manufacturer.setter
def manufacturer(self, value: str):
self.lib.set_manufacturer_string(self.handle, value)
@property
def product(self) -> str:
return self.lib.get_product_string_device(self.handle)
@product.setter
def product(self, value: str):
self.lib.set_product_string(self.handle, value)
@property
def serial_number(self) -> str:
return self.lib.get_serial_number(self.handle)
@serial_number.setter
def serial_number(self, value: str):
self.lib.set_serial_number(self.handle, value)
@property
def max_power_ma(self) -> int:
return self.lib.get_max_power(self.handle) * 2
@max_power_ma.setter
def max_power_ma(self, value: int):
self.lib.set_max_power(self.handle, value // 2)
@property
def self_powered(self) -> bool:
return self.lib.get_self_power(self.handle)
@self_powered.setter
def self_powered(self, value: bool):
self.lib.set_self_power(self.handle, value)
@property
def device_version(self) -> str:
v = self.lib.get_device_version(self.handle)
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
@device_version.setter
def device_version(self, value: int):
self.lib.set_device_version(self.handle, value)
@property
def is_locked(self) -> bool:
return self.lib.get_lock_value(self.handle) != 0
@property
def firmware_version(self) -> str | None:
"""Firmware version string (CP2102N only). Returns None if not supported."""
try:
major, minor, build = self.lib.get_firmware_version(self.handle)
return f"{major}.{minor}.{build}"
except CP210xError:
return None
@property
def flush_buffer_config(self) -> int | None:
"""Raw flush buffer config word. Returns None if not supported."""
try:
return self.lib.get_flush_buffer_config(self.handle)
except CP210xError:
return None
@property
def device_mode(self) -> tuple[int, int] | None:
"""(ECI mode, SCI mode) for CP2105. Returns None if not supported."""
try:
return self.lib.get_device_mode(self.handle)
except CP210xError:
return None
def get_interface_string(self, interface_number: int) -> str:
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
return self.lib.get_interface_string(self.handle, interface_number)
def set_interface_string(self, interface_number: int, value: str):
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
self.lib.set_interface_string(self.handle, interface_number, value)
def reset(self):
self.lib.reset(self.handle)

957
src/mcp210x_uart/server.py Normal file
View File

@ -0,0 +1,957 @@
"""FastMCP server for CP210x device customization."""
import subprocess
from fastmcp import Context, FastMCP
from fastmcp.server.elicitation import AcceptedElicitation
from .bindings import (
ENHANCED_FXN_FLAGS,
FLUSH_FLAGS_CP2104,
FLUSH_FLAGS_CP2105,
PARTS_CP2102N,
PARTS_WITH_DEVICE_MODE,
PARTS_WITH_DUAL_PORT_CONFIG,
PARTS_WITH_FLUSH_CONFIG,
PARTS_WITH_PORT_CONFIG,
PARTS_WITH_QUAD_PORT_CONFIG,
PORT_PIN_FLAGS,
CP210xDevice,
CP210xError,
CP210xLibrary,
decode_bitmask,
encode_bitmask,
)
mcp = FastMCP(
"cp210x",
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
)
_lib: CP210xLibrary | None = None
def get_lib() -> CP210xLibrary:
global _lib
if _lib is None:
_lib = CP210xLibrary()
return _lib
async def confirm_write(ctx: Context, message: str) -> bool:
"""Normal-tier confirmation: falls back to proceeding if elicitation unavailable."""
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return True
async def strict_confirm(ctx: Context, message: str) -> dict | None:
"""Strict-tier confirmation: returns error dict if elicitation unavailable."""
if not ctx:
return {
"error": "This operation requires interactive confirmation",
"message": "Too dangerous to proceed without explicit user consent.",
}
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
return {
"error": "This operation requires elicitation support",
"message": "Your MCP client does not support elicitation. "
"This operation is too dangerous to proceed without explicit confirmation.",
}
if not confirmed:
return {"cancelled": True, "message": "Operation cancelled by user"}
return None
# ===========================================================================
# Read-only tools (no confirmation needed)
# ===========================================================================
@mcp.tool()
def list_devices() -> list[dict]:
"""List all connected CP210x devices.
Returns a list of devices with their index, description, and serial number.
Use the index to reference a specific device in other tools.
"""
lib = get_lib()
num_devices = lib.get_num_devices()
devices = []
for i in range(num_devices):
try:
desc = lib.get_product_string(i, flag=1)
serial = lib.get_product_string(i, flag=0)
devices.append({
"index": i,
"description": desc,
"serial_number": serial,
})
except CP210xError as e:
devices.append({"index": i, "error": str(e)})
return devices
@mcp.tool()
def get_device_info(device_index: int = 0) -> dict:
"""Get detailed information about a CP210x device.
Args:
device_index: Zero-based index of the device (default: 0 for first device)
Returns full device details including part number, VID/PID, strings, and power settings.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
info = {
"part_number": part_name,
"part_code": f"0x{part_code:02X}",
"vid": f"0x{dev.vid:04X}",
"pid": f"0x{dev.pid:04X}",
"manufacturer": dev.manufacturer,
"product": dev.product,
"serial_number": dev.serial_number,
"device_version": dev.device_version,
"max_power_ma": dev.max_power_ma,
"self_powered": dev.self_powered,
"is_locked": dev.is_locked,
}
# Conditionally add part-specific fields
fw = dev.firmware_version
if fw is not None:
info["firmware_version"] = fw
flush = dev.flush_buffer_config
if flush is not None:
# Decode to named flags based on part
if part_code in PARTS_WITH_FLUSH_CONFIG:
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
info["flush_buffer_config"] = decode_bitmask(flush, flag_map)
else:
info["flush_buffer_config_raw"] = f"0x{flush:04X}"
mode = dev.device_mode
if mode is not None:
info["device_mode"] = {"eci": mode[0], "sci": mode[1]}
return info
@mcp.tool()
def get_firmware_version(device_index: int = 0) -> dict:
"""Get firmware version (CP2102N only).
Args:
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
fw = dev.firmware_version
if fw is None:
part_code, part_name = dev.part_number
return {"error": f"Firmware version not available on {part_name}",
"note": "Only CP2102N variants support this"}
return {"firmware_version": fw}
@mcp.tool()
def get_flush_buffer_config(device_index: int = 0) -> dict:
"""Get flush buffer configuration (CP2104/CP2105/CP2108).
Args:
device_index: Zero-based device index (default: 0)
Returns named flags showing which buffers are flushed on open/close.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_FLUSH_CONFIG:
return {"error": f"Flush buffer config not available on {part_name}",
"supported": "CP2104, CP2105, CP2108"}
raw = lib.get_flush_buffer_config(dev.handle)
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
return {
"raw": f"0x{raw:04X}",
"flags": decode_bitmask(raw, flag_map),
}
@mcp.tool()
def get_device_mode(device_index: int = 0) -> dict:
"""Get device mode (CP2105 only).
Args:
device_index: Zero-based device index (default: 0)
Returns ECI and SCI interface modes.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_DEVICE_MODE:
return {"error": f"Device mode not available on {part_name}",
"supported": "CP2105"}
eci, sci = lib.get_device_mode(dev.handle)
return {"eci_mode": eci, "sci_mode": sci}
@mcp.tool()
def get_interface_string(interface_number: int, device_index: int = 0) -> dict:
"""Get USB interface string (CP2105/CP2108 only).
Args:
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
try:
value = dev.get_interface_string(interface_number)
return {"interface": interface_number, "value": value}
except CP210xError as e:
return {"error": str(e)}
@mcp.tool()
def get_baud_rate_config(device_index: int = 0) -> dict:
"""Get the baud rate alias configuration table (32 entries).
Args:
device_index: Zero-based device index (default: 0)
Returns the full 32-entry baud rate alias table showing how standard
baud rates map to timer register values.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
configs = lib.get_baud_rate_config(dev.handle)
return {"entries": configs}
@mcp.tool()
def get_port_config(device_index: int = 0) -> dict:
"""Get GPIO port configuration (auto-detects CP2103/4/5/8).
Args:
device_index: Zero-based device index (default: 0)
Returns pin modes, latch values, and enhanced function settings.
Auto-dispatches to the correct struct based on part number.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code in PARTS_WITH_PORT_CONFIG:
raw = lib.get_port_config(dev.handle)
return {
"type": "single",
"part": part_name,
"mode": decode_bitmask(raw["mode"], PORT_PIN_FLAGS),
"reset_latch": decode_bitmask(raw["reset_latch"], PORT_PIN_FLAGS),
"suspend_latch": decode_bitmask(raw["suspend_latch"], PORT_PIN_FLAGS),
"enhanced_fxn": decode_bitmask(raw["enhanced_fxn"], ENHANCED_FXN_FLAGS),
"raw": raw,
}
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
raw = lib.get_dual_port_config(dev.handle)
return {"type": "dual", "part": part_name, **raw}
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
raw = lib.get_quad_port_config(dev.handle)
return {"type": "quad", "part": part_name, **raw}
else:
return {"error": f"Port config not available on {part_name}",
"supported": "CP2103, CP2104, CP2105, CP2108"}
@mcp.tool()
def get_raw_config(device_index: int = 0, size: int = 512) -> dict:
"""Read raw EPROM configuration blob.
Args:
device_index: Zero-based device index (default: 0)
size: Number of bytes to read (default: 512)
Returns hex-encoded config data.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
data = lib.get_config(dev.handle, size)
return {"hex": data.hex(), "size": len(data)}
@mcp.tool()
def create_hex_file(filename: str, device_index: int = 0) -> dict:
"""Dump device config to Intel HEX file.
Args:
filename: Output file path for the .hex file
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
lib.create_hex_file(dev.handle, filename)
return {"success": True, "filename": filename}
# ===========================================================================
# Normal-tier write tools (elicit → fallback to proceed)
# ===========================================================================
@mcp.tool()
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB product string (device name shown to host).
Args:
product: New product string (max 126 characters)
device_index: Zero-based device index (default: 0)
Returns the updated device info. Device may need to be re-plugged for
changes to appear on the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_product = dev.product
new_value = product[:126]
if ctx and not await confirm_write(
ctx,
f"Write product string to CP210x OTP EPROM?\n\n"
f" Old: {old_product}\n New: {new_value}\n\n"
f"OTP writes are limited — this cannot be undone easily.",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.product = product
return {"success": True, "old_value": old_product, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB manufacturer string.
Args:
manufacturer: New manufacturer string (max 45 characters)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_mfr = dev.manufacturer
new_value = manufacturer[:45]
if ctx and not await confirm_write(
ctx,
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
f" Old: {old_mfr}\n New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.manufacturer = manufacturer
return {"success": True, "old_value": old_mfr, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB serial number.
Args:
serial_number: New serial number (max 63 characters)
device_index: Zero-based device index (default: 0)
Note: Changing serial number may affect udev rules that match on serial.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_serial = dev.serial_number
new_value = serial_number[:63]
if ctx and not await confirm_write(
ctx,
f"Write serial number to CP210x OTP EPROM?\n\n"
f" Old: {old_serial}\n New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.serial_number = serial_number
return {"success": True, "old_value": old_serial, "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host"}
@mcp.tool()
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the maximum USB power draw in milliamps.
Args:
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if max_power_ma < 0 or max_power_ma > 500:
return {"error": "max_power_ma must be 0-500"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_power = dev.max_power_ma
actual_value = (max_power_ma // 2) * 2
if ctx and not await confirm_write(
ctx,
f"Write max power to CP210x OTP EPROM?\n\n"
f" Old: {old_power} mA\n New: {actual_value} mA",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.max_power_ma = max_power_ma
return {"success": True, "old_value_ma": old_power, "new_value_ma": actual_value}
@mcp.tool()
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
"""Set whether device reports as self-powered or bus-powered.
Args:
self_powered: True for self-powered, False for bus-powered
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_value = dev.self_powered
mode = "self-powered" if self_powered else "bus-powered"
if ctx and not await confirm_write(
ctx,
f"Write power mode to CP210x OTP EPROM?\n\n"
f" Old: {'self-powered' if old_value else 'bus-powered'}\n New: {mode}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.self_powered = self_powered
return {"success": True, "old_value": old_value, "new_value": self_powered}
@mcp.tool()
async def set_device_version(version: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set device version (bcdDevice field).
Args:
version: BCD version number (e.g., 0x0100 for 1.00)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_version = dev.device_version
if ctx and not await confirm_write(
ctx,
f"Write device version to CP210x OTP EPROM?\n\n"
f" Old: {old_version}\n New: 0x{version:04X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.device_version = version
new_ver = f"{(version >> 8) & 0xFF}.{version & 0xFF:02d}"
return {"success": True, "old_value": old_version, "new_value": new_ver}
@mcp.tool()
async def set_interface_string(
interface_number: int, value: str, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set USB interface string (CP2105/CP2108 only).
Args:
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
value: New interface string
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
try:
old_value = dev.get_interface_string(interface_number)
except CP210xError as e:
return {"error": str(e)}
if ctx and not await confirm_write(
ctx,
f"Write interface {interface_number} string to CP210x OTP EPROM?\n\n"
f" Old: {old_value}\n New: {value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.set_interface_string(interface_number, value)
return {"success": True, "interface": interface_number,
"old_value": old_value, "new_value": value}
@mcp.tool()
async def set_flush_buffer_config(
flags: dict, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set flush buffer configuration (CP2104/CP2105/CP2108).
Args:
flags: Dict of flag_name -> bool. Flag names depend on part:
CP2104: open_tx, open_rx, close_tx, close_rx
CP2105: open_tx_sci, open_rx_sci, close_tx_sci, close_rx_sci,
open_tx_eci, open_rx_eci, close_tx_eci, close_rx_eci
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_FLUSH_CONFIG:
return {"error": f"Flush buffer config not available on {part_name}"}
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
new_value = encode_bitmask(flags, flag_map)
old_raw = lib.get_flush_buffer_config(dev.handle)
if ctx and not await confirm_write(
ctx,
f"Write flush buffer config to CP210x?\n\n"
f" Old: 0x{old_raw:04X}\n New: 0x{new_value:04X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_flush_buffer_config(dev.handle, new_value)
return {"success": True, "old_raw": f"0x{old_raw:04X}",
"new_raw": f"0x{new_value:04X}", "flags": flags}
@mcp.tool()
async def set_device_mode(
eci_mode: int, sci_mode: int, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set device mode (CP2105 only).
Args:
eci_mode: Enhanced interface mode byte
sci_mode: Standard interface mode byte
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if part_code not in PARTS_WITH_DEVICE_MODE:
return {"error": f"Device mode not available on {part_name}"}
old_eci, old_sci = lib.get_device_mode(dev.handle)
if ctx and not await confirm_write(
ctx,
f"Write device mode to CP210x?\n\n"
f" Old ECI: 0x{old_eci:02X}, SCI: 0x{old_sci:02X}\n"
f" New ECI: 0x{eci_mode:02X}, SCI: 0x{sci_mode:02X}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_device_mode(dev.handle, eci_mode, sci_mode)
return {"success": True,
"old": {"eci": old_eci, "sci": old_sci},
"new": {"eci": eci_mode, "sci": sci_mode}}
@mcp.tool()
async def set_baud_rate_config(
entries: list[dict], device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set the full 32-entry baud rate alias table.
Args:
entries: List of 32 dicts, each with baud_gen, timer0_reload, prescaler, baud_rate
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if len(entries) != 32:
return {"error": f"Expected 32 baud config entries, got {len(entries)}"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
if ctx and not await confirm_write(
ctx,
"Write complete baud rate table (32 entries) to CP210x OTP EPROM?",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_baud_rate_config(dev.handle, entries)
return {"success": True, "entries_written": 32}
@mcp.tool()
async def set_baud_rate_alias(
index: int, baud_rate: int, baud_gen: int, timer0_reload: int, prescaler: int,
device_index: int = 0, ctx: Context = None,
) -> dict:
"""Modify a single baud rate alias entry (read-modify-write).
Args:
index: Entry index (0-31)
baud_rate: Target baud rate
baud_gen: Baud rate generator register value
timer0_reload: Timer0 reload register value
prescaler: Prescaler value
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
if not 0 <= index < 32:
return {"error": "Index must be 0-31"}
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
current = lib.get_baud_rate_config(dev.handle)
old_entry = current[index].copy()
current[index] = {
"index": index,
"baud_gen": baud_gen,
"timer0_reload": timer0_reload,
"prescaler": prescaler,
"baud_rate": baud_rate,
}
if ctx and not await confirm_write(
ctx,
f"Modify baud rate alias #{index}?\n\n"
f" Old: {old_entry['baud_rate']} baud\n"
f" New: {baud_rate} baud",
):
return {"cancelled": True, "message": "Write cancelled by user"}
lib.set_baud_rate_config(dev.handle, current)
return {"success": True, "index": index,
"old_entry": old_entry, "new_baud_rate": baud_rate}
@mcp.tool()
async def set_port_config(
config: dict, device_index: int = 0, ctx: Context = None,
) -> dict:
"""Set GPIO port configuration (auto-detects CP2103/4/5/8).
Args:
config: Port configuration dict. Structure depends on part type:
Single (CP2103/4): {mode, reset_latch, suspend_latch, enhanced_fxn}
- Values can be raw integers or named-flag dicts
Dual (CP2105): {mode, reset_latch, suspend_latch,
enhanced_fxn_eci, enhanced_fxn_sci, enhanced_fxn_device}
Quad (CP2108): Full quad port config dict (see get_port_config output)
device_index: Zero-based device index (default: 0)
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
part_code, part_name = dev.part_number
if ctx and not await confirm_write(
ctx,
f"Write port configuration to {part_name} OTP EPROM?",
):
return {"cancelled": True, "message": "Write cancelled by user"}
if part_code in PARTS_WITH_PORT_CONFIG:
# Allow either raw ints or named-flag dicts
mode = config.get("mode", 0)
if isinstance(mode, dict):
mode = encode_bitmask(mode, PORT_PIN_FLAGS)
reset = config.get("reset_latch", 0)
if isinstance(reset, dict):
reset = encode_bitmask(reset, PORT_PIN_FLAGS)
suspend = config.get("suspend_latch", 0)
if isinstance(suspend, dict):
suspend = encode_bitmask(suspend, PORT_PIN_FLAGS)
ef = config.get("enhanced_fxn", 0)
if isinstance(ef, dict):
ef = encode_bitmask(ef, ENHANCED_FXN_FLAGS)
lib.set_port_config(dev.handle, mode, reset, suspend, ef)
return {"success": True, "type": "single", "part": part_name}
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
lib.set_dual_port_config(
dev.handle,
config["mode"], config["reset_latch"], config["suspend_latch"],
config["enhanced_fxn_eci"], config["enhanced_fxn_sci"],
config["enhanced_fxn_device"],
)
return {"success": True, "type": "dual", "part": part_name}
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
lib.set_quad_port_config(dev.handle, config)
return {"success": True, "type": "quad", "part": part_name}
else:
return {"error": f"Port config not available on {part_name}"}
@mcp.tool()
def reset_device(device_index: int = 0) -> dict:
"""Reset the CP210x device (USB disconnect/reconnect).
Args:
device_index: Zero-based device index (default: 0)
This triggers a USB re-enumeration, which will make any programmed
changes visible to the USB host.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
dev.reset()
return {"success": True, "note": "Device has been reset. It may take a moment to re-enumerate."}
@mcp.tool()
async def setup_udev_rule(
device_index: int = 0,
symlink_name: str | None = None,
ctx: Context = None,
) -> dict:
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
Generates a rule that matches the device's product string and creates
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
and port reordering.
Args:
device_index: Zero-based device index (default: 0)
symlink_name: Custom symlink name. If not provided, auto-generates
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
Requires sudo for rule installation.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
product = dev.product
vid = dev.vid
pid = dev.pid
if not product or product == "CP2102 USB to UART Bridge Controller":
return {
"error": "Device has default product string",
"message": "Customize the product string first to create a unique match rule.",
}
if not symlink_name:
parts = product.split()
if len(parts) >= 2:
prefix = parts[0].lower()
suffix = parts[-1][-4:]
symlink_name = f"{prefix}-{suffix}"
else:
symlink_name = product.lower().replace(" ", "-")[:32]
match_suffix = product[-4:]
rule = (
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
f'ATTRS{{idProduct}}=="{pid:04x}", '
f'ATTRS{{product}}=="*{match_suffix}", '
f'SYMLINK+="{symlink_name}"'
)
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
rule_content = (
f"# Auto-generated by mcp210x-uart for: {product}\n"
f"# Creates /dev/{symlink_name} symlink\n"
f"{rule}\n"
)
install_msg = (
f"Install udev rule for stable device symlink?\n\n"
f" Device: {product}\n"
f" Symlink: /dev/{symlink_name}\n"
f" File: {rules_path}\n\n"
f"Requires sudo to install."
)
if ctx and not await confirm_write(ctx, install_msg):
return {"cancelled": True, "rule": rule_content,
"message": "Rule not installed. You can install it manually."}
try:
result = subprocess.run(
["sudo", "tee", rules_path],
input=rule_content, capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return {"error": f"Failed to install rule: {result.stderr.strip()}",
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
subprocess.run(["sudo", "udevadm", "control", "--reload-rules"],
capture_output=True, timeout=10)
subprocess.run(["sudo", "udevadm", "trigger"],
capture_output=True, timeout=10)
except subprocess.TimeoutExpired:
return {"error": "Sudo timed out — may need interactive password",
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
return {"success": True, "symlink": f"/dev/{symlink_name}",
"rules_file": rules_path, "rule": rule_content,
"note": "Symlink will appear after device re-plug or udevadm trigger"}
# ===========================================================================
# Strict-tier write tools (hard-refuse without elicitation)
# ===========================================================================
@mcp.tool()
async def set_vid(vid: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB Vendor ID (DANGEROUS — can break driver matching).
Args:
vid: New USB Vendor ID (16-bit, e.g., 0x10C4 for Silicon Labs)
device_index: Zero-based device index (default: 0)
WARNING: Changing VID can prevent the device from being recognized by
standard CP210x drivers. Only use if you have custom drivers.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
f"Write USB Vendor ID to CP210x OTP EPROM?\n\n"
f" New VID: 0x{vid:04X}\n\n"
f"WARNING: Changing VID can prevent the device from being recognized\n"
f"by standard CP210x drivers. This is PERMANENT.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_vid = dev.vid
lib.set_vid(dev.handle, vid)
return {"success": True, "old_vid": f"0x{old_vid:04X}", "new_vid": f"0x{vid:04X}"}
@mcp.tool()
async def set_pid(pid: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB Product ID (DANGEROUS — can break driver matching).
Args:
pid: New USB Product ID (16-bit, e.g., 0xEA60)
device_index: Zero-based device index (default: 0)
WARNING: Changing PID can prevent the device from being recognized by
standard CP210x drivers.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
f"Write USB Product ID to CP210x OTP EPROM?\n\n"
f" New PID: 0x{pid:04X}\n\n"
f"WARNING: Changing PID can prevent the device from being recognized\n"
f"by standard CP210x drivers. This is PERMANENT.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
old_pid = dev.pid
lib.set_pid(dev.handle, pid)
return {"success": True, "old_pid": f"0x{old_pid:04X}", "new_pid": f"0x{pid:04X}"}
@mcp.tool()
async def set_raw_config(hex_data: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Write raw configuration blob to device EPROM (DANGEROUS).
Args:
hex_data: Hex-encoded config data
device_index: Zero-based device index (default: 0)
WARNING: Writing invalid config data can brick the device.
Use get_raw_config to read current config first.
"""
lib = get_lib()
try:
data = bytes.fromhex(hex_data)
except ValueError:
return {"error": "Invalid hex string"}
err = await strict_confirm(
ctx,
f"Write {len(data)} bytes of raw config to CP210x EPROM?\n\n"
f"WARNING: Writing invalid config data can brick the device.\n"
f"This is PERMANENT and cannot be undone.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is locked and cannot be modified"}
lib.set_config(dev.handle, data)
return {"success": True, "bytes_written": len(data)}
@mcp.tool()
async def update_firmware(device_index: int = 0, ctx: Context = None) -> dict:
"""Update device firmware (CP2102N only, DANGEROUS).
Args:
device_index: Zero-based device index (default: 0)
WARNING: Firmware update failure can brick the device.
"""
lib = get_lib()
err = await strict_confirm(
ctx,
"Update CP2102N firmware?\n\n"
"WARNING: Firmware update failure can BRICK the device.\n"
"Ensure stable USB connection and power during update.",
)
if err:
return err
with CP210xDevice(lib, device_index) as dev:
part_code, part_name = dev.part_number
if part_code not in PARTS_CP2102N:
return {"error": f"Firmware update not available on {part_name}",
"supported": "CP2102N variants only"}
lib.update_firmware(dev.handle)
return {"success": True, "note": "Firmware updated. Device may need reset."}
@mcp.tool()
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
"""PERMANENTLY lock the device to prevent further customization.
Args:
device_index: Zero-based device index (default: 0)
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
configuration cannot be changed. The device will still function normally,
but strings, power settings, etc. cannot be modified.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
if dev.is_locked:
return {"error": "Device is already locked"}
part_code, part_name = dev.part_number
err = await strict_confirm(
ctx,
f"PERMANENTLY lock this {part_name}?\n\n"
f" Product: {dev.product}\n"
f" Serial: {dev.serial_number}\n\n"
f"THIS CANNOT BE UNDONE. The device configuration\n"
f"will be frozen forever.",
)
if err:
return err
lib.lock_device(dev.handle)
return {"success": True,
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed."}
def main():
"""Entry point for the MCP server."""
mcp.run()
if __name__ == "__main__":
main()

10
uv.lock generated
View File

@ -780,8 +780,8 @@ wheels = [
]
[[package]]
name = "mcp210x"
version = "0.1.0"
name = "mcp210x-uart"
version = "0.2.0"
source = { editable = "." }
dependencies = [
{ name = "fastmcp" },
@ -1191,11 +1191,11 @@ wheels = [
[[package]]
name = "pyjwt"
version = "2.10.1"
version = "2.11.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/46/bd74733ff231675599650d3e47f361794b22ef3e3770998dda30d3b63726/pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953", size = 87785, upload-time = "2024-11-28T03:43:29.933Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5c/5a/b46fa56bf322901eee5b0454a34343cdbdae202cd421775a8ee4e42fd519/pyjwt-2.11.0.tar.gz", hash = "sha256:35f95c1f0fbe5d5ba6e43f00271c275f7a1a4db1dab27bf708073b75318ea623", size = 98019, upload-time = "2026-01-30T19:59:55.694Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" },
{ url = "https://files.pythonhosted.org/packages/6f/01/c26ce75ba460d5cd503da9e13b21a33804d38c2165dec7b716d06b13010c/pyjwt-2.11.0-py3-none-any.whl", hash = "sha256:94a6bde30eb5c8e04fee991062b534071fd1439ef58d2adc9ccb823e7bcd0469", size = 28224, upload-time = "2026-01-30T19:59:54.539Z" },
]
[package.optional-dependencies]