Compare commits
3 Commits
a28eed3849
...
07d6ee6ff1
| Author | SHA1 | Date | |
|---|---|---|---|
| 07d6ee6ff1 | |||
| c0bedde54b | |||
| 80220f5728 |
@ -7,7 +7,7 @@
|
||||
"run",
|
||||
"--directory",
|
||||
"/home/rpm/claude/lora/cp2102-uart",
|
||||
"mcp210x"
|
||||
"mcp210x-uart"
|
||||
],
|
||||
"env": {}
|
||||
}
|
||||
|
||||
113
README.md
113
README.md
@ -1,10 +1,10 @@
|
||||
# mcp210x
|
||||
# mcp210x-uart
|
||||
|
||||
It's MCP. It's CP210x. It was right there the whole time.
|
||||
|
||||
An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, udev rules, and device locking — through natural language in Claude Code.
|
||||
An MCP server for customizing Silicon Labs CP210x USB-UART bridge devices — product strings, serial numbers, power config, GPIO port config, baud rate tables, udev rules, and device locking — through natural language in Claude Code.
|
||||
|
||||
Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to Silicon Labs' native `libcp210xmanufacturing` library.
|
||||
Built on [FastMCP](https://gofastmcp.com/) with Python ctypes bindings to all 45 functions in Silicon Labs' native `libcp210xmanufacturing` library. Covers the full CP210x family: CP2101, CP2102, CP2102N, CP2103, CP2104, CP2105, CP2108, and CP2109.
|
||||
|
||||
## The problem
|
||||
|
||||
@ -32,26 +32,78 @@ Two devices found:
|
||||
|
||||
Each device gets a unique product string baked into its USB descriptor EPROM. Udev rules match on that string to create stable symlinks. Devices survive reboots, port reordering, and hub changes.
|
||||
|
||||
## Features
|
||||
## Tools
|
||||
|
||||
- **List and inspect** connected CP210x devices (part number, VID/PID, strings, power, lock state)
|
||||
- **Write USB descriptors** — product string, manufacturer, serial number
|
||||
- **Configure power** — max current draw, self-powered vs bus-powered
|
||||
- **Generate udev rules** — stable `/dev/` symlinks based on product string
|
||||
- **Reset device** — trigger USB re-enumeration after changes
|
||||
- **Lock device** — permanently freeze configuration (with strict confirmation gate)
|
||||
### Read-only (no confirmation)
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `list_devices` | List all connected CP210x devices |
|
||||
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state, firmware version (CP2102N), flush config, device mode |
|
||||
| `get_firmware_version` | Firmware version (CP2102N only) |
|
||||
| `get_flush_buffer_config` | Flush buffer configuration (CP2104/CP2105/CP2108) |
|
||||
| `get_device_mode` | Device mode — ECI/SCI assignment (CP2105 only) |
|
||||
| `get_interface_string` | USB interface string (CP2105/CP2108 only) |
|
||||
| `get_baud_rate_config` | Baud rate alias table (32 entries) |
|
||||
| `get_port_config` | GPIO port configuration (auto-detects CP2103/4/5/8) |
|
||||
| `get_raw_config` | Raw EPROM configuration blob (hex string) |
|
||||
| `create_hex_file` | Dump device config to Intel HEX file |
|
||||
| `reset_device` | USB disconnect/reconnect to apply changes |
|
||||
|
||||
### Normal writes (elicitation → fallback to proceed)
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `set_product_string` | Write USB product string (max 126 chars) |
|
||||
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
|
||||
| `set_serial_number` | Write USB serial number (max 63 chars) |
|
||||
| `set_max_power` | Set max USB power draw in mA (0–500, rounded to nearest 2) |
|
||||
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
|
||||
| `set_device_version` | Set device version (bcdDevice field) |
|
||||
| `set_interface_string` | Set USB interface string (CP2105/CP2108 only) |
|
||||
| `set_flush_buffer_config` | Set flush buffer configuration (CP2104/CP2105/CP2108) |
|
||||
| `set_device_mode` | Set device mode (CP2105 only) |
|
||||
| `set_baud_rate_config` | Set full 32-entry baud rate alias table |
|
||||
| `set_baud_rate_alias` | Modify a single baud rate alias entry (read-modify-write) |
|
||||
| `set_port_config` | Set GPIO port configuration (auto-detects CP2103/4/5/8) |
|
||||
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
|
||||
|
||||
### Strict writes (elicitation required — hard-refuses without it)
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `set_vid` | Set USB Vendor ID (can break driver matching) |
|
||||
| `set_pid` | Set USB Product ID (can break driver matching) |
|
||||
| `set_raw_config` | Write raw configuration blob to EPROM |
|
||||
| `update_firmware` | Update device firmware (CP2102N only) |
|
||||
| `lock_device` | Permanently freeze device configuration |
|
||||
|
||||
## Safety model
|
||||
|
||||
CP210x descriptor EPROM is one-time-programmable with limited write cycles. Writes can't be undone. Locks are permanent. The server enforces a tiered confirmation model:
|
||||
|
||||
| Operation | Confirmation |
|
||||
|-----------|-------------|
|
||||
| Reads | None |
|
||||
| Writes (strings, power) | MCP elicitation if client supports it; proceeds otherwise |
|
||||
| Lock | Elicitation **required**; hard-refuses without it |
|
||||
| Tier | Behavior | Tools |
|
||||
|------|----------|-------|
|
||||
| **None** | No confirmation | All `get_*`, `list_devices`, `reset_device`, `create_hex_file` |
|
||||
| **Normal** | Elicitation if client supports it; proceeds otherwise | String setters, power, baud config, port config, `setup_udev_rule` |
|
||||
| **Strict** | Elicitation **required**; returns error without it | `set_vid`, `set_pid`, `set_raw_config`, `update_firmware`, `lock_device` |
|
||||
|
||||
The lock gate isn't just a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog.
|
||||
The strict gate isn't a warning — it returns an error and does not proceed if the MCP client can't present a confirmation dialog.
|
||||
|
||||
## Part-number support
|
||||
|
||||
Different CP210x parts expose different features. The server auto-gates tools to supported parts:
|
||||
|
||||
| Feature | Supported parts |
|
||||
|---------|----------------|
|
||||
| Core (strings, power, baud) | All CP210x |
|
||||
| Flush buffer config | CP2104, CP2105, CP2108 |
|
||||
| Device mode | CP2105 |
|
||||
| Interface strings | CP2105, CP2108 |
|
||||
| Port config | CP2103, CP2104 |
|
||||
| Dual port config | CP2105 |
|
||||
| Quad port config | CP2108 |
|
||||
| Firmware version/update | CP2102N |
|
||||
|
||||
## Requirements
|
||||
|
||||
@ -93,30 +145,15 @@ uv tool install .
|
||||
### 3. Claude Code
|
||||
|
||||
```bash
|
||||
claude mcp add cp210x -- uvx mcp210x
|
||||
claude mcp add cp210x -- uvx mcp210x-uart
|
||||
```
|
||||
|
||||
For development (runs from source):
|
||||
|
||||
```bash
|
||||
claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x
|
||||
claude mcp add cp210x-local -- uv run --directory /path/to/this-repo mcp210x-uart
|
||||
```
|
||||
|
||||
## Tools
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `list_devices` | List connected CP210x devices with description and serial |
|
||||
| `get_device_info` | Full device details — part number, VID/PID, strings, power, lock state |
|
||||
| `set_product_string` | Write USB product string (max 126 chars) |
|
||||
| `set_manufacturer_string` | Write USB manufacturer string (max 45 chars) |
|
||||
| `set_serial_number` | Write USB serial number (max 63 chars) |
|
||||
| `set_max_power` | Set max USB power draw in mA (0-500, rounded to nearest 2) |
|
||||
| `set_self_powered` | Toggle self-powered vs bus-powered reporting |
|
||||
| `reset_device` | USB disconnect/reconnect to apply changes |
|
||||
| `lock_device` | Permanently freeze device configuration |
|
||||
| `setup_udev_rule` | Generate and install a udev rule for a stable `/dev/` symlink |
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
@ -137,10 +174,10 @@ The native library uses **libusb** for device access, separate from the kernel's
|
||||
## Project structure
|
||||
|
||||
```
|
||||
mcp210x/
|
||||
├── src/mcp210x/
|
||||
│ ├── server.py # FastMCP tool definitions and elicitation logic
|
||||
│ ├── bindings.py # ctypes wrapper for libcp210xmanufacturing.so
|
||||
mcp210x-uart/
|
||||
├── src/mcp210x_uart/
|
||||
│ ├── server.py # FastMCP tool definitions, elicitation, part-number dispatch
|
||||
│ ├── bindings.py # ctypes: structs, prototypes, bitmask helpers, device wrapper
|
||||
│ └── __init__.py
|
||||
├── aur/cp210xmanufacturing/
|
||||
│ ├── PKGBUILD # Arch Linux package for the native library
|
||||
@ -152,7 +189,7 @@ mcp210x/
|
||||
|
||||
## Complementary tools
|
||||
|
||||
This server handles **device customization** (USB descriptors, power config). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial).
|
||||
This server handles **device customization** (USB descriptors, power config, GPIO, baud tables). For **serial communication** (sending/receiving data over UART), use [mcserial](https://git.supported.systems/MCP/mcserial).
|
||||
|
||||
## Reference
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "mcp210x"
|
||||
version = "0.1.0"
|
||||
name = "mcp210x-uart"
|
||||
version = "0.2.0"
|
||||
description = "MCP server for CP210x USB-UART bridge customization"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
@ -31,18 +31,18 @@ dev = [
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
mcp210x = "mcp210x:main"
|
||||
mcp210x-uart = "mcp210x_uart:main"
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://forge.supported.systems/MCP/mcp210x"
|
||||
Repository = "https://forge.supported.systems/MCP/mcp210x"
|
||||
Homepage = "https://forge.supported.systems/MCP/mcp210x-uart"
|
||||
Repository = "https://forge.supported.systems/MCP/mcp210x-uart"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/mcp210x"]
|
||||
packages = ["src/mcp210x_uart"]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
|
||||
@ -1,5 +0,0 @@
|
||||
"""mcp210x - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
|
||||
|
||||
from .server import mcp, main
|
||||
|
||||
__all__ = ["mcp", "main"]
|
||||
@ -1,441 +0,0 @@
|
||||
"""Low-level ctypes bindings for libcp210xmanufacturing."""
|
||||
|
||||
import ctypes
|
||||
from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_void_p, POINTER, byref
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Type aliases matching the C library
|
||||
CP210x_STATUS = c_int
|
||||
HANDLE = c_void_p
|
||||
DWORD = c_uint
|
||||
WORD = c_ushort
|
||||
BYTE = c_ubyte
|
||||
BOOL = c_int
|
||||
|
||||
# Return codes
|
||||
CP210x_SUCCESS = 0x00
|
||||
CP210x_DEVICE_NOT_FOUND = 0xFF
|
||||
CP210x_INVALID_HANDLE = 0x01
|
||||
CP210x_INVALID_PARAMETER = 0x02
|
||||
CP210x_DEVICE_IO_FAILED = 0x03
|
||||
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
|
||||
CP210x_GLOBAL_DATA_ERROR = 0x05
|
||||
CP210x_FILE_ERROR = 0x06
|
||||
CP210x_COMMAND_FAILED = 0x08
|
||||
CP210x_INVALID_ACCESS_TYPE = 0x09
|
||||
|
||||
# Part numbers
|
||||
PART_NUMBERS = {
|
||||
0x01: "CP2101",
|
||||
0x02: "CP2102",
|
||||
0x03: "CP2103",
|
||||
0x04: "CP2104",
|
||||
0x05: "CP2105",
|
||||
0x08: "CP2108",
|
||||
0x09: "CP2109",
|
||||
0x20: "CP2102N-QFN28",
|
||||
0x21: "CP2102N-QFN24",
|
||||
0x22: "CP2102N-QFN20",
|
||||
}
|
||||
|
||||
# Buffer sizes
|
||||
MAX_DEVICE_STRLEN = 256
|
||||
MAX_MANUFACTURER_STRLEN = 45
|
||||
MAX_PRODUCT_STRLEN = 126
|
||||
MAX_SERIAL_STRLEN = 63
|
||||
|
||||
# GetProductString flags
|
||||
RETURN_SERIAL_NUMBER = 0x00
|
||||
RETURN_DESCRIPTION = 0x01
|
||||
RETURN_FULL_PATH = 0x02
|
||||
|
||||
|
||||
class CP210xError(Exception):
|
||||
"""Exception raised for CP210x library errors."""
|
||||
|
||||
ERROR_MESSAGES = {
|
||||
CP210x_DEVICE_NOT_FOUND: "Device not found",
|
||||
CP210x_INVALID_HANDLE: "Invalid handle",
|
||||
CP210x_INVALID_PARAMETER: "Invalid parameter",
|
||||
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
|
||||
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
|
||||
CP210x_GLOBAL_DATA_ERROR: "Global data error",
|
||||
CP210x_FILE_ERROR: "File error",
|
||||
CP210x_COMMAND_FAILED: "Command failed",
|
||||
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
|
||||
}
|
||||
|
||||
def __init__(self, status: int, context: str = ""):
|
||||
self.status = status
|
||||
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
|
||||
if context:
|
||||
msg = f"{context}: {msg}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class CP210xLibrary:
|
||||
"""Wrapper for the CP210x manufacturing library."""
|
||||
|
||||
def __init__(self, lib_path: Optional[str] = None):
|
||||
"""Load the CP210x manufacturing library.
|
||||
|
||||
Args:
|
||||
lib_path: Path to libcp210xmanufacturing.so, or None to search default paths.
|
||||
"""
|
||||
if lib_path:
|
||||
self._lib = ctypes.CDLL(lib_path)
|
||||
else:
|
||||
# Search common locations
|
||||
search_paths = [
|
||||
"/usr/lib/libcp210xmanufacturing.so",
|
||||
"/usr/local/lib/libcp210xmanufacturing.so",
|
||||
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
|
||||
]
|
||||
for path in search_paths:
|
||||
if Path(path).exists():
|
||||
self._lib = ctypes.CDLL(path)
|
||||
break
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
"libcp210xmanufacturing.so not found. Install with: "
|
||||
"cd aur/cp210xmanufacturing && makepkg -si"
|
||||
)
|
||||
|
||||
self._setup_functions()
|
||||
|
||||
def _setup_functions(self):
|
||||
"""Set up function prototypes."""
|
||||
# CP210x_GetNumDevices
|
||||
self._lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
|
||||
self._lib.CP210x_GetNumDevices.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetProductString
|
||||
self._lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
|
||||
self._lib.CP210x_GetProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Open
|
||||
self._lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
|
||||
self._lib.CP210x_Open.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Close
|
||||
self._lib.CP210x_Close.argtypes = [HANDLE]
|
||||
self._lib.CP210x_Close.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetPartNumber
|
||||
self._lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetPartNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceVid
|
||||
self._lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDevicePid
|
||||
self._lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDevicePid.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceManufacturerString
|
||||
self._lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceProductString
|
||||
self._lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceSerialNumber
|
||||
self._lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
self._lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetManufacturerString
|
||||
self._lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetProductString
|
||||
self._lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetProductString.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetSerialNumber
|
||||
self._lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
self._lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetMaxPower
|
||||
self._lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetMaxPower
|
||||
self._lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
|
||||
self._lib.CP210x_SetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetSelfPower
|
||||
self._lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
|
||||
self._lib.CP210x_GetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetSelfPower
|
||||
self._lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
|
||||
self._lib.CP210x_SetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetDeviceVersion
|
||||
self._lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
|
||||
self._lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetDeviceVersion
|
||||
self._lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
|
||||
self._lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_GetLockValue
|
||||
self._lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
self._lib.CP210x_GetLockValue.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_SetLockValue
|
||||
self._lib.CP210x_SetLockValue.argtypes = [HANDLE]
|
||||
self._lib.CP210x_SetLockValue.restype = CP210x_STATUS
|
||||
|
||||
# CP210x_Reset
|
||||
self._lib.CP210x_Reset.argtypes = [HANDLE]
|
||||
self._lib.CP210x_Reset.restype = CP210x_STATUS
|
||||
|
||||
def _check_status(self, status: int, context: str = ""):
|
||||
"""Raise exception if status indicates error."""
|
||||
if status != CP210x_SUCCESS:
|
||||
raise CP210xError(status, context)
|
||||
|
||||
def get_num_devices(self) -> int:
|
||||
"""Get the number of connected CP210x devices."""
|
||||
num = DWORD()
|
||||
status = self._lib.CP210x_GetNumDevices(byref(num))
|
||||
self._check_status(status, "GetNumDevices")
|
||||
return num.value
|
||||
|
||||
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
|
||||
"""Get device string without opening the device.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index
|
||||
flag: What to return - RETURN_SERIAL_NUMBER, RETURN_DESCRIPTION, or RETURN_FULL_PATH
|
||||
"""
|
||||
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
|
||||
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
|
||||
self._check_status(status, "GetProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def open(self, device_index: int) -> HANDLE:
|
||||
"""Open a device by index."""
|
||||
handle = HANDLE()
|
||||
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
|
||||
self._check_status(status, f"Open device {device_index}")
|
||||
return handle
|
||||
|
||||
def close(self, handle: HANDLE):
|
||||
"""Close an open device."""
|
||||
status = self._lib.CP210x_Close(handle)
|
||||
self._check_status(status, "Close")
|
||||
|
||||
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
|
||||
"""Get the part number of an open device.
|
||||
|
||||
Returns:
|
||||
Tuple of (part_number_code, part_name)
|
||||
"""
|
||||
part = BYTE()
|
||||
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
|
||||
self._check_status(status, "GetPartNumber")
|
||||
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
|
||||
|
||||
def get_device_vid(self, handle: HANDLE) -> int:
|
||||
"""Get the USB Vendor ID."""
|
||||
vid = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
|
||||
self._check_status(status, "GetDeviceVid")
|
||||
return vid.value
|
||||
|
||||
def get_device_pid(self, handle: HANDLE) -> int:
|
||||
"""Get the USB Product ID."""
|
||||
pid = WORD()
|
||||
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
|
||||
self._check_status(status, "GetDevicePid")
|
||||
return pid.value
|
||||
|
||||
def get_manufacturer_string(self, handle: HANDLE) -> str:
|
||||
"""Get the manufacturer string."""
|
||||
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceManufacturerString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_product_string_device(self, handle: HANDLE) -> str:
|
||||
"""Get the product string from an open device."""
|
||||
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_serial_number(self, handle: HANDLE) -> str:
|
||||
"""Get the serial number."""
|
||||
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceSerialNumber")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
|
||||
"""Set the manufacturer string (max 45 chars)."""
|
||||
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
|
||||
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetManufacturerString")
|
||||
|
||||
def set_product_string(self, handle: HANDLE, product: str):
|
||||
"""Set the product string (max 126 chars)."""
|
||||
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
|
||||
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetProductString")
|
||||
|
||||
def set_serial_number(self, handle: HANDLE, serial: str):
|
||||
"""Set the serial number (max 63 chars)."""
|
||||
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
|
||||
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetSerialNumber")
|
||||
|
||||
def get_max_power(self, handle: HANDLE) -> int:
|
||||
"""Get max power in units of 2mA (multiply by 2 for mA)."""
|
||||
power = BYTE()
|
||||
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
|
||||
self._check_status(status, "GetMaxPower")
|
||||
return power.value
|
||||
|
||||
def set_max_power(self, handle: HANDLE, power_2ma: int):
|
||||
"""Set max power in units of 2mA (e.g., 50 = 100mA)."""
|
||||
if power_2ma > 250:
|
||||
raise ValueError("Max power cannot exceed 250 (500mA)")
|
||||
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
|
||||
self._check_status(status, "SetMaxPower")
|
||||
|
||||
def get_self_power(self, handle: HANDLE) -> bool:
|
||||
"""Check if device is self-powered."""
|
||||
self_power = BOOL()
|
||||
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
|
||||
self._check_status(status, "GetSelfPower")
|
||||
return bool(self_power.value)
|
||||
|
||||
def set_self_power(self, handle: HANDLE, self_powered: bool):
|
||||
"""Set whether device is self-powered."""
|
||||
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
|
||||
self._check_status(status, "SetSelfPower")
|
||||
|
||||
def get_device_version(self, handle: HANDLE) -> int:
|
||||
"""Get device version (bcdDevice)."""
|
||||
version = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
|
||||
self._check_status(status, "GetDeviceVersion")
|
||||
return version.value
|
||||
|
||||
def set_device_version(self, handle: HANDLE, version: int):
|
||||
"""Set device version (bcdDevice)."""
|
||||
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
|
||||
self._check_status(status, "SetDeviceVersion")
|
||||
|
||||
def get_lock_value(self, handle: HANDLE) -> int:
|
||||
"""Get lock value (0 = unlocked, 1-255 = locked)."""
|
||||
lock = BYTE()
|
||||
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
|
||||
self._check_status(status, "GetLockValue")
|
||||
return lock.value
|
||||
|
||||
def lock_device(self, handle: HANDLE):
|
||||
"""Lock the device (PERMANENT - cannot be undone!)."""
|
||||
status = self._lib.CP210x_SetLockValue(handle)
|
||||
self._check_status(status, "SetLockValue")
|
||||
|
||||
def reset(self, handle: HANDLE):
|
||||
"""Reset the device (USB disconnect/reconnect)."""
|
||||
status = self._lib.CP210x_Reset(handle)
|
||||
self._check_status(status, "Reset")
|
||||
|
||||
|
||||
# Convenience context manager for device access
|
||||
class CP210xDevice:
|
||||
"""Context manager for safe device access."""
|
||||
|
||||
def __init__(self, lib: CP210xLibrary, device_index: int):
|
||||
self.lib = lib
|
||||
self.device_index = device_index
|
||||
self.handle = None
|
||||
|
||||
def __enter__(self):
|
||||
self.handle = self.lib.open(self.device_index)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.handle:
|
||||
self.lib.close(self.handle)
|
||||
self.handle = None
|
||||
return False
|
||||
|
||||
@property
|
||||
def part_number(self) -> tuple[int, str]:
|
||||
return self.lib.get_part_number(self.handle)
|
||||
|
||||
@property
|
||||
def vid(self) -> int:
|
||||
return self.lib.get_device_vid(self.handle)
|
||||
|
||||
@property
|
||||
def pid(self) -> int:
|
||||
return self.lib.get_device_pid(self.handle)
|
||||
|
||||
@property
|
||||
def manufacturer(self) -> str:
|
||||
return self.lib.get_manufacturer_string(self.handle)
|
||||
|
||||
@manufacturer.setter
|
||||
def manufacturer(self, value: str):
|
||||
self.lib.set_manufacturer_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def product(self) -> str:
|
||||
return self.lib.get_product_string_device(self.handle)
|
||||
|
||||
@product.setter
|
||||
def product(self, value: str):
|
||||
self.lib.set_product_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def serial_number(self) -> str:
|
||||
return self.lib.get_serial_number(self.handle)
|
||||
|
||||
@serial_number.setter
|
||||
def serial_number(self, value: str):
|
||||
self.lib.set_serial_number(self.handle, value)
|
||||
|
||||
@property
|
||||
def max_power_ma(self) -> int:
|
||||
"""Max power in milliamps."""
|
||||
return self.lib.get_max_power(self.handle) * 2
|
||||
|
||||
@max_power_ma.setter
|
||||
def max_power_ma(self, value: int):
|
||||
"""Set max power in milliamps (will be rounded down to nearest 2mA)."""
|
||||
self.lib.set_max_power(self.handle, value // 2)
|
||||
|
||||
@property
|
||||
def self_powered(self) -> bool:
|
||||
return self.lib.get_self_power(self.handle)
|
||||
|
||||
@self_powered.setter
|
||||
def self_powered(self, value: bool):
|
||||
self.lib.set_self_power(self.handle, value)
|
||||
|
||||
@property
|
||||
def device_version(self) -> str:
|
||||
"""Device version as BCD string (e.g., '1.00')."""
|
||||
v = self.lib.get_device_version(self.handle)
|
||||
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
|
||||
|
||||
@property
|
||||
def is_locked(self) -> bool:
|
||||
return self.lib.get_lock_value(self.handle) != 0
|
||||
|
||||
def reset(self):
|
||||
"""Reset the device."""
|
||||
self.lib.reset(self.handle)
|
||||
@ -1,483 +0,0 @@
|
||||
"""FastMCP server for CP210x device customization."""
|
||||
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
from fastmcp import FastMCP, Context
|
||||
from fastmcp.server.elicitation import AcceptedElicitation
|
||||
|
||||
from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS
|
||||
|
||||
mcp = FastMCP(
|
||||
"cp210x",
|
||||
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
|
||||
)
|
||||
|
||||
# Lazy-loaded library instance
|
||||
_lib: Optional[CP210xLibrary] = None
|
||||
|
||||
|
||||
def get_lib() -> CP210xLibrary:
|
||||
"""Get or create the library instance."""
|
||||
global _lib
|
||||
if _lib is None:
|
||||
_lib = CP210xLibrary()
|
||||
return _lib
|
||||
|
||||
|
||||
async def confirm_write(ctx: Context, message: str) -> bool:
|
||||
"""Ask user to confirm a write operation via elicitation.
|
||||
|
||||
Falls back to proceeding without confirmation if the client
|
||||
doesn't support elicitation.
|
||||
"""
|
||||
try:
|
||||
result = await ctx.elicit(message, ["Confirm", "Cancel"])
|
||||
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
|
||||
except Exception:
|
||||
# Client doesn't support elicitation — proceed as usual
|
||||
return True
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def list_devices() -> list[dict]:
|
||||
"""List all connected CP210x devices.
|
||||
|
||||
Returns a list of devices with their index, description, and serial number.
|
||||
Use the index to reference a specific device in other tools.
|
||||
"""
|
||||
lib = get_lib()
|
||||
num_devices = lib.get_num_devices()
|
||||
|
||||
devices = []
|
||||
for i in range(num_devices):
|
||||
try:
|
||||
desc = lib.get_product_string(i, flag=1) # RETURN_DESCRIPTION
|
||||
serial = lib.get_product_string(i, flag=0) # RETURN_SERIAL_NUMBER
|
||||
devices.append({
|
||||
"index": i,
|
||||
"description": desc,
|
||||
"serial_number": serial,
|
||||
})
|
||||
except CP210xError as e:
|
||||
devices.append({
|
||||
"index": i,
|
||||
"error": str(e),
|
||||
})
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_device_info(device_index: int = 0) -> dict:
|
||||
"""Get detailed information about a CP210x device.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based index of the device (default: 0 for first device)
|
||||
|
||||
Returns full device details including part number, VID/PID, strings, and power settings.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
return {
|
||||
"part_number": part_name,
|
||||
"part_code": f"0x{part_code:02X}",
|
||||
"vid": f"0x{dev.vid:04X}",
|
||||
"pid": f"0x{dev.pid:04X}",
|
||||
"manufacturer": dev.manufacturer,
|
||||
"product": dev.product,
|
||||
"serial_number": dev.serial_number,
|
||||
"device_version": dev.device_version,
|
||||
"max_power_ma": dev.max_power_ma,
|
||||
"self_powered": dev.self_powered,
|
||||
"is_locked": dev.is_locked,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB product string (device name shown to host).
|
||||
|
||||
Args:
|
||||
product: New product string (max 126 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns the updated device info. Device may need to be re-plugged for
|
||||
changes to appear on the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_product = dev.product
|
||||
new_value = product[:126]
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write product string to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_product}\n"
|
||||
f" New: {new_value}\n\n"
|
||||
f"OTP writes are limited — this cannot be undone easily.",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
dev.product = product
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_product,
|
||||
"new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB manufacturer string.
|
||||
|
||||
Args:
|
||||
manufacturer: New manufacturer string (max 45 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_mfr = dev.manufacturer
|
||||
new_value = manufacturer[:45]
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_mfr}\n"
|
||||
f" New: {new_value}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
dev.manufacturer = manufacturer
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_mfr,
|
||||
"new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB serial number.
|
||||
|
||||
Args:
|
||||
serial_number: New serial number (max 63 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Note: Changing serial number may affect udev rules that match on serial.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_serial = dev.serial_number
|
||||
new_value = serial_number[:63]
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write serial number to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_serial}\n"
|
||||
f" New: {new_value}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
dev.serial_number = serial_number
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_serial,
|
||||
"new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the maximum USB power draw in milliamps.
|
||||
|
||||
Args:
|
||||
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
if max_power_ma < 0 or max_power_ma > 500:
|
||||
return {"error": "max_power_ma must be 0-500"}
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_power = dev.max_power_ma
|
||||
actual_value = (max_power_ma // 2) * 2
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write max power to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_power} mA\n"
|
||||
f" New: {actual_value} mA",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
dev.max_power_ma = max_power_ma
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value_ma": old_power,
|
||||
"new_value_ma": actual_value,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set whether device reports as self-powered or bus-powered.
|
||||
|
||||
Args:
|
||||
self_powered: True for self-powered, False for bus-powered
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
|
||||
old_value = dev.self_powered
|
||||
mode = "self-powered" if self_powered else "bus-powered"
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write power mode to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {'self-powered' if old_value else 'bus-powered'}\n"
|
||||
f" New: {mode}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
dev.self_powered = self_powered
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"old_value": old_value,
|
||||
"new_value": self_powered,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def reset_device(device_index: int = 0) -> dict:
|
||||
"""Reset the CP210x device (USB disconnect/reconnect).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
This triggers a USB re-enumeration, which will make any programmed
|
||||
changes visible to the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
dev.reset()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"note": "Device has been reset. It may take a moment to re-enumerate.",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""PERMANENTLY lock the device to prevent further customization.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
|
||||
configuration cannot be changed. The device will still function normally,
|
||||
but strings, power settings, etc. cannot be modified.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is already locked"}
|
||||
|
||||
part_code, part_name = dev.part_number
|
||||
info = (
|
||||
f"PERMANENTLY lock this {part_name}?\n\n"
|
||||
f" Product: {dev.product}\n"
|
||||
f" Serial: {dev.serial_number}\n\n"
|
||||
f"THIS CANNOT BE UNDONE. The device configuration\n"
|
||||
f"will be frozen forever."
|
||||
)
|
||||
|
||||
if not ctx:
|
||||
return {
|
||||
"error": "Lock requires interactive confirmation",
|
||||
"message": "This operation is too dangerous without user confirmation.",
|
||||
}
|
||||
|
||||
# Strict confirmation — do NOT fall back to True on failure.
|
||||
# Unlike regular writes, an irreversible lock must get explicit consent.
|
||||
try:
|
||||
result = await ctx.elicit(info, ["Confirm", "Cancel"])
|
||||
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
|
||||
except Exception:
|
||||
return {
|
||||
"error": "Lock requires elicitation support",
|
||||
"message": "Your MCP client does not support elicitation. "
|
||||
"Lock is too dangerous to proceed without explicit confirmation.",
|
||||
}
|
||||
|
||||
if not confirmed:
|
||||
return {"cancelled": True, "message": "Lock cancelled by user"}
|
||||
|
||||
lib.lock_device(dev.handle)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed.",
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def setup_udev_rule(
|
||||
device_index: int = 0,
|
||||
symlink_name: Optional[str] = None,
|
||||
ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
|
||||
|
||||
Generates a rule that matches the device's product string and creates
|
||||
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
|
||||
and port reordering.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
symlink_name: Custom symlink name. If not provided, auto-generates
|
||||
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
|
||||
|
||||
Requires sudo for rule installation.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
product = dev.product
|
||||
vid = dev.vid
|
||||
pid = dev.pid
|
||||
|
||||
if not product or product == "CP2102 USB to UART Bridge Controller":
|
||||
return {
|
||||
"error": "Device has default product string",
|
||||
"message": "Customize the product string first to create a unique match rule.",
|
||||
}
|
||||
|
||||
# Auto-generate symlink name from product string
|
||||
if not symlink_name:
|
||||
# "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27"
|
||||
parts = product.split()
|
||||
if len(parts) >= 2:
|
||||
prefix = parts[0].lower()
|
||||
suffix = parts[-1][-4:] # Last 4 chars of EUI/address
|
||||
symlink_name = f"{prefix}-{suffix}"
|
||||
else:
|
||||
symlink_name = product.lower().replace(" ", "-")[:32]
|
||||
|
||||
# Build the udev rule
|
||||
# Use glob pattern on product string to match the unique suffix
|
||||
match_suffix = product[-4:]
|
||||
rule = (
|
||||
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
|
||||
f'ATTRS{{idProduct}}=="{pid:04x}", '
|
||||
f'ATTRS{{product}}=="*{match_suffix}", '
|
||||
f'SYMLINK+="{symlink_name}"'
|
||||
)
|
||||
|
||||
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
|
||||
rule_content = (
|
||||
f"# Auto-generated by mcp210x for: {product}\n"
|
||||
f"# Creates /dev/{symlink_name} symlink\n"
|
||||
f"{rule}\n"
|
||||
)
|
||||
|
||||
# Elicit confirmation before installing
|
||||
install_msg = (
|
||||
f"Install udev rule for stable device symlink?\n\n"
|
||||
f" Device: {product}\n"
|
||||
f" Symlink: /dev/{symlink_name}\n"
|
||||
f" File: {rules_path}\n\n"
|
||||
f"Requires sudo to install."
|
||||
)
|
||||
|
||||
if ctx and not await confirm_write(ctx, install_msg):
|
||||
return {
|
||||
"cancelled": True,
|
||||
"rule": rule_content,
|
||||
"message": "Rule not installed. You can install it manually.",
|
||||
}
|
||||
|
||||
# Install the rule
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["sudo", "tee", rules_path],
|
||||
input=rule_content,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return {
|
||||
"error": f"Failed to install rule: {result.stderr.strip()}",
|
||||
"rule": rule_content,
|
||||
"message": "Install manually with: sudo tee " + rules_path,
|
||||
}
|
||||
|
||||
# Reload udev rules
|
||||
subprocess.run(
|
||||
["sudo", "udevadm", "control", "--reload-rules"],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", "udevadm", "trigger"],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return {
|
||||
"error": "Sudo timed out — may need interactive password",
|
||||
"rule": rule_content,
|
||||
"message": "Install manually with: sudo tee " + rules_path,
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"symlink": f"/dev/{symlink_name}",
|
||||
"rules_file": rules_path,
|
||||
"rule": rule_content,
|
||||
"note": "Symlink will appear after device re-plug or udevadm trigger",
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point for the MCP server."""
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
src/mcp210x_uart/__init__.py
Normal file
5
src/mcp210x_uart/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
"""mcp210x-uart - MCP server for Silicon Labs CP210x USB-UART bridge customization."""
|
||||
|
||||
from .server import main, mcp
|
||||
|
||||
__all__ = ["mcp", "main"]
|
||||
937
src/mcp210x_uart/bindings.py
Normal file
937
src/mcp210x_uart/bindings.py
Normal file
@ -0,0 +1,937 @@
|
||||
"""Low-level ctypes bindings for libcp210xmanufacturing."""
|
||||
|
||||
import ctypes
|
||||
from ctypes import POINTER, Structure, byref, c_char_p, c_int, c_ubyte, c_uint, c_ushort, c_void_p
|
||||
from pathlib import Path
|
||||
|
||||
# Type aliases matching the C library
|
||||
CP210x_STATUS = c_int
|
||||
HANDLE = c_void_p
|
||||
DWORD = c_uint
|
||||
WORD = c_ushort
|
||||
BYTE = c_ubyte
|
||||
BOOL = c_int
|
||||
|
||||
# Return codes
|
||||
CP210x_SUCCESS = 0x00
|
||||
CP210x_DEVICE_NOT_FOUND = 0xFF
|
||||
CP210x_INVALID_HANDLE = 0x01
|
||||
CP210x_INVALID_PARAMETER = 0x02
|
||||
CP210x_DEVICE_IO_FAILED = 0x03
|
||||
CP210x_FUNCTION_NOT_SUPPORTED = 0x04
|
||||
CP210x_GLOBAL_DATA_ERROR = 0x05
|
||||
CP210x_FILE_ERROR = 0x06
|
||||
CP210x_COMMAND_FAILED = 0x08
|
||||
CP210x_INVALID_ACCESS_TYPE = 0x09
|
||||
|
||||
# Part numbers
|
||||
PART_NUMBERS = {
|
||||
0x01: "CP2101",
|
||||
0x02: "CP2102",
|
||||
0x03: "CP2103",
|
||||
0x04: "CP2104",
|
||||
0x05: "CP2105",
|
||||
0x08: "CP2108",
|
||||
0x09: "CP2109",
|
||||
0x20: "CP2102N-QFN28",
|
||||
0x21: "CP2102N-QFN24",
|
||||
0x22: "CP2102N-QFN20",
|
||||
}
|
||||
|
||||
# Part number groups for feature gating
|
||||
PARTS_CP2102N = {0x20, 0x21, 0x22}
|
||||
PARTS_WITH_FLUSH_CONFIG = {0x04, 0x05, 0x08} # CP2104, CP2105, CP2108
|
||||
PARTS_WITH_DEVICE_MODE = {0x05} # CP2105 only
|
||||
PARTS_WITH_INTERFACE_STRING = {0x05, 0x08} # CP2105, CP2108
|
||||
PARTS_WITH_PORT_CONFIG = {0x03, 0x04} # CP2103, CP2104
|
||||
PARTS_WITH_DUAL_PORT_CONFIG = {0x05} # CP2105
|
||||
PARTS_WITH_QUAD_PORT_CONFIG = {0x08} # CP2108
|
||||
|
||||
# Buffer sizes
|
||||
MAX_DEVICE_STRLEN = 256
|
||||
MAX_MANUFACTURER_STRLEN = 45
|
||||
MAX_PRODUCT_STRLEN = 126
|
||||
MAX_SERIAL_STRLEN = 63
|
||||
CP2105_MAX_INTERFACE_STRLEN = 32
|
||||
CP2108_MAX_INTERFACE_STRLEN = 126
|
||||
|
||||
# GetProductString flags
|
||||
RETURN_SERIAL_NUMBER = 0x00
|
||||
RETURN_DESCRIPTION = 0x01
|
||||
RETURN_FULL_PATH = 0x02
|
||||
|
||||
# Flush buffer config flags (CP2104)
|
||||
FC_OPEN_TX = 0x01
|
||||
FC_OPEN_RX = 0x02
|
||||
FC_CLOSE_TX = 0x04
|
||||
FC_CLOSE_RX = 0x08
|
||||
# CP2105 standard port
|
||||
FC_OPEN_TX_SCI = FC_OPEN_TX
|
||||
FC_OPEN_RX_SCI = FC_OPEN_RX
|
||||
FC_CLOSE_TX_SCI = FC_CLOSE_TX
|
||||
FC_CLOSE_RX_SCI = FC_CLOSE_RX
|
||||
# CP2105 enhanced port
|
||||
FC_OPEN_TX_ECI = 0x10
|
||||
FC_OPEN_RX_ECI = 0x20
|
||||
FC_CLOSE_TX_ECI = 0x40
|
||||
FC_CLOSE_RX_ECI = 0x80
|
||||
# CP2108 per-interface
|
||||
FC_OPEN_TX_IFC0 = 0x0001
|
||||
FC_OPEN_RX_IFC0 = 0x0002
|
||||
FC_CLOSE_TX_IFC0 = 0x0004
|
||||
FC_CLOSE_RX_IFC0 = 0x0008
|
||||
FC_OPEN_TX_IFC1 = 0x0010
|
||||
FC_OPEN_RX_IFC1 = 0x0020
|
||||
FC_CLOSE_TX_IFC1 = 0x0040
|
||||
FC_CLOSE_RX_IFC1 = 0x0080
|
||||
FC_OPEN_TX_IFC2 = 0x0100
|
||||
FC_OPEN_RX_IFC2 = 0x0200
|
||||
FC_CLOSE_TX_IFC2 = 0x0400
|
||||
FC_CLOSE_RX_IFC2 = 0x0800
|
||||
FC_OPEN_TX_IFC3 = 0x1000
|
||||
FC_OPEN_RX_IFC3 = 0x2000
|
||||
FC_CLOSE_TX_IFC3 = 0x4000
|
||||
FC_CLOSE_RX_IFC3 = 0x8000
|
||||
|
||||
# Baud rate config
|
||||
NUM_BAUD_CONFIGS = 32
|
||||
BAUD_CONFIG_SIZE = 10
|
||||
|
||||
# Port config pin flags (CP2103/CP2104)
|
||||
PORT_RI_ON = 0x0001
|
||||
PORT_DCD_ON = 0x0002
|
||||
PORT_DTR_ON = 0x0004
|
||||
PORT_DSR_ON = 0x0008
|
||||
PORT_TXD_ON = 0x0010
|
||||
PORT_RXD_ON = 0x0020
|
||||
PORT_RTS_ON = 0x0040
|
||||
PORT_CTS_ON = 0x0080
|
||||
PORT_GPIO_0_ON = 0x0100
|
||||
PORT_GPIO_1_ON = 0x0200
|
||||
PORT_GPIO_2_ON = 0x0400
|
||||
PORT_GPIO_3_ON = 0x0800
|
||||
PORT_SUSPEND_ON = 0x4000
|
||||
PORT_SUSPEND_BAR_ON = 0x8000
|
||||
|
||||
# Enhanced function flags (CP2103/CP2104)
|
||||
EF_GPIO_0_TXLED = 0x01
|
||||
EF_GPIO_1_RXLED = 0x02
|
||||
EF_GPIO_2_RS485 = 0x04
|
||||
EF_RS485_INVERT = 0x08
|
||||
EF_WEAKPULLUP = 0x10
|
||||
EF_RESERVED_1 = 0x20
|
||||
EF_SERIAL_DYNAMIC_SUSPEND = 0x40
|
||||
EF_GPIO_DYNAMIC_SUSPEND = 0x80
|
||||
|
||||
# Flush buffer flag names for human-readable output
|
||||
FLUSH_FLAGS_CP2104 = {
|
||||
FC_OPEN_TX: "open_tx",
|
||||
FC_OPEN_RX: "open_rx",
|
||||
FC_CLOSE_TX: "close_tx",
|
||||
FC_CLOSE_RX: "close_rx",
|
||||
}
|
||||
|
||||
FLUSH_FLAGS_CP2105 = {
|
||||
FC_OPEN_TX_SCI: "open_tx_sci",
|
||||
FC_OPEN_RX_SCI: "open_rx_sci",
|
||||
FC_CLOSE_TX_SCI: "close_tx_sci",
|
||||
FC_CLOSE_RX_SCI: "close_rx_sci",
|
||||
FC_OPEN_TX_ECI: "open_tx_eci",
|
||||
FC_OPEN_RX_ECI: "open_rx_eci",
|
||||
FC_CLOSE_TX_ECI: "close_tx_eci",
|
||||
FC_CLOSE_RX_ECI: "close_rx_eci",
|
||||
}
|
||||
|
||||
ENHANCED_FXN_FLAGS = {
|
||||
EF_GPIO_0_TXLED: "gpio0_txled",
|
||||
EF_GPIO_1_RXLED: "gpio1_rxled",
|
||||
EF_GPIO_2_RS485: "gpio2_rs485",
|
||||
EF_RS485_INVERT: "rs485_invert",
|
||||
EF_WEAKPULLUP: "weak_pullup",
|
||||
EF_SERIAL_DYNAMIC_SUSPEND: "serial_dynamic_suspend",
|
||||
EF_GPIO_DYNAMIC_SUSPEND: "gpio_dynamic_suspend",
|
||||
}
|
||||
|
||||
PORT_PIN_FLAGS = {
|
||||
PORT_RI_ON: "RI",
|
||||
PORT_DCD_ON: "DCD",
|
||||
PORT_DTR_ON: "DTR",
|
||||
PORT_DSR_ON: "DSR",
|
||||
PORT_TXD_ON: "TXD",
|
||||
PORT_RXD_ON: "RXD",
|
||||
PORT_RTS_ON: "RTS",
|
||||
PORT_CTS_ON: "CTS",
|
||||
PORT_GPIO_0_ON: "GPIO0",
|
||||
PORT_GPIO_1_ON: "GPIO1",
|
||||
PORT_GPIO_2_ON: "GPIO2",
|
||||
PORT_GPIO_3_ON: "GPIO3",
|
||||
PORT_SUSPEND_ON: "SUSPEND",
|
||||
PORT_SUSPEND_BAR_ON: "SUSPEND_BAR",
|
||||
}
|
||||
|
||||
|
||||
def decode_bitmask(value: int, flag_map: dict) -> dict[str, bool]:
|
||||
"""Decode a bitmask into a dict of named flags."""
|
||||
return {name: bool(value & bit) for bit, name in flag_map.items()}
|
||||
|
||||
|
||||
def encode_bitmask(flags: dict[str, bool], flag_map: dict) -> int:
|
||||
"""Encode a dict of named flags back into a bitmask."""
|
||||
name_to_bit = {name: bit for bit, name in flag_map.items()}
|
||||
value = 0
|
||||
for name, enabled in flags.items():
|
||||
if name in name_to_bit and enabled:
|
||||
value |= name_to_bit[name]
|
||||
return value
|
||||
|
||||
|
||||
# --- ctypes Structures ---
|
||||
|
||||
class BAUD_CONFIG(Structure):
|
||||
_pack_ = 1
|
||||
_fields_ = [
|
||||
("BaudGen", WORD),
|
||||
("Timer0Reload", WORD),
|
||||
("Prescaler", BYTE),
|
||||
("_pad", BYTE),
|
||||
("BaudRate", DWORD),
|
||||
]
|
||||
|
||||
|
||||
assert ctypes.sizeof(BAUD_CONFIG) == BAUD_CONFIG_SIZE
|
||||
|
||||
BAUD_CONFIG_DATA = BAUD_CONFIG * NUM_BAUD_CONFIGS
|
||||
|
||||
|
||||
class PORT_CONFIG(Structure):
|
||||
_fields_ = [
|
||||
("Mode", WORD),
|
||||
("Reset_Latch", WORD),
|
||||
("Suspend_Latch", WORD),
|
||||
("EnhancedFxn", BYTE),
|
||||
]
|
||||
|
||||
|
||||
class DUAL_PORT_CONFIG(Structure):
|
||||
_fields_ = [
|
||||
("Mode", WORD),
|
||||
("Reset_Latch", WORD),
|
||||
("Suspend_Latch", WORD),
|
||||
("EnhancedFxn_ECI", BYTE),
|
||||
("EnhancedFxn_SCI", BYTE),
|
||||
("EnhancedFxn_Device", BYTE),
|
||||
]
|
||||
|
||||
|
||||
class QUAD_PORT_STATE(Structure):
|
||||
_fields_ = [
|
||||
("Mode_PB0", WORD),
|
||||
("Mode_PB1", WORD),
|
||||
("Mode_PB2", WORD),
|
||||
("Mode_PB3", WORD),
|
||||
("Mode_PB4", WORD),
|
||||
("LowPower_PB0", WORD),
|
||||
("LowPower_PB1", WORD),
|
||||
("LowPower_PB2", WORD),
|
||||
("LowPower_PB3", WORD),
|
||||
("LowPower_PB4", WORD),
|
||||
("Latch_PB0", WORD),
|
||||
("Latch_PB1", WORD),
|
||||
("Latch_PB2", WORD),
|
||||
("Latch_PB3", WORD),
|
||||
("Latch_PB4", WORD),
|
||||
]
|
||||
|
||||
|
||||
class QUAD_PORT_CONFIG(Structure):
|
||||
_fields_ = [
|
||||
("Reset_Latch", QUAD_PORT_STATE),
|
||||
("Suspend_Latch", QUAD_PORT_STATE),
|
||||
("IPDelay_IFC0", BYTE),
|
||||
("IPDelay_IFC1", BYTE),
|
||||
("IPDelay_IFC2", BYTE),
|
||||
("IPDelay_IFC3", BYTE),
|
||||
("EnhancedFxn_IFC0", BYTE),
|
||||
("EnhancedFxn_IFC1", BYTE),
|
||||
("EnhancedFxn_IFC2", BYTE),
|
||||
("EnhancedFxn_IFC3", BYTE),
|
||||
("EnhancedFxn_Device", BYTE),
|
||||
("ExtClk0Freq", BYTE),
|
||||
("ExtClk1Freq", BYTE),
|
||||
("ExtClk2Freq", BYTE),
|
||||
("ExtClk3Freq", BYTE),
|
||||
]
|
||||
|
||||
|
||||
class FIRMWARE_T(Structure):
|
||||
_fields_ = [
|
||||
("major", BYTE),
|
||||
("minor", BYTE),
|
||||
("build", BYTE),
|
||||
]
|
||||
|
||||
|
||||
class CP210xError(Exception):
|
||||
"""Exception raised for CP210x library errors."""
|
||||
|
||||
ERROR_MESSAGES = {
|
||||
CP210x_DEVICE_NOT_FOUND: "Device not found",
|
||||
CP210x_INVALID_HANDLE: "Invalid handle",
|
||||
CP210x_INVALID_PARAMETER: "Invalid parameter",
|
||||
CP210x_DEVICE_IO_FAILED: "Device I/O failed",
|
||||
CP210x_FUNCTION_NOT_SUPPORTED: "Function not supported",
|
||||
CP210x_GLOBAL_DATA_ERROR: "Global data error",
|
||||
CP210x_FILE_ERROR: "File error",
|
||||
CP210x_COMMAND_FAILED: "Command failed",
|
||||
CP210x_INVALID_ACCESS_TYPE: "Invalid access type",
|
||||
}
|
||||
|
||||
def __init__(self, status: int, context: str = ""):
|
||||
self.status = status
|
||||
msg = self.ERROR_MESSAGES.get(status, f"Unknown error 0x{status:02X}")
|
||||
if context:
|
||||
msg = f"{context}: {msg}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class CP210xLibrary:
|
||||
"""Wrapper for the CP210x manufacturing library."""
|
||||
|
||||
def __init__(self, lib_path: str | None = None):
|
||||
if lib_path:
|
||||
self._lib = ctypes.CDLL(lib_path)
|
||||
else:
|
||||
search_paths = [
|
||||
"/usr/lib/libcp210xmanufacturing.so",
|
||||
"/usr/local/lib/libcp210xmanufacturing.so",
|
||||
str(Path(__file__).parent.parent.parent / "AN721SW/Linux/LibrarySourcePackages/cp210xmanufacturing/build/lib/x86_64/libcp210xmanufacturing.so"),
|
||||
]
|
||||
for path in search_paths:
|
||||
if Path(path).exists():
|
||||
self._lib = ctypes.CDLL(path)
|
||||
break
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
"libcp210xmanufacturing.so not found. Install with: "
|
||||
"cd aur/cp210xmanufacturing && makepkg -si"
|
||||
)
|
||||
|
||||
self._setup_functions()
|
||||
|
||||
def _setup_functions(self):
|
||||
"""Set up function prototypes for all library functions."""
|
||||
lib = self._lib
|
||||
|
||||
# --- Device enumeration ---
|
||||
lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)]
|
||||
lib.CP210x_GetNumDevices.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD]
|
||||
lib.CP210x_GetProductString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)]
|
||||
lib.CP210x_Open.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_Close.argtypes = [HANDLE]
|
||||
lib.CP210x_Close.restype = CP210x_STATUS
|
||||
|
||||
# --- Getters (scalars) ---
|
||||
lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
lib.CP210x_GetPartNumber.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
lib.CP210x_GetDeviceVid.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)]
|
||||
lib.CP210x_GetDevicePid.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL]
|
||||
lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, POINTER(BYTE), BOOL]
|
||||
lib.CP210x_GetDeviceInterfaceString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
lib.CP210x_GetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)]
|
||||
lib.CP210x_GetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)]
|
||||
lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetFlushBufferConfig.argtypes = [HANDLE, POINTER(WORD)]
|
||||
lib.CP210x_GetFlushBufferConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDeviceMode.argtypes = [HANDLE, POINTER(BYTE), POINTER(BYTE)]
|
||||
lib.CP210x_GetDeviceMode.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)]
|
||||
lib.CP210x_GetLockValue.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetFirmwareVersion.argtypes = [HANDLE, POINTER(FIRMWARE_T)]
|
||||
lib.CP210x_GetFirmwareVersion.restype = CP210x_STATUS
|
||||
|
||||
# --- Setters (scalars) ---
|
||||
lib.CP210x_SetVid.argtypes = [HANDLE, WORD]
|
||||
lib.CP210x_SetVid.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetPid.argtypes = [HANDLE, WORD]
|
||||
lib.CP210x_SetPid.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
lib.CP210x_SetManufacturerString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
lib.CP210x_SetProductString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL]
|
||||
lib.CP210x_SetSerialNumber.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, BYTE, BOOL]
|
||||
lib.CP210x_SetInterfaceString.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE]
|
||||
lib.CP210x_SetMaxPower.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL]
|
||||
lib.CP210x_SetSelfPower.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD]
|
||||
lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetFlushBufferConfig.argtypes = [HANDLE, WORD]
|
||||
lib.CP210x_SetFlushBufferConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetDeviceMode.argtypes = [HANDLE, BYTE, BYTE]
|
||||
lib.CP210x_SetDeviceMode.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetLockValue.argtypes = [HANDLE]
|
||||
lib.CP210x_SetLockValue.restype = CP210x_STATUS
|
||||
|
||||
# --- Struct-based configs ---
|
||||
lib.CP210x_GetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
|
||||
lib.CP210x_GetBaudRateConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)]
|
||||
lib.CP210x_SetBaudRateConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
|
||||
lib.CP210x_GetPortConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)]
|
||||
lib.CP210x_SetPortConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
|
||||
lib.CP210x_GetDualPortConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)]
|
||||
lib.CP210x_SetDualPortConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
|
||||
lib.CP210x_GetQuadPortConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)]
|
||||
lib.CP210x_SetQuadPortConfig.restype = CP210x_STATUS
|
||||
|
||||
# --- Advanced / raw ---
|
||||
lib.CP210x_GetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
|
||||
lib.CP210x_GetConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD]
|
||||
lib.CP210x_SetConfig.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_GetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
|
||||
lib.CP210x_GetGeneric.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_SetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD]
|
||||
lib.CP210x_SetGeneric.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_CreateHexFile.argtypes = [HANDLE, c_char_p]
|
||||
lib.CP210x_CreateHexFile.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_UpdateFirmware.argtypes = [HANDLE]
|
||||
lib.CP210x_UpdateFirmware.restype = CP210x_STATUS
|
||||
|
||||
lib.CP210x_Reset.argtypes = [HANDLE]
|
||||
lib.CP210x_Reset.restype = CP210x_STATUS
|
||||
|
||||
def _check_status(self, status: int, context: str = ""):
|
||||
if status != CP210x_SUCCESS:
|
||||
raise CP210xError(status, context)
|
||||
|
||||
# --- Device enumeration ---
|
||||
|
||||
def get_num_devices(self) -> int:
|
||||
num = DWORD()
|
||||
status = self._lib.CP210x_GetNumDevices(byref(num))
|
||||
self._check_status(status, "GetNumDevices")
|
||||
return num.value
|
||||
|
||||
def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str:
|
||||
buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN)
|
||||
status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag))
|
||||
self._check_status(status, "GetProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def open(self, device_index: int) -> HANDLE:
|
||||
handle = HANDLE()
|
||||
status = self._lib.CP210x_Open(DWORD(device_index), byref(handle))
|
||||
self._check_status(status, f"Open device {device_index}")
|
||||
return handle
|
||||
|
||||
def close(self, handle: HANDLE):
|
||||
status = self._lib.CP210x_Close(handle)
|
||||
self._check_status(status, "Close")
|
||||
|
||||
# --- Scalar getters ---
|
||||
|
||||
def get_part_number(self, handle: HANDLE) -> tuple[int, str]:
|
||||
part = BYTE()
|
||||
status = self._lib.CP210x_GetPartNumber(handle, byref(part))
|
||||
self._check_status(status, "GetPartNumber")
|
||||
return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})")
|
||||
|
||||
def get_device_vid(self, handle: HANDLE) -> int:
|
||||
vid = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVid(handle, byref(vid))
|
||||
self._check_status(status, "GetDeviceVid")
|
||||
return vid.value
|
||||
|
||||
def get_device_pid(self, handle: HANDLE) -> int:
|
||||
pid = WORD()
|
||||
status = self._lib.CP210x_GetDevicePid(handle, byref(pid))
|
||||
self._check_status(status, "GetDevicePid")
|
||||
return pid.value
|
||||
|
||||
def get_manufacturer_string(self, handle: HANDLE) -> str:
|
||||
buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceManufacturerString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_product_string_device(self, handle: HANDLE) -> str:
|
||||
buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceProductString")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_serial_number(self, handle: HANDLE) -> str:
|
||||
buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1))
|
||||
self._check_status(status, "GetDeviceSerialNumber")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_interface_string(self, handle: HANDLE, interface_number: int) -> str:
|
||||
buf = ctypes.create_string_buffer(CP2108_MAX_INTERFACE_STRLEN)
|
||||
length = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceInterfaceString(
|
||||
handle, BYTE(interface_number), buf, byref(length), BOOL(1)
|
||||
)
|
||||
self._check_status(status, f"GetDeviceInterfaceString(ifc={interface_number})")
|
||||
return buf.value.decode('utf-8', errors='replace')
|
||||
|
||||
def get_max_power(self, handle: HANDLE) -> int:
|
||||
power = BYTE()
|
||||
status = self._lib.CP210x_GetMaxPower(handle, byref(power))
|
||||
self._check_status(status, "GetMaxPower")
|
||||
return power.value
|
||||
|
||||
def get_self_power(self, handle: HANDLE) -> bool:
|
||||
self_power = BOOL()
|
||||
status = self._lib.CP210x_GetSelfPower(handle, byref(self_power))
|
||||
self._check_status(status, "GetSelfPower")
|
||||
return bool(self_power.value)
|
||||
|
||||
def get_device_version(self, handle: HANDLE) -> int:
|
||||
version = WORD()
|
||||
status = self._lib.CP210x_GetDeviceVersion(handle, byref(version))
|
||||
self._check_status(status, "GetDeviceVersion")
|
||||
return version.value
|
||||
|
||||
def get_flush_buffer_config(self, handle: HANDLE) -> int:
|
||||
config = WORD()
|
||||
status = self._lib.CP210x_GetFlushBufferConfig(handle, byref(config))
|
||||
self._check_status(status, "GetFlushBufferConfig")
|
||||
return config.value
|
||||
|
||||
def get_device_mode(self, handle: HANDLE) -> tuple[int, int]:
|
||||
eci = BYTE()
|
||||
sci = BYTE()
|
||||
status = self._lib.CP210x_GetDeviceMode(handle, byref(eci), byref(sci))
|
||||
self._check_status(status, "GetDeviceMode")
|
||||
return eci.value, sci.value
|
||||
|
||||
def get_lock_value(self, handle: HANDLE) -> int:
|
||||
lock = BYTE()
|
||||
status = self._lib.CP210x_GetLockValue(handle, byref(lock))
|
||||
self._check_status(status, "GetLockValue")
|
||||
return lock.value
|
||||
|
||||
def get_firmware_version(self, handle: HANDLE) -> tuple[int, int, int]:
|
||||
fw = FIRMWARE_T()
|
||||
status = self._lib.CP210x_GetFirmwareVersion(handle, byref(fw))
|
||||
self._check_status(status, "GetFirmwareVersion")
|
||||
return fw.major, fw.minor, fw.build
|
||||
|
||||
# --- Scalar setters ---
|
||||
|
||||
def set_vid(self, handle: HANDLE, vid: int):
|
||||
status = self._lib.CP210x_SetVid(handle, WORD(vid))
|
||||
self._check_status(status, "SetVid")
|
||||
|
||||
def set_pid(self, handle: HANDLE, pid: int):
|
||||
status = self._lib.CP210x_SetPid(handle, WORD(pid))
|
||||
self._check_status(status, "SetPid")
|
||||
|
||||
def set_manufacturer_string(self, handle: HANDLE, manufacturer: str):
|
||||
data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN]
|
||||
status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetManufacturerString")
|
||||
|
||||
def set_product_string(self, handle: HANDLE, product: str):
|
||||
data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN]
|
||||
status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetProductString")
|
||||
|
||||
def set_serial_number(self, handle: HANDLE, serial: str):
|
||||
data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN]
|
||||
status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1))
|
||||
self._check_status(status, "SetSerialNumber")
|
||||
|
||||
def set_interface_string(self, handle: HANDLE, interface_number: int, value: str):
|
||||
data = value.encode('utf-8')[:CP2108_MAX_INTERFACE_STRLEN]
|
||||
status = self._lib.CP210x_SetInterfaceString(
|
||||
handle, BYTE(interface_number), data, BYTE(len(data)), BOOL(1)
|
||||
)
|
||||
self._check_status(status, f"SetInterfaceString(ifc={interface_number})")
|
||||
|
||||
def set_max_power(self, handle: HANDLE, power_2ma: int):
|
||||
if power_2ma > 250:
|
||||
raise ValueError("Max power cannot exceed 250 (500mA)")
|
||||
status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma))
|
||||
self._check_status(status, "SetMaxPower")
|
||||
|
||||
def set_self_power(self, handle: HANDLE, self_powered: bool):
|
||||
status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0))
|
||||
self._check_status(status, "SetSelfPower")
|
||||
|
||||
def set_device_version(self, handle: HANDLE, version: int):
|
||||
status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version))
|
||||
self._check_status(status, "SetDeviceVersion")
|
||||
|
||||
def set_flush_buffer_config(self, handle: HANDLE, config: int):
|
||||
status = self._lib.CP210x_SetFlushBufferConfig(handle, WORD(config))
|
||||
self._check_status(status, "SetFlushBufferConfig")
|
||||
|
||||
def set_device_mode(self, handle: HANDLE, eci_mode: int, sci_mode: int):
|
||||
status = self._lib.CP210x_SetDeviceMode(handle, BYTE(eci_mode), BYTE(sci_mode))
|
||||
self._check_status(status, "SetDeviceMode")
|
||||
|
||||
def lock_device(self, handle: HANDLE):
|
||||
status = self._lib.CP210x_SetLockValue(handle)
|
||||
self._check_status(status, "SetLockValue")
|
||||
|
||||
# --- Struct-based configs ---
|
||||
|
||||
def get_baud_rate_config(self, handle: HANDLE) -> list[dict]:
|
||||
data = BAUD_CONFIG_DATA()
|
||||
status = self._lib.CP210x_GetBaudRateConfig(handle, data)
|
||||
self._check_status(status, "GetBaudRateConfig")
|
||||
return [
|
||||
{
|
||||
"index": i,
|
||||
"baud_gen": data[i].BaudGen,
|
||||
"timer0_reload": data[i].Timer0Reload,
|
||||
"prescaler": data[i].Prescaler,
|
||||
"baud_rate": data[i].BaudRate,
|
||||
}
|
||||
for i in range(NUM_BAUD_CONFIGS)
|
||||
]
|
||||
|
||||
def set_baud_rate_config(self, handle: HANDLE, configs: list[dict]):
|
||||
if len(configs) != NUM_BAUD_CONFIGS:
|
||||
raise ValueError(f"Expected {NUM_BAUD_CONFIGS} baud configs, got {len(configs)}")
|
||||
data = BAUD_CONFIG_DATA()
|
||||
for i, cfg in enumerate(configs):
|
||||
data[i].BaudGen = cfg["baud_gen"]
|
||||
data[i].Timer0Reload = cfg["timer0_reload"]
|
||||
data[i].Prescaler = cfg["prescaler"]
|
||||
data[i]._pad = 0
|
||||
data[i].BaudRate = cfg["baud_rate"]
|
||||
status = self._lib.CP210x_SetBaudRateConfig(handle, data)
|
||||
self._check_status(status, "SetBaudRateConfig")
|
||||
|
||||
def get_port_config(self, handle: HANDLE) -> dict:
|
||||
cfg = PORT_CONFIG()
|
||||
status = self._lib.CP210x_GetPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "GetPortConfig")
|
||||
return {
|
||||
"mode": cfg.Mode,
|
||||
"reset_latch": cfg.Reset_Latch,
|
||||
"suspend_latch": cfg.Suspend_Latch,
|
||||
"enhanced_fxn": cfg.EnhancedFxn,
|
||||
}
|
||||
|
||||
def set_port_config(self, handle: HANDLE, mode: int, reset_latch: int, suspend_latch: int, enhanced_fxn: int):
|
||||
cfg = PORT_CONFIG()
|
||||
cfg.Mode = mode
|
||||
cfg.Reset_Latch = reset_latch
|
||||
cfg.Suspend_Latch = suspend_latch
|
||||
cfg.EnhancedFxn = enhanced_fxn
|
||||
status = self._lib.CP210x_SetPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "SetPortConfig")
|
||||
|
||||
def get_dual_port_config(self, handle: HANDLE) -> dict:
|
||||
cfg = DUAL_PORT_CONFIG()
|
||||
status = self._lib.CP210x_GetDualPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "GetDualPortConfig")
|
||||
return {
|
||||
"mode": cfg.Mode,
|
||||
"reset_latch": cfg.Reset_Latch,
|
||||
"suspend_latch": cfg.Suspend_Latch,
|
||||
"enhanced_fxn_eci": cfg.EnhancedFxn_ECI,
|
||||
"enhanced_fxn_sci": cfg.EnhancedFxn_SCI,
|
||||
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
|
||||
}
|
||||
|
||||
def set_dual_port_config(self, handle: HANDLE, mode: int, reset_latch: int,
|
||||
suspend_latch: int, enhanced_fxn_eci: int,
|
||||
enhanced_fxn_sci: int, enhanced_fxn_device: int):
|
||||
cfg = DUAL_PORT_CONFIG()
|
||||
cfg.Mode = mode
|
||||
cfg.Reset_Latch = reset_latch
|
||||
cfg.Suspend_Latch = suspend_latch
|
||||
cfg.EnhancedFxn_ECI = enhanced_fxn_eci
|
||||
cfg.EnhancedFxn_SCI = enhanced_fxn_sci
|
||||
cfg.EnhancedFxn_Device = enhanced_fxn_device
|
||||
status = self._lib.CP210x_SetDualPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "SetDualPortConfig")
|
||||
|
||||
def _quad_port_state_to_dict(self, qps: QUAD_PORT_STATE) -> dict:
|
||||
return {
|
||||
f"pb{i}": {"mode": getattr(qps, f"Mode_PB{i}"),
|
||||
"low_power": getattr(qps, f"LowPower_PB{i}"),
|
||||
"latch": getattr(qps, f"Latch_PB{i}")}
|
||||
for i in range(5)
|
||||
}
|
||||
|
||||
def get_quad_port_config(self, handle: HANDLE) -> dict:
|
||||
cfg = QUAD_PORT_CONFIG()
|
||||
status = self._lib.CP210x_GetQuadPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "GetQuadPortConfig")
|
||||
return {
|
||||
"reset_latch": self._quad_port_state_to_dict(cfg.Reset_Latch),
|
||||
"suspend_latch": self._quad_port_state_to_dict(cfg.Suspend_Latch),
|
||||
"ip_delay": [cfg.IPDelay_IFC0, cfg.IPDelay_IFC1, cfg.IPDelay_IFC2, cfg.IPDelay_IFC3],
|
||||
"enhanced_fxn": [cfg.EnhancedFxn_IFC0, cfg.EnhancedFxn_IFC1,
|
||||
cfg.EnhancedFxn_IFC2, cfg.EnhancedFxn_IFC3],
|
||||
"enhanced_fxn_device": cfg.EnhancedFxn_Device,
|
||||
"ext_clk_freq": [cfg.ExtClk0Freq, cfg.ExtClk1Freq,
|
||||
cfg.ExtClk2Freq, cfg.ExtClk3Freq],
|
||||
}
|
||||
|
||||
def set_quad_port_config(self, handle: HANDLE, config_dict: dict):
|
||||
cfg = QUAD_PORT_CONFIG()
|
||||
|
||||
for state_name in ("reset_latch", "suspend_latch"):
|
||||
state = getattr(cfg, state_name.title().replace("_l", "_L"))
|
||||
src = config_dict[state_name]
|
||||
for i in range(5):
|
||||
pb = src[f"pb{i}"]
|
||||
setattr(state, f"Mode_PB{i}", pb["mode"])
|
||||
setattr(state, f"LowPower_PB{i}", pb["low_power"])
|
||||
setattr(state, f"Latch_PB{i}", pb["latch"])
|
||||
|
||||
for i in range(4):
|
||||
setattr(cfg, f"IPDelay_IFC{i}", config_dict["ip_delay"][i])
|
||||
setattr(cfg, f"EnhancedFxn_IFC{i}", config_dict["enhanced_fxn"][i])
|
||||
|
||||
cfg.EnhancedFxn_Device = config_dict["enhanced_fxn_device"]
|
||||
|
||||
for i in range(4):
|
||||
setattr(cfg, f"ExtClk{i}Freq", config_dict["ext_clk_freq"][i])
|
||||
|
||||
status = self._lib.CP210x_SetQuadPortConfig(handle, byref(cfg))
|
||||
self._check_status(status, "SetQuadPortConfig")
|
||||
|
||||
# --- Advanced / raw ---
|
||||
|
||||
def get_config(self, handle: HANDLE, size: int = 512) -> bytes:
|
||||
buf = (BYTE * size)()
|
||||
status = self._lib.CP210x_GetConfig(handle, buf, WORD(size))
|
||||
self._check_status(status, "GetConfig")
|
||||
return bytes(buf)
|
||||
|
||||
def set_config(self, handle: HANDLE, data: bytes):
|
||||
buf = (BYTE * len(data))(*data)
|
||||
status = self._lib.CP210x_SetConfig(handle, buf, WORD(len(data)))
|
||||
self._check_status(status, "SetConfig")
|
||||
|
||||
def get_generic(self, handle: HANDLE, size: int = 512) -> bytes:
|
||||
buf = (BYTE * size)()
|
||||
status = self._lib.CP210x_GetGeneric(handle, buf, WORD(size))
|
||||
self._check_status(status, "GetGeneric")
|
||||
return bytes(buf)
|
||||
|
||||
def set_generic(self, handle: HANDLE, data: bytes):
|
||||
buf = (BYTE * len(data))(*data)
|
||||
status = self._lib.CP210x_SetGeneric(handle, buf, WORD(len(data)))
|
||||
self._check_status(status, "SetGeneric")
|
||||
|
||||
def create_hex_file(self, handle: HANDLE, filename: str):
|
||||
status = self._lib.CP210x_CreateHexFile(handle, filename.encode('utf-8'))
|
||||
self._check_status(status, "CreateHexFile")
|
||||
|
||||
def update_firmware(self, handle: HANDLE):
|
||||
status = self._lib.CP210x_UpdateFirmware(handle)
|
||||
self._check_status(status, "UpdateFirmware")
|
||||
|
||||
def reset(self, handle: HANDLE):
|
||||
status = self._lib.CP210x_Reset(handle)
|
||||
self._check_status(status, "Reset")
|
||||
|
||||
|
||||
class CP210xDevice:
|
||||
"""Context manager for safe device access."""
|
||||
|
||||
def __init__(self, lib: CP210xLibrary, device_index: int):
|
||||
self.lib = lib
|
||||
self.device_index = device_index
|
||||
self.handle = None
|
||||
self._part_code: int | None = None
|
||||
|
||||
def __enter__(self):
|
||||
self.handle = self.lib.open(self.device_index)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.handle:
|
||||
self.lib.close(self.handle)
|
||||
self.handle = None
|
||||
return False
|
||||
|
||||
def _get_part_code(self) -> int:
|
||||
if self._part_code is None:
|
||||
self._part_code = self.lib.get_part_number(self.handle)[0]
|
||||
return self._part_code
|
||||
|
||||
def _require_part(self, allowed: set[int], feature: str):
|
||||
code = self._get_part_code()
|
||||
if code not in allowed:
|
||||
name = PART_NUMBERS.get(code, f"0x{code:02X}")
|
||||
allowed_names = ", ".join(PART_NUMBERS.get(c, f"0x{c:02X}") for c in sorted(allowed))
|
||||
raise CP210xError(
|
||||
CP210x_FUNCTION_NOT_SUPPORTED,
|
||||
f"{feature} not supported on {name} (requires {allowed_names})",
|
||||
)
|
||||
|
||||
@property
|
||||
def part_number(self) -> tuple[int, str]:
|
||||
return self.lib.get_part_number(self.handle)
|
||||
|
||||
@property
|
||||
def vid(self) -> int:
|
||||
return self.lib.get_device_vid(self.handle)
|
||||
|
||||
@property
|
||||
def pid(self) -> int:
|
||||
return self.lib.get_device_pid(self.handle)
|
||||
|
||||
@property
|
||||
def manufacturer(self) -> str:
|
||||
return self.lib.get_manufacturer_string(self.handle)
|
||||
|
||||
@manufacturer.setter
|
||||
def manufacturer(self, value: str):
|
||||
self.lib.set_manufacturer_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def product(self) -> str:
|
||||
return self.lib.get_product_string_device(self.handle)
|
||||
|
||||
@product.setter
|
||||
def product(self, value: str):
|
||||
self.lib.set_product_string(self.handle, value)
|
||||
|
||||
@property
|
||||
def serial_number(self) -> str:
|
||||
return self.lib.get_serial_number(self.handle)
|
||||
|
||||
@serial_number.setter
|
||||
def serial_number(self, value: str):
|
||||
self.lib.set_serial_number(self.handle, value)
|
||||
|
||||
@property
|
||||
def max_power_ma(self) -> int:
|
||||
return self.lib.get_max_power(self.handle) * 2
|
||||
|
||||
@max_power_ma.setter
|
||||
def max_power_ma(self, value: int):
|
||||
self.lib.set_max_power(self.handle, value // 2)
|
||||
|
||||
@property
|
||||
def self_powered(self) -> bool:
|
||||
return self.lib.get_self_power(self.handle)
|
||||
|
||||
@self_powered.setter
|
||||
def self_powered(self, value: bool):
|
||||
self.lib.set_self_power(self.handle, value)
|
||||
|
||||
@property
|
||||
def device_version(self) -> str:
|
||||
v = self.lib.get_device_version(self.handle)
|
||||
return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}"
|
||||
|
||||
@device_version.setter
|
||||
def device_version(self, value: int):
|
||||
self.lib.set_device_version(self.handle, value)
|
||||
|
||||
@property
|
||||
def is_locked(self) -> bool:
|
||||
return self.lib.get_lock_value(self.handle) != 0
|
||||
|
||||
@property
|
||||
def firmware_version(self) -> str | None:
|
||||
"""Firmware version string (CP2102N only). Returns None if not supported."""
|
||||
try:
|
||||
major, minor, build = self.lib.get_firmware_version(self.handle)
|
||||
return f"{major}.{minor}.{build}"
|
||||
except CP210xError:
|
||||
return None
|
||||
|
||||
@property
|
||||
def flush_buffer_config(self) -> int | None:
|
||||
"""Raw flush buffer config word. Returns None if not supported."""
|
||||
try:
|
||||
return self.lib.get_flush_buffer_config(self.handle)
|
||||
except CP210xError:
|
||||
return None
|
||||
|
||||
@property
|
||||
def device_mode(self) -> tuple[int, int] | None:
|
||||
"""(ECI mode, SCI mode) for CP2105. Returns None if not supported."""
|
||||
try:
|
||||
return self.lib.get_device_mode(self.handle)
|
||||
except CP210xError:
|
||||
return None
|
||||
|
||||
def get_interface_string(self, interface_number: int) -> str:
|
||||
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
|
||||
return self.lib.get_interface_string(self.handle, interface_number)
|
||||
|
||||
def set_interface_string(self, interface_number: int, value: str):
|
||||
self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings")
|
||||
self.lib.set_interface_string(self.handle, interface_number, value)
|
||||
|
||||
def reset(self):
|
||||
self.lib.reset(self.handle)
|
||||
957
src/mcp210x_uart/server.py
Normal file
957
src/mcp210x_uart/server.py
Normal file
@ -0,0 +1,957 @@
|
||||
"""FastMCP server for CP210x device customization."""
|
||||
|
||||
import subprocess
|
||||
|
||||
from fastmcp import Context, FastMCP
|
||||
from fastmcp.server.elicitation import AcceptedElicitation
|
||||
|
||||
from .bindings import (
|
||||
ENHANCED_FXN_FLAGS,
|
||||
FLUSH_FLAGS_CP2104,
|
||||
FLUSH_FLAGS_CP2105,
|
||||
PARTS_CP2102N,
|
||||
PARTS_WITH_DEVICE_MODE,
|
||||
PARTS_WITH_DUAL_PORT_CONFIG,
|
||||
PARTS_WITH_FLUSH_CONFIG,
|
||||
PARTS_WITH_PORT_CONFIG,
|
||||
PARTS_WITH_QUAD_PORT_CONFIG,
|
||||
PORT_PIN_FLAGS,
|
||||
CP210xDevice,
|
||||
CP210xError,
|
||||
CP210xLibrary,
|
||||
decode_bitmask,
|
||||
encode_bitmask,
|
||||
)
|
||||
|
||||
mcp = FastMCP(
|
||||
"cp210x",
|
||||
instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration",
|
||||
)
|
||||
|
||||
_lib: CP210xLibrary | None = None
|
||||
|
||||
|
||||
def get_lib() -> CP210xLibrary:
|
||||
global _lib
|
||||
if _lib is None:
|
||||
_lib = CP210xLibrary()
|
||||
return _lib
|
||||
|
||||
|
||||
async def confirm_write(ctx: Context, message: str) -> bool:
|
||||
"""Normal-tier confirmation: falls back to proceeding if elicitation unavailable."""
|
||||
try:
|
||||
result = await ctx.elicit(message, ["Confirm", "Cancel"])
|
||||
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
async def strict_confirm(ctx: Context, message: str) -> dict | None:
|
||||
"""Strict-tier confirmation: returns error dict if elicitation unavailable."""
|
||||
if not ctx:
|
||||
return {
|
||||
"error": "This operation requires interactive confirmation",
|
||||
"message": "Too dangerous to proceed without explicit user consent.",
|
||||
}
|
||||
try:
|
||||
result = await ctx.elicit(message, ["Confirm", "Cancel"])
|
||||
confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm"
|
||||
except Exception:
|
||||
return {
|
||||
"error": "This operation requires elicitation support",
|
||||
"message": "Your MCP client does not support elicitation. "
|
||||
"This operation is too dangerous to proceed without explicit confirmation.",
|
||||
}
|
||||
if not confirmed:
|
||||
return {"cancelled": True, "message": "Operation cancelled by user"}
|
||||
return None
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Read-only tools (no confirmation needed)
|
||||
# ===========================================================================
|
||||
|
||||
@mcp.tool()
|
||||
def list_devices() -> list[dict]:
|
||||
"""List all connected CP210x devices.
|
||||
|
||||
Returns a list of devices with their index, description, and serial number.
|
||||
Use the index to reference a specific device in other tools.
|
||||
"""
|
||||
lib = get_lib()
|
||||
num_devices = lib.get_num_devices()
|
||||
|
||||
devices = []
|
||||
for i in range(num_devices):
|
||||
try:
|
||||
desc = lib.get_product_string(i, flag=1)
|
||||
serial = lib.get_product_string(i, flag=0)
|
||||
devices.append({
|
||||
"index": i,
|
||||
"description": desc,
|
||||
"serial_number": serial,
|
||||
})
|
||||
except CP210xError as e:
|
||||
devices.append({"index": i, "error": str(e)})
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_device_info(device_index: int = 0) -> dict:
|
||||
"""Get detailed information about a CP210x device.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based index of the device (default: 0 for first device)
|
||||
|
||||
Returns full device details including part number, VID/PID, strings, and power settings.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
info = {
|
||||
"part_number": part_name,
|
||||
"part_code": f"0x{part_code:02X}",
|
||||
"vid": f"0x{dev.vid:04X}",
|
||||
"pid": f"0x{dev.pid:04X}",
|
||||
"manufacturer": dev.manufacturer,
|
||||
"product": dev.product,
|
||||
"serial_number": dev.serial_number,
|
||||
"device_version": dev.device_version,
|
||||
"max_power_ma": dev.max_power_ma,
|
||||
"self_powered": dev.self_powered,
|
||||
"is_locked": dev.is_locked,
|
||||
}
|
||||
|
||||
# Conditionally add part-specific fields
|
||||
fw = dev.firmware_version
|
||||
if fw is not None:
|
||||
info["firmware_version"] = fw
|
||||
|
||||
flush = dev.flush_buffer_config
|
||||
if flush is not None:
|
||||
# Decode to named flags based on part
|
||||
if part_code in PARTS_WITH_FLUSH_CONFIG:
|
||||
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
|
||||
info["flush_buffer_config"] = decode_bitmask(flush, flag_map)
|
||||
else:
|
||||
info["flush_buffer_config_raw"] = f"0x{flush:04X}"
|
||||
|
||||
mode = dev.device_mode
|
||||
if mode is not None:
|
||||
info["device_mode"] = {"eci": mode[0], "sci": mode[1]}
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_firmware_version(device_index: int = 0) -> dict:
|
||||
"""Get firmware version (CP2102N only).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
fw = dev.firmware_version
|
||||
if fw is None:
|
||||
part_code, part_name = dev.part_number
|
||||
return {"error": f"Firmware version not available on {part_name}",
|
||||
"note": "Only CP2102N variants support this"}
|
||||
return {"firmware_version": fw}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_flush_buffer_config(device_index: int = 0) -> dict:
|
||||
"""Get flush buffer configuration (CP2104/CP2105/CP2108).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns named flags showing which buffers are flushed on open/close.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
if part_code not in PARTS_WITH_FLUSH_CONFIG:
|
||||
return {"error": f"Flush buffer config not available on {part_name}",
|
||||
"supported": "CP2104, CP2105, CP2108"}
|
||||
|
||||
raw = lib.get_flush_buffer_config(dev.handle)
|
||||
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
|
||||
return {
|
||||
"raw": f"0x{raw:04X}",
|
||||
"flags": decode_bitmask(raw, flag_map),
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_device_mode(device_index: int = 0) -> dict:
|
||||
"""Get device mode (CP2105 only).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns ECI and SCI interface modes.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
if part_code not in PARTS_WITH_DEVICE_MODE:
|
||||
return {"error": f"Device mode not available on {part_name}",
|
||||
"supported": "CP2105"}
|
||||
|
||||
eci, sci = lib.get_device_mode(dev.handle)
|
||||
return {"eci_mode": eci, "sci_mode": sci}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_interface_string(interface_number: int, device_index: int = 0) -> dict:
|
||||
"""Get USB interface string (CP2105/CP2108 only).
|
||||
|
||||
Args:
|
||||
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
try:
|
||||
value = dev.get_interface_string(interface_number)
|
||||
return {"interface": interface_number, "value": value}
|
||||
except CP210xError as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_baud_rate_config(device_index: int = 0) -> dict:
|
||||
"""Get the baud rate alias configuration table (32 entries).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns the full 32-entry baud rate alias table showing how standard
|
||||
baud rates map to timer register values.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
configs = lib.get_baud_rate_config(dev.handle)
|
||||
return {"entries": configs}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_port_config(device_index: int = 0) -> dict:
|
||||
"""Get GPIO port configuration (auto-detects CP2103/4/5/8).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns pin modes, latch values, and enhanced function settings.
|
||||
Auto-dispatches to the correct struct based on part number.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
|
||||
if part_code in PARTS_WITH_PORT_CONFIG:
|
||||
raw = lib.get_port_config(dev.handle)
|
||||
return {
|
||||
"type": "single",
|
||||
"part": part_name,
|
||||
"mode": decode_bitmask(raw["mode"], PORT_PIN_FLAGS),
|
||||
"reset_latch": decode_bitmask(raw["reset_latch"], PORT_PIN_FLAGS),
|
||||
"suspend_latch": decode_bitmask(raw["suspend_latch"], PORT_PIN_FLAGS),
|
||||
"enhanced_fxn": decode_bitmask(raw["enhanced_fxn"], ENHANCED_FXN_FLAGS),
|
||||
"raw": raw,
|
||||
}
|
||||
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
|
||||
raw = lib.get_dual_port_config(dev.handle)
|
||||
return {"type": "dual", "part": part_name, **raw}
|
||||
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
|
||||
raw = lib.get_quad_port_config(dev.handle)
|
||||
return {"type": "quad", "part": part_name, **raw}
|
||||
else:
|
||||
return {"error": f"Port config not available on {part_name}",
|
||||
"supported": "CP2103, CP2104, CP2105, CP2108"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_raw_config(device_index: int = 0, size: int = 512) -> dict:
|
||||
"""Read raw EPROM configuration blob.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
size: Number of bytes to read (default: 512)
|
||||
|
||||
Returns hex-encoded config data.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
data = lib.get_config(dev.handle, size)
|
||||
return {"hex": data.hex(), "size": len(data)}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def create_hex_file(filename: str, device_index: int = 0) -> dict:
|
||||
"""Dump device config to Intel HEX file.
|
||||
|
||||
Args:
|
||||
filename: Output file path for the .hex file
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
lib.create_hex_file(dev.handle, filename)
|
||||
return {"success": True, "filename": filename}
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Normal-tier write tools (elicit → fallback to proceed)
|
||||
# ===========================================================================
|
||||
|
||||
@mcp.tool()
|
||||
async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB product string (device name shown to host).
|
||||
|
||||
Args:
|
||||
product: New product string (max 126 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Returns the updated device info. Device may need to be re-plugged for
|
||||
changes to appear on the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_product = dev.product
|
||||
new_value = product[:126]
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write product string to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_product}\n New: {new_value}\n\n"
|
||||
f"OTP writes are limited — this cannot be undone easily.",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.product = product
|
||||
return {"success": True, "old_value": old_product, "new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB manufacturer string.
|
||||
|
||||
Args:
|
||||
manufacturer: New manufacturer string (max 45 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_mfr = dev.manufacturer
|
||||
new_value = manufacturer[:45]
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_mfr}\n New: {new_value}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.manufacturer = manufacturer
|
||||
return {"success": True, "old_value": old_mfr, "new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB serial number.
|
||||
|
||||
Args:
|
||||
serial_number: New serial number (max 63 characters)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
Note: Changing serial number may affect udev rules that match on serial.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_serial = dev.serial_number
|
||||
new_value = serial_number[:63]
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write serial number to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_serial}\n New: {new_value}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.serial_number = serial_number
|
||||
return {"success": True, "old_value": old_serial, "new_value": new_value,
|
||||
"note": "Re-plug device for changes to take effect on USB host"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the maximum USB power draw in milliamps.
|
||||
|
||||
Args:
|
||||
max_power_ma: Max power in mA (0-500, will be rounded to nearest 2mA)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
if max_power_ma < 0 or max_power_ma > 500:
|
||||
return {"error": "max_power_ma must be 0-500"}
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_power = dev.max_power_ma
|
||||
actual_value = (max_power_ma // 2) * 2
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write max power to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_power} mA\n New: {actual_value} mA",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.max_power_ma = max_power_ma
|
||||
return {"success": True, "old_value_ma": old_power, "new_value_ma": actual_value}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set whether device reports as self-powered or bus-powered.
|
||||
|
||||
Args:
|
||||
self_powered: True for self-powered, False for bus-powered
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_value = dev.self_powered
|
||||
mode = "self-powered" if self_powered else "bus-powered"
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write power mode to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {'self-powered' if old_value else 'bus-powered'}\n New: {mode}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.self_powered = self_powered
|
||||
return {"success": True, "old_value": old_value, "new_value": self_powered}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_device_version(version: int, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set device version (bcdDevice field).
|
||||
|
||||
Args:
|
||||
version: BCD version number (e.g., 0x0100 for 1.00)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_version = dev.device_version
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write device version to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_version}\n New: 0x{version:04X}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.device_version = version
|
||||
new_ver = f"{(version >> 8) & 0xFF}.{version & 0xFF:02d}"
|
||||
return {"success": True, "old_value": old_version, "new_value": new_ver}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_interface_string(
|
||||
interface_number: int, value: str, device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Set USB interface string (CP2105/CP2108 only).
|
||||
|
||||
Args:
|
||||
interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108)
|
||||
value: New interface string
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
try:
|
||||
old_value = dev.get_interface_string(interface_number)
|
||||
except CP210xError as e:
|
||||
return {"error": str(e)}
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write interface {interface_number} string to CP210x OTP EPROM?\n\n"
|
||||
f" Old: {old_value}\n New: {value}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
dev.set_interface_string(interface_number, value)
|
||||
return {"success": True, "interface": interface_number,
|
||||
"old_value": old_value, "new_value": value}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_flush_buffer_config(
|
||||
flags: dict, device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Set flush buffer configuration (CP2104/CP2105/CP2108).
|
||||
|
||||
Args:
|
||||
flags: Dict of flag_name -> bool. Flag names depend on part:
|
||||
CP2104: open_tx, open_rx, close_tx, close_rx
|
||||
CP2105: open_tx_sci, open_rx_sci, close_tx_sci, close_rx_sci,
|
||||
open_tx_eci, open_rx_eci, close_tx_eci, close_rx_eci
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
part_code, part_name = dev.part_number
|
||||
if part_code not in PARTS_WITH_FLUSH_CONFIG:
|
||||
return {"error": f"Flush buffer config not available on {part_name}"}
|
||||
flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104
|
||||
new_value = encode_bitmask(flags, flag_map)
|
||||
old_raw = lib.get_flush_buffer_config(dev.handle)
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write flush buffer config to CP210x?\n\n"
|
||||
f" Old: 0x{old_raw:04X}\n New: 0x{new_value:04X}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
lib.set_flush_buffer_config(dev.handle, new_value)
|
||||
return {"success": True, "old_raw": f"0x{old_raw:04X}",
|
||||
"new_raw": f"0x{new_value:04X}", "flags": flags}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_device_mode(
|
||||
eci_mode: int, sci_mode: int, device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Set device mode (CP2105 only).
|
||||
|
||||
Args:
|
||||
eci_mode: Enhanced interface mode byte
|
||||
sci_mode: Standard interface mode byte
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
part_code, part_name = dev.part_number
|
||||
if part_code not in PARTS_WITH_DEVICE_MODE:
|
||||
return {"error": f"Device mode not available on {part_name}"}
|
||||
old_eci, old_sci = lib.get_device_mode(dev.handle)
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write device mode to CP210x?\n\n"
|
||||
f" Old ECI: 0x{old_eci:02X}, SCI: 0x{old_sci:02X}\n"
|
||||
f" New ECI: 0x{eci_mode:02X}, SCI: 0x{sci_mode:02X}",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
lib.set_device_mode(dev.handle, eci_mode, sci_mode)
|
||||
return {"success": True,
|
||||
"old": {"eci": old_eci, "sci": old_sci},
|
||||
"new": {"eci": eci_mode, "sci": sci_mode}}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_baud_rate_config(
|
||||
entries: list[dict], device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Set the full 32-entry baud rate alias table.
|
||||
|
||||
Args:
|
||||
entries: List of 32 dicts, each with baud_gen, timer0_reload, prescaler, baud_rate
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
if len(entries) != 32:
|
||||
return {"error": f"Expected 32 baud config entries, got {len(entries)}"}
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
"Write complete baud rate table (32 entries) to CP210x OTP EPROM?",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
lib.set_baud_rate_config(dev.handle, entries)
|
||||
return {"success": True, "entries_written": 32}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_baud_rate_alias(
|
||||
index: int, baud_rate: int, baud_gen: int, timer0_reload: int, prescaler: int,
|
||||
device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Modify a single baud rate alias entry (read-modify-write).
|
||||
|
||||
Args:
|
||||
index: Entry index (0-31)
|
||||
baud_rate: Target baud rate
|
||||
baud_gen: Baud rate generator register value
|
||||
timer0_reload: Timer0 reload register value
|
||||
prescaler: Prescaler value
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
if not 0 <= index < 32:
|
||||
return {"error": "Index must be 0-31"}
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
current = lib.get_baud_rate_config(dev.handle)
|
||||
old_entry = current[index].copy()
|
||||
current[index] = {
|
||||
"index": index,
|
||||
"baud_gen": baud_gen,
|
||||
"timer0_reload": timer0_reload,
|
||||
"prescaler": prescaler,
|
||||
"baud_rate": baud_rate,
|
||||
}
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Modify baud rate alias #{index}?\n\n"
|
||||
f" Old: {old_entry['baud_rate']} baud\n"
|
||||
f" New: {baud_rate} baud",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
lib.set_baud_rate_config(dev.handle, current)
|
||||
return {"success": True, "index": index,
|
||||
"old_entry": old_entry, "new_baud_rate": baud_rate}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_port_config(
|
||||
config: dict, device_index: int = 0, ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Set GPIO port configuration (auto-detects CP2103/4/5/8).
|
||||
|
||||
Args:
|
||||
config: Port configuration dict. Structure depends on part type:
|
||||
Single (CP2103/4): {mode, reset_latch, suspend_latch, enhanced_fxn}
|
||||
- Values can be raw integers or named-flag dicts
|
||||
Dual (CP2105): {mode, reset_latch, suspend_latch,
|
||||
enhanced_fxn_eci, enhanced_fxn_sci, enhanced_fxn_device}
|
||||
Quad (CP2108): Full quad port config dict (see get_port_config output)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
part_code, part_name = dev.part_number
|
||||
|
||||
if ctx and not await confirm_write(
|
||||
ctx,
|
||||
f"Write port configuration to {part_name} OTP EPROM?",
|
||||
):
|
||||
return {"cancelled": True, "message": "Write cancelled by user"}
|
||||
|
||||
if part_code in PARTS_WITH_PORT_CONFIG:
|
||||
# Allow either raw ints or named-flag dicts
|
||||
mode = config.get("mode", 0)
|
||||
if isinstance(mode, dict):
|
||||
mode = encode_bitmask(mode, PORT_PIN_FLAGS)
|
||||
reset = config.get("reset_latch", 0)
|
||||
if isinstance(reset, dict):
|
||||
reset = encode_bitmask(reset, PORT_PIN_FLAGS)
|
||||
suspend = config.get("suspend_latch", 0)
|
||||
if isinstance(suspend, dict):
|
||||
suspend = encode_bitmask(suspend, PORT_PIN_FLAGS)
|
||||
ef = config.get("enhanced_fxn", 0)
|
||||
if isinstance(ef, dict):
|
||||
ef = encode_bitmask(ef, ENHANCED_FXN_FLAGS)
|
||||
lib.set_port_config(dev.handle, mode, reset, suspend, ef)
|
||||
return {"success": True, "type": "single", "part": part_name}
|
||||
|
||||
elif part_code in PARTS_WITH_DUAL_PORT_CONFIG:
|
||||
lib.set_dual_port_config(
|
||||
dev.handle,
|
||||
config["mode"], config["reset_latch"], config["suspend_latch"],
|
||||
config["enhanced_fxn_eci"], config["enhanced_fxn_sci"],
|
||||
config["enhanced_fxn_device"],
|
||||
)
|
||||
return {"success": True, "type": "dual", "part": part_name}
|
||||
|
||||
elif part_code in PARTS_WITH_QUAD_PORT_CONFIG:
|
||||
lib.set_quad_port_config(dev.handle, config)
|
||||
return {"success": True, "type": "quad", "part": part_name}
|
||||
|
||||
else:
|
||||
return {"error": f"Port config not available on {part_name}"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def reset_device(device_index: int = 0) -> dict:
|
||||
"""Reset the CP210x device (USB disconnect/reconnect).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
This triggers a USB re-enumeration, which will make any programmed
|
||||
changes visible to the USB host.
|
||||
"""
|
||||
lib = get_lib()
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
dev.reset()
|
||||
return {"success": True, "note": "Device has been reset. It may take a moment to re-enumerate."}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def setup_udev_rule(
|
||||
device_index: int = 0,
|
||||
symlink_name: str | None = None,
|
||||
ctx: Context = None,
|
||||
) -> dict:
|
||||
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
|
||||
|
||||
Generates a rule that matches the device's product string and creates
|
||||
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
|
||||
and port reordering.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
symlink_name: Custom symlink name. If not provided, auto-generates
|
||||
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
|
||||
|
||||
Requires sudo for rule installation.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
product = dev.product
|
||||
vid = dev.vid
|
||||
pid = dev.pid
|
||||
|
||||
if not product or product == "CP2102 USB to UART Bridge Controller":
|
||||
return {
|
||||
"error": "Device has default product string",
|
||||
"message": "Customize the product string first to create a unique match rule.",
|
||||
}
|
||||
|
||||
if not symlink_name:
|
||||
parts = product.split()
|
||||
if len(parts) >= 2:
|
||||
prefix = parts[0].lower()
|
||||
suffix = parts[-1][-4:]
|
||||
symlink_name = f"{prefix}-{suffix}"
|
||||
else:
|
||||
symlink_name = product.lower().replace(" ", "-")[:32]
|
||||
|
||||
match_suffix = product[-4:]
|
||||
rule = (
|
||||
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
|
||||
f'ATTRS{{idProduct}}=="{pid:04x}", '
|
||||
f'ATTRS{{product}}=="*{match_suffix}", '
|
||||
f'SYMLINK+="{symlink_name}"'
|
||||
)
|
||||
|
||||
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
|
||||
rule_content = (
|
||||
f"# Auto-generated by mcp210x-uart for: {product}\n"
|
||||
f"# Creates /dev/{symlink_name} symlink\n"
|
||||
f"{rule}\n"
|
||||
)
|
||||
|
||||
install_msg = (
|
||||
f"Install udev rule for stable device symlink?\n\n"
|
||||
f" Device: {product}\n"
|
||||
f" Symlink: /dev/{symlink_name}\n"
|
||||
f" File: {rules_path}\n\n"
|
||||
f"Requires sudo to install."
|
||||
)
|
||||
|
||||
if ctx and not await confirm_write(ctx, install_msg):
|
||||
return {"cancelled": True, "rule": rule_content,
|
||||
"message": "Rule not installed. You can install it manually."}
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["sudo", "tee", rules_path],
|
||||
input=rule_content, capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return {"error": f"Failed to install rule: {result.stderr.strip()}",
|
||||
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
|
||||
subprocess.run(["sudo", "udevadm", "control", "--reload-rules"],
|
||||
capture_output=True, timeout=10)
|
||||
subprocess.run(["sudo", "udevadm", "trigger"],
|
||||
capture_output=True, timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"error": "Sudo timed out — may need interactive password",
|
||||
"rule": rule_content, "message": "Install manually with: sudo tee " + rules_path}
|
||||
|
||||
return {"success": True, "symlink": f"/dev/{symlink_name}",
|
||||
"rules_file": rules_path, "rule": rule_content,
|
||||
"note": "Symlink will appear after device re-plug or udevadm trigger"}
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Strict-tier write tools (hard-refuse without elicitation)
|
||||
# ===========================================================================
|
||||
|
||||
@mcp.tool()
|
||||
async def set_vid(vid: int, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB Vendor ID (DANGEROUS — can break driver matching).
|
||||
|
||||
Args:
|
||||
vid: New USB Vendor ID (16-bit, e.g., 0x10C4 for Silicon Labs)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: Changing VID can prevent the device from being recognized by
|
||||
standard CP210x drivers. Only use if you have custom drivers.
|
||||
"""
|
||||
lib = get_lib()
|
||||
err = await strict_confirm(
|
||||
ctx,
|
||||
f"Write USB Vendor ID to CP210x OTP EPROM?\n\n"
|
||||
f" New VID: 0x{vid:04X}\n\n"
|
||||
f"WARNING: Changing VID can prevent the device from being recognized\n"
|
||||
f"by standard CP210x drivers. This is PERMANENT.",
|
||||
)
|
||||
if err:
|
||||
return err
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_vid = dev.vid
|
||||
lib.set_vid(dev.handle, vid)
|
||||
return {"success": True, "old_vid": f"0x{old_vid:04X}", "new_vid": f"0x{vid:04X}"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_pid(pid: int, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Set the USB Product ID (DANGEROUS — can break driver matching).
|
||||
|
||||
Args:
|
||||
pid: New USB Product ID (16-bit, e.g., 0xEA60)
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: Changing PID can prevent the device from being recognized by
|
||||
standard CP210x drivers.
|
||||
"""
|
||||
lib = get_lib()
|
||||
err = await strict_confirm(
|
||||
ctx,
|
||||
f"Write USB Product ID to CP210x OTP EPROM?\n\n"
|
||||
f" New PID: 0x{pid:04X}\n\n"
|
||||
f"WARNING: Changing PID can prevent the device from being recognized\n"
|
||||
f"by standard CP210x drivers. This is PERMANENT.",
|
||||
)
|
||||
if err:
|
||||
return err
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
old_pid = dev.pid
|
||||
lib.set_pid(dev.handle, pid)
|
||||
return {"success": True, "old_pid": f"0x{old_pid:04X}", "new_pid": f"0x{pid:04X}"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_raw_config(hex_data: str, device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Write raw configuration blob to device EPROM (DANGEROUS).
|
||||
|
||||
Args:
|
||||
hex_data: Hex-encoded config data
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: Writing invalid config data can brick the device.
|
||||
Use get_raw_config to read current config first.
|
||||
"""
|
||||
lib = get_lib()
|
||||
try:
|
||||
data = bytes.fromhex(hex_data)
|
||||
except ValueError:
|
||||
return {"error": "Invalid hex string"}
|
||||
err = await strict_confirm(
|
||||
ctx,
|
||||
f"Write {len(data)} bytes of raw config to CP210x EPROM?\n\n"
|
||||
f"WARNING: Writing invalid config data can brick the device.\n"
|
||||
f"This is PERMANENT and cannot be undone.",
|
||||
)
|
||||
if err:
|
||||
return err
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is locked and cannot be modified"}
|
||||
lib.set_config(dev.handle, data)
|
||||
return {"success": True, "bytes_written": len(data)}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def update_firmware(device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""Update device firmware (CP2102N only, DANGEROUS).
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: Firmware update failure can brick the device.
|
||||
"""
|
||||
lib = get_lib()
|
||||
err = await strict_confirm(
|
||||
ctx,
|
||||
"Update CP2102N firmware?\n\n"
|
||||
"WARNING: Firmware update failure can BRICK the device.\n"
|
||||
"Ensure stable USB connection and power during update.",
|
||||
)
|
||||
if err:
|
||||
return err
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
part_code, part_name = dev.part_number
|
||||
if part_code not in PARTS_CP2102N:
|
||||
return {"error": f"Firmware update not available on {part_name}",
|
||||
"supported": "CP2102N variants only"}
|
||||
lib.update_firmware(dev.handle)
|
||||
return {"success": True, "note": "Firmware updated. Device may need reset."}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
|
||||
"""PERMANENTLY lock the device to prevent further customization.
|
||||
|
||||
Args:
|
||||
device_index: Zero-based device index (default: 0)
|
||||
|
||||
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
|
||||
configuration cannot be changed. The device will still function normally,
|
||||
but strings, power settings, etc. cannot be modified.
|
||||
"""
|
||||
lib = get_lib()
|
||||
|
||||
with CP210xDevice(lib, device_index) as dev:
|
||||
if dev.is_locked:
|
||||
return {"error": "Device is already locked"}
|
||||
|
||||
part_code, part_name = dev.part_number
|
||||
err = await strict_confirm(
|
||||
ctx,
|
||||
f"PERMANENTLY lock this {part_name}?\n\n"
|
||||
f" Product: {dev.product}\n"
|
||||
f" Serial: {dev.serial_number}\n\n"
|
||||
f"THIS CANNOT BE UNDONE. The device configuration\n"
|
||||
f"will be frozen forever.",
|
||||
)
|
||||
if err:
|
||||
return err
|
||||
|
||||
lib.lock_device(dev.handle)
|
||||
return {"success": True,
|
||||
"warning": "Device is now PERMANENTLY locked. Configuration cannot be changed."}
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point for the MCP server."""
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
10
uv.lock
generated
10
uv.lock
generated
@ -780,8 +780,8 @@ wheels = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mcp210x"
|
||||
version = "0.1.0"
|
||||
name = "mcp210x-uart"
|
||||
version = "0.2.0"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "fastmcp" },
|
||||
@ -1191,11 +1191,11 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "pyjwt"
|
||||
version = "2.10.1"
|
||||
version = "2.11.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e7/46/bd74733ff231675599650d3e47f361794b22ef3e3770998dda30d3b63726/pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953", size = 87785, upload-time = "2024-11-28T03:43:29.933Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/5c/5a/b46fa56bf322901eee5b0454a34343cdbdae202cd421775a8ee4e42fd519/pyjwt-2.11.0.tar.gz", hash = "sha256:35f95c1f0fbe5d5ba6e43f00271c275f7a1a4db1dab27bf708073b75318ea623", size = 98019, upload-time = "2026-01-30T19:59:55.694Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/6f/01/c26ce75ba460d5cd503da9e13b21a33804d38c2165dec7b716d06b13010c/pyjwt-2.11.0-py3-none-any.whl", hash = "sha256:94a6bde30eb5c8e04fee991062b534071fd1439ef58d2adc9ccb823e7bcd0469", size = 28224, upload-time = "2026-01-30T19:59:54.539Z" },
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user