diff --git a/src/mcserial/server.py b/src/mcserial/server.py index b92d374..499ee4f 100644 --- a/src/mcserial/server.py +++ b/src/mcserial/server.py @@ -334,6 +334,8 @@ def get_connection_status() -> dict: "out_waiting": conn.out_waiting, "cts": conn.cts, "dsr": conn.dsr, + "ri": conn.ri, + "cd": conn.cd, "rts": conn.rts, "dtr": conn.dtr, "resource_uri": f"serial://{port}/data", @@ -369,6 +371,180 @@ def flush_serial(port: str, input_buffer: bool = True, output_buffer: bool = Tru return {"error": str(e), "success": False} +@mcp.tool() +def get_modem_lines(port: str) -> dict: + """Get all RS-232 modem control/status line states. + + Input lines (directly readable from device): + - CTS: Clear To Send (device ready to receive) + - DSR: Data Set Ready (device is present/powered) + - RI: Ring Indicator (incoming call on modem) + - CD: Carrier Detect (connection established) + + Output lines (set by us): + - RTS: Request To Send (we're ready to receive) + - DTR: Data Terminal Ready (we're present/active) + + Args: + port: Device path of the port to query + + Returns: + All modem line states + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + return { + "success": True, + "port": port, + "input_lines": { + "cts": conn.cts, + "dsr": conn.dsr, + "ri": conn.ri, + "cd": conn.cd, + }, + "output_lines": { + "rts": conn.rts, + "dtr": conn.dtr, + }, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def set_modem_lines( + port: str, + rts: bool | None = None, + dtr: bool | None = None, +) -> dict: + """Set RS-232 output control lines (RTS and DTR). + + These lines can be used for: + - Hardware flow control + - Device reset sequences (many boards use DTR for reset) + - Power control on some devices + - Custom signaling protocols + + Args: + port: Device path of the port + rts: Set RTS (Request To Send) state, None = no change + dtr: Set DTR (Data Terminal Ready) state, None = no change + + Returns: + Updated line states + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + if rts is not None: + conn.rts = rts + if dtr is not None: + conn.dtr = dtr + + return { + "success": True, + "port": port, + "rts": conn.rts, + "dtr": conn.dtr, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def pulse_line( + port: str, + line: Literal["rts", "dtr"], + duration_ms: int = 100, + active_low: bool = True, +) -> dict: + """Pulse an RS-232 control line (useful for reset sequences). + + Many devices use DTR or RTS for reset: + - ESP32/ESP8266: DTR low + RTS sequence for bootloader + - Arduino: DTR pulse for reset + - Custom hardware: Various reset/trigger signals + + Args: + port: Device path of the port + line: Which line to pulse ("rts" or "dtr") + duration_ms: Pulse duration in milliseconds (default 100) + active_low: If True, pulse LOW then HIGH. If False, pulse HIGH then LOW. + + Returns: + Pulse operation status + """ + import time + + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + if duration_ms < 1 or duration_ms > 5000: + return {"error": "duration_ms must be between 1 and 5000", "success": False} + + try: + conn = _connections[port].connection + + # Get current state + if line == "rts": + original = conn.rts + conn.rts = not active_low # Start state + time.sleep(duration_ms / 1000.0) + conn.rts = active_low # Pulse state + time.sleep(duration_ms / 1000.0) + conn.rts = original # Restore + else: # dtr + original = conn.dtr + conn.dtr = not active_low + time.sleep(duration_ms / 1000.0) + conn.dtr = active_low + time.sleep(duration_ms / 1000.0) + conn.dtr = original + + return { + "success": True, + "port": port, + "line": line, + "duration_ms": duration_ms, + "active_low": active_low, + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def send_break(port: str, duration_ms: int = 250) -> dict: + """Send a serial break signal. + + A break is a sustained low signal longer than a character frame, + used to get attention of remote device or trigger special modes. + + Args: + port: Device path of the port + duration_ms: Break duration in milliseconds (default 250) + + Returns: + Break operation status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + if duration_ms < 1 or duration_ms > 5000: + return {"error": "duration_ms must be between 1 and 5000", "success": False} + + try: + conn = _connections[port].connection + conn.send_break(duration=duration_ms / 1000.0) + return {"success": True, "port": port, "duration_ms": duration_ms} + except serial.SerialException as e: + return {"error": str(e), "success": False} + + # ============================================================================ # RESOURCES - Dynamic data access via URIs # ============================================================================ @@ -435,6 +611,8 @@ def resource_port_status(port: str) -> str: f"- Bytes waiting (out): {conn.out_waiting}", f"- CTS: {conn.cts}", f"- DSR: {conn.dsr}", + f"- RI: {conn.ri}", + f"- CD: {conn.cd}", f"- RTS: {conn.rts}", f"- DTR: {conn.dtr}", ]