From acec4b29ba768ac8c4280c5af11bd961d8689976 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 30 Jan 2026 11:32:41 -0700 Subject: [PATCH] Add elicitation confirmations and udev rule tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Write tools now prompt the user via MCP elicitation before modifying OTP EPROM. Falls back to no confirmation if the client doesn't support elicitation. lock_device always requires confirmation — refuses without it. New tool: setup_udev_rule - Auto-generates symlink name from product string - Installs rule to /usr/lib/udev/rules.d/ - Elicits user permission before sudo operations --- src/cp210x_mcp/server.py | 238 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 220 insertions(+), 18 deletions(-) diff --git a/src/cp210x_mcp/server.py b/src/cp210x_mcp/server.py index c47b386..fde8c47 100644 --- a/src/cp210x_mcp/server.py +++ b/src/cp210x_mcp/server.py @@ -1,7 +1,10 @@ """FastMCP server for CP210x device customization.""" +import subprocess from typing import Optional -from fastmcp import FastMCP + +from fastmcp import FastMCP, Context +from fastmcp.server.elicitation import AcceptedElicitation from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS @@ -22,6 +25,20 @@ def get_lib() -> CP210xLibrary: return _lib +async def confirm_write(ctx: Context, message: str) -> bool: + """Ask user to confirm a write operation via elicitation. + + Falls back to proceeding without confirmation if the client + doesn't support elicitation. + """ + try: + result = await ctx.elicit(message, ["Confirm", "Cancel"]) + return isinstance(result, AcceptedElicitation) and result.data == "Confirm" + except Exception: + # Client doesn't support elicitation — proceed as usual + return True + + @mcp.tool() def list_devices() -> list[dict]: """List all connected CP210x devices. @@ -80,7 +97,7 @@ def get_device_info(device_index: int = 0) -> dict: @mcp.tool() -def set_product_string(product: str, device_index: int = 0) -> dict: +async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict: """Set the USB product string (device name shown to host). Args: @@ -97,18 +114,29 @@ def set_product_string(product: str, device_index: int = 0) -> dict: return {"error": "Device is locked and cannot be modified"} old_product = dev.product + new_value = product[:126] + + if ctx and not await confirm_write( + ctx, + f"Write product string to CP210x OTP EPROM?\n\n" + f" Old: {old_product}\n" + f" New: {new_value}\n\n" + f"OTP writes are limited — this cannot be undone easily.", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.product = product return { "success": True, "old_value": old_product, - "new_value": product[:126], + "new_value": new_value, "note": "Re-plug device for changes to take effect on USB host", } @mcp.tool() -def set_manufacturer_string(manufacturer: str, device_index: int = 0) -> dict: +async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict: """Set the USB manufacturer string. Args: @@ -122,18 +150,28 @@ def set_manufacturer_string(manufacturer: str, device_index: int = 0) -> dict: return {"error": "Device is locked and cannot be modified"} old_mfr = dev.manufacturer + new_value = manufacturer[:45] + + if ctx and not await confirm_write( + ctx, + f"Write manufacturer string to CP210x OTP EPROM?\n\n" + f" Old: {old_mfr}\n" + f" New: {new_value}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.manufacturer = manufacturer return { "success": True, "old_value": old_mfr, - "new_value": manufacturer[:45], + "new_value": new_value, "note": "Re-plug device for changes to take effect on USB host", } @mcp.tool() -def set_serial_number(serial_number: str, device_index: int = 0) -> dict: +async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict: """Set the USB serial number. Args: @@ -149,18 +187,28 @@ def set_serial_number(serial_number: str, device_index: int = 0) -> dict: return {"error": "Device is locked and cannot be modified"} old_serial = dev.serial_number + new_value = serial_number[:63] + + if ctx and not await confirm_write( + ctx, + f"Write serial number to CP210x OTP EPROM?\n\n" + f" Old: {old_serial}\n" + f" New: {new_value}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.serial_number = serial_number return { "success": True, "old_value": old_serial, - "new_value": serial_number[:63], + "new_value": new_value, "note": "Re-plug device for changes to take effect on USB host", } @mcp.tool() -def set_max_power(max_power_ma: int, device_index: int = 0) -> dict: +async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict: """Set the maximum USB power draw in milliamps. Args: @@ -177,17 +225,27 @@ def set_max_power(max_power_ma: int, device_index: int = 0) -> dict: return {"error": "Device is locked and cannot be modified"} old_power = dev.max_power_ma + actual_value = (max_power_ma // 2) * 2 + + if ctx and not await confirm_write( + ctx, + f"Write max power to CP210x OTP EPROM?\n\n" + f" Old: {old_power} mA\n" + f" New: {actual_value} mA", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.max_power_ma = max_power_ma return { "success": True, "old_value_ma": old_power, - "new_value_ma": (max_power_ma // 2) * 2, # Actual value after rounding + "new_value_ma": actual_value, } @mcp.tool() -def set_self_powered(self_powered: bool, device_index: int = 0) -> dict: +async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict: """Set whether device reports as self-powered or bus-powered. Args: @@ -201,6 +259,16 @@ def set_self_powered(self_powered: bool, device_index: int = 0) -> dict: return {"error": "Device is locked and cannot be modified"} old_value = dev.self_powered + mode = "self-powered" if self_powered else "bus-powered" + + if ctx and not await confirm_write( + ctx, + f"Write power mode to CP210x OTP EPROM?\n\n" + f" Old: {'self-powered' if old_value else 'bus-powered'}\n" + f" New: {mode}", + ): + return {"cancelled": True, "message": "Write cancelled by user"} + dev.self_powered = self_powered return { @@ -232,29 +300,43 @@ def reset_device(device_index: int = 0) -> dict: @mcp.tool() -def lock_device(device_index: int = 0, confirm: bool = False) -> dict: +async def lock_device(device_index: int = 0, ctx: Context = None) -> dict: """PERMANENTLY lock the device to prevent further customization. Args: device_index: Zero-based device index (default: 0) - confirm: Must be True to actually lock (safety check) WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's configuration cannot be changed. The device will still function normally, but strings, power settings, etc. cannot be modified. """ - if not confirm: - return { - "error": "Safety check failed", - "message": "Set confirm=True to actually lock the device. THIS IS PERMANENT!", - } - lib = get_lib() with CP210xDevice(lib, device_index) as dev: if dev.is_locked: return {"error": "Device is already locked"} + part_code, part_name = dev.part_number + info = ( + f"PERMANENTLY lock this {part_name}?\n\n" + f" Product: {dev.product}\n" + f" Serial: {dev.serial_number}\n\n" + f"THIS CANNOT BE UNDONE. The device configuration\n" + f"will be frozen forever." + ) + + if ctx: + confirmed = await confirm_write(ctx, info) + else: + # No context = no way to confirm, refuse + return { + "error": "Lock requires interactive confirmation", + "message": "This operation is too dangerous without user confirmation.", + } + + if not confirmed: + return {"cancelled": True, "message": "Lock cancelled by user"} + lib.lock_device(dev.handle) return { @@ -263,6 +345,126 @@ def lock_device(device_index: int = 0, confirm: bool = False) -> dict: } +@mcp.tool() +async def setup_udev_rule( + device_index: int = 0, + symlink_name: Optional[str] = None, + ctx: Context = None, +) -> dict: + """Create a udev rule for stable /dev/ symlink for a CP210x device. + + Generates a rule that matches the device's product string and creates + a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs + and port reordering. + + Args: + device_index: Zero-based device index (default: 0) + symlink_name: Custom symlink name. If not provided, auto-generates + from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27") + + Requires sudo for rule installation. + """ + lib = get_lib() + + with CP210xDevice(lib, device_index) as dev: + product = dev.product + vid = dev.vid + pid = dev.pid + + if not product or product == "CP2102 USB to UART Bridge Controller": + return { + "error": "Device has default product string", + "message": "Customize the product string first to create a unique match rule.", + } + + # Auto-generate symlink name from product string + if not symlink_name: + # "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27" + parts = product.split() + if len(parts) >= 2: + prefix = parts[0].lower() + suffix = parts[-1][-4:] # Last 4 chars of EUI/address + symlink_name = f"{prefix}-{suffix}" + else: + symlink_name = product.lower().replace(" ", "-")[:32] + + # Build the udev rule + # Use glob pattern on product string to match the unique suffix + match_suffix = product[-4:] + rule = ( + f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", ' + f'ATTRS{{idProduct}}=="{pid:04x}", ' + f'ATTRS{{product}}=="*{match_suffix}", ' + f'SYMLINK+="{symlink_name}"' + ) + + rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules" + rule_content = ( + f"# Auto-generated by cp210x-mcp for: {product}\n" + f"# Creates /dev/{symlink_name} symlink\n" + f"{rule}\n" + ) + + # Elicit confirmation before installing + install_msg = ( + f"Install udev rule for stable device symlink?\n\n" + f" Device: {product}\n" + f" Symlink: /dev/{symlink_name}\n" + f" File: {rules_path}\n\n" + f"Requires sudo to install." + ) + + if ctx and not await confirm_write(ctx, install_msg): + return { + "cancelled": True, + "rule": rule_content, + "message": "Rule not installed. You can install it manually.", + } + + # Install the rule + try: + result = subprocess.run( + ["sudo", "tee", rules_path], + input=rule_content, + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return { + "error": f"Failed to install rule: {result.stderr.strip()}", + "rule": rule_content, + "message": "Install manually with: sudo tee " + rules_path, + } + + # Reload udev rules + subprocess.run( + ["sudo", "udevadm", "control", "--reload-rules"], + capture_output=True, + timeout=10, + ) + subprocess.run( + ["sudo", "udevadm", "trigger"], + capture_output=True, + timeout=10, + ) + + except subprocess.TimeoutExpired: + return { + "error": "Sudo timed out — may need interactive password", + "rule": rule_content, + "message": "Install manually with: sudo tee " + rules_path, + } + + return { + "success": True, + "symlink": f"/dev/{symlink_name}", + "rules_file": rules_path, + "rule": rule_content, + "note": "Symlink will appear after device re-plug or udevadm trigger", + } + + def main(): """Entry point for the MCP server.""" mcp.run()