diff --git a/src/mcserial/server.py b/src/mcserial/server.py index 499ee4f..2be675c 100644 --- a/src/mcserial/server.py +++ b/src/mcserial/server.py @@ -79,6 +79,12 @@ def open_serial_port( parity: Literal["N", "E", "O", "M", "S"] = "N", stopbits: Literal[1, 1.5, 2] = 1, timeout: float = DEFAULT_TIMEOUT, + write_timeout: float | None = None, + inter_byte_timeout: float | None = None, + xonxoff: bool = False, + rtscts: bool = False, + dsrdtr: bool = False, + exclusive: bool = False, ) -> dict: """Open a serial port connection. @@ -89,6 +95,12 @@ def open_serial_port( parity: Parity checking (N=None, E=Even, O=Odd, M=Mark, S=Space) stopbits: Stop bits (1, 1.5, or 2) timeout: Read timeout in seconds + write_timeout: Write timeout in seconds (None = blocking) + inter_byte_timeout: Timeout between bytes during read (None = disabled) + xonxoff: Enable software flow control (XON/XOFF) + rtscts: Enable hardware RTS/CTS flow control + dsrdtr: Enable hardware DSR/DTR flow control + exclusive: Request exclusive access (lock port from other processes) Returns: Connection status and details @@ -107,6 +119,12 @@ def open_serial_port( parity=parity, stopbits=stopbits, timeout=timeout, + write_timeout=write_timeout, + inter_byte_timeout=inter_byte_timeout, + xonxoff=xonxoff, + rtscts=rtscts, + dsrdtr=dsrdtr, + exclusive=exclusive, ) _connections[port] = SerialConnection(port=port, connection=conn) return { @@ -116,6 +134,10 @@ def open_serial_port( "bytesize": bytesize, "parity": parity, "stopbits": stopbits, + "xonxoff": xonxoff, + "rtscts": rtscts, + "dsrdtr": dsrdtr, + "exclusive": exclusive, "resource_uri": f"serial://{port}/data", } except serial.SerialException as e: @@ -270,6 +292,11 @@ def configure_serial( port: str, baudrate: int | None = None, timeout: float | None = None, + write_timeout: float | None = None, + inter_byte_timeout: float | None = None, + xonxoff: bool | None = None, + rtscts: bool | None = None, + dsrdtr: bool | None = None, rts: bool | None = None, dtr: bool | None = None, ) -> dict: @@ -279,6 +306,11 @@ def configure_serial( port: Device path of the port to configure baudrate: New baud rate (None = no change) timeout: New read timeout in seconds (None = no change) + write_timeout: New write timeout in seconds (None = no change) + inter_byte_timeout: Timeout between bytes (None = no change) + xonxoff: Software flow control XON/XOFF (None = no change) + rtscts: Hardware RTS/CTS flow control (None = no change) + dsrdtr: Hardware DSR/DTR flow control (None = no change) rts: Set RTS line state (None = no change) dtr: Set DTR line state (None = no change) @@ -295,6 +327,16 @@ def configure_serial( conn.baudrate = baudrate if timeout is not None: conn.timeout = timeout + if write_timeout is not None: + conn.write_timeout = write_timeout + if inter_byte_timeout is not None: + conn.inter_byte_timeout = inter_byte_timeout + if xonxoff is not None: + conn.xonxoff = xonxoff + if rtscts is not None: + conn.rtscts = rtscts + if dsrdtr is not None: + conn.dsrdtr = dsrdtr if rts is not None: conn.rts = rts if dtr is not None: @@ -305,6 +347,11 @@ def configure_serial( "port": port, "baudrate": conn.baudrate, "timeout": conn.timeout, + "write_timeout": conn.write_timeout, + "inter_byte_timeout": conn.inter_byte_timeout, + "xonxoff": conn.xonxoff, + "rtscts": conn.rtscts, + "dsrdtr": conn.dsrdtr, "rts": conn.rts, "dtr": conn.dtr, } @@ -330,6 +377,11 @@ def get_connection_status() -> dict: "parity": conn.parity, "stopbits": conn.stopbits, "timeout": conn.timeout, + "write_timeout": conn.write_timeout, + "inter_byte_timeout": conn.inter_byte_timeout, + "xonxoff": conn.xonxoff, + "rtscts": conn.rtscts, + "dsrdtr": conn.dsrdtr, "in_waiting": conn.in_waiting, "out_waiting": conn.out_waiting, "cts": conn.cts, @@ -545,6 +597,159 @@ def send_break(port: str, duration_ms: int = 250) -> dict: return {"error": str(e), "success": False} +@mcp.tool() +def read_until( + port: str, + terminator: str = "\n", + size: int | None = None, + encoding: str = "utf-8", +) -> dict: + """Read data until a specific terminator sequence is received. + + More flexible than read_serial_line - supports any terminator, not just newline. + + Args: + port: Device path of the port to read from + terminator: Byte sequence to stop at (default: newline) + size: Maximum bytes to read (None = no limit, use timeout) + encoding: Character encoding for terminator and result + + Returns: + Data read up to and including the terminator + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + term_bytes = terminator.encode(encoding) + raw = conn.read_until(term_bytes, size) + return { + "success": True, + "data": raw.decode(encoding, errors="replace"), + "bytes_read": len(raw), + "raw_hex": raw.hex(), + "port": port, + "terminator_found": raw.endswith(term_bytes), + } + except serial.SerialException as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def set_rs485_mode( + port: str, + enabled: bool = True, + delay_before_tx: float = 0.0, + delay_before_rx: float = 0.0, + rts_level_for_tx: bool = True, + rts_level_for_rx: bool = False, + loopback: bool = False, +) -> dict: + """Configure RS-485 mode for half-duplex communication. + + RS-485 is commonly used in industrial applications (Modbus, etc.) + where multiple devices share a bus. The driver must control the + TX enable line (typically RTS) to switch between transmit and receive. + + Args: + port: Device path of the port + enabled: Enable or disable RS-485 mode + delay_before_tx: Delay in seconds before enabling TX + delay_before_rx: Delay in seconds before enabling RX after TX + rts_level_for_tx: RTS level when transmitting (True=high) + rts_level_for_rx: RTS level when receiving (True=high) + loopback: Enable RS-485 loopback mode + + Returns: + RS-485 configuration status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + + if enabled: + # Create RS485Settings + rs485_settings = serial.rs485.RS485Settings( + rts_level_for_tx=rts_level_for_tx, + rts_level_for_rx=rts_level_for_rx, + loopback=loopback, + delay_before_tx=delay_before_tx, + delay_before_rx=delay_before_rx, + ) + conn.rs485_mode = rs485_settings + else: + conn.rs485_mode = None + + return { + "success": True, + "port": port, + "rs485_enabled": enabled, + "rts_level_for_tx": rts_level_for_tx if enabled else None, + "rts_level_for_rx": rts_level_for_rx if enabled else None, + "delay_before_tx": delay_before_tx if enabled else None, + "delay_before_rx": delay_before_rx if enabled else None, + "loopback": loopback if enabled else None, + } + except (serial.SerialException, AttributeError) as e: + return {"error": str(e), "success": False} + + +@mcp.tool() +def set_low_latency_mode(port: str, enabled: bool = True) -> dict: + """Enable or disable low-latency mode (Linux only). + + Reduces latency by changing how the kernel buffers data. + Useful for time-critical applications. + + Note: This is a Linux-specific feature and may not work on all systems. + + Args: + port: Device path of the port + enabled: Enable (True) or disable (False) low latency mode + + Returns: + Operation status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + conn.set_low_latency_mode(enabled) + return {"success": True, "port": port, "low_latency": enabled} + except (serial.SerialException, AttributeError, OSError) as e: + return {"error": f"Low latency mode not supported: {e}", "success": False} + + +@mcp.tool() +def set_break_condition(port: str, enabled: bool) -> dict: + """Set or clear the break condition on a serial port. + + Unlike send_break() which sends a timed pulse, this holds the + break condition until explicitly cleared. Useful for protocols + that require sustained break states. + + Args: + port: Device path of the port + enabled: True to assert break (hold TX low), False to release + + Returns: + Break condition status + """ + if port not in _connections: + return {"error": f"Port {port} is not open", "success": False} + + try: + conn = _connections[port].connection + conn.break_condition = enabled + return {"success": True, "port": port, "break_condition": enabled} + except serial.SerialException as e: + return {"error": str(e), "success": False} + + # ============================================================================ # RESOURCES - Dynamic data access via URIs # ============================================================================