Add elicitation confirmations and udev rule tool

Write tools now prompt the user via MCP elicitation before
modifying OTP EPROM. Falls back to no confirmation if the
client doesn't support elicitation.

lock_device always requires confirmation — refuses without it.

New tool: setup_udev_rule
- Auto-generates symlink name from product string
- Installs rule to /usr/lib/udev/rules.d/
- Elicits user permission before sudo operations
This commit is contained in:
Ryan Malloy 2026-01-30 11:32:41 -07:00
parent 1a5fb3b195
commit acec4b29ba

View File

@ -1,7 +1,10 @@
"""FastMCP server for CP210x device customization.""" """FastMCP server for CP210x device customization."""
import subprocess
from typing import Optional from typing import Optional
from fastmcp import FastMCP
from fastmcp import FastMCP, Context
from fastmcp.server.elicitation import AcceptedElicitation
from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS from .bindings import CP210xLibrary, CP210xDevice, CP210xError, PART_NUMBERS
@ -22,6 +25,20 @@ def get_lib() -> CP210xLibrary:
return _lib return _lib
async def confirm_write(ctx: Context, message: str) -> bool:
"""Ask user to confirm a write operation via elicitation.
Falls back to proceeding without confirmation if the client
doesn't support elicitation.
"""
try:
result = await ctx.elicit(message, ["Confirm", "Cancel"])
return isinstance(result, AcceptedElicitation) and result.data == "Confirm"
except Exception:
# Client doesn't support elicitation — proceed as usual
return True
@mcp.tool() @mcp.tool()
def list_devices() -> list[dict]: def list_devices() -> list[dict]:
"""List all connected CP210x devices. """List all connected CP210x devices.
@ -80,7 +97,7 @@ def get_device_info(device_index: int = 0) -> dict:
@mcp.tool() @mcp.tool()
def set_product_string(product: str, device_index: int = 0) -> dict: async def set_product_string(product: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB product string (device name shown to host). """Set the USB product string (device name shown to host).
Args: Args:
@ -97,18 +114,29 @@ def set_product_string(product: str, device_index: int = 0) -> dict:
return {"error": "Device is locked and cannot be modified"} return {"error": "Device is locked and cannot be modified"}
old_product = dev.product old_product = dev.product
new_value = product[:126]
if ctx and not await confirm_write(
ctx,
f"Write product string to CP210x OTP EPROM?\n\n"
f" Old: {old_product}\n"
f" New: {new_value}\n\n"
f"OTP writes are limited — this cannot be undone easily.",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.product = product dev.product = product
return { return {
"success": True, "success": True,
"old_value": old_product, "old_value": old_product,
"new_value": product[:126], "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host", "note": "Re-plug device for changes to take effect on USB host",
} }
@mcp.tool() @mcp.tool()
def set_manufacturer_string(manufacturer: str, device_index: int = 0) -> dict: async def set_manufacturer_string(manufacturer: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB manufacturer string. """Set the USB manufacturer string.
Args: Args:
@ -122,18 +150,28 @@ def set_manufacturer_string(manufacturer: str, device_index: int = 0) -> dict:
return {"error": "Device is locked and cannot be modified"} return {"error": "Device is locked and cannot be modified"}
old_mfr = dev.manufacturer old_mfr = dev.manufacturer
new_value = manufacturer[:45]
if ctx and not await confirm_write(
ctx,
f"Write manufacturer string to CP210x OTP EPROM?\n\n"
f" Old: {old_mfr}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.manufacturer = manufacturer dev.manufacturer = manufacturer
return { return {
"success": True, "success": True,
"old_value": old_mfr, "old_value": old_mfr,
"new_value": manufacturer[:45], "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host", "note": "Re-plug device for changes to take effect on USB host",
} }
@mcp.tool() @mcp.tool()
def set_serial_number(serial_number: str, device_index: int = 0) -> dict: async def set_serial_number(serial_number: str, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the USB serial number. """Set the USB serial number.
Args: Args:
@ -149,18 +187,28 @@ def set_serial_number(serial_number: str, device_index: int = 0) -> dict:
return {"error": "Device is locked and cannot be modified"} return {"error": "Device is locked and cannot be modified"}
old_serial = dev.serial_number old_serial = dev.serial_number
new_value = serial_number[:63]
if ctx and not await confirm_write(
ctx,
f"Write serial number to CP210x OTP EPROM?\n\n"
f" Old: {old_serial}\n"
f" New: {new_value}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.serial_number = serial_number dev.serial_number = serial_number
return { return {
"success": True, "success": True,
"old_value": old_serial, "old_value": old_serial,
"new_value": serial_number[:63], "new_value": new_value,
"note": "Re-plug device for changes to take effect on USB host", "note": "Re-plug device for changes to take effect on USB host",
} }
@mcp.tool() @mcp.tool()
def set_max_power(max_power_ma: int, device_index: int = 0) -> dict: async def set_max_power(max_power_ma: int, device_index: int = 0, ctx: Context = None) -> dict:
"""Set the maximum USB power draw in milliamps. """Set the maximum USB power draw in milliamps.
Args: Args:
@ -177,17 +225,27 @@ def set_max_power(max_power_ma: int, device_index: int = 0) -> dict:
return {"error": "Device is locked and cannot be modified"} return {"error": "Device is locked and cannot be modified"}
old_power = dev.max_power_ma old_power = dev.max_power_ma
actual_value = (max_power_ma // 2) * 2
if ctx and not await confirm_write(
ctx,
f"Write max power to CP210x OTP EPROM?\n\n"
f" Old: {old_power} mA\n"
f" New: {actual_value} mA",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.max_power_ma = max_power_ma dev.max_power_ma = max_power_ma
return { return {
"success": True, "success": True,
"old_value_ma": old_power, "old_value_ma": old_power,
"new_value_ma": (max_power_ma // 2) * 2, # Actual value after rounding "new_value_ma": actual_value,
} }
@mcp.tool() @mcp.tool()
def set_self_powered(self_powered: bool, device_index: int = 0) -> dict: async def set_self_powered(self_powered: bool, device_index: int = 0, ctx: Context = None) -> dict:
"""Set whether device reports as self-powered or bus-powered. """Set whether device reports as self-powered or bus-powered.
Args: Args:
@ -201,6 +259,16 @@ def set_self_powered(self_powered: bool, device_index: int = 0) -> dict:
return {"error": "Device is locked and cannot be modified"} return {"error": "Device is locked and cannot be modified"}
old_value = dev.self_powered old_value = dev.self_powered
mode = "self-powered" if self_powered else "bus-powered"
if ctx and not await confirm_write(
ctx,
f"Write power mode to CP210x OTP EPROM?\n\n"
f" Old: {'self-powered' if old_value else 'bus-powered'}\n"
f" New: {mode}",
):
return {"cancelled": True, "message": "Write cancelled by user"}
dev.self_powered = self_powered dev.self_powered = self_powered
return { return {
@ -232,29 +300,43 @@ def reset_device(device_index: int = 0) -> dict:
@mcp.tool() @mcp.tool()
def lock_device(device_index: int = 0, confirm: bool = False) -> dict: async def lock_device(device_index: int = 0, ctx: Context = None) -> dict:
"""PERMANENTLY lock the device to prevent further customization. """PERMANENTLY lock the device to prevent further customization.
Args: Args:
device_index: Zero-based device index (default: 0) device_index: Zero-based device index (default: 0)
confirm: Must be True to actually lock (safety check)
WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's WARNING: This is PERMANENT and IRREVERSIBLE! Once locked, the device's
configuration cannot be changed. The device will still function normally, configuration cannot be changed. The device will still function normally,
but strings, power settings, etc. cannot be modified. but strings, power settings, etc. cannot be modified.
""" """
if not confirm:
return {
"error": "Safety check failed",
"message": "Set confirm=True to actually lock the device. THIS IS PERMANENT!",
}
lib = get_lib() lib = get_lib()
with CP210xDevice(lib, device_index) as dev: with CP210xDevice(lib, device_index) as dev:
if dev.is_locked: if dev.is_locked:
return {"error": "Device is already locked"} return {"error": "Device is already locked"}
part_code, part_name = dev.part_number
info = (
f"PERMANENTLY lock this {part_name}?\n\n"
f" Product: {dev.product}\n"
f" Serial: {dev.serial_number}\n\n"
f"THIS CANNOT BE UNDONE. The device configuration\n"
f"will be frozen forever."
)
if ctx:
confirmed = await confirm_write(ctx, info)
else:
# No context = no way to confirm, refuse
return {
"error": "Lock requires interactive confirmation",
"message": "This operation is too dangerous without user confirmation.",
}
if not confirmed:
return {"cancelled": True, "message": "Lock cancelled by user"}
lib.lock_device(dev.handle) lib.lock_device(dev.handle)
return { return {
@ -263,6 +345,126 @@ def lock_device(device_index: int = 0, confirm: bool = False) -> dict:
} }
@mcp.tool()
async def setup_udev_rule(
device_index: int = 0,
symlink_name: Optional[str] = None,
ctx: Context = None,
) -> dict:
"""Create a udev rule for stable /dev/ symlink for a CP210x device.
Generates a rule that matches the device's product string and creates
a persistent symlink (e.g., /dev/rylr998-0D27) that survives re-plugs
and port reordering.
Args:
device_index: Zero-based device index (default: 0)
symlink_name: Custom symlink name. If not provided, auto-generates
from the product string (e.g., "RYLR998 ...0D27" -> "rylr998-0D27")
Requires sudo for rule installation.
"""
lib = get_lib()
with CP210xDevice(lib, device_index) as dev:
product = dev.product
vid = dev.vid
pid = dev.pid
if not product or product == "CP2102 USB to UART Bridge Controller":
return {
"error": "Device has default product string",
"message": "Customize the product string first to create a unique match rule.",
}
# Auto-generate symlink name from product string
if not symlink_name:
# "RYLR998 0033001104645C0B00000D27" -> "rylr998-0D27"
parts = product.split()
if len(parts) >= 2:
prefix = parts[0].lower()
suffix = parts[-1][-4:] # Last 4 chars of EUI/address
symlink_name = f"{prefix}-{suffix}"
else:
symlink_name = product.lower().replace(" ", "-")[:32]
# Build the udev rule
# Use glob pattern on product string to match the unique suffix
match_suffix = product[-4:]
rule = (
f'SUBSYSTEM=="tty", ATTRS{{idVendor}}=="{vid:04x}", '
f'ATTRS{{idProduct}}=="{pid:04x}", '
f'ATTRS{{product}}=="*{match_suffix}", '
f'SYMLINK+="{symlink_name}"'
)
rules_path = f"/usr/lib/udev/rules.d/99-cp210x-{symlink_name}.rules"
rule_content = (
f"# Auto-generated by cp210x-mcp for: {product}\n"
f"# Creates /dev/{symlink_name} symlink\n"
f"{rule}\n"
)
# Elicit confirmation before installing
install_msg = (
f"Install udev rule for stable device symlink?\n\n"
f" Device: {product}\n"
f" Symlink: /dev/{symlink_name}\n"
f" File: {rules_path}\n\n"
f"Requires sudo to install."
)
if ctx and not await confirm_write(ctx, install_msg):
return {
"cancelled": True,
"rule": rule_content,
"message": "Rule not installed. You can install it manually.",
}
# Install the rule
try:
result = subprocess.run(
["sudo", "tee", rules_path],
input=rule_content,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return {
"error": f"Failed to install rule: {result.stderr.strip()}",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
# Reload udev rules
subprocess.run(
["sudo", "udevadm", "control", "--reload-rules"],
capture_output=True,
timeout=10,
)
subprocess.run(
["sudo", "udevadm", "trigger"],
capture_output=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return {
"error": "Sudo timed out — may need interactive password",
"rule": rule_content,
"message": "Install manually with: sudo tee " + rules_path,
}
return {
"success": True,
"symlink": f"/dev/{symlink_name}",
"rules_file": rules_path,
"rule": rule_content,
"note": "Symlink will appear after device re-plug or udevadm trigger",
}
def main(): def main():
"""Entry point for the MCP server.""" """Entry point for the MCP server."""
mcp.run() mcp.run()