diff --git a/src/mcp210x_uart/__init__.py b/src/mcp210x_uart/__init__.py index 8c3519b..eb8d5f3 100644 --- a/src/mcp210x_uart/__init__.py +++ b/src/mcp210x_uart/__init__.py @@ -1,5 +1,5 @@ """mcp210x-uart - MCP server for Silicon Labs CP210x USB-UART bridge customization.""" -from .server import mcp, main +from .server import main, mcp __all__ = ["mcp", "main"] diff --git a/src/mcp210x_uart/bindings.py b/src/mcp210x_uart/bindings.py index 91dcd3d..1b68d91 100644 --- a/src/mcp210x_uart/bindings.py +++ b/src/mcp210x_uart/bindings.py @@ -1,9 +1,8 @@ """Low-level ctypes bindings for libcp210xmanufacturing.""" import ctypes -from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_void_p, POINTER, byref +from ctypes import POINTER, Structure, byref, c_char_p, c_int, c_ubyte, c_uint, c_ushort, c_void_p from pathlib import Path -from typing import Optional # Type aliases matching the C library CP210x_STATUS = c_int @@ -39,17 +38,238 @@ PART_NUMBERS = { 0x22: "CP2102N-QFN20", } +# Part number groups for feature gating +PARTS_CP2102N = {0x20, 0x21, 0x22} +PARTS_WITH_FLUSH_CONFIG = {0x04, 0x05, 0x08} # CP2104, CP2105, CP2108 +PARTS_WITH_DEVICE_MODE = {0x05} # CP2105 only +PARTS_WITH_INTERFACE_STRING = {0x05, 0x08} # CP2105, CP2108 +PARTS_WITH_PORT_CONFIG = {0x03, 0x04} # CP2103, CP2104 +PARTS_WITH_DUAL_PORT_CONFIG = {0x05} # CP2105 +PARTS_WITH_QUAD_PORT_CONFIG = {0x08} # CP2108 + # Buffer sizes MAX_DEVICE_STRLEN = 256 MAX_MANUFACTURER_STRLEN = 45 MAX_PRODUCT_STRLEN = 126 MAX_SERIAL_STRLEN = 63 +CP2105_MAX_INTERFACE_STRLEN = 32 +CP2108_MAX_INTERFACE_STRLEN = 126 # GetProductString flags RETURN_SERIAL_NUMBER = 0x00 RETURN_DESCRIPTION = 0x01 RETURN_FULL_PATH = 0x02 +# Flush buffer config flags (CP2104) +FC_OPEN_TX = 0x01 +FC_OPEN_RX = 0x02 +FC_CLOSE_TX = 0x04 +FC_CLOSE_RX = 0x08 +# CP2105 standard port +FC_OPEN_TX_SCI = FC_OPEN_TX +FC_OPEN_RX_SCI = FC_OPEN_RX +FC_CLOSE_TX_SCI = FC_CLOSE_TX +FC_CLOSE_RX_SCI = FC_CLOSE_RX +# CP2105 enhanced port +FC_OPEN_TX_ECI = 0x10 +FC_OPEN_RX_ECI = 0x20 +FC_CLOSE_TX_ECI = 0x40 +FC_CLOSE_RX_ECI = 0x80 +# CP2108 per-interface +FC_OPEN_TX_IFC0 = 0x0001 +FC_OPEN_RX_IFC0 = 0x0002 +FC_CLOSE_TX_IFC0 = 0x0004 +FC_CLOSE_RX_IFC0 = 0x0008 +FC_OPEN_TX_IFC1 = 0x0010 +FC_OPEN_RX_IFC1 = 0x0020 +FC_CLOSE_TX_IFC1 = 0x0040 +FC_CLOSE_RX_IFC1 = 0x0080 +FC_OPEN_TX_IFC2 = 0x0100 +FC_OPEN_RX_IFC2 = 0x0200 +FC_CLOSE_TX_IFC2 = 0x0400 +FC_CLOSE_RX_IFC2 = 0x0800 +FC_OPEN_TX_IFC3 = 0x1000 +FC_OPEN_RX_IFC3 = 0x2000 +FC_CLOSE_TX_IFC3 = 0x4000 +FC_CLOSE_RX_IFC3 = 0x8000 + +# Baud rate config +NUM_BAUD_CONFIGS = 32 +BAUD_CONFIG_SIZE = 10 + +# Port config pin flags (CP2103/CP2104) +PORT_RI_ON = 0x0001 +PORT_DCD_ON = 0x0002 +PORT_DTR_ON = 0x0004 +PORT_DSR_ON = 0x0008 +PORT_TXD_ON = 0x0010 +PORT_RXD_ON = 0x0020 +PORT_RTS_ON = 0x0040 +PORT_CTS_ON = 0x0080 +PORT_GPIO_0_ON = 0x0100 +PORT_GPIO_1_ON = 0x0200 +PORT_GPIO_2_ON = 0x0400 +PORT_GPIO_3_ON = 0x0800 +PORT_SUSPEND_ON = 0x4000 +PORT_SUSPEND_BAR_ON = 0x8000 + +# Enhanced function flags (CP2103/CP2104) +EF_GPIO_0_TXLED = 0x01 +EF_GPIO_1_RXLED = 0x02 +EF_GPIO_2_RS485 = 0x04 +EF_RS485_INVERT = 0x08 +EF_WEAKPULLUP = 0x10 +EF_RESERVED_1 = 0x20 +EF_SERIAL_DYNAMIC_SUSPEND = 0x40 +EF_GPIO_DYNAMIC_SUSPEND = 0x80 + +# Flush buffer flag names for human-readable output +FLUSH_FLAGS_CP2104 = { + FC_OPEN_TX: "open_tx", + FC_OPEN_RX: "open_rx", + FC_CLOSE_TX: "close_tx", + FC_CLOSE_RX: "close_rx", +} + +FLUSH_FLAGS_CP2105 = { + FC_OPEN_TX_SCI: "open_tx_sci", + FC_OPEN_RX_SCI: "open_rx_sci", + FC_CLOSE_TX_SCI: "close_tx_sci", + FC_CLOSE_RX_SCI: "close_rx_sci", + FC_OPEN_TX_ECI: "open_tx_eci", + FC_OPEN_RX_ECI: "open_rx_eci", + FC_CLOSE_TX_ECI: "close_tx_eci", + FC_CLOSE_RX_ECI: "close_rx_eci", +} + +ENHANCED_FXN_FLAGS = { + EF_GPIO_0_TXLED: "gpio0_txled", + EF_GPIO_1_RXLED: "gpio1_rxled", + EF_GPIO_2_RS485: "gpio2_rs485", + EF_RS485_INVERT: "rs485_invert", + EF_WEAKPULLUP: "weak_pullup", + EF_SERIAL_DYNAMIC_SUSPEND: "serial_dynamic_suspend", + EF_GPIO_DYNAMIC_SUSPEND: "gpio_dynamic_suspend", +} + +PORT_PIN_FLAGS = { + PORT_RI_ON: "RI", + PORT_DCD_ON: "DCD", + PORT_DTR_ON: "DTR", + PORT_DSR_ON: "DSR", + PORT_TXD_ON: "TXD", + PORT_RXD_ON: "RXD", + PORT_RTS_ON: "RTS", + PORT_CTS_ON: "CTS", + PORT_GPIO_0_ON: "GPIO0", + PORT_GPIO_1_ON: "GPIO1", + PORT_GPIO_2_ON: "GPIO2", + PORT_GPIO_3_ON: "GPIO3", + PORT_SUSPEND_ON: "SUSPEND", + PORT_SUSPEND_BAR_ON: "SUSPEND_BAR", +} + + +def decode_bitmask(value: int, flag_map: dict) -> dict[str, bool]: + """Decode a bitmask into a dict of named flags.""" + return {name: bool(value & bit) for bit, name in flag_map.items()} + + +def encode_bitmask(flags: dict[str, bool], flag_map: dict) -> int: + """Encode a dict of named flags back into a bitmask.""" + name_to_bit = {name: bit for bit, name in flag_map.items()} + value = 0 + for name, enabled in flags.items(): + if name in name_to_bit and enabled: + value |= name_to_bit[name] + return value + + +# --- ctypes Structures --- + +class BAUD_CONFIG(Structure): + _pack_ = 1 + _fields_ = [ + ("BaudGen", WORD), + ("Timer0Reload", WORD), + ("Prescaler", BYTE), + ("_pad", BYTE), + ("BaudRate", DWORD), + ] + + +assert ctypes.sizeof(BAUD_CONFIG) == BAUD_CONFIG_SIZE + +BAUD_CONFIG_DATA = BAUD_CONFIG * NUM_BAUD_CONFIGS + + +class PORT_CONFIG(Structure): + _fields_ = [ + ("Mode", WORD), + ("Reset_Latch", WORD), + ("Suspend_Latch", WORD), + ("EnhancedFxn", BYTE), + ] + + +class DUAL_PORT_CONFIG(Structure): + _fields_ = [ + ("Mode", WORD), + ("Reset_Latch", WORD), + ("Suspend_Latch", WORD), + ("EnhancedFxn_ECI", BYTE), + ("EnhancedFxn_SCI", BYTE), + ("EnhancedFxn_Device", BYTE), + ] + + +class QUAD_PORT_STATE(Structure): + _fields_ = [ + ("Mode_PB0", WORD), + ("Mode_PB1", WORD), + ("Mode_PB2", WORD), + ("Mode_PB3", WORD), + ("Mode_PB4", WORD), + ("LowPower_PB0", WORD), + ("LowPower_PB1", WORD), + ("LowPower_PB2", WORD), + ("LowPower_PB3", WORD), + ("LowPower_PB4", WORD), + ("Latch_PB0", WORD), + ("Latch_PB1", WORD), + ("Latch_PB2", WORD), + ("Latch_PB3", WORD), + ("Latch_PB4", WORD), + ] + + +class QUAD_PORT_CONFIG(Structure): + _fields_ = [ + ("Reset_Latch", QUAD_PORT_STATE), + ("Suspend_Latch", QUAD_PORT_STATE), + ("IPDelay_IFC0", BYTE), + ("IPDelay_IFC1", BYTE), + ("IPDelay_IFC2", BYTE), + ("IPDelay_IFC3", BYTE), + ("EnhancedFxn_IFC0", BYTE), + ("EnhancedFxn_IFC1", BYTE), + ("EnhancedFxn_IFC2", BYTE), + ("EnhancedFxn_IFC3", BYTE), + ("EnhancedFxn_Device", BYTE), + ("ExtClk0Freq", BYTE), + ("ExtClk1Freq", BYTE), + ("ExtClk2Freq", BYTE), + ("ExtClk3Freq", BYTE), + ] + + +class FIRMWARE_T(Structure): + _fields_ = [ + ("major", BYTE), + ("minor", BYTE), + ("build", BYTE), + ] + class CP210xError(Exception): """Exception raised for CP210x library errors.""" @@ -77,16 +297,10 @@ class CP210xError(Exception): class CP210xLibrary: """Wrapper for the CP210x manufacturing library.""" - def __init__(self, lib_path: Optional[str] = None): - """Load the CP210x manufacturing library. - - Args: - lib_path: Path to libcp210xmanufacturing.so, or None to search default paths. - """ + def __init__(self, lib_path: str | None = None): if lib_path: self._lib = ctypes.CDLL(lib_path) else: - # Search common locations search_paths = [ "/usr/lib/libcp210xmanufacturing.so", "/usr/local/lib/libcp210xmanufacturing.so", @@ -105,158 +319,198 @@ class CP210xLibrary: self._setup_functions() def _setup_functions(self): - """Set up function prototypes.""" - # CP210x_GetNumDevices - self._lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)] - self._lib.CP210x_GetNumDevices.restype = CP210x_STATUS + """Set up function prototypes for all library functions.""" + lib = self._lib - # CP210x_GetProductString - self._lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD] - self._lib.CP210x_GetProductString.restype = CP210x_STATUS + # --- Device enumeration --- + lib.CP210x_GetNumDevices.argtypes = [POINTER(DWORD)] + lib.CP210x_GetNumDevices.restype = CP210x_STATUS - # CP210x_Open - self._lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)] - self._lib.CP210x_Open.restype = CP210x_STATUS + lib.CP210x_GetProductString.argtypes = [DWORD, c_void_p, DWORD] + lib.CP210x_GetProductString.restype = CP210x_STATUS - # CP210x_Close - self._lib.CP210x_Close.argtypes = [HANDLE] - self._lib.CP210x_Close.restype = CP210x_STATUS + lib.CP210x_Open.argtypes = [DWORD, POINTER(HANDLE)] + lib.CP210x_Open.restype = CP210x_STATUS - # CP210x_GetPartNumber - self._lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)] - self._lib.CP210x_GetPartNumber.restype = CP210x_STATUS + lib.CP210x_Close.argtypes = [HANDLE] + lib.CP210x_Close.restype = CP210x_STATUS - # CP210x_GetDeviceVid - self._lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)] - self._lib.CP210x_GetDeviceVid.restype = CP210x_STATUS + # --- Getters (scalars) --- + lib.CP210x_GetPartNumber.argtypes = [HANDLE, POINTER(BYTE)] + lib.CP210x_GetPartNumber.restype = CP210x_STATUS - # CP210x_GetDevicePid - self._lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)] - self._lib.CP210x_GetDevicePid.restype = CP210x_STATUS + lib.CP210x_GetDeviceVid.argtypes = [HANDLE, POINTER(WORD)] + lib.CP210x_GetDeviceVid.restype = CP210x_STATUS - # CP210x_GetDeviceManufacturerString - self._lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] - self._lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS + lib.CP210x_GetDevicePid.argtypes = [HANDLE, POINTER(WORD)] + lib.CP210x_GetDevicePid.restype = CP210x_STATUS - # CP210x_GetDeviceProductString - self._lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] - self._lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS + lib.CP210x_GetDeviceManufacturerString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] + lib.CP210x_GetDeviceManufacturerString.restype = CP210x_STATUS - # CP210x_GetDeviceSerialNumber - self._lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] - self._lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS + lib.CP210x_GetDeviceProductString.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] + lib.CP210x_GetDeviceProductString.restype = CP210x_STATUS - # CP210x_SetManufacturerString - self._lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL] - self._lib.CP210x_SetManufacturerString.restype = CP210x_STATUS + lib.CP210x_GetDeviceSerialNumber.argtypes = [HANDLE, c_void_p, POINTER(BYTE), BOOL] + lib.CP210x_GetDeviceSerialNumber.restype = CP210x_STATUS - # CP210x_SetProductString - self._lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL] - self._lib.CP210x_SetProductString.restype = CP210x_STATUS + lib.CP210x_GetDeviceInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, POINTER(BYTE), BOOL] + lib.CP210x_GetDeviceInterfaceString.restype = CP210x_STATUS - # CP210x_SetSerialNumber - self._lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL] - self._lib.CP210x_SetSerialNumber.restype = CP210x_STATUS + lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)] + lib.CP210x_GetMaxPower.restype = CP210x_STATUS - # CP210x_GetMaxPower - self._lib.CP210x_GetMaxPower.argtypes = [HANDLE, POINTER(BYTE)] - self._lib.CP210x_GetMaxPower.restype = CP210x_STATUS + lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)] + lib.CP210x_GetSelfPower.restype = CP210x_STATUS - # CP210x_SetMaxPower - self._lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE] - self._lib.CP210x_SetMaxPower.restype = CP210x_STATUS + lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)] + lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS - # CP210x_GetSelfPower - self._lib.CP210x_GetSelfPower.argtypes = [HANDLE, POINTER(BOOL)] - self._lib.CP210x_GetSelfPower.restype = CP210x_STATUS + lib.CP210x_GetFlushBufferConfig.argtypes = [HANDLE, POINTER(WORD)] + lib.CP210x_GetFlushBufferConfig.restype = CP210x_STATUS - # CP210x_SetSelfPower - self._lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL] - self._lib.CP210x_SetSelfPower.restype = CP210x_STATUS + lib.CP210x_GetDeviceMode.argtypes = [HANDLE, POINTER(BYTE), POINTER(BYTE)] + lib.CP210x_GetDeviceMode.restype = CP210x_STATUS - # CP210x_GetDeviceVersion - self._lib.CP210x_GetDeviceVersion.argtypes = [HANDLE, POINTER(WORD)] - self._lib.CP210x_GetDeviceVersion.restype = CP210x_STATUS + lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)] + lib.CP210x_GetLockValue.restype = CP210x_STATUS - # CP210x_SetDeviceVersion - self._lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD] - self._lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS + lib.CP210x_GetFirmwareVersion.argtypes = [HANDLE, POINTER(FIRMWARE_T)] + lib.CP210x_GetFirmwareVersion.restype = CP210x_STATUS - # CP210x_GetLockValue - self._lib.CP210x_GetLockValue.argtypes = [HANDLE, POINTER(BYTE)] - self._lib.CP210x_GetLockValue.restype = CP210x_STATUS + # --- Setters (scalars) --- + lib.CP210x_SetVid.argtypes = [HANDLE, WORD] + lib.CP210x_SetVid.restype = CP210x_STATUS - # CP210x_SetLockValue - self._lib.CP210x_SetLockValue.argtypes = [HANDLE] - self._lib.CP210x_SetLockValue.restype = CP210x_STATUS + lib.CP210x_SetPid.argtypes = [HANDLE, WORD] + lib.CP210x_SetPid.restype = CP210x_STATUS - # CP210x_Reset - self._lib.CP210x_Reset.argtypes = [HANDLE] - self._lib.CP210x_Reset.restype = CP210x_STATUS + lib.CP210x_SetManufacturerString.argtypes = [HANDLE, c_void_p, BYTE, BOOL] + lib.CP210x_SetManufacturerString.restype = CP210x_STATUS + + lib.CP210x_SetProductString.argtypes = [HANDLE, c_void_p, BYTE, BOOL] + lib.CP210x_SetProductString.restype = CP210x_STATUS + + lib.CP210x_SetSerialNumber.argtypes = [HANDLE, c_void_p, BYTE, BOOL] + lib.CP210x_SetSerialNumber.restype = CP210x_STATUS + + lib.CP210x_SetInterfaceString.argtypes = [HANDLE, BYTE, c_void_p, BYTE, BOOL] + lib.CP210x_SetInterfaceString.restype = CP210x_STATUS + + lib.CP210x_SetMaxPower.argtypes = [HANDLE, BYTE] + lib.CP210x_SetMaxPower.restype = CP210x_STATUS + + lib.CP210x_SetSelfPower.argtypes = [HANDLE, BOOL] + lib.CP210x_SetSelfPower.restype = CP210x_STATUS + + lib.CP210x_SetDeviceVersion.argtypes = [HANDLE, WORD] + lib.CP210x_SetDeviceVersion.restype = CP210x_STATUS + + lib.CP210x_SetFlushBufferConfig.argtypes = [HANDLE, WORD] + lib.CP210x_SetFlushBufferConfig.restype = CP210x_STATUS + + lib.CP210x_SetDeviceMode.argtypes = [HANDLE, BYTE, BYTE] + lib.CP210x_SetDeviceMode.restype = CP210x_STATUS + + lib.CP210x_SetLockValue.argtypes = [HANDLE] + lib.CP210x_SetLockValue.restype = CP210x_STATUS + + # --- Struct-based configs --- + lib.CP210x_GetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)] + lib.CP210x_GetBaudRateConfig.restype = CP210x_STATUS + + lib.CP210x_SetBaudRateConfig.argtypes = [HANDLE, POINTER(BAUD_CONFIG)] + lib.CP210x_SetBaudRateConfig.restype = CP210x_STATUS + + lib.CP210x_GetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)] + lib.CP210x_GetPortConfig.restype = CP210x_STATUS + + lib.CP210x_SetPortConfig.argtypes = [HANDLE, POINTER(PORT_CONFIG)] + lib.CP210x_SetPortConfig.restype = CP210x_STATUS + + lib.CP210x_GetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)] + lib.CP210x_GetDualPortConfig.restype = CP210x_STATUS + + lib.CP210x_SetDualPortConfig.argtypes = [HANDLE, POINTER(DUAL_PORT_CONFIG)] + lib.CP210x_SetDualPortConfig.restype = CP210x_STATUS + + lib.CP210x_GetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)] + lib.CP210x_GetQuadPortConfig.restype = CP210x_STATUS + + lib.CP210x_SetQuadPortConfig.argtypes = [HANDLE, POINTER(QUAD_PORT_CONFIG)] + lib.CP210x_SetQuadPortConfig.restype = CP210x_STATUS + + # --- Advanced / raw --- + lib.CP210x_GetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD] + lib.CP210x_GetConfig.restype = CP210x_STATUS + + lib.CP210x_SetConfig.argtypes = [HANDLE, POINTER(BYTE), WORD] + lib.CP210x_SetConfig.restype = CP210x_STATUS + + lib.CP210x_GetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD] + lib.CP210x_GetGeneric.restype = CP210x_STATUS + + lib.CP210x_SetGeneric.argtypes = [HANDLE, POINTER(BYTE), WORD] + lib.CP210x_SetGeneric.restype = CP210x_STATUS + + lib.CP210x_CreateHexFile.argtypes = [HANDLE, c_char_p] + lib.CP210x_CreateHexFile.restype = CP210x_STATUS + + lib.CP210x_UpdateFirmware.argtypes = [HANDLE] + lib.CP210x_UpdateFirmware.restype = CP210x_STATUS + + lib.CP210x_Reset.argtypes = [HANDLE] + lib.CP210x_Reset.restype = CP210x_STATUS def _check_status(self, status: int, context: str = ""): - """Raise exception if status indicates error.""" if status != CP210x_SUCCESS: raise CP210xError(status, context) + # --- Device enumeration --- + def get_num_devices(self) -> int: - """Get the number of connected CP210x devices.""" num = DWORD() status = self._lib.CP210x_GetNumDevices(byref(num)) self._check_status(status, "GetNumDevices") return num.value def get_product_string(self, device_index: int, flag: int = RETURN_DESCRIPTION) -> str: - """Get device string without opening the device. - - Args: - device_index: Zero-based device index - flag: What to return - RETURN_SERIAL_NUMBER, RETURN_DESCRIPTION, or RETURN_FULL_PATH - """ buf = ctypes.create_string_buffer(MAX_DEVICE_STRLEN) status = self._lib.CP210x_GetProductString(DWORD(device_index), buf, DWORD(flag)) self._check_status(status, "GetProductString") return buf.value.decode('utf-8', errors='replace') def open(self, device_index: int) -> HANDLE: - """Open a device by index.""" handle = HANDLE() status = self._lib.CP210x_Open(DWORD(device_index), byref(handle)) self._check_status(status, f"Open device {device_index}") return handle def close(self, handle: HANDLE): - """Close an open device.""" status = self._lib.CP210x_Close(handle) self._check_status(status, "Close") - def get_part_number(self, handle: HANDLE) -> tuple[int, str]: - """Get the part number of an open device. + # --- Scalar getters --- - Returns: - Tuple of (part_number_code, part_name) - """ + def get_part_number(self, handle: HANDLE) -> tuple[int, str]: part = BYTE() status = self._lib.CP210x_GetPartNumber(handle, byref(part)) self._check_status(status, "GetPartNumber") return part.value, PART_NUMBERS.get(part.value, f"Unknown (0x{part.value:02X})") def get_device_vid(self, handle: HANDLE) -> int: - """Get the USB Vendor ID.""" vid = WORD() status = self._lib.CP210x_GetDeviceVid(handle, byref(vid)) self._check_status(status, "GetDeviceVid") return vid.value def get_device_pid(self, handle: HANDLE) -> int: - """Get the USB Product ID.""" pid = WORD() status = self._lib.CP210x_GetDevicePid(handle, byref(pid)) self._check_status(status, "GetDevicePid") return pid.value def get_manufacturer_string(self, handle: HANDLE) -> str: - """Get the manufacturer string.""" buf = ctypes.create_string_buffer(MAX_MANUFACTURER_STRLEN) length = BYTE() status = self._lib.CP210x_GetDeviceManufacturerString(handle, buf, byref(length), BOOL(1)) @@ -264,7 +518,6 @@ class CP210xLibrary: return buf.value.decode('utf-8', errors='replace') def get_product_string_device(self, handle: HANDLE) -> str: - """Get the product string from an open device.""" buf = ctypes.create_string_buffer(MAX_PRODUCT_STRLEN) length = BYTE() status = self._lib.CP210x_GetDeviceProductString(handle, buf, byref(length), BOOL(1)) @@ -272,88 +525,282 @@ class CP210xLibrary: return buf.value.decode('utf-8', errors='replace') def get_serial_number(self, handle: HANDLE) -> str: - """Get the serial number.""" buf = ctypes.create_string_buffer(MAX_SERIAL_STRLEN) length = BYTE() status = self._lib.CP210x_GetDeviceSerialNumber(handle, buf, byref(length), BOOL(1)) self._check_status(status, "GetDeviceSerialNumber") return buf.value.decode('utf-8', errors='replace') - def set_manufacturer_string(self, handle: HANDLE, manufacturer: str): - """Set the manufacturer string (max 45 chars).""" - data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN] - status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1)) - self._check_status(status, "SetManufacturerString") - - def set_product_string(self, handle: HANDLE, product: str): - """Set the product string (max 126 chars).""" - data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN] - status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1)) - self._check_status(status, "SetProductString") - - def set_serial_number(self, handle: HANDLE, serial: str): - """Set the serial number (max 63 chars).""" - data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN] - status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1)) - self._check_status(status, "SetSerialNumber") + def get_interface_string(self, handle: HANDLE, interface_number: int) -> str: + buf = ctypes.create_string_buffer(CP2108_MAX_INTERFACE_STRLEN) + length = BYTE() + status = self._lib.CP210x_GetDeviceInterfaceString( + handle, BYTE(interface_number), buf, byref(length), BOOL(1) + ) + self._check_status(status, f"GetDeviceInterfaceString(ifc={interface_number})") + return buf.value.decode('utf-8', errors='replace') def get_max_power(self, handle: HANDLE) -> int: - """Get max power in units of 2mA (multiply by 2 for mA).""" power = BYTE() status = self._lib.CP210x_GetMaxPower(handle, byref(power)) self._check_status(status, "GetMaxPower") return power.value - def set_max_power(self, handle: HANDLE, power_2ma: int): - """Set max power in units of 2mA (e.g., 50 = 100mA).""" - if power_2ma > 250: - raise ValueError("Max power cannot exceed 250 (500mA)") - status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma)) - self._check_status(status, "SetMaxPower") - def get_self_power(self, handle: HANDLE) -> bool: - """Check if device is self-powered.""" self_power = BOOL() status = self._lib.CP210x_GetSelfPower(handle, byref(self_power)) self._check_status(status, "GetSelfPower") return bool(self_power.value) - def set_self_power(self, handle: HANDLE, self_powered: bool): - """Set whether device is self-powered.""" - status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0)) - self._check_status(status, "SetSelfPower") - def get_device_version(self, handle: HANDLE) -> int: - """Get device version (bcdDevice).""" version = WORD() status = self._lib.CP210x_GetDeviceVersion(handle, byref(version)) self._check_status(status, "GetDeviceVersion") return version.value - def set_device_version(self, handle: HANDLE, version: int): - """Set device version (bcdDevice).""" - status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version)) - self._check_status(status, "SetDeviceVersion") + def get_flush_buffer_config(self, handle: HANDLE) -> int: + config = WORD() + status = self._lib.CP210x_GetFlushBufferConfig(handle, byref(config)) + self._check_status(status, "GetFlushBufferConfig") + return config.value + + def get_device_mode(self, handle: HANDLE) -> tuple[int, int]: + eci = BYTE() + sci = BYTE() + status = self._lib.CP210x_GetDeviceMode(handle, byref(eci), byref(sci)) + self._check_status(status, "GetDeviceMode") + return eci.value, sci.value def get_lock_value(self, handle: HANDLE) -> int: - """Get lock value (0 = unlocked, 1-255 = locked).""" lock = BYTE() status = self._lib.CP210x_GetLockValue(handle, byref(lock)) self._check_status(status, "GetLockValue") return lock.value + def get_firmware_version(self, handle: HANDLE) -> tuple[int, int, int]: + fw = FIRMWARE_T() + status = self._lib.CP210x_GetFirmwareVersion(handle, byref(fw)) + self._check_status(status, "GetFirmwareVersion") + return fw.major, fw.minor, fw.build + + # --- Scalar setters --- + + def set_vid(self, handle: HANDLE, vid: int): + status = self._lib.CP210x_SetVid(handle, WORD(vid)) + self._check_status(status, "SetVid") + + def set_pid(self, handle: HANDLE, pid: int): + status = self._lib.CP210x_SetPid(handle, WORD(pid)) + self._check_status(status, "SetPid") + + def set_manufacturer_string(self, handle: HANDLE, manufacturer: str): + data = manufacturer.encode('utf-8')[:MAX_MANUFACTURER_STRLEN] + status = self._lib.CP210x_SetManufacturerString(handle, data, BYTE(len(data)), BOOL(1)) + self._check_status(status, "SetManufacturerString") + + def set_product_string(self, handle: HANDLE, product: str): + data = product.encode('utf-8')[:MAX_PRODUCT_STRLEN] + status = self._lib.CP210x_SetProductString(handle, data, BYTE(len(data)), BOOL(1)) + self._check_status(status, "SetProductString") + + def set_serial_number(self, handle: HANDLE, serial: str): + data = serial.encode('utf-8')[:MAX_SERIAL_STRLEN] + status = self._lib.CP210x_SetSerialNumber(handle, data, BYTE(len(data)), BOOL(1)) + self._check_status(status, "SetSerialNumber") + + def set_interface_string(self, handle: HANDLE, interface_number: int, value: str): + data = value.encode('utf-8')[:CP2108_MAX_INTERFACE_STRLEN] + status = self._lib.CP210x_SetInterfaceString( + handle, BYTE(interface_number), data, BYTE(len(data)), BOOL(1) + ) + self._check_status(status, f"SetInterfaceString(ifc={interface_number})") + + def set_max_power(self, handle: HANDLE, power_2ma: int): + if power_2ma > 250: + raise ValueError("Max power cannot exceed 250 (500mA)") + status = self._lib.CP210x_SetMaxPower(handle, BYTE(power_2ma)) + self._check_status(status, "SetMaxPower") + + def set_self_power(self, handle: HANDLE, self_powered: bool): + status = self._lib.CP210x_SetSelfPower(handle, BOOL(1 if self_powered else 0)) + self._check_status(status, "SetSelfPower") + + def set_device_version(self, handle: HANDLE, version: int): + status = self._lib.CP210x_SetDeviceVersion(handle, WORD(version)) + self._check_status(status, "SetDeviceVersion") + + def set_flush_buffer_config(self, handle: HANDLE, config: int): + status = self._lib.CP210x_SetFlushBufferConfig(handle, WORD(config)) + self._check_status(status, "SetFlushBufferConfig") + + def set_device_mode(self, handle: HANDLE, eci_mode: int, sci_mode: int): + status = self._lib.CP210x_SetDeviceMode(handle, BYTE(eci_mode), BYTE(sci_mode)) + self._check_status(status, "SetDeviceMode") + def lock_device(self, handle: HANDLE): - """Lock the device (PERMANENT - cannot be undone!).""" status = self._lib.CP210x_SetLockValue(handle) self._check_status(status, "SetLockValue") + # --- Struct-based configs --- + + def get_baud_rate_config(self, handle: HANDLE) -> list[dict]: + data = BAUD_CONFIG_DATA() + status = self._lib.CP210x_GetBaudRateConfig(handle, data) + self._check_status(status, "GetBaudRateConfig") + return [ + { + "index": i, + "baud_gen": data[i].BaudGen, + "timer0_reload": data[i].Timer0Reload, + "prescaler": data[i].Prescaler, + "baud_rate": data[i].BaudRate, + } + for i in range(NUM_BAUD_CONFIGS) + ] + + def set_baud_rate_config(self, handle: HANDLE, configs: list[dict]): + if len(configs) != NUM_BAUD_CONFIGS: + raise ValueError(f"Expected {NUM_BAUD_CONFIGS} baud configs, got {len(configs)}") + data = BAUD_CONFIG_DATA() + for i, cfg in enumerate(configs): + data[i].BaudGen = cfg["baud_gen"] + data[i].Timer0Reload = cfg["timer0_reload"] + data[i].Prescaler = cfg["prescaler"] + data[i]._pad = 0 + data[i].BaudRate = cfg["baud_rate"] + status = self._lib.CP210x_SetBaudRateConfig(handle, data) + self._check_status(status, "SetBaudRateConfig") + + def get_port_config(self, handle: HANDLE) -> dict: + cfg = PORT_CONFIG() + status = self._lib.CP210x_GetPortConfig(handle, byref(cfg)) + self._check_status(status, "GetPortConfig") + return { + "mode": cfg.Mode, + "reset_latch": cfg.Reset_Latch, + "suspend_latch": cfg.Suspend_Latch, + "enhanced_fxn": cfg.EnhancedFxn, + } + + def set_port_config(self, handle: HANDLE, mode: int, reset_latch: int, suspend_latch: int, enhanced_fxn: int): + cfg = PORT_CONFIG() + cfg.Mode = mode + cfg.Reset_Latch = reset_latch + cfg.Suspend_Latch = suspend_latch + cfg.EnhancedFxn = enhanced_fxn + status = self._lib.CP210x_SetPortConfig(handle, byref(cfg)) + self._check_status(status, "SetPortConfig") + + def get_dual_port_config(self, handle: HANDLE) -> dict: + cfg = DUAL_PORT_CONFIG() + status = self._lib.CP210x_GetDualPortConfig(handle, byref(cfg)) + self._check_status(status, "GetDualPortConfig") + return { + "mode": cfg.Mode, + "reset_latch": cfg.Reset_Latch, + "suspend_latch": cfg.Suspend_Latch, + "enhanced_fxn_eci": cfg.EnhancedFxn_ECI, + "enhanced_fxn_sci": cfg.EnhancedFxn_SCI, + "enhanced_fxn_device": cfg.EnhancedFxn_Device, + } + + def set_dual_port_config(self, handle: HANDLE, mode: int, reset_latch: int, + suspend_latch: int, enhanced_fxn_eci: int, + enhanced_fxn_sci: int, enhanced_fxn_device: int): + cfg = DUAL_PORT_CONFIG() + cfg.Mode = mode + cfg.Reset_Latch = reset_latch + cfg.Suspend_Latch = suspend_latch + cfg.EnhancedFxn_ECI = enhanced_fxn_eci + cfg.EnhancedFxn_SCI = enhanced_fxn_sci + cfg.EnhancedFxn_Device = enhanced_fxn_device + status = self._lib.CP210x_SetDualPortConfig(handle, byref(cfg)) + self._check_status(status, "SetDualPortConfig") + + def _quad_port_state_to_dict(self, qps: QUAD_PORT_STATE) -> dict: + return { + f"pb{i}": {"mode": getattr(qps, f"Mode_PB{i}"), + "low_power": getattr(qps, f"LowPower_PB{i}"), + "latch": getattr(qps, f"Latch_PB{i}")} + for i in range(5) + } + + def get_quad_port_config(self, handle: HANDLE) -> dict: + cfg = QUAD_PORT_CONFIG() + status = self._lib.CP210x_GetQuadPortConfig(handle, byref(cfg)) + self._check_status(status, "GetQuadPortConfig") + return { + "reset_latch": self._quad_port_state_to_dict(cfg.Reset_Latch), + "suspend_latch": self._quad_port_state_to_dict(cfg.Suspend_Latch), + "ip_delay": [cfg.IPDelay_IFC0, cfg.IPDelay_IFC1, cfg.IPDelay_IFC2, cfg.IPDelay_IFC3], + "enhanced_fxn": [cfg.EnhancedFxn_IFC0, cfg.EnhancedFxn_IFC1, + cfg.EnhancedFxn_IFC2, cfg.EnhancedFxn_IFC3], + "enhanced_fxn_device": cfg.EnhancedFxn_Device, + "ext_clk_freq": [cfg.ExtClk0Freq, cfg.ExtClk1Freq, + cfg.ExtClk2Freq, cfg.ExtClk3Freq], + } + + def set_quad_port_config(self, handle: HANDLE, config_dict: dict): + cfg = QUAD_PORT_CONFIG() + + for state_name in ("reset_latch", "suspend_latch"): + state = getattr(cfg, state_name.title().replace("_l", "_L")) + src = config_dict[state_name] + for i in range(5): + pb = src[f"pb{i}"] + setattr(state, f"Mode_PB{i}", pb["mode"]) + setattr(state, f"LowPower_PB{i}", pb["low_power"]) + setattr(state, f"Latch_PB{i}", pb["latch"]) + + for i in range(4): + setattr(cfg, f"IPDelay_IFC{i}", config_dict["ip_delay"][i]) + setattr(cfg, f"EnhancedFxn_IFC{i}", config_dict["enhanced_fxn"][i]) + + cfg.EnhancedFxn_Device = config_dict["enhanced_fxn_device"] + + for i in range(4): + setattr(cfg, f"ExtClk{i}Freq", config_dict["ext_clk_freq"][i]) + + status = self._lib.CP210x_SetQuadPortConfig(handle, byref(cfg)) + self._check_status(status, "SetQuadPortConfig") + + # --- Advanced / raw --- + + def get_config(self, handle: HANDLE, size: int = 512) -> bytes: + buf = (BYTE * size)() + status = self._lib.CP210x_GetConfig(handle, buf, WORD(size)) + self._check_status(status, "GetConfig") + return bytes(buf) + + def set_config(self, handle: HANDLE, data: bytes): + buf = (BYTE * len(data))(*data) + status = self._lib.CP210x_SetConfig(handle, buf, WORD(len(data))) + self._check_status(status, "SetConfig") + + def get_generic(self, handle: HANDLE, size: int = 512) -> bytes: + buf = (BYTE * size)() + status = self._lib.CP210x_GetGeneric(handle, buf, WORD(size)) + self._check_status(status, "GetGeneric") + return bytes(buf) + + def set_generic(self, handle: HANDLE, data: bytes): + buf = (BYTE * len(data))(*data) + status = self._lib.CP210x_SetGeneric(handle, buf, WORD(len(data))) + self._check_status(status, "SetGeneric") + + def create_hex_file(self, handle: HANDLE, filename: str): + status = self._lib.CP210x_CreateHexFile(handle, filename.encode('utf-8')) + self._check_status(status, "CreateHexFile") + + def update_firmware(self, handle: HANDLE): + status = self._lib.CP210x_UpdateFirmware(handle) + self._check_status(status, "UpdateFirmware") + def reset(self, handle: HANDLE): - """Reset the device (USB disconnect/reconnect).""" status = self._lib.CP210x_Reset(handle) self._check_status(status, "Reset") -# Convenience context manager for device access class CP210xDevice: """Context manager for safe device access.""" @@ -361,6 +808,7 @@ class CP210xDevice: self.lib = lib self.device_index = device_index self.handle = None + self._part_code: int | None = None def __enter__(self): self.handle = self.lib.open(self.device_index) @@ -372,6 +820,21 @@ class CP210xDevice: self.handle = None return False + def _get_part_code(self) -> int: + if self._part_code is None: + self._part_code = self.lib.get_part_number(self.handle)[0] + return self._part_code + + def _require_part(self, allowed: set[int], feature: str): + code = self._get_part_code() + if code not in allowed: + name = PART_NUMBERS.get(code, f"0x{code:02X}") + allowed_names = ", ".join(PART_NUMBERS.get(c, f"0x{c:02X}") for c in sorted(allowed)) + raise CP210xError( + CP210x_FUNCTION_NOT_SUPPORTED, + f"{feature} not supported on {name} (requires {allowed_names})", + ) + @property def part_number(self) -> tuple[int, str]: return self.lib.get_part_number(self.handle) @@ -410,12 +873,10 @@ class CP210xDevice: @property def max_power_ma(self) -> int: - """Max power in milliamps.""" return self.lib.get_max_power(self.handle) * 2 @max_power_ma.setter def max_power_ma(self, value: int): - """Set max power in milliamps (will be rounded down to nearest 2mA).""" self.lib.set_max_power(self.handle, value // 2) @property @@ -428,14 +889,49 @@ class CP210xDevice: @property def device_version(self) -> str: - """Device version as BCD string (e.g., '1.00').""" v = self.lib.get_device_version(self.handle) return f"{(v >> 8) & 0xFF}.{v & 0xFF:02d}" + @device_version.setter + def device_version(self, value: int): + self.lib.set_device_version(self.handle, value) + @property def is_locked(self) -> bool: return self.lib.get_lock_value(self.handle) != 0 + @property + def firmware_version(self) -> str | None: + """Firmware version string (CP2102N only). Returns None if not supported.""" + try: + major, minor, build = self.lib.get_firmware_version(self.handle) + return f"{major}.{minor}.{build}" + except CP210xError: + return None + + @property + def flush_buffer_config(self) -> int | None: + """Raw flush buffer config word. Returns None if not supported.""" + try: + return self.lib.get_flush_buffer_config(self.handle) + except CP210xError: + return None + + @property + def device_mode(self) -> tuple[int, int] | None: + """(ECI mode, SCI mode) for CP2105. Returns None if not supported.""" + try: + return self.lib.get_device_mode(self.handle) + except CP210xError: + return None + + def get_interface_string(self, interface_number: int) -> str: + self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings") + return self.lib.get_interface_string(self.handle, interface_number) + + def set_interface_string(self, interface_number: int, value: str): + self._require_part(PARTS_WITH_INTERFACE_STRING, "Interface strings") + self.lib.set_interface_string(self.handle, interface_number, value) + def reset(self): - """Reset the device.""" self.lib.reset(self.handle) diff --git a/src/mcp210x_uart/server.py b/src/mcp210x_uart/server.py index 425f966..ef2941b 100644 --- a/src/mcp210x_uart/server.py +++ b/src/mcp210x_uart/server.py @@ -1,24 +1,37 @@ """FastMCP server for CP210x device customization.""" import subprocess -from typing import Optional -from fastmcp import FastMCP, Context +from fastmcp import Context, FastMCP from fastmcp.server.elicitation import AcceptedElicitation -from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS +from .bindings import ( + ENHANCED_FXN_FLAGS, + FLUSH_FLAGS_CP2104, + FLUSH_FLAGS_CP2105, + PARTS_CP2102N, + PARTS_WITH_DEVICE_MODE, + PARTS_WITH_DUAL_PORT_CONFIG, + PARTS_WITH_FLUSH_CONFIG, + PARTS_WITH_PORT_CONFIG, + PARTS_WITH_QUAD_PORT_CONFIG, + PORT_PIN_FLAGS, + CP210xDevice, + CP210xError, + CP210xLibrary, + decode_bitmask, + encode_bitmask, +) mcp = FastMCP( "cp210x", instructions="CP210x USB-UART bridge customization - read/write product strings, serial numbers, and device configuration", ) -# Lazy-loaded library instance -_lib: Optional[CP210xLibrary] = None +_lib: CP210xLibrary | None = None def get_lib() -> CP210xLibrary: - """Get or create the library instance.""" global _lib if _lib is None: _lib = CP210xLibrary() @@ -26,19 +39,39 @@ def get_lib() -> CP210xLibrary: async def confirm_write(ctx: Context, message: str) -> bool: - """Ask user to confirm a write operation via elicitation. - - Falls back to proceeding without confirmation if the client - doesn't support elicitation. - """ + """Normal-tier confirmation: falls back to proceeding if elicitation unavailable.""" try: result = await ctx.elicit(message, ["Confirm", "Cancel"]) return isinstance(result, AcceptedElicitation) and result.data == "Confirm" except Exception: - # Client doesn't support elicitation — proceed as usual return True +async def strict_confirm(ctx: Context, message: str) -> dict | None: + """Strict-tier confirmation: returns error dict if elicitation unavailable.""" + if not ctx: + return { + "error": "This operation requires interactive confirmation", + "message": "Too dangerous to proceed without explicit user consent.", + } + try: + result = await ctx.elicit(message, ["Confirm", "Cancel"]) + confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm" + except Exception: + return { + "error": "This operation requires elicitation support", + "message": "Your MCP client does not support elicitation. " + "This operation is too dangerous to proceed without explicit confirmation.", + } + if not confirmed: + return {"cancelled": True, "message": "Operation cancelled by user"} + return None + + +# =========================================================================== +# Read-only tools (no confirmation needed) +# =========================================================================== + @mcp.tool() def list_devices() -> list[dict]: """List all connected CP210x devices. @@ -52,18 +85,15 @@ def list_devices() -> list[dict]: devices = [] for i in range(num_devices): try: - desc = lib.get_product_string(i, flag=1) # RETURN_DESCRIPTION - serial = lib.get_product_string(i, flag=0) # RETURN_SERIAL_NUMBER + desc = lib.get_product_string(i, flag=1) + serial = lib.get_product_string(i, flag=0) devices.append({ "index": i, "description": desc, "serial_number": serial, }) except CP210xError as e: - devices.append({ - "index": i, - "error": str(e), - }) + devices.append({"index": i, "error": str(e)}) return devices @@ -81,7 +111,7 @@ def get_device_info(device_index: int = 0) -> dict: with CP210xDevice(lib, device_index) as dev: part_code, part_name = dev.part_number - return { + info = { "part_number": part_name, "part_code": f"0x{part_code:02X}", "vid": f"0x{dev.vid:04X}", @@ -95,6 +125,190 @@ def get_device_info(device_index: int = 0) -> dict: "is_locked": dev.is_locked, } + # Conditionally add part-specific fields + fw = dev.firmware_version + if fw is not None: + info["firmware_version"] = fw + + flush = dev.flush_buffer_config + if flush is not None: + # Decode to named flags based on part + if part_code in PARTS_WITH_FLUSH_CONFIG: + flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104 + info["flush_buffer_config"] = decode_bitmask(flush, flag_map) + else: + info["flush_buffer_config_raw"] = f"0x{flush:04X}" + + mode = dev.device_mode + if mode is not None: + info["device_mode"] = {"eci": mode[0], "sci": mode[1]} + + return info + + +@mcp.tool() +def get_firmware_version(device_index: int = 0) -> dict: + """Get firmware version (CP2102N only). + + Args: + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + fw = dev.firmware_version + if fw is None: + part_code, part_name = dev.part_number + return {"error": f"Firmware version not available on {part_name}", + "note": "Only CP2102N variants support this"} + return {"firmware_version": fw} + + +@mcp.tool() +def get_flush_buffer_config(device_index: int = 0) -> dict: + """Get flush buffer configuration (CP2104/CP2105/CP2108). + + Args: + device_index: Zero-based device index (default: 0) + + Returns named flags showing which buffers are flushed on open/close. + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + part_code, part_name = dev.part_number + if part_code not in PARTS_WITH_FLUSH_CONFIG: + return {"error": f"Flush buffer config not available on {part_name}", + "supported": "CP2104, CP2105, CP2108"} + + raw = lib.get_flush_buffer_config(dev.handle) + flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104 + return { + "raw": f"0x{raw:04X}", + "flags": decode_bitmask(raw, flag_map), + } + + +@mcp.tool() +def get_device_mode(device_index: int = 0) -> dict: + """Get device mode (CP2105 only). + + Args: + device_index: Zero-based device index (default: 0) + + Returns ECI and SCI interface modes. + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + part_code, part_name = dev.part_number + if part_code not in PARTS_WITH_DEVICE_MODE: + return {"error": f"Device mode not available on {part_name}", + "supported": "CP2105"} + + eci, sci = lib.get_device_mode(dev.handle) + return {"eci_mode": eci, "sci_mode": sci} + + +@mcp.tool() +def get_interface_string(interface_number: int, device_index: int = 0) -> dict: + """Get USB interface string (CP2105/CP2108 only). + + Args: + interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108) + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + try: + value = dev.get_interface_string(interface_number) + return {"interface": interface_number, "value": value} + except CP210xError as e: + return {"error": str(e)} + + +@mcp.tool() +def get_baud_rate_config(device_index: int = 0) -> dict: + """Get the baud rate alias configuration table (32 entries). + + Args: + device_index: Zero-based device index (default: 0) + + Returns the full 32-entry baud rate alias table showing how standard + baud rates map to timer register values. + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + configs = lib.get_baud_rate_config(dev.handle) + return {"entries": configs} + + +@mcp.tool() +def get_port_config(device_index: int = 0) -> dict: + """Get GPIO port configuration (auto-detects CP2103/4/5/8). + + Args: + device_index: Zero-based device index (default: 0) + + Returns pin modes, latch values, and enhanced function settings. + Auto-dispatches to the correct struct based on part number. + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + part_code, part_name = dev.part_number + + if part_code in PARTS_WITH_PORT_CONFIG: + raw = lib.get_port_config(dev.handle) + return { + "type": "single", + "part": part_name, + "mode": decode_bitmask(raw["mode"], PORT_PIN_FLAGS), + "reset_latch": decode_bitmask(raw["reset_latch"], PORT_PIN_FLAGS), + "suspend_latch": decode_bitmask(raw["suspend_latch"], PORT_PIN_FLAGS), + "enhanced_fxn": decode_bitmask(raw["enhanced_fxn"], ENHANCED_FXN_FLAGS), + "raw": raw, + } + elif part_code in PARTS_WITH_DUAL_PORT_CONFIG: + raw = lib.get_dual_port_config(dev.handle) + return {"type": "dual", "part": part_name, **raw} + elif part_code in PARTS_WITH_QUAD_PORT_CONFIG: + raw = lib.get_quad_port_config(dev.handle) + return {"type": "quad", "part": part_name, **raw} + else: + return {"error": f"Port config not available on {part_name}", + "supported": "CP2103, CP2104, CP2105, CP2108"} + + +@mcp.tool() +def get_raw_config(device_index: int = 0, size: int = 512) -> dict: + """Read raw EPROM configuration blob. + + Args: + device_index: Zero-based device index (default: 0) + size: Number of bytes to read (default: 512) + + Returns hex-encoded config data. + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + data = lib.get_config(dev.handle, size) + return {"hex": data.hex(), "size": len(data)} + + +@mcp.tool() +def create_hex_file(filename: str, device_index: int = 0) -> dict: + """Dump device config to Intel HEX file. + + Args: + filename: Output file path for the .hex file + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + lib.create_hex_file(dev.handle, filename) + return {"success": True, "filename": filename} + + +# =========================================================================== +# Normal-tier write tools (elicit → fallback to proceed) +# =========================================================================== @mcp.tool() async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict: @@ -108,31 +322,21 @@ async def set_product_string(product: str, device_index: int = 0, ctx: Context = changes to appear on the USB host. """ lib = get_lib() - with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is locked and cannot be modified"} - old_product = dev.product new_value = product[:126] - if ctx and not await confirm_write( ctx, f"Write product string to CP210x OTP EPROM?\n\n" - f" Old: {old_product}\n" - f" New: {new_value}\n\n" + f" Old: {old_product}\n New: {new_value}\n\n" f"OTP writes are limited — this cannot be undone easily.", ): return {"cancelled": True, "message": "Write cancelled by user"} - dev.product = product - - return { - "success": True, - "old_value": old_product, - "new_value": new_value, - "note": "Re-plug device for changes to take effect on USB host", - } + return {"success": True, "old_value": old_product, "new_value": new_value, + "note": "Re-plug device for changes to take effect on USB host"} @mcp.tool() @@ -144,30 +348,20 @@ async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: device_index: Zero-based device index (default: 0) """ lib = get_lib() - with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is locked and cannot be modified"} - old_mfr = dev.manufacturer new_value = manufacturer[:45] - if ctx and not await confirm_write( ctx, f"Write manufacturer string to CP210x OTP EPROM?\n\n" - f" Old: {old_mfr}\n" - f" New: {new_value}", + f" Old: {old_mfr}\n New: {new_value}", ): return {"cancelled": True, "message": "Write cancelled by user"} - dev.manufacturer = manufacturer - - return { - "success": True, - "old_value": old_mfr, - "new_value": new_value, - "note": "Re-plug device for changes to take effect on USB host", - } + return {"success": True, "old_value": old_mfr, "new_value": new_value, + "note": "Re-plug device for changes to take effect on USB host"} @mcp.tool() @@ -181,30 +375,20 @@ async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Cont Note: Changing serial number may affect udev rules that match on serial. """ lib = get_lib() - with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is locked and cannot be modified"} - old_serial = dev.serial_number new_value = serial_number[:63] - if ctx and not await confirm_write( ctx, f"Write serial number to CP210x OTP EPROM?\n\n" - f" Old: {old_serial}\n" - f" New: {new_value}", + f" Old: {old_serial}\n New: {new_value}", ): return {"cancelled": True, "message": "Write cancelled by user"} - dev.serial_number = serial_number - - return { - "success": True, - "old_value": old_serial, - "new_value": new_value, - "note": "Re-plug device for changes to take effect on USB host", - } + return {"success": True, "old_value": old_serial, "new_value": new_value, + "note": "Re-plug device for changes to take effect on USB host"} @mcp.tool() @@ -216,32 +400,21 @@ async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = device_index: Zero-based device index (default: 0) """ lib = get_lib() - if max_power_ma < 0 or max_power_ma > 500: return {"error": "max_power_ma must be 0-500"} - with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is locked and cannot be modified"} - old_power = dev.max_power_ma actual_value = (max_power_ma // 2) * 2 - if ctx and not await confirm_write( ctx, f"Write max power to CP210x OTP EPROM?\n\n" - f" Old: {old_power} mA\n" - f" New: {actual_value} mA", + f" Old: {old_power} mA\n New: {actual_value} mA", ): return {"cancelled": True, "message": "Write cancelled by user"} - dev.max_power_ma = max_power_ma - - return { - "success": True, - "old_value_ma": old_power, - "new_value_ma": actual_value, - } + return {"success": True, "old_value_ma": old_power, "new_value_ma": actual_value} @mcp.tool() @@ -253,29 +426,267 @@ async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Conte device_index: Zero-based device index (default: 0) """ lib = get_lib() - with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is locked and cannot be modified"} - old_value = dev.self_powered mode = "self-powered" if self_powered else "bus-powered" - if ctx and not await confirm_write( ctx, f"Write power mode to CP210x OTP EPROM?\n\n" - f" Old: {'self-powered' if old_value else 'bus-powered'}\n" - f" New: {mode}", + f" Old: {'self-powered' if old_value else 'bus-powered'}\n New: {mode}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.self_powered = self_powered + return {"success": True, "old_value": old_value, "new_value": self_powered} + + +@mcp.tool() +async def set_device_version(version: int, device_index: int = 0, ctx: Context = None) -> dict: + """Set device version (bcdDevice field). + + Args: + version: BCD version number (e.g., 0x0100 for 1.00) + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + old_version = dev.device_version + if ctx and not await confirm_write( + ctx, + f"Write device version to CP210x OTP EPROM?\n\n" + f" Old: {old_version}\n New: 0x{version:04X}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.device_version = version + new_ver = f"{(version >> 8) & 0xFF}.{version & 0xFF:02d}" + return {"success": True, "old_value": old_version, "new_value": new_ver} + + +@mcp.tool() +async def set_interface_string( + interface_number: int, value: str, device_index: int = 0, ctx: Context = None, +) -> dict: + """Set USB interface string (CP2105/CP2108 only). + + Args: + interface_number: Interface index (0 or 1 for CP2105, 0-3 for CP2108) + value: New interface string + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + try: + old_value = dev.get_interface_string(interface_number) + except CP210xError as e: + return {"error": str(e)} + if ctx and not await confirm_write( + ctx, + f"Write interface {interface_number} string to CP210x OTP EPROM?\n\n" + f" Old: {old_value}\n New: {value}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.set_interface_string(interface_number, value) + return {"success": True, "interface": interface_number, + "old_value": old_value, "new_value": value} + + +@mcp.tool() +async def set_flush_buffer_config( + flags: dict, device_index: int = 0, ctx: Context = None, +) -> dict: + """Set flush buffer configuration (CP2104/CP2105/CP2108). + + Args: + flags: Dict of flag_name -> bool. Flag names depend on part: + CP2104: open_tx, open_rx, close_tx, close_rx + CP2105: open_tx_sci, open_rx_sci, close_tx_sci, close_rx_sci, + open_tx_eci, open_rx_eci, close_tx_eci, close_rx_eci + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + part_code, part_name = dev.part_number + if part_code not in PARTS_WITH_FLUSH_CONFIG: + return {"error": f"Flush buffer config not available on {part_name}"} + flag_map = FLUSH_FLAGS_CP2105 if part_code == 0x05 else FLUSH_FLAGS_CP2104 + new_value = encode_bitmask(flags, flag_map) + old_raw = lib.get_flush_buffer_config(dev.handle) + if ctx and not await confirm_write( + ctx, + f"Write flush buffer config to CP210x?\n\n" + f" Old: 0x{old_raw:04X}\n New: 0x{new_value:04X}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + lib.set_flush_buffer_config(dev.handle, new_value) + return {"success": True, "old_raw": f"0x{old_raw:04X}", + "new_raw": f"0x{new_value:04X}", "flags": flags} + + +@mcp.tool() +async def set_device_mode( + eci_mode: int, sci_mode: int, device_index: int = 0, ctx: Context = None, +) -> dict: + """Set device mode (CP2105 only). + + Args: + eci_mode: Enhanced interface mode byte + sci_mode: Standard interface mode byte + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + part_code, part_name = dev.part_number + if part_code not in PARTS_WITH_DEVICE_MODE: + return {"error": f"Device mode not available on {part_name}"} + old_eci, old_sci = lib.get_device_mode(dev.handle) + if ctx and not await confirm_write( + ctx, + f"Write device mode to CP210x?\n\n" + f" Old ECI: 0x{old_eci:02X}, SCI: 0x{old_sci:02X}\n" + f" New ECI: 0x{eci_mode:02X}, SCI: 0x{sci_mode:02X}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + lib.set_device_mode(dev.handle, eci_mode, sci_mode) + return {"success": True, + "old": {"eci": old_eci, "sci": old_sci}, + "new": {"eci": eci_mode, "sci": sci_mode}} + + +@mcp.tool() +async def set_baud_rate_config( + entries: list[dict], device_index: int = 0, ctx: Context = None, +) -> dict: + """Set the full 32-entry baud rate alias table. + + Args: + entries: List of 32 dicts, each with baud_gen, timer0_reload, prescaler, baud_rate + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + if len(entries) != 32: + return {"error": f"Expected 32 baud config entries, got {len(entries)}"} + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + if ctx and not await confirm_write( + ctx, + "Write complete baud rate table (32 entries) to CP210x OTP EPROM?", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + lib.set_baud_rate_config(dev.handle, entries) + return {"success": True, "entries_written": 32} + + +@mcp.tool() +async def set_baud_rate_alias( + index: int, baud_rate: int, baud_gen: int, timer0_reload: int, prescaler: int, + device_index: int = 0, ctx: Context = None, +) -> dict: + """Modify a single baud rate alias entry (read-modify-write). + + Args: + index: Entry index (0-31) + baud_rate: Target baud rate + baud_gen: Baud rate generator register value + timer0_reload: Timer0 reload register value + prescaler: Prescaler value + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + if not 0 <= index < 32: + return {"error": "Index must be 0-31"} + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + current = lib.get_baud_rate_config(dev.handle) + old_entry = current[index].copy() + current[index] = { + "index": index, + "baud_gen": baud_gen, + "timer0_reload": timer0_reload, + "prescaler": prescaler, + "baud_rate": baud_rate, + } + if ctx and not await confirm_write( + ctx, + f"Modify baud rate alias #{index}?\n\n" + f" Old: {old_entry['baud_rate']} baud\n" + f" New: {baud_rate} baud", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + lib.set_baud_rate_config(dev.handle, current) + return {"success": True, "index": index, + "old_entry": old_entry, "new_baud_rate": baud_rate} + + +@mcp.tool() +async def set_port_config( + config: dict, device_index: int = 0, ctx: Context = None, +) -> dict: + """Set GPIO port configuration (auto-detects CP2103/4/5/8). + + Args: + config: Port configuration dict. Structure depends on part type: + Single (CP2103/4): {mode, reset_latch, suspend_latch, enhanced_fxn} + - Values can be raw integers or named-flag dicts + Dual (CP2105): {mode, reset_latch, suspend_latch, + enhanced_fxn_eci, enhanced_fxn_sci, enhanced_fxn_device} + Quad (CP2108): Full quad port config dict (see get_port_config output) + device_index: Zero-based device index (default: 0) + """ + lib = get_lib() + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + part_code, part_name = dev.part_number + + if ctx and not await confirm_write( + ctx, + f"Write port configuration to {part_name} OTP EPROM?", ): return {"cancelled": True, "message": "Write cancelled by user"} - dev.self_powered = self_powered + if part_code in PARTS_WITH_PORT_CONFIG: + # Allow either raw ints or named-flag dicts + mode = config.get("mode", 0) + if isinstance(mode, dict): + mode = encode_bitmask(mode, PORT_PIN_FLAGS) + reset = config.get("reset_latch", 0) + if isinstance(reset, dict): + reset = encode_bitmask(reset, PORT_PIN_FLAGS) + suspend = config.get("suspend_latch", 0) + if isinstance(suspend, dict): + suspend = encode_bitmask(suspend, PORT_PIN_FLAGS) + ef = config.get("enhanced_fxn", 0) + if isinstance(ef, dict): + ef = encode_bitmask(ef, ENHANCED_FXN_FLAGS) + lib.set_port_config(dev.handle, mode, reset, suspend, ef) + return {"success": True, "type": "single", "part": part_name} - return { - "success": True, - "old_value": old_value, - "new_value": self_powered, - } + elif part_code in PARTS_WITH_DUAL_PORT_CONFIG: + lib.set_dual_port_config( + dev.handle, + config["mode"], config["reset_latch"], config["suspend_latch"], + config["enhanced_fxn_eci"], config["enhanced_fxn_sci"], + config["enhanced_fxn_device"], + ) + return {"success": True, "type": "dual", "part": part_name} + + elif part_code in PARTS_WITH_QUAD_PORT_CONFIG: + lib.set_quad_port_config(dev.handle, config) + return {"success": True, "type": "quad", "part": part_name} + + else: + return {"error": f"Port config not available on {part_name}"} @mcp.tool() @@ -289,75 +700,15 @@ def reset_device(device_index: int = 0) -> dict: changes visible to the USB host. """ lib = get_lib() - with CP210xDevice(lib, device_index) as dev: dev.reset() - - return { - "success": True, - "note": "Device has been reset. It may take a moment to re-enumerate.", - } - - -@mcp.tool() -async def lock_device(device_index: int = 0, ctx: Context = None) -> dict: - """PERMANENTLY lock the device to prevent further customization. - - Args: - device_index: Zero-based device index (default: 0) - - WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's - configuration cannot be changed. The device will still function normally, - but strings, power settings, etc. cannot be modified. - """ - lib = get_lib() - - with CP210xDevice(lib, device_index) as dev: - if dev.is_locked: - return {"error": "Device is already locked"} - - part_code, part_name = dev.part_number - info = ( - f"PERMANENTLY lock this {part_name}?\n\n" - f" Product: {dev.product}\n" - f" Serial: {dev.serial_number}\n\n" - f"THIS CANNOT BE UNDONE. The device configuration\n" - f"will be frozen forever." - ) - - if not ctx: - return { - "error": "Lock requires interactive confirmation", - "message": "This operation is too dangerous without user confirmation.", - } - - # Strict confirmation — do NOT fall back to True on failure. - # Unlike regular writes, an irreversible lock must get explicit consent. - try: - result = await ctx.elicit(info, ["Confirm", "Cancel"]) - confirmed = isinstance(result, AcceptedElicitation) and result.data == "Confirm" - except Exception: - return { - "error": "Lock requires elicitation support", - "message": "Your MCP client does not support elicitation. " - "Lock is too dangerous to proceed without explicit confirmation.", - } - - if not confirmed: - return {"cancelled": True, "message": "Lock cancelled by user"} - - lib.lock_device(dev.handle) - - return { - "success": True, - "warning": "Device is now PERMANENTLY locked. Configuration cannot be changed.", - } + return {"success": True, "note": "Device has been reset. It may take a moment to re-enumerate."} @mcp.tool() async def setup_udev_rule( device_index: int = 0, - symlink_name: Optional[str] = None, + symlink_name: str | None = None, ctx: Context = None, ) -> dict: """Create a udev rule for stable /dev/ symlink for a CP210x device. @@ -386,19 +737,15 @@ async def setup_udev_rule( "message": "Customize the product string first to create a unique match rule.", } - # Auto-generate symlink name from product string if not symlink_name: - # "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27" parts = product.split() if len(parts) >= 2: prefix = parts[0].lower() - suffix = parts[-1][-4:] # Last 4 chars of EUI/address + suffix = parts[-1][-4:] symlink_name = f"{prefix}-{suffix}" else: symlink_name = product.lower().replace(" ", "-")[:32] - # Build the udev rule - # Use glob pattern on product string to match the unique suffix match_suffix = product[-4:] rule = ( f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", ' @@ -414,7 +761,6 @@ async def setup_udev_rule( f"{rule}\n" ) - # Elicit confirmation before installing install_msg = ( f"Install udev rule for stable device symlink?\n\n" f" Device: {product}\n" @@ -424,54 +770,182 @@ async def setup_udev_rule( ) if ctx and not await confirm_write(ctx, install_msg): - return { - "cancelled": True, - "rule": rule_content, - "message": "Rule not installed. You can install it manually.", - } + return {"cancelled": True, "rule": rule_content, + "message": "Rule not installed. You can install it manually."} - # Install the rule try: result = subprocess.run( ["sudo", "tee", rules_path], - input=rule_content, - capture_output=True, - text=True, - timeout=30, + input=rule_content, capture_output=True, text=True, timeout=30, ) if result.returncode != 0: - return { - "error": f"Failed to install rule: {result.stderr.strip()}", - "rule": rule_content, - "message": "Install manually with: sudo tee " + rules_path, - } - - # Reload udev rules - subprocess.run( - ["sudo", "udevadm", "control", "--reload-rules"], - capture_output=True, - timeout=10, - ) - subprocess.run( - ["sudo", "udevadm", "trigger"], - capture_output=True, - timeout=10, - ) - + return {"error": f"Failed to install rule: {result.stderr.strip()}", + "rule": rule_content, "message": "Install manually with: sudo tee " + rules_path} + subprocess.run(["sudo", "udevadm", "control", "--reload-rules"], + capture_output=True, timeout=10) + subprocess.run(["sudo", "udevadm", "trigger"], + capture_output=True, timeout=10) except subprocess.TimeoutExpired: - return { - "error": "Sudo timed out — may need interactive password", - "rule": rule_content, - "message": "Install manually with: sudo tee " + rules_path, - } + return {"error": "Sudo timed out — may need interactive password", + "rule": rule_content, "message": "Install manually with: sudo tee " + rules_path} - return { - "success": True, - "symlink": f"/dev/{symlink_name}", - "rules_file": rules_path, - "rule": rule_content, - "note": "Symlink will appear after device re-plug or udevadm trigger", - } + return {"success": True, "symlink": f"/dev/{symlink_name}", + "rules_file": rules_path, "rule": rule_content, + "note": "Symlink will appear after device re-plug or udevadm trigger"} + + +# =========================================================================== +# Strict-tier write tools (hard-refuse without elicitation) +# =========================================================================== + +@mcp.tool() +async def set_vid(vid: int, device_index: int = 0, ctx: Context = None) -> dict: + """Set the USB Vendor ID (DANGEROUS — can break driver matching). + + Args: + vid: New USB Vendor ID (16-bit, e.g., 0x10C4 for Silicon Labs) + device_index: Zero-based device index (default: 0) + + WARNING: Changing VID can prevent the device from being recognized by + standard CP210x drivers. Only use if you have custom drivers. + """ + lib = get_lib() + err = await strict_confirm( + ctx, + f"Write USB Vendor ID to CP210x OTP EPROM?\n\n" + f" New VID: 0x{vid:04X}\n\n" + f"WARNING: Changing VID can prevent the device from being recognized\n" + f"by standard CP210x drivers. This is PERMANENT.", + ) + if err: + return err + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + old_vid = dev.vid + lib.set_vid(dev.handle, vid) + return {"success": True, "old_vid": f"0x{old_vid:04X}", "new_vid": f"0x{vid:04X}"} + + +@mcp.tool() +async def set_pid(pid: int, device_index: int = 0, ctx: Context = None) -> dict: + """Set the USB Product ID (DANGEROUS — can break driver matching). + + Args: + pid: New USB Product ID (16-bit, e.g., 0xEA60) + device_index: Zero-based device index (default: 0) + + WARNING: Changing PID can prevent the device from being recognized by + standard CP210x drivers. + """ + lib = get_lib() + err = await strict_confirm( + ctx, + f"Write USB Product ID to CP210x OTP EPROM?\n\n" + f" New PID: 0x{pid:04X}\n\n" + f"WARNING: Changing PID can prevent the device from being recognized\n" + f"by standard CP210x drivers. This is PERMANENT.", + ) + if err: + return err + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + old_pid = dev.pid + lib.set_pid(dev.handle, pid) + return {"success": True, "old_pid": f"0x{old_pid:04X}", "new_pid": f"0x{pid:04X}"} + + +@mcp.tool() +async def set_raw_config(hex_data: str, device_index: int = 0, ctx: Context = None) -> dict: + """Write raw configuration blob to device EPROM (DANGEROUS). + + Args: + hex_data: Hex-encoded config data + device_index: Zero-based device index (default: 0) + + WARNING: Writing invalid config data can brick the device. + Use get_raw_config to read current config first. + """ + lib = get_lib() + try: + data = bytes.fromhex(hex_data) + except ValueError: + return {"error": "Invalid hex string"} + err = await strict_confirm( + ctx, + f"Write {len(data)} bytes of raw config to CP210x EPROM?\n\n" + f"WARNING: Writing invalid config data can brick the device.\n" + f"This is PERMANENT and cannot be undone.", + ) + if err: + return err + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is locked and cannot be modified"} + lib.set_config(dev.handle, data) + return {"success": True, "bytes_written": len(data)} + + +@mcp.tool() +async def update_firmware(device_index: int = 0, ctx: Context = None) -> dict: + """Update device firmware (CP2102N only, DANGEROUS). + + Args: + device_index: Zero-based device index (default: 0) + + WARNING: Firmware update failure can brick the device. + """ + lib = get_lib() + err = await strict_confirm( + ctx, + "Update CP2102N firmware?\n\n" + "WARNING: Firmware update failure can BRICK the device.\n" + "Ensure stable USB connection and power during update.", + ) + if err: + return err + with CP210xDevice(lib, device_index) as dev: + part_code, part_name = dev.part_number + if part_code not in PARTS_CP2102N: + return {"error": f"Firmware update not available on {part_name}", + "supported": "CP2102N variants only"} + lib.update_firmware(dev.handle) + return {"success": True, "note": "Firmware updated. Device may need reset."} + + +@mcp.tool() +async def lock_device(device_index: int = 0, ctx: Context = None) -> dict: + """PERMANENTLY lock the device to prevent further customization. + + Args: + device_index: Zero-based device index (default: 0) + + WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's + configuration cannot be changed. The device will still function normally, + but strings, power settings, etc. cannot be modified. + """ + lib = get_lib() + + with CP210xDevice(lib, device_index) as dev: + if dev.is_locked: + return {"error": "Device is already locked"} + + part_code, part_name = dev.part_number + err = await strict_confirm( + ctx, + f"PERMANENTLY lock this {part_name}?\n\n" + f" Product: {dev.product}\n" + f" Serial: {dev.serial_number}\n\n" + f"THIS CANNOT BE UNDONE. The device configuration\n" + f"will be frozen forever.", + ) + if err: + return err + + lib.lock_device(dev.handle) + return {"success": True, + "warning": "Device is now PERMANENTLY locked. Configuration cannot be changed."} def main():