Add SPP (Serial Port Profile) for classic Bluetooth serial over RFCOMM

Registers a Profile1 D-Bus endpoint with BlueZ for bidirectional raw
byte streams — same pattern as HFP AG minus the AT command layer.
Supports both server mode (accept inbound) and client mode (connect
outbound via ConnectProfile). Includes 8 MCP tools, 2 resources, and
cursor-based recv polling with deque(maxlen=500).
This commit is contained in:
Ryan Malloy 2026-02-12 18:27:49 -07:00
parent 5dc5f640c7
commit 45c1e57dfe
5 changed files with 733 additions and 0 deletions

View File

@ -70,6 +70,7 @@ ignore = ["E501"]
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
"src/mcbluetooth/gatt_server.py" = ["F821", "F722"]
"src/mcbluetooth/spp.py" = ["F821", "F722"]
[tool.pytest.ini_options]
asyncio_mode = "auto"

View File

@ -309,6 +309,62 @@ def register_resources(mcp: FastMCP) -> None:
indent=2,
)
# ==================== SPP Resources ====================
@mcp.resource(
"bluetooth://spp/connections",
name="SPP Connections",
description=(
"Active SPP (Serial Port Profile) connections with role, "
"duration, and byte counters for each peer."
),
mime_type="application/json",
)
async def resource_spp_connections() -> str:
"""Get active SPP connections."""
from mcbluetooth.spp import get_spp
profile = await get_spp()
if not profile:
return json.dumps({"registered": False, "connections": []})
status = profile.get_status()
return json.dumps(
{
"registered": status["registered"],
"uuid": status["uuid"],
"connections": status["connections"],
},
indent=2,
)
@mcp.resource(
"bluetooth://spp/data",
name="SPP Received Data",
description=(
"Recent data received from SPP peers. Returns the last 50 "
"data events with timestamps, addresses, and hex/string values."
),
mime_type="application/json",
)
async def resource_spp_data() -> str:
"""Get recent SPP received data events."""
from mcbluetooth.spp import get_spp
profile = await get_spp()
if not profile:
return json.dumps({"count": 0, "events": [], "hint": "SPP not enabled"})
events = profile.get_recv_events(since_index=0, limit=50)
return json.dumps(
{
"count": len(events),
"total": profile._recv_index,
"events": events,
},
indent=2,
)
# ==================== GATT Server Resources ====================
@mcp.resource(

View File

@ -13,6 +13,7 @@ from mcbluetooth.tools import (
hfp,
monitor,
obex,
spp,
)
mcp = FastMCP(
@ -69,6 +70,18 @@ For quick OBD-II BLE adapter emulation (Nordic UART Service):
All tools require an explicit 'adapter' parameter (e.g., "hci0").
Use bt_list_adapters() to discover available adapters.
### SPP (Serial Port Profile) — Classic Bluetooth Serial
- bluetooth://spp/connections Active SPP connections
- bluetooth://spp/data Recent received data events
For classic Bluetooth serial (RFCOMM):
1. bt_spp_enable() register profile
2. Server: make discoverable, remote connects automatically
3. Client: bt_spp_connect(adapter, address)
4. Send: bt_spp_send(address, "ATZ\\r\\n", "string")
5. Receive: bt_spp_recv(since_index=0) cursor-based polling
6. Disconnect: bt_spp_disconnect(address)
For pairing, use pairing_mode parameter:
- "elicit": Use MCP elicitation to request PIN from user (preferred)
- "interactive": Return awaiting status, then call bt_pair_confirm
@ -89,6 +102,7 @@ gatt_server.register_tools(mcp)
bt_elm327_emu.register_tools(mcp)
monitor.register_tools(mcp)
obex.register_tools(mcp)
spp.register_tools(mcp)
def main():

419
src/mcbluetooth/spp.py Normal file
View File

@ -0,0 +1,419 @@
"""SPP (Serial Port Profile) implementation for BlueZ.
Registers as an SPP endpoint via BlueZ ProfileManager1, providing raw
bidirectional serial byte streams over RFCOMM. When a remote device
connects, BlueZ hands us the RFCOMM file descriptor through the Profile1
D-Bus interface the same pattern used by hfp_ag.py, minus the AT
command protocol layer.
Use cases: serial terminal to Arduino/ESP32, GPS receivers (NMEA),
legacy sensors, Bluetooth modems, any line-oriented protocol over
classic Bluetooth.
D-Bus flow:
1. RegisterProfile(SPP_UUID) with ProfileManager1
2. Remote connects BlueZ calls NewConnection(device, fd, props)
3. We dup the fd, wrap in async streams, read/write raw bytes
4. RequestDisconnection or EOF cleanup
"""
import asyncio
import logging
import os
import socket
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from dbus_fast import BusType, Variant
from dbus_fast.aio import MessageBus
from dbus_fast.service import ServiceInterface, method
log = logging.getLogger(__name__)
# Bluetooth socket constants — Python often compiled without bluetooth.h
_AF_BLUETOOTH = getattr(socket, "AF_BLUETOOTH", 31)
_BTPROTO_RFCOMM = getattr(socket, "BTPROTO_RFCOMM", 3)
# D-Bus constants
BLUEZ_SERVICE = "org.bluez"
PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1"
SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb"
SPP_PROFILE_PATH = "/mcbluetooth/spp"
def _try_decode(value: bytes) -> str | None:
"""Try to decode bytes as printable UTF-8."""
try:
decoded = value.decode("utf-8")
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
return decoded
return None
except (UnicodeDecodeError, ValueError):
return None
def _path_to_address(device_path: str) -> str:
parts = device_path.split("/")
if len(parts) >= 5 and parts[-1].startswith("dev_"):
return parts[-1][4:].replace("_", ":")
return device_path
@dataclass
class SPPDataEvent:
"""A chunk of data received from a remote SPP peer."""
index: int
timestamp: str
address: str
value: bytes
value_hex: str
value_string: str | None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"index": self.index,
"timestamp": self.timestamp,
"address": self.address,
"value_hex": self.value_hex,
"length": len(self.value),
}
if self.value_string is not None:
d["value_string"] = self.value_string
return d
@dataclass
class SPPConnection:
"""Per-peer connection state for an SPP session."""
device_path: str
address: str
role: str # "server" (they connected to us) or "client" (we connected to them)
fd: int
connected_at: float = field(default_factory=time.monotonic)
sock: socket.socket | None = None
reader: asyncio.StreamReader | None = None
writer: asyncio.StreamWriter | None = None
bytes_sent: int = 0
bytes_received: int = 0
_read_task: asyncio.Task | None = field(default=None, repr=False)
class SPPProfile(ServiceInterface):
"""D-Bus Profile1 service for SPP (Serial Port Profile)."""
def __init__(self) -> None:
super().__init__("org.bluez.Profile1")
self.connections: dict[str, SPPConnection] = {} # keyed by device_path
self._recv_events: deque[SPPDataEvent] = deque(maxlen=500)
self._recv_index: int = 0
self._recv_callback: Any = None # async callable(address, data)
@method()
def Release(self) -> None:
log.info("SPP profile released")
for conn in list(self.connections.values()):
self._cleanup_connection(conn)
self.connections.clear()
@method()
def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None:
address = _path_to_address(device)
log.debug("SPP NewConnection: device=%s fd=%r props=%s", device, fd, properties)
if fd is None or (isinstance(fd, int) and fd < 0):
log.error("SPP: invalid fd received: %r", fd)
return
log.info("SPP: NewConnection from %s (fd=%d)", address, fd)
# Duplicate the fd so we own it independent of dbus-fast
try:
new_fd = os.dup(fd)
log.debug("os.dup(%d) -> %d", fd, new_fd)
except OSError:
log.exception("SPP: os.dup(%d) failed for %s", fd, address)
return
conn = SPPConnection(
device_path=device,
address=address,
role="server",
fd=new_fd,
)
self.connections[device] = conn
loop = asyncio.get_event_loop()
conn._read_task = loop.create_task(self._handle_connection(conn))
log.debug("SPP NewConnection done, read task created for %s", address)
@method()
def RequestDisconnection(self, device: "o") -> None:
address = _path_to_address(device)
log.info("SPP: disconnect requested for %s", address)
conn = self.connections.pop(device, None)
if conn:
self._cleanup_connection(conn)
def _cleanup_connection(self, conn: SPPConnection) -> None:
if conn._read_task and not conn._read_task.done():
conn._read_task.cancel()
if conn.writer:
try:
conn.writer.close()
except Exception:
pass
if conn.sock:
try:
conn.sock.close()
except Exception:
pass
elif conn.fd >= 0:
try:
os.close(conn.fd)
except Exception:
pass
async def _handle_connection(self, conn: SPPConnection) -> None:
"""Read loop for raw bytes from the remote SPP peer."""
try:
log.debug("SPP _handle_connection start: addr=%s fd=%d", conn.address, conn.fd)
# socket.fromfd() dups the fd internally — close our intermediate copy
conn.sock = socket.fromfd(
conn.fd, _AF_BLUETOOTH, socket.SOCK_STREAM,
_BTPROTO_RFCOMM,
)
log.debug("socket.fromfd OK: fileno=%d", conn.sock.fileno())
try:
os.close(conn.fd)
except OSError:
pass
conn.fd = -1 # transferred to socket
conn.sock.setblocking(False)
conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock)
log.debug("SPP async streams ready for %s, entering read loop", conn.address)
while True:
data = await conn.reader.read(4096)
if not data:
log.debug("SPP: EOF from %s (clean disconnect)", conn.address)
break
log.debug("SPP recv %d bytes from %s: %r", len(data), conn.address, data[:80])
conn.bytes_received += len(data)
event = SPPDataEvent(
index=self._recv_index,
timestamp=datetime.now(UTC).isoformat(),
address=conn.address,
value=data,
value_hex=data.hex(),
value_string=_try_decode(data),
)
self._recv_events.append(event)
self._recv_index += 1
# Fire async callback if registered (auto-responders)
if self._recv_callback:
try:
loop = asyncio.get_running_loop()
loop.create_task(self._recv_callback(conn.address, data))
except RuntimeError:
pass
except (ConnectionResetError, BrokenPipeError, OSError) as e:
log.debug("SPP connection error for %s: %s: %s", conn.address, type(e).__name__, e)
except asyncio.CancelledError:
log.debug("SPP task cancelled for %s", conn.address)
except Exception:
log.exception("SPP UNEXPECTED error for %s", conn.address)
finally:
log.debug("SPP cleanup for %s", conn.address)
self.connections.pop(conn.device_path, None)
self._cleanup_connection(conn)
async def send(self, address: str, data: bytes) -> bool:
"""Send raw bytes to a connected SPP peer."""
conn = self._get_connection(address)
if not conn or not conn.writer or conn.writer.is_closing():
return False
conn.writer.write(data)
await conn.writer.drain()
conn.bytes_sent += len(data)
log.debug("SPP sent %d bytes to %s", len(data), address)
return True
def add_client_connection(self, device_path: str, address: str, fd: int) -> None:
"""Register a client-initiated connection (we connected to them).
Called after Device1.ConnectProfile triggers NewConnection on our
Profile1 handler. In client mode, BlueZ still delivers the fd via
NewConnection we just tag the role as "client" before the read
loop starts. However, BlueZ may have already fired NewConnection
by the time ConnectProfile returns, so this is a fallback to
retag if needed.
"""
conn = self.connections.get(device_path)
if conn:
conn.role = "client"
def _get_connection(self, address: str) -> SPPConnection | None:
for conn in self.connections.values():
if conn.address.upper() == address.upper():
return conn
return None
def get_recv_events(
self,
since_index: int = 0,
address: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Get received data events, optionally filtered."""
events = [
e
for e in self._recv_events
if e.index >= since_index
and (address is None or e.address.upper() == address.upper())
]
return [e.to_dict() for e in events[:limit]]
def clear_recv_events(self) -> int:
"""Clear all received data events. Returns count cleared."""
count = len(self._recv_events)
self._recv_events.clear()
return count
def get_status(self) -> dict[str, Any]:
"""Get overall SPP status."""
conns = []
now = time.monotonic()
for conn in self.connections.values():
conns.append({
"address": conn.address,
"role": conn.role,
"duration_seconds": round(now - conn.connected_at, 1),
"bytes_sent": conn.bytes_sent,
"bytes_received": conn.bytes_received,
})
return {
"registered": _profile_registered,
"uuid": _registered_uuid or SPP_UUID,
"connections": conns,
"recv_buffer_count": len(self._recv_events),
"recv_buffer_total": self._recv_index,
}
# ==================== Module-level lifecycle ====================
_profile: SPPProfile | None = None
_profile_bus: MessageBus | None = None
_profile_registered: bool = False
_registered_uuid: str | None = None
async def enable_spp(
uuid: str = SPP_UUID,
channel: int = 0,
name: str = "mcbluetooth SPP",
) -> SPPProfile:
"""Register the SPP profile with BlueZ.
Args:
uuid: Service UUID. Default is standard SPP UUID. Use custom UUIDs
for Arduino/ESP32 devices that advertise non-standard RFCOMM.
channel: RFCOMM channel number (0 = auto-assign).
name: Profile display name.
"""
global _profile, _profile_bus, _profile_registered, _registered_uuid
if _profile_registered and _profile:
return _profile
if _profile is None:
_profile = SPPProfile()
if _profile_bus is None:
_profile_bus = await MessageBus(
bus_type=BusType.SYSTEM,
negotiate_unix_fd=True, # Required: BlueZ passes RFCOMM fd via D-Bus
).connect()
log.debug("SPP D-Bus connected: negotiate_unix_fd=%s unique_name=%s",
_profile_bus._negotiate_unix_fd, _profile_bus.unique_name)
_profile_bus.export(SPP_PROFILE_PATH, _profile)
# Register with ProfileManager1
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
options: dict[str, Variant] = {
"Name": Variant("s", name),
"Role": Variant("s", "client-server"),
}
if channel > 0:
options["Channel"] = Variant("q", channel)
try:
await profile_mgr.call_register_profile(
SPP_PROFILE_PATH,
uuid,
options,
)
_profile_registered = True
_registered_uuid = uuid
log.info("SPP profile registered with BlueZ (uuid=%s, channel=%s)", uuid, channel or "auto")
except Exception as e:
if "Already Exists" in str(e):
log.info("SPP profile stale — unregistering and re-registering")
try:
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
except Exception:
pass
await profile_mgr.call_register_profile(
SPP_PROFILE_PATH,
uuid,
options,
)
_profile_registered = True
_registered_uuid = uuid
log.info("SPP profile re-registered with BlueZ")
else:
raise
return _profile
async def disable_spp() -> None:
"""Unregister the SPP profile and close all connections."""
global _profile, _profile_bus, _profile_registered, _registered_uuid
if _profile_bus and _profile_registered:
try:
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
except Exception:
pass
_profile_registered = False
_registered_uuid = None
if _profile:
_profile.Release()
_profile = None
if _profile_bus:
_profile_bus.disconnect()
_profile_bus = None
async def get_spp() -> SPPProfile | None:
"""Get the current SPP profile instance (None if not enabled)."""
return _profile

View File

@ -0,0 +1,243 @@
"""SPP (Serial Port Profile) tools for Bluetooth MCP server.
Classic Bluetooth serial communication over RFCOMM. SPP provides raw
bidirectional byte streams the classic BT equivalent of BLE's Nordic
UART Service.
Typical flow:
Server mode:
1. bt_spp_enable() Register SPP profile
2. Make adapter discoverable Remote device connects
3. bt_spp_send(address, "Hello\r\n") Send data
4. bt_spp_recv() Poll received data
Client mode:
1. bt_spp_enable() Register SPP profile
2. bt_spp_connect(adapter, address) Connect to remote SPP
3. bt_spp_send / bt_spp_recv Bidirectional I/O
"""
from __future__ import annotations
from typing import Any, Literal
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
from mcbluetooth.spp import (
SPP_UUID,
disable_spp,
enable_spp,
get_spp,
)
def register_tools(mcp: FastMCP) -> None:
"""Register SPP tools with the MCP server."""
@mcp.tool()
async def bt_spp_enable(
uuid: str = SPP_UUID,
channel: int = 0,
name: str = "mcbluetooth SPP",
) -> dict[str, Any]:
"""Enable SPP (Serial Port Profile) for classic Bluetooth serial.
Registers an RFCOMM serial profile with BlueZ. After enabling,
remote devices can connect for bidirectional serial communication,
or use bt_spp_connect() to initiate outbound connections.
Args:
uuid: Service UUID. Default is standard SPP (0x1101).
Use custom UUIDs for Arduino/ESP32 with non-standard RFCOMM.
channel: RFCOMM channel (0 = auto-assign, recommended).
name: Profile display name visible during service discovery.
Returns:
Registration status with active UUID.
"""
try:
await enable_spp(uuid=uuid, channel=channel, name=name)
return {"status": "ok", "uuid": uuid, "channel": channel or "auto", "name": name}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_disable() -> dict[str, Any]:
"""Disable SPP and close all serial connections.
Unregisters the SPP profile from BlueZ and terminates any active
RFCOMM sessions.
Returns:
Status confirming profile removal.
"""
try:
await disable_spp()
return {"status": "ok", "disabled": True}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_status() -> dict[str, Any]:
"""Get SPP status: registration state, connections, and buffer stats.
Returns:
- registered: Whether the SPP profile is active
- uuid: The UUID the profile was registered with
- connections: Active peers with role, duration, byte counters
- recv_buffer_count: Buffered received data events
- recv_buffer_total: Total events received since enable
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "registered": False, "connections": []}
return {"status": "ok", **profile.get_status()}
@mcp.tool()
async def bt_spp_connect(
adapter: str,
address: str,
uuid: str = SPP_UUID,
) -> dict[str, Any]:
"""Connect to a remote device's SPP service (client mode).
Initiates an outbound RFCOMM connection. The SPP profile must be
enabled first (bt_spp_enable). BlueZ will deliver the RFCOMM fd
through our Profile1 handler automatically.
The device should already be paired and trusted.
Args:
adapter: Bluetooth adapter (e.g. "hci0").
address: Remote device Bluetooth address.
uuid: SPP service UUID on the remote device.
Returns:
Connection status.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled — call bt_spp_enable() first"}
try:
client = await get_client()
await client.connect_profile(adapter, address, uuid)
# BlueZ fires NewConnection on our Profile1 handler — tag as client
device_path = f"/org/bluez/{adapter}/dev_{address.upper().replace(':', '_')}"
profile.add_client_connection(device_path, address, -1)
return {"status": "ok", "address": address, "role": "client"}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_disconnect(address: str) -> dict[str, Any]:
"""Disconnect a specific SPP peer.
Closes the RFCOMM socket and removes the connection.
Args:
address: Bluetooth address of the peer to disconnect.
Returns:
Disconnect status.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled"}
conn = profile._get_connection(address)
if not conn:
return {"status": "error", "error": f"No SPP connection to {address}"}
profile.connections.pop(conn.device_path, None)
profile._cleanup_connection(conn)
return {"status": "ok", "address": address, "disconnected": True}
@mcp.tool()
async def bt_spp_send(
address: str,
data: str,
data_type: Literal["string", "hex", "line"] = "string",
) -> dict[str, Any]:
"""Send data to a connected SPP peer.
Args:
address: Bluetooth address of the peer.
data: The data to send.
data_type: How to interpret the data parameter:
- "string": Send as-is (UTF-8 encoded)
- "hex": Parse as hex string (e.g. "48656c6c6f")
- "line": Append CR+LF for line-oriented protocols
(AT commands, NMEA, etc.)
Returns:
Send status with byte count.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled"}
if data_type == "hex":
try:
raw = bytes.fromhex(data)
except ValueError as exc:
return {"status": "error", "error": f"Invalid hex: {exc}"}
elif data_type == "line":
raw = (data + "\r\n").encode("utf-8")
else:
raw = data.encode("utf-8")
ok = await profile.send(address, raw)
if ok:
return {"status": "ok", "bytes_sent": len(raw)}
return {"status": "error", "error": f"No active SPP connection to {address}"}
@mcp.tool()
async def bt_spp_recv(
since_index: int = 0,
address: str | None = None,
limit: int = 50,
) -> dict[str, Any]:
"""Read received data from SPP connections (cursor-based polling).
Returns buffered data events since the given index. Use the highest
returned index + 1 as since_index for the next poll to avoid
duplicates.
Args:
since_index: Return events with index >= this value.
Start at 0 for first call, then use last index + 1.
address: Filter by peer address (optional).
limit: Maximum number of events to return.
Returns:
List of data events with index, timestamp, address, and values.
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "events": [], "hint": "SPP not enabled"}
events = profile.get_recv_events(since_index=since_index, address=address, limit=limit)
return {
"status": "ok",
"count": len(events),
"events": events,
"next_index": events[-1]["index"] + 1 if events else since_index,
}
@mcp.tool()
async def bt_spp_clear_recv() -> dict[str, Any]:
"""Clear the SPP receive buffer.
Removes all buffered received data events. The index counter
continues from where it left off (not reset to 0).
Returns:
Count of events cleared.
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "cleared": 0}
count = profile.clear_recv_events()
return {"status": "ok", "cleared": count}