Compare commits

..

4 Commits

Author SHA1 Message Date
795010e495 Update docs site for mcbluetooth.warehack.ing deployment
Point site URL, social link, and edit links to warehack.ing Gitea.
2026-02-13 09:16:52 -07:00
45c1e57dfe Add SPP (Serial Port Profile) for classic Bluetooth serial over RFCOMM
Registers a Profile1 D-Bus endpoint with BlueZ for bidirectional raw
byte streams — same pattern as HFP AG minus the AT command layer.
Supports both server mode (accept inbound) and client mode (connect
outbound via ConnectProfile). Includes 8 MCP tools, 2 resources, and
cursor-based recv polling with deque(maxlen=500).
2026-02-12 18:27:49 -07:00
5dc5f640c7 Fix BlueZ GATT server registration and OBD-II mode 03/04 parsing
Add @dbus_property(PropertyAccess.READ) to GattServiceIface,
GattCharacteristicIface, and GattDescriptorIface so BlueZ can
validate objects via the standard Properties interface — fixes
"No valid service object found" error on RegisterApplication.

Also fix ELM327 emulator rejecting 2-char OBD commands (mode 03
for DTCs and mode 04 for clear) by padding to 4 chars.
2026-02-12 13:01:28 -07:00
d0f0565e62 Add BLE GATT server and ELM327 OBD-II emulator
Implement peripheral/GATT server support (14 tools) using BlueZ's
GattManager1 D-Bus API, enabling mcbluetooth to act as a BLE device
that advertises, accepts connections, and handles reads/writes/notifications.

Add ELM327 emulator (7 tools) that builds on the GATT server to emulate
a BLE OBD-II adapter over Nordic UART Service. Handles AT commands,
Mode 01/03/04/09 queries, configurable PIDs, DTCs, and VIN with
auto-response — no LLM round-trip needed for protocol handling.

New files:
- gatt_server.py: D-Bus ServiceInterface hierarchy + GattServerManager
- tools/gatt_server.py: MCP tool wrappers for server lifecycle
- tools/bt_elm327_emu.py: ELM327 protocol handler + NUS setup

Also adds 2 MCP resources (bluetooth://gatt/server, .../writes).
2026-02-12 12:50:58 -07:00
9 changed files with 2639 additions and 4 deletions

View File

@ -6,7 +6,7 @@ import icon from 'astro-icon';
// https://astro.build/config // https://astro.build/config
export default defineConfig({ export default defineConfig({
// Site URL for sitemap generation // Site URL for sitemap generation
site: 'https://mcbluetooth.supported.systems', site: 'https://mcbluetooth.warehack.ing',
// Disable telemetry // Disable telemetry
telemetry: false, telemetry: false,
@ -25,7 +25,7 @@ export default defineConfig({
replacesTitle: false, replacesTitle: false,
}, },
social: [ social: [
{ icon: 'github', label: 'GitHub', href: 'https://github.com/yourusername/mcbluetooth' }, { icon: 'github', label: 'Gitea', href: 'https://git.supported.systems/warehack.ing/mcbluetooth' },
], ],
customCss: [ customCss: [
'./src/styles/custom.css', './src/styles/custom.css',
@ -80,7 +80,7 @@ export default defineConfig({
}, },
], ],
editLink: { editLink: {
baseUrl: 'https://github.com/yourusername/mcbluetooth/edit/main/docs-site/', baseUrl: 'https://git.supported.systems/warehack.ing/mcbluetooth/_edit/main/docs-site/',
}, },
}), }),
], ],

View File

@ -69,6 +69,8 @@ ignore = ["E501"]
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations # dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"] "src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
"src/mcbluetooth/gatt_server.py" = ["F821", "F722"]
"src/mcbluetooth/spp.py" = ["F821", "F722"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"

View File

@ -0,0 +1,756 @@
"""BLE GATT Server implementation for BlueZ.
Registers as a BLE peripheral via BlueZ's GattManager1 D-Bus API, allowing
mcbluetooth to act as a GATT server. Remote BLE central devices can discover
services, read/write characteristics, and subscribe to notifications.
D-Bus hierarchy exported to BlueZ:
/mcbluetooth/gatt ObjectManager
/mcbluetooth/gatt/serviceN GattService1
/mcbluetooth/gatt/serviceN/charM GattCharacteristic1
/mcbluetooth/gatt/serviceN/charM/descK GattDescriptor1
/mcbluetooth/gatt/adv0 LEAdvertisement1
Reference patterns: agent.py (ServiceInterface, pending requests),
hfp_ag.py (dedicated bus, module singleton, ProfileManager registration).
"""
import asyncio
import logging
from collections import deque
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from dbus_fast import BusType, Message, MessageType, Variant
from dbus_fast.aio import MessageBus
from dbus_fast.service import PropertyAccess, ServiceInterface, dbus_property, method
log = logging.getLogger(__name__)
BLUEZ_SERVICE = "org.bluez"
GATT_MANAGER_IFACE = "org.bluez.GattManager1"
LE_ADV_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1"
APP_BASE_PATH = "/mcbluetooth/gatt"
ADV_PATH = "/mcbluetooth/gatt/adv0"
# ==================== Data Classes ====================
@dataclass
class WriteEvent:
"""A write received from a remote BLE central device."""
index: int
timestamp: str
char_id: str
char_uuid: str
value: bytes
value_hex: str
value_string: str | None
device: str | None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"index": self.index,
"timestamp": self.timestamp,
"char_id": self.char_id,
"char_uuid": self.char_uuid,
"value_hex": self.value_hex,
"length": len(self.value),
}
if self.value_string is not None:
d["value_string"] = self.value_string
if self.device:
d["device"] = self.device
return d
@dataclass
class ServerCharacteristic:
"""Internal state for a server-side GATT characteristic."""
char_id: str
path: str
uuid: str
flags: list[str]
value: bytes = b""
notifying: bool = False
service_id: str = ""
desc_count: int = 0
dbus_obj: Any = field(default=None, repr=False)
@dataclass
class ServerDescriptor:
"""Internal state for a server-side GATT descriptor."""
desc_id: str
path: str
uuid: str
flags: list[str]
value: bytes = b""
char_id: str = ""
dbus_obj: Any = field(default=None, repr=False)
@dataclass
class ServerService:
"""Internal state for a server-side GATT service."""
service_id: str
path: str
uuid: str
primary: bool = True
char_count: int = 0
characteristics: dict[str, ServerCharacteristic] = field(default_factory=dict)
dbus_obj: Any = field(default=None, repr=False)
# ==================== D-Bus Interfaces ====================
class GattApplication(ServiceInterface):
"""ObjectManager for the GATT application root.
BlueZ calls GetManagedObjects() on this path to discover all
services, characteristics, and descriptors we're hosting.
"""
def __init__(self, manager: "GattServerManager"):
super().__init__("org.freedesktop.DBus.ObjectManager")
self._manager = manager
@method()
def GetManagedObjects(self) -> "a{oa{sa{sv}}}": # noqa: F821
return self._manager._build_managed_objects()
class GattServiceIface(ServiceInterface):
"""org.bluez.GattService1 — exported at each service path.
Properties must be exposed via @dbus_property so BlueZ can validate
the service object through the standard Properties interface.
"""
def __init__(self, uuid: str, primary: bool):
super().__init__("org.bluez.GattService1")
self._uuid = uuid
self._primary = primary
@dbus_property(PropertyAccess.READ)
def UUID(self) -> "s": # noqa: F821
return self._uuid
@dbus_property(PropertyAccess.READ)
def Primary(self) -> "b": # noqa: F821
return self._primary
class GattCharacteristicIface(ServiceInterface):
"""org.bluez.GattCharacteristic1 — handles reads/writes from remotes.
ReadValue returns the stored value. WriteValue stores the value AND
records a WriteEvent for LLM polling (like agent.py's pending_requests).
Properties exposed via @dbus_property for BlueZ introspection.
"""
def __init__(self, char: ServerCharacteristic, manager: "GattServerManager"):
super().__init__("org.bluez.GattCharacteristic1")
self._char = char
self._manager = manager
@dbus_property(PropertyAccess.READ)
def UUID(self) -> "s": # noqa: F821
return self._char.uuid
@dbus_property(PropertyAccess.READ)
def Service(self) -> "o": # noqa: F821
return f"{APP_BASE_PATH}/{self._char.service_id}"
@dbus_property(PropertyAccess.READ)
def Flags(self) -> "as": # noqa: F821
return self._char.flags
@method()
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
offset = 0
opt_offset = options.get("offset")
if opt_offset is not None:
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
return list(self._char.value[offset:])
@method()
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
new_bytes = bytes(value)
offset = 0
opt_offset = options.get("offset")
if opt_offset is not None:
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
if offset > 0:
self._char.value = self._char.value[:offset] + new_bytes
else:
self._char.value = new_bytes
device = None
opt_device = options.get("device")
if opt_device is not None:
device = opt_device.value if hasattr(opt_device, "value") else str(opt_device)
self._manager._record_write(self._char.char_id, self._char.uuid, self._char.value, device)
@method()
def StartNotify(self) -> None:
self._char.notifying = True
log.info("GATT server: StartNotify on %s (%s)", self._char.char_id, self._char.uuid)
@method()
def StopNotify(self) -> None:
self._char.notifying = False
log.info("GATT server: StopNotify on %s (%s)", self._char.char_id, self._char.uuid)
class GattDescriptorIface(ServiceInterface):
"""org.bluez.GattDescriptor1 — handles reads/writes on descriptors.
Properties exposed via @dbus_property for BlueZ introspection.
"""
def __init__(self, desc: ServerDescriptor):
super().__init__("org.bluez.GattDescriptor1")
self._desc = desc
@dbus_property(PropertyAccess.READ)
def UUID(self) -> "s": # noqa: F821
return self._desc.uuid
@dbus_property(PropertyAccess.READ)
def Characteristic(self) -> "o": # noqa: F821
return f"{APP_BASE_PATH}/{self._desc.char_id}"
@dbus_property(PropertyAccess.READ)
def Flags(self) -> "as": # noqa: F821
return self._desc.flags
@method()
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
offset = 0
opt_offset = options.get("offset")
if opt_offset is not None:
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
return list(self._desc.value[offset:])
@method()
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
self._desc.value = bytes(value)
class LEAdvertisementIface(ServiceInterface):
"""org.bluez.LEAdvertisement1 — BLE advertising data.
BlueZ reads advertisement properties via Properties.GetAll, so these
must be exposed as @dbus_property (unlike GATT objects which use
GetManagedObjects).
"""
def __init__(self, adv_type: str, local_name: str, service_uuids: list[str]):
super().__init__("org.bluez.LEAdvertisement1")
self._type = adv_type
self._local_name = local_name
self._service_uuids = service_uuids
@method()
def Release(self) -> None:
log.info("GATT server: advertisement released by BlueZ")
@dbus_property(PropertyAccess.READ)
def Type(self) -> "s": # noqa: F821
return self._type
@dbus_property(PropertyAccess.READ)
def LocalName(self) -> "s": # noqa: F821
return self._local_name
@dbus_property(PropertyAccess.READ)
def ServiceUUIDs(self) -> "as": # noqa: F821
return self._service_uuids
# ==================== GATT Server Manager ====================
class GattServerManager:
"""Manages the GATT server lifecycle and state.
Coordinates D-Bus object creation, BlueZ registration,
advertising, and write event buffering.
"""
def __init__(self):
self._bus: MessageBus | None = None
self._app: GattApplication | None = None
self._adv: LEAdvertisementIface | None = None
self._services: dict[str, ServerService] = {}
self._characteristics: dict[str, ServerCharacteristic] = {}
self._descriptors: dict[str, ServerDescriptor] = {}
self._write_events: deque[WriteEvent] = deque(maxlen=200)
self._write_index: int = 0
self._service_count: int = 0
self._registered: bool = False
self._advertising: bool = False
self._write_callback: Any = None # async callable(char_id, value, device)
# ---- Service construction ----
def add_service(self, uuid: str, primary: bool = True) -> str:
"""Add a GATT service. Returns the service_id (e.g. 'service0')."""
service_id = f"service{self._service_count}"
self._service_count += 1
path = f"{APP_BASE_PATH}/{service_id}"
svc = ServerService(
service_id=service_id,
path=path,
uuid=uuid,
primary=primary,
)
svc.dbus_obj = GattServiceIface(uuid, primary)
self._services[service_id] = svc
log.info("GATT server: added service %s (UUID=%s)", service_id, uuid)
return service_id
def add_characteristic(
self,
service_id: str,
uuid: str,
flags: list[str],
value: bytes = b"",
) -> str:
"""Add a characteristic to a service. Returns the char_id."""
svc = self._services.get(service_id)
if not svc:
raise ValueError(f"Service '{service_id}' not found")
char_idx = svc.char_count
svc.char_count += 1
char_id = f"{service_id}/char{char_idx}"
path = f"{APP_BASE_PATH}/{char_id}"
char = ServerCharacteristic(
char_id=char_id,
path=path,
uuid=uuid,
flags=flags,
value=value,
service_id=service_id,
)
char.dbus_obj = GattCharacteristicIface(char, self)
svc.characteristics[char_id] = char
self._characteristics[char_id] = char
log.info(
"GATT server: added characteristic %s (UUID=%s, flags=%s)",
char_id,
uuid,
flags,
)
return char_id
def add_descriptor(
self,
char_id: str,
uuid: str,
flags: list[str],
value: bytes = b"",
) -> str:
"""Add a descriptor to a characteristic. Returns the desc_id."""
char = self._characteristics.get(char_id)
if not char:
raise ValueError(f"Characteristic '{char_id}' not found")
desc_idx = char.desc_count
char.desc_count += 1
desc_id = f"{char_id}/desc{desc_idx}"
path = f"{APP_BASE_PATH}/{desc_id}"
desc = ServerDescriptor(
desc_id=desc_id,
path=path,
uuid=uuid,
flags=flags,
value=value,
char_id=char_id,
)
desc.dbus_obj = GattDescriptorIface(desc)
self._descriptors[desc_id] = desc
log.info("GATT server: added descriptor %s (UUID=%s)", desc_id, uuid)
return desc_id
def clear(self) -> None:
"""Remove all services, characteristics, and descriptors."""
if self._registered:
raise RuntimeError("Cannot clear while registered — unregister first")
self._services.clear()
self._characteristics.clear()
self._descriptors.clear()
self._service_count = 0
# ---- Registration with BlueZ ----
async def register(self, adapter: str) -> None:
"""Register the GATT application with BlueZ GattManager1."""
if self._registered:
return
if not self._services:
raise ValueError("No services defined — add at least one service first")
# Dedicated bus connection (like hfp_ag.py)
if not self._bus:
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
# Export ObjectManager at application root
self._app = GattApplication(self)
self._bus.export(APP_BASE_PATH, self._app)
# Export all service/characteristic/descriptor D-Bus objects
for svc in self._services.values():
self._bus.export(svc.path, svc.dbus_obj)
for char in svc.characteristics.values():
self._bus.export(char.path, char.dbus_obj)
for desc in self._descriptors.values():
self._bus.export(desc.path, desc.dbus_obj)
# Register with BlueZ
adapter_path = f"/org/bluez/{adapter}"
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
try:
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
self._registered = True
log.info("GATT server: registered with BlueZ (%s)", adapter)
except Exception as e:
if "Already Exists" in str(e):
log.info("GATT server: stale registration — re-registering")
try:
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
except Exception:
pass
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
self._registered = True
log.info("GATT server: re-registered with BlueZ (%s)", adapter)
else:
raise
async def unregister(self, adapter: str) -> None:
"""Unregister the GATT application from BlueZ."""
if self._advertising:
await self.set_advertising(adapter, False)
if self._bus and self._registered:
try:
adapter_path = f"/org/bluez/{adapter}"
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
except Exception:
pass
self._registered = False
# Disconnect bus (cleans up all exports)
if self._bus:
self._bus.disconnect()
self._bus = None
self._app = None
self._adv = None
log.info("GATT server: unregistered")
# ---- Advertising ----
async def set_advertising(
self,
adapter: str,
enable: bool,
name: str = "mcbluetooth",
service_uuids: list[str] | None = None,
) -> None:
"""Start or stop BLE advertising."""
if not self._bus:
raise RuntimeError("Bus not connected — register first")
adapter_path = f"/org/bluez/{adapter}"
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
adv_mgr = proxy.get_interface(LE_ADV_MANAGER_IFACE)
if enable:
if self._advertising:
return
uuids = service_uuids or [svc.uuid for svc in self._services.values()]
self._adv = LEAdvertisementIface("peripheral", name, uuids)
self._bus.export(ADV_PATH, self._adv)
try:
await adv_mgr.call_register_advertisement(ADV_PATH, {})
self._advertising = True
log.info("GATT server: advertising started (name=%s)", name)
except Exception as e:
if "Already Exists" in str(e):
try:
await adv_mgr.call_unregister_advertisement(ADV_PATH)
except Exception:
pass
await adv_mgr.call_register_advertisement(ADV_PATH, {})
self._advertising = True
log.info("GATT server: advertising restarted (name=%s)", name)
else:
raise
else:
if not self._advertising:
return
try:
await adv_mgr.call_unregister_advertisement(ADV_PATH)
except Exception:
pass
self._advertising = False
self._adv = None
log.info("GATT server: advertising stopped")
# ---- Value management ----
def set_value(self, char_id: str, value: bytes) -> None:
"""Set characteristic value and auto-notify if subscribed."""
char = self._characteristics.get(char_id)
if not char:
raise ValueError(f"Characteristic '{char_id}' not found")
char.value = value
if char.notifying:
self._emit_notification(char)
def get_value(self, char_id: str) -> bytes:
"""Get current characteristic value."""
char = self._characteristics.get(char_id)
if not char:
raise ValueError(f"Characteristic '{char_id}' not found")
return char.value
def notify(self, char_id: str) -> bool:
"""Explicitly send notification for current value."""
char = self._characteristics.get(char_id)
if not char:
raise ValueError(f"Characteristic '{char_id}' not found")
if not char.notifying:
return False
self._emit_notification(char)
return True
def _emit_notification(self, char: ServerCharacteristic) -> None:
"""Emit PropertiesChanged signal to trigger BLE notification."""
if not self._bus:
return
msg = Message(
message_type=MessageType.SIGNAL,
path=char.path,
interface="org.freedesktop.DBus.Properties",
member="PropertiesChanged",
signature="sa{sv}as",
body=[
"org.bluez.GattCharacteristic1",
{"Value": Variant("ay", list(char.value))},
[],
],
)
self._bus.send_message(msg)
log.debug("GATT server: notification sent on %s (%d bytes)", char.char_id, len(char.value))
# ---- Write event monitoring ----
def _record_write(self, char_id: str, char_uuid: str, value: bytes, device: str | None) -> None:
"""Record a write event and fire callback if registered."""
event = WriteEvent(
index=self._write_index,
timestamp=datetime.now(UTC).isoformat(),
char_id=char_id,
char_uuid=char_uuid,
value=value,
value_hex=value.hex(),
value_string=_try_decode(value),
device=device,
)
self._write_events.append(event)
self._write_index += 1
log.debug(
"GATT server: write on %s from %s (%d bytes)",
char_id,
device or "unknown",
len(value),
)
# Fire async callback (e.g. ELM327 emulator auto-responder)
if self._write_callback:
try:
loop = asyncio.get_running_loop()
loop.create_task(self._write_callback(char_id, value, device))
except RuntimeError:
pass
def get_write_events(
self,
since_index: int = 0,
char_id: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Get write events, optionally filtered."""
events = [
e
for e in self._write_events
if e.index >= since_index and (char_id is None or e.char_id == char_id)
]
return [e.to_dict() for e in events[:limit]]
def clear_write_events(self) -> int:
"""Clear all write events. Returns count cleared."""
count = len(self._write_events)
self._write_events.clear()
return count
# ---- GetManagedObjects builder ----
def _build_managed_objects(self) -> dict[str, dict[str, dict[str, Variant]]]:
"""Build the response for ObjectManager.GetManagedObjects().
Returns the complete D-Bus object hierarchy that BlueZ uses
to discover our GATT services, characteristics, and descriptors.
"""
objects: dict[str, dict[str, dict[str, Variant]]] = {}
for svc in self._services.values():
char_paths = [c.path for c in svc.characteristics.values()]
objects[svc.path] = {
"org.bluez.GattService1": {
"UUID": Variant("s", svc.uuid),
"Primary": Variant("b", svc.primary),
"Characteristics": Variant("ao", char_paths),
}
}
for char in svc.characteristics.values():
desc_paths = [
d.path for d in self._descriptors.values() if d.char_id == char.char_id
]
objects[char.path] = {
"org.bluez.GattCharacteristic1": {
"UUID": Variant("s", char.uuid),
"Service": Variant("o", svc.path),
"Flags": Variant("as", char.flags),
"Descriptors": Variant("ao", desc_paths),
}
}
for desc in self._descriptors.values():
char = self._characteristics.get(desc.char_id)
if char:
objects[desc.path] = {
"org.bluez.GattDescriptor1": {
"UUID": Variant("s", desc.uuid),
"Characteristic": Variant("o", char.path),
"Flags": Variant("as", desc.flags),
}
}
return objects
# ---- Status ----
def get_status(self) -> dict[str, Any]:
"""Get overall GATT server status."""
services = []
for svc in self._services.values():
chars = []
for char in svc.characteristics.values():
descs = [
{
"desc_id": d.desc_id,
"uuid": d.uuid,
"flags": d.flags,
}
for d in self._descriptors.values()
if d.char_id == char.char_id
]
chars.append(
{
"char_id": char.char_id,
"uuid": char.uuid,
"flags": char.flags,
"value_length": len(char.value),
"notifying": char.notifying,
"descriptors": descs,
}
)
services.append(
{
"service_id": svc.service_id,
"uuid": svc.uuid,
"primary": svc.primary,
"characteristics": chars,
}
)
return {
"registered": self._registered,
"advertising": self._advertising,
"services": services,
"write_event_count": len(self._write_events),
"write_event_total": self._write_index,
}
# ==================== Module-level lifecycle ====================
_manager: GattServerManager | None = None
def get_gatt_server() -> GattServerManager:
"""Get or create the global GATT server manager (singleton)."""
global _manager
if _manager is None:
_manager = GattServerManager()
return _manager
async def shutdown_gatt_server(adapter: str) -> None:
"""Shut down the GATT server completely."""
global _manager
if _manager:
try:
await _manager.unregister(adapter)
except Exception:
pass
_manager = None
# ==================== Helpers ====================
def _try_decode(value: bytes) -> str | None:
"""Try to decode bytes as printable UTF-8."""
try:
decoded = value.decode("utf-8")
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
return decoded
return None
except (UnicodeDecodeError, ValueError):
return None

View File

@ -17,6 +17,7 @@ from dataclasses import asdict
from fastmcp import FastMCP from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client, get_notify_manager from mcbluetooth.dbus_client import get_client, get_notify_manager
from mcbluetooth.gatt_server import get_gatt_server
def register_resources(mcp: FastMCP) -> None: def register_resources(mcp: FastMCP) -> None:
@ -307,3 +308,99 @@ def register_resources(mcp: FastMCP) -> None:
}, },
indent=2, indent=2,
) )
# ==================== SPP Resources ====================
@mcp.resource(
"bluetooth://spp/connections",
name="SPP Connections",
description=(
"Active SPP (Serial Port Profile) connections with role, "
"duration, and byte counters for each peer."
),
mime_type="application/json",
)
async def resource_spp_connections() -> str:
"""Get active SPP connections."""
from mcbluetooth.spp import get_spp
profile = await get_spp()
if not profile:
return json.dumps({"registered": False, "connections": []})
status = profile.get_status()
return json.dumps(
{
"registered": status["registered"],
"uuid": status["uuid"],
"connections": status["connections"],
},
indent=2,
)
@mcp.resource(
"bluetooth://spp/data",
name="SPP Received Data",
description=(
"Recent data received from SPP peers. Returns the last 50 "
"data events with timestamps, addresses, and hex/string values."
),
mime_type="application/json",
)
async def resource_spp_data() -> str:
"""Get recent SPP received data events."""
from mcbluetooth.spp import get_spp
profile = await get_spp()
if not profile:
return json.dumps({"count": 0, "events": [], "hint": "SPP not enabled"})
events = profile.get_recv_events(since_index=0, limit=50)
return json.dumps(
{
"count": len(events),
"total": profile._recv_index,
"events": events,
},
indent=2,
)
# ==================== GATT Server Resources ====================
@mcp.resource(
"bluetooth://gatt/server",
name="GATT Server State",
description=(
"Current state of the BLE GATT server (peripheral mode). "
"Shows registered services, characteristics, advertising status, "
"and write event statistics."
),
mime_type="application/json",
)
async def resource_gatt_server() -> str:
"""Get GATT server state."""
mgr = get_gatt_server()
return json.dumps(mgr.get_status(), indent=2)
@mcp.resource(
"bluetooth://gatt/server/writes",
name="GATT Server Write Events",
description=(
"Write events received from remote BLE central devices. "
"Shows the most recent writes to server characteristics "
"with timestamps, values, and source device info."
),
mime_type="application/json",
)
async def resource_gatt_server_writes() -> str:
"""Get recent write events from remote clients."""
mgr = get_gatt_server()
events = mgr.get_write_events(since_index=0, limit=50)
return json.dumps(
{
"count": len(events),
"total": mgr._write_index,
"events": events,
},
indent=2,
)

View File

@ -3,7 +3,18 @@
from fastmcp import FastMCP from fastmcp import FastMCP
from mcbluetooth import resources from mcbluetooth import resources
from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex from mcbluetooth.tools import (
adapter,
audio,
ble,
bt_elm327_emu,
device,
gatt_server,
hfp,
monitor,
obex,
spp,
)
mcp = FastMCP( mcp = FastMCP(
name="mcbluetooth", name="mcbluetooth",
@ -36,10 +47,41 @@ To capture BLE notifications:
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource 3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
4. Subscribe to the resource for real-time updates (client-side) 4. Subscribe to the resource for real-time updates (client-side)
### GATT Server Resources (peripheral mode)
- bluetooth://gatt/server - GATT server state (registered, services, advertising)
- bluetooth://gatt/server/writes - Write events from remote clients
To act as a BLE peripheral (GATT server):
1. Add services: bt_gatt_server_add_service(uuid)
2. Add characteristics: bt_gatt_server_add_characteristic(service_id, uuid, flags)
3. Register: bt_gatt_server_register(adapter)
4. Advertise: bt_gatt_server_advertise(adapter, enable=True, name="MyDevice")
5. Monitor writes: bt_gatt_server_read_writes(since_index=0)
6. Respond: bt_gatt_server_set_value(char_id, value) auto-notifies subscribers
### ELM327 OBD-II Emulator
For quick OBD-II BLE adapter emulation (Nordic UART Service):
1. bt_elm327_emu_start(adapter, name="OBDII") sets up NUS, advertises
2. bt_elm327_emu_set_pid(0x0C, "0BB8", "hex") set RPM to 750
3. bt_elm327_emu_read_commands() see what the OBD app sent
4. bt_elm327_emu_stop(adapter) clean up
## Tools ## Tools
All tools require an explicit 'adapter' parameter (e.g., "hci0"). All tools require an explicit 'adapter' parameter (e.g., "hci0").
Use bt_list_adapters() to discover available adapters. Use bt_list_adapters() to discover available adapters.
### SPP (Serial Port Profile) — Classic Bluetooth Serial
- bluetooth://spp/connections Active SPP connections
- bluetooth://spp/data Recent received data events
For classic Bluetooth serial (RFCOMM):
1. bt_spp_enable() register profile
2. Server: make discoverable, remote connects automatically
3. Client: bt_spp_connect(adapter, address)
4. Send: bt_spp_send(address, "ATZ\\r\\n", "string")
5. Receive: bt_spp_recv(since_index=0) cursor-based polling
6. Disconnect: bt_spp_disconnect(address)
For pairing, use pairing_mode parameter: For pairing, use pairing_mode parameter:
- "elicit": Use MCP elicitation to request PIN from user (preferred) - "elicit": Use MCP elicitation to request PIN from user (preferred)
- "interactive": Return awaiting status, then call bt_pair_confirm - "interactive": Return awaiting status, then call bt_pair_confirm
@ -56,8 +98,11 @@ device.register_tools(mcp)
audio.register_tools(mcp) audio.register_tools(mcp)
hfp.register_tools(mcp) hfp.register_tools(mcp)
ble.register_tools(mcp) ble.register_tools(mcp)
gatt_server.register_tools(mcp)
bt_elm327_emu.register_tools(mcp)
monitor.register_tools(mcp) monitor.register_tools(mcp)
obex.register_tools(mcp) obex.register_tools(mcp)
spp.register_tools(mcp)
def main(): def main():

419
src/mcbluetooth/spp.py Normal file
View File

@ -0,0 +1,419 @@
"""SPP (Serial Port Profile) implementation for BlueZ.
Registers as an SPP endpoint via BlueZ ProfileManager1, providing raw
bidirectional serial byte streams over RFCOMM. When a remote device
connects, BlueZ hands us the RFCOMM file descriptor through the Profile1
D-Bus interface the same pattern used by hfp_ag.py, minus the AT
command protocol layer.
Use cases: serial terminal to Arduino/ESP32, GPS receivers (NMEA),
legacy sensors, Bluetooth modems, any line-oriented protocol over
classic Bluetooth.
D-Bus flow:
1. RegisterProfile(SPP_UUID) with ProfileManager1
2. Remote connects BlueZ calls NewConnection(device, fd, props)
3. We dup the fd, wrap in async streams, read/write raw bytes
4. RequestDisconnection or EOF cleanup
"""
import asyncio
import logging
import os
import socket
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from dbus_fast import BusType, Variant
from dbus_fast.aio import MessageBus
from dbus_fast.service import ServiceInterface, method
log = logging.getLogger(__name__)
# Bluetooth socket constants — Python often compiled without bluetooth.h
_AF_BLUETOOTH = getattr(socket, "AF_BLUETOOTH", 31)
_BTPROTO_RFCOMM = getattr(socket, "BTPROTO_RFCOMM", 3)
# D-Bus constants
BLUEZ_SERVICE = "org.bluez"
PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1"
SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb"
SPP_PROFILE_PATH = "/mcbluetooth/spp"
def _try_decode(value: bytes) -> str | None:
"""Try to decode bytes as printable UTF-8."""
try:
decoded = value.decode("utf-8")
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
return decoded
return None
except (UnicodeDecodeError, ValueError):
return None
def _path_to_address(device_path: str) -> str:
parts = device_path.split("/")
if len(parts) >= 5 and parts[-1].startswith("dev_"):
return parts[-1][4:].replace("_", ":")
return device_path
@dataclass
class SPPDataEvent:
"""A chunk of data received from a remote SPP peer."""
index: int
timestamp: str
address: str
value: bytes
value_hex: str
value_string: str | None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"index": self.index,
"timestamp": self.timestamp,
"address": self.address,
"value_hex": self.value_hex,
"length": len(self.value),
}
if self.value_string is not None:
d["value_string"] = self.value_string
return d
@dataclass
class SPPConnection:
"""Per-peer connection state for an SPP session."""
device_path: str
address: str
role: str # "server" (they connected to us) or "client" (we connected to them)
fd: int
connected_at: float = field(default_factory=time.monotonic)
sock: socket.socket | None = None
reader: asyncio.StreamReader | None = None
writer: asyncio.StreamWriter | None = None
bytes_sent: int = 0
bytes_received: int = 0
_read_task: asyncio.Task | None = field(default=None, repr=False)
class SPPProfile(ServiceInterface):
"""D-Bus Profile1 service for SPP (Serial Port Profile)."""
def __init__(self) -> None:
super().__init__("org.bluez.Profile1")
self.connections: dict[str, SPPConnection] = {} # keyed by device_path
self._recv_events: deque[SPPDataEvent] = deque(maxlen=500)
self._recv_index: int = 0
self._recv_callback: Any = None # async callable(address, data)
@method()
def Release(self) -> None:
log.info("SPP profile released")
for conn in list(self.connections.values()):
self._cleanup_connection(conn)
self.connections.clear()
@method()
def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None:
address = _path_to_address(device)
log.debug("SPP NewConnection: device=%s fd=%r props=%s", device, fd, properties)
if fd is None or (isinstance(fd, int) and fd < 0):
log.error("SPP: invalid fd received: %r", fd)
return
log.info("SPP: NewConnection from %s (fd=%d)", address, fd)
# Duplicate the fd so we own it independent of dbus-fast
try:
new_fd = os.dup(fd)
log.debug("os.dup(%d) -> %d", fd, new_fd)
except OSError:
log.exception("SPP: os.dup(%d) failed for %s", fd, address)
return
conn = SPPConnection(
device_path=device,
address=address,
role="server",
fd=new_fd,
)
self.connections[device] = conn
loop = asyncio.get_event_loop()
conn._read_task = loop.create_task(self._handle_connection(conn))
log.debug("SPP NewConnection done, read task created for %s", address)
@method()
def RequestDisconnection(self, device: "o") -> None:
address = _path_to_address(device)
log.info("SPP: disconnect requested for %s", address)
conn = self.connections.pop(device, None)
if conn:
self._cleanup_connection(conn)
def _cleanup_connection(self, conn: SPPConnection) -> None:
if conn._read_task and not conn._read_task.done():
conn._read_task.cancel()
if conn.writer:
try:
conn.writer.close()
except Exception:
pass
if conn.sock:
try:
conn.sock.close()
except Exception:
pass
elif conn.fd >= 0:
try:
os.close(conn.fd)
except Exception:
pass
async def _handle_connection(self, conn: SPPConnection) -> None:
"""Read loop for raw bytes from the remote SPP peer."""
try:
log.debug("SPP _handle_connection start: addr=%s fd=%d", conn.address, conn.fd)
# socket.fromfd() dups the fd internally — close our intermediate copy
conn.sock = socket.fromfd(
conn.fd, _AF_BLUETOOTH, socket.SOCK_STREAM,
_BTPROTO_RFCOMM,
)
log.debug("socket.fromfd OK: fileno=%d", conn.sock.fileno())
try:
os.close(conn.fd)
except OSError:
pass
conn.fd = -1 # transferred to socket
conn.sock.setblocking(False)
conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock)
log.debug("SPP async streams ready for %s, entering read loop", conn.address)
while True:
data = await conn.reader.read(4096)
if not data:
log.debug("SPP: EOF from %s (clean disconnect)", conn.address)
break
log.debug("SPP recv %d bytes from %s: %r", len(data), conn.address, data[:80])
conn.bytes_received += len(data)
event = SPPDataEvent(
index=self._recv_index,
timestamp=datetime.now(UTC).isoformat(),
address=conn.address,
value=data,
value_hex=data.hex(),
value_string=_try_decode(data),
)
self._recv_events.append(event)
self._recv_index += 1
# Fire async callback if registered (auto-responders)
if self._recv_callback:
try:
loop = asyncio.get_running_loop()
loop.create_task(self._recv_callback(conn.address, data))
except RuntimeError:
pass
except (ConnectionResetError, BrokenPipeError, OSError) as e:
log.debug("SPP connection error for %s: %s: %s", conn.address, type(e).__name__, e)
except asyncio.CancelledError:
log.debug("SPP task cancelled for %s", conn.address)
except Exception:
log.exception("SPP UNEXPECTED error for %s", conn.address)
finally:
log.debug("SPP cleanup for %s", conn.address)
self.connections.pop(conn.device_path, None)
self._cleanup_connection(conn)
async def send(self, address: str, data: bytes) -> bool:
"""Send raw bytes to a connected SPP peer."""
conn = self._get_connection(address)
if not conn or not conn.writer or conn.writer.is_closing():
return False
conn.writer.write(data)
await conn.writer.drain()
conn.bytes_sent += len(data)
log.debug("SPP sent %d bytes to %s", len(data), address)
return True
def add_client_connection(self, device_path: str, address: str, fd: int) -> None:
"""Register a client-initiated connection (we connected to them).
Called after Device1.ConnectProfile triggers NewConnection on our
Profile1 handler. In client mode, BlueZ still delivers the fd via
NewConnection we just tag the role as "client" before the read
loop starts. However, BlueZ may have already fired NewConnection
by the time ConnectProfile returns, so this is a fallback to
retag if needed.
"""
conn = self.connections.get(device_path)
if conn:
conn.role = "client"
def _get_connection(self, address: str) -> SPPConnection | None:
for conn in self.connections.values():
if conn.address.upper() == address.upper():
return conn
return None
def get_recv_events(
self,
since_index: int = 0,
address: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Get received data events, optionally filtered."""
events = [
e
for e in self._recv_events
if e.index >= since_index
and (address is None or e.address.upper() == address.upper())
]
return [e.to_dict() for e in events[:limit]]
def clear_recv_events(self) -> int:
"""Clear all received data events. Returns count cleared."""
count = len(self._recv_events)
self._recv_events.clear()
return count
def get_status(self) -> dict[str, Any]:
"""Get overall SPP status."""
conns = []
now = time.monotonic()
for conn in self.connections.values():
conns.append({
"address": conn.address,
"role": conn.role,
"duration_seconds": round(now - conn.connected_at, 1),
"bytes_sent": conn.bytes_sent,
"bytes_received": conn.bytes_received,
})
return {
"registered": _profile_registered,
"uuid": _registered_uuid or SPP_UUID,
"connections": conns,
"recv_buffer_count": len(self._recv_events),
"recv_buffer_total": self._recv_index,
}
# ==================== Module-level lifecycle ====================
_profile: SPPProfile | None = None
_profile_bus: MessageBus | None = None
_profile_registered: bool = False
_registered_uuid: str | None = None
async def enable_spp(
uuid: str = SPP_UUID,
channel: int = 0,
name: str = "mcbluetooth SPP",
) -> SPPProfile:
"""Register the SPP profile with BlueZ.
Args:
uuid: Service UUID. Default is standard SPP UUID. Use custom UUIDs
for Arduino/ESP32 devices that advertise non-standard RFCOMM.
channel: RFCOMM channel number (0 = auto-assign).
name: Profile display name.
"""
global _profile, _profile_bus, _profile_registered, _registered_uuid
if _profile_registered and _profile:
return _profile
if _profile is None:
_profile = SPPProfile()
if _profile_bus is None:
_profile_bus = await MessageBus(
bus_type=BusType.SYSTEM,
negotiate_unix_fd=True, # Required: BlueZ passes RFCOMM fd via D-Bus
).connect()
log.debug("SPP D-Bus connected: negotiate_unix_fd=%s unique_name=%s",
_profile_bus._negotiate_unix_fd, _profile_bus.unique_name)
_profile_bus.export(SPP_PROFILE_PATH, _profile)
# Register with ProfileManager1
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
options: dict[str, Variant] = {
"Name": Variant("s", name),
"Role": Variant("s", "client-server"),
}
if channel > 0:
options["Channel"] = Variant("q", channel)
try:
await profile_mgr.call_register_profile(
SPP_PROFILE_PATH,
uuid,
options,
)
_profile_registered = True
_registered_uuid = uuid
log.info("SPP profile registered with BlueZ (uuid=%s, channel=%s)", uuid, channel or "auto")
except Exception as e:
if "Already Exists" in str(e):
log.info("SPP profile stale — unregistering and re-registering")
try:
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
except Exception:
pass
await profile_mgr.call_register_profile(
SPP_PROFILE_PATH,
uuid,
options,
)
_profile_registered = True
_registered_uuid = uuid
log.info("SPP profile re-registered with BlueZ")
else:
raise
return _profile
async def disable_spp() -> None:
"""Unregister the SPP profile and close all connections."""
global _profile, _profile_bus, _profile_registered, _registered_uuid
if _profile_bus and _profile_registered:
try:
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
except Exception:
pass
_profile_registered = False
_registered_uuid = None
if _profile:
_profile.Release()
_profile = None
if _profile_bus:
_profile_bus.disconnect()
_profile_bus = None
async def get_spp() -> SPPProfile | None:
"""Get the current SPP profile instance (None if not enabled)."""
return _profile

View File

@ -0,0 +1,681 @@
"""ELM327 OBD-II BLE Emulator.
Emulates an ELM327 v1.5 adapter over BLE using Nordic UART Service (NUS).
OBD-II apps (Torque, Car Scanner, etc.) connect via BLE, send AT commands
and OBD-II PID queries, and receive simulated responses.
The emulator handles the full ELM327 protocol:
- AT command set (ATZ, ATE0, ATH1, ATSP, etc.)
- Mode 01 (live data) with configurable PID values
- Mode 03 (stored DTCs)
- Mode 09 (VIN query)
- Supported PID bitmaps auto-generated from configured PIDs
NUS UUIDs (same as real BLE OBD adapters):
Service: 6E400001-B5A3-F393-E0A9-E50E24DCCA9E
RX (write): 6E400002-... (client writes commands here)
TX (notify): 6E400003-... (server sends responses here)
"""
from __future__ import annotations
import logging
from collections import deque
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from fastmcp import FastMCP
from mcbluetooth.gatt_server import get_gatt_server
log = logging.getLogger(__name__)
# Nordic UART Service UUIDs
NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
NUS_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # Client writes here
NUS_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # Server notifies here
# Default ECU header (primary ECU response address)
ECU_HEADER = "7E8"
# Common OBD-II PIDs with default values
DEFAULT_PIDS: dict[int, tuple[bytes, str]] = {
0x05: (bytes([70]), "Coolant temp (°C, offset -40) → 30°C"),
0x0C: (bytes([0x0B, 0xB8]), "Engine RPM (× ¼) → 750 RPM"),
0x0D: (bytes([0]), "Vehicle speed (km/h) → 0"),
0x0F: (bytes([55]), "Intake air temp (°C, offset -40) → 15°C"),
0x11: (bytes([25]), "Throttle position (%) → ~10%"),
0x1F: (bytes([0x00, 0x00]), "Runtime since start (sec) → 0"),
0x2F: (bytes([51]), "Fuel tank level (%) → 20%"),
0x46: (bytes([60]), "Ambient air temp (°C, offset -40) → 20°C"),
}
@dataclass
class CommandRecord:
"""A command received from the OBD-II client."""
index: int
timestamp: str
raw: str
parsed: str
response: str
class ELM327Emulator:
"""ELM327 AT command and OBD-II protocol handler."""
def __init__(self):
self.device_id = "ELM327 v1.5"
self.protocol = "6" # ISO 15765-4 CAN (11/500)
self.echo = False
self.headers = True
self.spaces = False
self.linefeed = False
self.pid_values: dict[int, bytes] = {}
self.dtc_codes: list[str] = []
self.vin = "1MCBT00TESTING12345"
self._commands: deque[CommandRecord] = deque(maxlen=200)
self._cmd_index = 0
self._started = False
self._rx_char_id: str | None = None
self._tx_char_id: str | None = None
self._adapter: str | None = None
self._buf = "" # Accumulates partial writes
def set_defaults(self) -> None:
"""Load default PID values for a realistic idle vehicle."""
for pid, (value, _desc) in DEFAULT_PIDS.items():
self.pid_values[pid] = value
def handle_data(self, data: bytes) -> str | None:
"""Process incoming bytes, return response when a complete command is received.
ELM327 commands are terminated with CR (\\r). Accumulates partial
writes until a complete command is received.
"""
try:
text = data.decode("utf-8", errors="replace")
except Exception:
return None
self._buf += text
# Check for command terminator
if "\r" not in self._buf:
return None
# Split on CR, process first complete command
line, self._buf = self._buf.split("\r", 1)
line = line.strip()
if not line:
return None
response = self._process_command(line)
# Record command
record = CommandRecord(
index=self._cmd_index,
timestamp=datetime.now(UTC).isoformat(),
raw=line,
parsed=line.upper().strip(),
response=response.replace("\r", "\\r").replace("\n", "\\n"),
)
self._commands.append(record)
self._cmd_index += 1
return response
def _process_command(self, line: str) -> str:
"""Parse and handle an AT or OBD command."""
cmd = line.strip().upper()
if cmd.startswith("AT"):
return self._handle_at(cmd)
# OBD-II query
return self._handle_obd(cmd)
def _handle_at(self, cmd: str) -> str:
"""Handle ELM327 AT commands."""
cr = "\r\n" if self.linefeed else "\r"
if cmd == "ATZ":
self.echo = False
self.headers = True
self.spaces = False
return f"{cr}{self.device_id}{cr}{cr}>"
if cmd == "ATI":
return f"{self.device_id}{cr}{cr}>"
if cmd == "AT@1":
return f"mcbluetooth OBD-II Emulator{cr}{cr}>"
if cmd == "ATE0":
self.echo = False
return f"OK{cr}{cr}>"
if cmd == "ATE1":
self.echo = True
return f"OK{cr}{cr}>"
if cmd == "ATH0":
self.headers = False
return f"OK{cr}{cr}>"
if cmd == "ATH1":
self.headers = True
return f"OK{cr}{cr}>"
if cmd == "ATS0":
self.spaces = False
return f"OK{cr}{cr}>"
if cmd == "ATS1":
self.spaces = True
return f"OK{cr}{cr}>"
if cmd.startswith("ATL"):
self.linefeed = cmd.endswith("1")
cr = "\r\n" if self.linefeed else "\r"
return f"OK{cr}{cr}>"
if cmd == "ATDPN":
return f"A{self.protocol}{cr}{cr}>"
if cmd == "ATDP":
protocols = {
"0": "AUTO",
"6": "ISO 15765-4 (CAN 11/500)",
"7": "ISO 15765-4 (CAN 29/500)",
"8": "ISO 15765-4 (CAN 11/250)",
"9": "ISO 15765-4 (CAN 29/250)",
}
name = protocols.get(self.protocol, "AUTO")
return f"{name}{cr}{cr}>"
if cmd == "ATRV":
return f"12.6V{cr}{cr}>"
# Commands that just get OK
ok_prefixes = (
"ATSP",
"ATST",
"ATAT",
"ATM",
"ATCAF",
"ATD",
"ATCE",
"ATCF",
"ATCM",
"ATSH",
"ATFC",
"ATWM",
"ATMA",
"ATAR",
"ATPC",
)
for prefix in ok_prefixes:
if cmd.startswith(prefix):
return f"OK{cr}{cr}>"
if cmd == "AT":
return f"OK{cr}{cr}>"
return f"?{cr}{cr}>"
def _handle_obd(self, cmd: str) -> str:
"""Handle OBD-II service queries."""
cr = "\r\n" if self.linefeed else "\r"
cmd = cmd.replace(" ", "")
# Modes 03/04 don't require a PID — pad to 4 chars
if len(cmd) == 2:
cmd += "00"
elif len(cmd) < 4:
return f"?{cr}{cr}>"
try:
mode = int(cmd[:2], 16)
pid = int(cmd[2:4], 16)
except ValueError:
return f"?{cr}{cr}>"
if mode == 0x01:
return self._handle_mode01(pid) + f"{cr}>"
if mode == 0x03:
return self._handle_mode03() + f"{cr}>"
if mode == 0x04:
self.dtc_codes.clear()
return f"44{cr}{cr}>"
if mode == 0x09:
return self._handle_mode09(pid) + f"{cr}>"
return f"NO DATA{cr}{cr}>"
def _handle_mode01(self, pid: int) -> str:
"""Handle Mode 01 (current data) queries."""
cr = "\r\n" if self.linefeed else "\r"
# Supported PID bitmaps
if pid in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0):
bitmap = self._build_pid_bitmap(pid)
return self._format_response(0x41, pid, bitmap)
# Configured PID value
if pid in self.pid_values:
return self._format_response(0x41, pid, self.pid_values[pid])
return f"NO DATA{cr}"
def _handle_mode03(self) -> str:
"""Handle Mode 03 (stored DTCs) query."""
if not self.dtc_codes:
return self._format_response(0x43, 0x00, bytes([0x00, 0x00, 0x00, 0x00]))
# Pack DTCs into response (2 bytes each)
dtc_bytes = bytearray()
for code in self.dtc_codes[:3]: # Max 3 per frame
dtc_bytes.extend(self._encode_dtc(code))
# Pad to even number of DTCs
while len(dtc_bytes) % 4 != 0:
dtc_bytes.extend(b"\x00\x00")
return self._format_response(0x43, len(self.dtc_codes), bytes(dtc_bytes))
def _handle_mode09(self, pid: int) -> str:
"""Handle Mode 09 (vehicle info) queries."""
cr = "\r\n" if self.linefeed else "\r"
if pid == 0x00:
# Supported Mode 09 PIDs: 02 (VIN)
return self._format_response(0x49, 0x00, bytes([0x40, 0x00, 0x00, 0x00]))
if pid == 0x02:
# VIN (17 chars, multi-frame)
vin_bytes = self.vin.encode("ascii")[:17].ljust(17, b"0")
if self.headers:
lines = []
# First frame: 10 14 49 02 01 VIN[0:3]
lines.append(
f"{ECU_HEADER} 10 14 49 02 01 " + " ".join(f"{b:02X}" for b in vin_bytes[:3])
)
# Consecutive frames
offset = 3
seq = 1
while offset < len(vin_bytes):
chunk = vin_bytes[offset : offset + 7]
frame = f"{ECU_HEADER} 2{seq:01X} " + " ".join(f"{b:02X}" for b in chunk)
lines.append(frame)
offset += 7
seq = (seq + 1) & 0x0F
return cr.join(lines) + cr
else:
return f"4902{vin_bytes.hex().upper()}{cr}"
return f"NO DATA{cr}"
def _format_response(self, mode_resp: int, pid: int, data: bytes) -> str:
"""Format an OBD-II response with optional headers."""
cr = "\r\n" if self.linefeed else "\r"
length = 2 + len(data) # mode + pid + data
if self.headers:
sep = " " if self.spaces else ""
data_hex = sep.join(f"{b:02X}" for b in data)
return f"{ECU_HEADER}{sep}{length:02X}{sep}{mode_resp:02X}{sep}{pid:02X}{sep}{data_hex}{cr}"
else:
data_hex = "".join(f"{b:02X}" for b in data)
return f"{mode_resp:02X}{pid:02X}{data_hex}{cr}"
def _build_pid_bitmap(self, base_pid: int) -> bytes:
"""Build a 4-byte bitmap of supported PIDs for a range."""
bitmap = 0
all_pids = set(self.pid_values.keys())
for i in range(32):
actual_pid = base_pid + i + 1
if actual_pid in all_pids:
bitmap |= 1 << (31 - i)
# Set bit 32 (= bit 0) if the next range has supported PIDs
next_base = base_pid + 0x20
if next_base <= 0xE0:
for p in all_pids:
if next_base < p <= next_base + 0x20:
bitmap |= 1
break
return bitmap.to_bytes(4, "big")
@staticmethod
def _encode_dtc(code: str) -> bytes:
"""Encode a DTC string like 'P0301' into 2 bytes."""
prefixes = {"P": 0, "C": 1, "B": 2, "U": 3}
prefix = prefixes.get(code[0].upper(), 0)
number = int(code[1:], 16)
value = (prefix << 14) | number
return value.to_bytes(2, "big")
def get_commands(
self,
since_index: int = 0,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Get recorded commands."""
records = [
{
"index": r.index,
"timestamp": r.timestamp,
"command": r.raw,
"response_preview": r.response[:80],
}
for r in self._commands
if r.index >= since_index
]
return records[:limit]
# ==================== Module-level emulator ====================
_emulator: ELM327Emulator | None = None
def _get_emulator() -> ELM327Emulator:
"""Get the global emulator instance."""
global _emulator
if _emulator is None:
_emulator = ELM327Emulator()
return _emulator
async def _on_write(char_id: str, value: bytes, device: str | None) -> None:
"""Callback fired when a remote client writes to the RX characteristic."""
emu = _get_emulator()
if not emu._started or char_id != emu._rx_char_id:
return
response = emu.handle_data(value)
if response and emu._tx_char_id:
mgr = get_gatt_server()
mgr.set_value(emu._tx_char_id, response.encode("utf-8"))
# ==================== Tool registration ====================
def register_tools(mcp: FastMCP) -> None:
"""Register ELM327 emulator tools with the MCP server."""
@mcp.tool()
async def bt_elm327_emu_start(
adapter: str,
name: str = "OBDII",
load_defaults: bool = True,
) -> dict[str, Any]:
"""Start the ELM327 OBD-II BLE emulator.
Sets up a Nordic UART Service (NUS) GATT server, registers with
BlueZ, and starts advertising. OBD-II apps can then discover and
connect to this emulated adapter.
The emulator auto-responds to AT commands and OBD-II PID queries.
Configure PID values with bt_elm327_emu_set_pid().
Args:
adapter: Bluetooth adapter (e.g., "hci0").
name: BLE device name shown to scanners (default "OBDII").
load_defaults: Pre-load default PID values for a realistic
idle vehicle (RPM 750, coolant 30°C, etc.).
Returns:
Status with NUS characteristic IDs and default PID info.
"""
emu = _get_emulator()
if emu._started:
return {"status": "error", "error": "Emulator already running"}
try:
if load_defaults:
emu.set_defaults()
mgr = get_gatt_server()
# Build NUS GATT hierarchy
svc_id = mgr.add_service(NUS_SERVICE_UUID)
rx_id = mgr.add_characteristic(
svc_id,
NUS_RX_CHAR_UUID,
flags=["write", "write-without-response"],
)
tx_id = mgr.add_characteristic(
svc_id,
NUS_TX_CHAR_UUID,
flags=["notify"],
value=f"{emu.device_id}\r\n>".encode(),
)
emu._rx_char_id = rx_id
emu._tx_char_id = tx_id
emu._adapter = adapter
# Register write callback for auto-response
mgr._write_callback = _on_write
# Register + advertise
await mgr.register(adapter)
await mgr.set_advertising(
adapter,
True,
name=name,
service_uuids=[NUS_SERVICE_UUID],
)
emu._started = True
pid_info = (
{pid: desc for pid, (_val, desc) in DEFAULT_PIDS.items()} if load_defaults else {}
)
return {
"status": "ok",
"emulator": "ELM327 v1.5",
"adapter": adapter,
"name": name,
"rx_char_id": rx_id,
"tx_char_id": tx_id,
"default_pids": pid_info,
"vin": emu.vin,
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_elm327_emu_stop(adapter: str) -> dict[str, Any]:
"""Stop the ELM327 OBD-II BLE emulator.
Stops advertising, unregisters GATT services, and cleans up.
Args:
adapter: Bluetooth adapter (e.g., "hci0").
Returns:
Status confirming shutdown.
"""
global _emulator
emu = _get_emulator()
if not emu._started:
return {"status": "ok", "was_running": False}
try:
mgr = get_gatt_server()
mgr._write_callback = None
await mgr.unregister(adapter)
mgr.clear()
except Exception:
pass
emu._started = False
_emulator = None
return {"status": "ok", "was_running": True}
@mcp.tool()
async def bt_elm327_emu_status() -> dict[str, Any]:
"""Get ELM327 emulator status.
Returns:
- running: Whether the emulator is active
- pids: Configured PID values with descriptions
- dtcs: Active diagnostic trouble codes
- command_count: Number of commands received
- settings: Current ELM327 settings (echo, headers, etc.)
"""
emu = _get_emulator()
pids = {}
for pid, value in emu.pid_values.items():
desc = ""
if pid in DEFAULT_PIDS:
_, desc = DEFAULT_PIDS[pid]
pids[f"0x{pid:02X}"] = {
"value_hex": value.hex(),
"value_bytes": list(value),
"description": desc,
}
return {
"status": "ok",
"running": emu._started,
"device_id": emu.device_id,
"adapter": emu._adapter,
"protocol": emu.protocol,
"pids": pids,
"dtcs": emu.dtc_codes,
"vin": emu.vin,
"command_count": emu._cmd_index,
"settings": {
"echo": emu.echo,
"headers": emu.headers,
"spaces": emu.spaces,
"linefeed": emu.linefeed,
},
}
@mcp.tool()
async def bt_elm327_emu_set_pid(
pid: int,
value: str,
value_type: str = "hex",
) -> dict[str, Any]:
"""Set an OBD-II PID response value.
When an OBD app queries this PID (Mode 01), the emulator returns
the configured value. Values follow OBD-II encoding conventions.
Common PIDs:
0x05: Coolant temp (1 byte, value - 40 = °C)
0x0C: Engine RPM (2 bytes, value / 4 = RPM)
0x0D: Vehicle speed (1 byte = km/h)
0x0F: Intake air temp (1 byte, value - 40 = °C)
0x11: Throttle position (1 byte, value / 2.55 = %)
0x2F: Fuel tank level (1 byte, value / 2.55 = %)
0x46: Ambient air temp (1 byte, value - 40 = °C)
Args:
pid: PID number (e.g., 0x0C for RPM).
value: Encoded value. For RPM of 3000: "0BB8" (hex) or "3000" (int).
value_type: "hex" (raw bytes), "int" (little-endian integer),
or "string" (UTF-8).
Returns:
Status with the configured PID value.
"""
emu = _get_emulator()
try:
if value_type == "hex":
data = bytes.fromhex(value)
elif value_type == "int":
int_val = int(value)
byte_len = max(1, (int_val.bit_length() + 7) // 8)
data = int_val.to_bytes(byte_len, "big") # OBD uses big-endian
elif value_type == "string":
data = value.encode("utf-8")
else:
return {"status": "error", "error": f"Unknown value_type: {value_type}"}
emu.pid_values[pid] = data
return {
"status": "ok",
"pid": f"0x{pid:02X}",
"value_hex": data.hex(),
"value_bytes": list(data),
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_elm327_emu_set_dtcs(dtc_codes: list[str]) -> dict[str, Any]:
"""Set diagnostic trouble codes for Mode 03 queries.
Args:
dtc_codes: List of DTC strings (e.g., ["P0301", "P0420", "C0035"]).
P = Powertrain, C = Chassis, B = Body, U = Network.
Returns:
Status with configured DTCs.
"""
emu = _get_emulator()
emu.dtc_codes = [c.upper() for c in dtc_codes]
return {
"status": "ok",
"dtcs": emu.dtc_codes,
"count": len(emu.dtc_codes),
}
@mcp.tool()
async def bt_elm327_emu_set_vin(vin: str) -> dict[str, Any]:
"""Set the Vehicle Identification Number for Mode 09 queries.
Args:
vin: 17-character VIN string.
Returns:
Status with configured VIN.
"""
emu = _get_emulator()
if len(vin) != 17:
return {"status": "error", "error": f"VIN must be 17 characters (got {len(vin)})"}
emu.vin = vin.upper()
return {"status": "ok", "vin": emu.vin}
@mcp.tool()
async def bt_elm327_emu_read_commands(
since_index: int = 0,
limit: int = 50,
) -> dict[str, Any]:
"""Read commands received from the OBD-II client.
Monitor what the connected app is requesting. Useful for debugging
protocol issues or understanding app behavior.
Args:
since_index: Only return commands with index >= this value.
limit: Maximum commands to return.
Returns:
List of received commands with timestamps and responses.
"""
emu = _get_emulator()
cmds = emu.get_commands(since_index, limit)
next_idx = max((c["index"] for c in cmds), default=since_index) + 1 if cmds else since_index
return {
"status": "ok",
"commands": cmds,
"count": len(cmds),
"next_index": next_idx,
}

View File

@ -0,0 +1,392 @@
"""GATT Server tools for Bluetooth MCP server.
These tools let Linux act as a BLE peripheral (GATT server) that
advertises services, accepts connections, and handles reads/writes
from remote central devices. Use cases include: emulating sensors,
building test harnesses, mocking BLE devices, protocol fuzzing.
Typical flow:
1. bt_gatt_server_add_service(uuid) service0
2. bt_gatt_server_add_characteristic(service0, ...) service0/char0
3. bt_gatt_server_register(adapter="hci0") registered
4. bt_gatt_server_advertise(adapter="hci0", enable=True, name="MyDevice")
5. Remote connects, writes bt_gatt_server_read_writes()
6. Respond via bt_gatt_server_set_value() auto-notifies
"""
from __future__ import annotations
from typing import Any
from fastmcp import FastMCP
from mcbluetooth.gatt_server import get_gatt_server, shutdown_gatt_server
def _parse_value(value: str, value_type: str) -> bytes:
"""Convert a string value to bytes based on value_type."""
if value_type == "hex":
return bytes.fromhex(value)
elif value_type == "string":
return value.encode("utf-8")
elif value_type == "int":
int_val = int(value)
byte_len = max(1, (int_val.bit_length() + 7) // 8)
return int_val.to_bytes(byte_len, "little")
else:
raise ValueError(f"Unknown value_type: {value_type}")
def register_tools(mcp: FastMCP) -> None:
"""Register GATT server tools with the MCP server."""
# ---- Lifecycle ----
@mcp.tool()
async def bt_gatt_server_register(adapter: str) -> dict[str, Any]:
"""Register the GATT application with BlueZ.
Exports all added services/characteristics to D-Bus and calls
GattManager1.RegisterApplication. Must add at least one service
before registering.
After registration, remote BLE centrals can discover and interact
with the hosted services (once advertising is started).
Args:
adapter: Adapter name (e.g., "hci0").
Returns:
Registration status with service count.
"""
try:
mgr = get_gatt_server()
await mgr.register(adapter)
status = mgr.get_status()
return {
"status": "ok",
"registered": True,
"service_count": len(status["services"]),
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_unregister(adapter: str) -> dict[str, Any]:
"""Unregister the GATT application from BlueZ.
Stops advertising (if active) and removes the application
registration. Services remain in memory and can be re-registered.
Args:
adapter: Adapter name (e.g., "hci0").
Returns:
Status confirming unregistration.
"""
try:
await shutdown_gatt_server(adapter)
return {"status": "ok", "registered": False}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_status() -> dict[str, Any]:
"""Get GATT server status.
Returns:
- registered: Whether the application is registered with BlueZ
- advertising: Whether BLE advertising is active
- services: List of services with their characteristics
- write_event_count: Number of buffered write events
"""
mgr = get_gatt_server()
return {"status": "ok", **mgr.get_status()}
# ---- Service construction ----
@mcp.tool()
async def bt_gatt_server_add_service(
uuid: str,
primary: bool = True,
) -> dict[str, Any]:
"""Add a GATT service to the server.
Services must be added BEFORE registering with BlueZ.
If already registered, unregister first, add services, then re-register.
Args:
uuid: Service UUID (e.g., "180F" for Battery Service, or full
128-bit "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" for custom).
primary: Whether this is a primary service (default True).
Returns:
The assigned service_id (e.g., "service0") for adding characteristics.
"""
try:
mgr = get_gatt_server()
service_id = mgr.add_service(uuid, primary)
return {"status": "ok", "service_id": service_id, "uuid": uuid}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_add_characteristic(
service_id: str,
uuid: str,
flags: list[str],
value: str | None = None,
value_type: str = "hex",
) -> dict[str, Any]:
"""Add a characteristic to a GATT service.
Args:
service_id: Service to add to (e.g., "service0").
uuid: Characteristic UUID.
flags: List of flags/permissions. Common values:
"read", "write", "write-without-response", "notify", "indicate",
"encrypt-read", "encrypt-write".
value: Initial value (optional). Format depends on value_type.
value_type: How to interpret value "hex", "string", or "int".
Returns:
The assigned char_id (e.g., "service0/char0") for setting values.
"""
try:
mgr = get_gatt_server()
initial = _parse_value(value, value_type) if value else b""
char_id = mgr.add_characteristic(service_id, uuid, flags, initial)
return {
"status": "ok",
"char_id": char_id,
"uuid": uuid,
"flags": flags,
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_add_descriptor(
char_id: str,
uuid: str,
flags: list[str],
value: str | None = None,
value_type: str = "hex",
) -> dict[str, Any]:
"""Add a descriptor to a GATT characteristic.
Note: CCCD (0x2902) for notify/indicate is auto-managed by BlueZ
don't add it manually.
Args:
char_id: Characteristic to add to (e.g., "service0/char0").
uuid: Descriptor UUID (e.g., "2901" for Characteristic User Description).
flags: List of flags (e.g., ["read"], ["read", "write"]).
value: Initial value (optional).
value_type: How to interpret value "hex", "string", or "int".
Returns:
The assigned desc_id.
"""
try:
mgr = get_gatt_server()
initial = _parse_value(value, value_type) if value else b""
desc_id = mgr.add_descriptor(char_id, uuid, flags, initial)
return {"status": "ok", "desc_id": desc_id, "uuid": uuid}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_clear() -> dict[str, Any]:
"""Remove all services, characteristics, and descriptors.
Must unregister first. Use this to rebuild the service hierarchy
from scratch.
Returns:
Status confirming services were cleared.
"""
try:
mgr = get_gatt_server()
mgr.clear()
return {"status": "ok", "cleared": True}
except Exception as exc:
return {"status": "error", "error": str(exc)}
# ---- Value management ----
@mcp.tool()
async def bt_gatt_server_set_value(
char_id: str,
value: str,
value_type: str = "hex",
) -> dict[str, Any]:
"""Set a characteristic value on the GATT server.
If a remote client has subscribed to notifications on this
characteristic, the new value is automatically pushed via
BLE notification.
Args:
char_id: Characteristic ID (e.g., "service0/char0").
value: Value to set.
value_type: How to interpret value "hex" (e.g., "0102ff"),
"string" (UTF-8 text), or "int" (decimal number).
Returns:
Status with value details and whether notification was sent.
"""
try:
mgr = get_gatt_server()
data = _parse_value(value, value_type)
char = mgr._characteristics.get(char_id)
was_notifying = char.notifying if char else False
mgr.set_value(char_id, data)
return {
"status": "ok",
"char_id": char_id,
"value_hex": data.hex(),
"length": len(data),
"notified": was_notifying,
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_get_value(char_id: str) -> dict[str, Any]:
"""Read a characteristic value from the server side.
This reads the value stored on OUR server, not from a remote device.
Use bt_ble_read() for reading from remote GATT servers.
Args:
char_id: Characteristic ID (e.g., "service0/char0").
Returns:
The current value as hex, bytes, and string (if decodable).
"""
try:
mgr = get_gatt_server()
value = mgr.get_value(char_id)
result: dict[str, Any] = {
"status": "ok",
"char_id": char_id,
"value_hex": value.hex(),
"value_bytes": list(value),
"length": len(value),
}
try:
result["value_string"] = value.decode("utf-8")
except UnicodeDecodeError:
pass
return result
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_gatt_server_notify(char_id: str) -> dict[str, Any]:
"""Explicitly send a BLE notification for the current value.
Normally, bt_gatt_server_set_value auto-notifies. Use this to
re-send the current value without changing it.
Args:
char_id: Characteristic ID (e.g., "service0/char0").
Returns:
Status indicating whether notification was sent.
"""
try:
mgr = get_gatt_server()
sent = mgr.notify(char_id)
if sent:
return {"status": "ok", "notified": True}
return {
"status": "ok",
"notified": False,
"reason": "No client subscribed to notifications",
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
# ---- Advertising ----
@mcp.tool()
async def bt_gatt_server_advertise(
adapter: str,
enable: bool,
name: str = "mcbluetooth",
service_uuids: list[str] | None = None,
) -> dict[str, Any]:
"""Start or stop BLE advertising.
When advertising, the device becomes discoverable to BLE scanners
and can accept connections from central devices.
Must register the GATT application first.
Args:
adapter: Adapter name (e.g., "hci0").
enable: True to start advertising, False to stop.
name: Local name shown to scanners (default "mcbluetooth").
service_uuids: UUIDs to include in advertisement. If not specified,
uses all registered service UUIDs.
Returns:
Advertising status.
"""
try:
mgr = get_gatt_server()
await mgr.set_advertising(adapter, enable, name=name, service_uuids=service_uuids)
return {
"status": "ok",
"advertising": enable,
"name": name if enable else None,
}
except Exception as exc:
return {"status": "error", "error": str(exc)}
# ---- Write event monitoring ----
@mcp.tool()
async def bt_gatt_server_read_writes(
since_index: int = 0,
char_id: str | None = None,
limit: int = 50,
) -> dict[str, Any]:
"""Read write events from remote BLE clients.
When a remote central device writes to a server characteristic,
the write is buffered here. Poll this to see incoming commands/data.
Args:
since_index: Only return events with index >= this value.
Start with 0, then use the highest returned index + 1.
char_id: Filter to writes on this characteristic (optional).
limit: Maximum events to return (default 50).
Returns:
List of write events with value, timestamp, and source device.
"""
mgr = get_gatt_server()
events = mgr.get_write_events(since_index, char_id, limit)
next_index = (
max((e["index"] for e in events), default=since_index) + 1 if events else since_index
)
return {
"status": "ok",
"events": events,
"count": len(events),
"next_index": next_index,
}
@mcp.tool()
async def bt_gatt_server_clear_writes() -> dict[str, Any]:
"""Clear the write event buffer.
Returns:
Number of events cleared.
"""
mgr = get_gatt_server()
count = mgr.clear_write_events()
return {"status": "ok", "cleared_count": count}

View File

@ -0,0 +1,243 @@
"""SPP (Serial Port Profile) tools for Bluetooth MCP server.
Classic Bluetooth serial communication over RFCOMM. SPP provides raw
bidirectional byte streams the classic BT equivalent of BLE's Nordic
UART Service.
Typical flow:
Server mode:
1. bt_spp_enable() Register SPP profile
2. Make adapter discoverable Remote device connects
3. bt_spp_send(address, "Hello\r\n") Send data
4. bt_spp_recv() Poll received data
Client mode:
1. bt_spp_enable() Register SPP profile
2. bt_spp_connect(adapter, address) Connect to remote SPP
3. bt_spp_send / bt_spp_recv Bidirectional I/O
"""
from __future__ import annotations
from typing import Any, Literal
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
from mcbluetooth.spp import (
SPP_UUID,
disable_spp,
enable_spp,
get_spp,
)
def register_tools(mcp: FastMCP) -> None:
"""Register SPP tools with the MCP server."""
@mcp.tool()
async def bt_spp_enable(
uuid: str = SPP_UUID,
channel: int = 0,
name: str = "mcbluetooth SPP",
) -> dict[str, Any]:
"""Enable SPP (Serial Port Profile) for classic Bluetooth serial.
Registers an RFCOMM serial profile with BlueZ. After enabling,
remote devices can connect for bidirectional serial communication,
or use bt_spp_connect() to initiate outbound connections.
Args:
uuid: Service UUID. Default is standard SPP (0x1101).
Use custom UUIDs for Arduino/ESP32 with non-standard RFCOMM.
channel: RFCOMM channel (0 = auto-assign, recommended).
name: Profile display name visible during service discovery.
Returns:
Registration status with active UUID.
"""
try:
await enable_spp(uuid=uuid, channel=channel, name=name)
return {"status": "ok", "uuid": uuid, "channel": channel or "auto", "name": name}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_disable() -> dict[str, Any]:
"""Disable SPP and close all serial connections.
Unregisters the SPP profile from BlueZ and terminates any active
RFCOMM sessions.
Returns:
Status confirming profile removal.
"""
try:
await disable_spp()
return {"status": "ok", "disabled": True}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_status() -> dict[str, Any]:
"""Get SPP status: registration state, connections, and buffer stats.
Returns:
- registered: Whether the SPP profile is active
- uuid: The UUID the profile was registered with
- connections: Active peers with role, duration, byte counters
- recv_buffer_count: Buffered received data events
- recv_buffer_total: Total events received since enable
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "registered": False, "connections": []}
return {"status": "ok", **profile.get_status()}
@mcp.tool()
async def bt_spp_connect(
adapter: str,
address: str,
uuid: str = SPP_UUID,
) -> dict[str, Any]:
"""Connect to a remote device's SPP service (client mode).
Initiates an outbound RFCOMM connection. The SPP profile must be
enabled first (bt_spp_enable). BlueZ will deliver the RFCOMM fd
through our Profile1 handler automatically.
The device should already be paired and trusted.
Args:
adapter: Bluetooth adapter (e.g. "hci0").
address: Remote device Bluetooth address.
uuid: SPP service UUID on the remote device.
Returns:
Connection status.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled — call bt_spp_enable() first"}
try:
client = await get_client()
await client.connect_profile(adapter, address, uuid)
# BlueZ fires NewConnection on our Profile1 handler — tag as client
device_path = f"/org/bluez/{adapter}/dev_{address.upper().replace(':', '_')}"
profile.add_client_connection(device_path, address, -1)
return {"status": "ok", "address": address, "role": "client"}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_spp_disconnect(address: str) -> dict[str, Any]:
"""Disconnect a specific SPP peer.
Closes the RFCOMM socket and removes the connection.
Args:
address: Bluetooth address of the peer to disconnect.
Returns:
Disconnect status.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled"}
conn = profile._get_connection(address)
if not conn:
return {"status": "error", "error": f"No SPP connection to {address}"}
profile.connections.pop(conn.device_path, None)
profile._cleanup_connection(conn)
return {"status": "ok", "address": address, "disconnected": True}
@mcp.tool()
async def bt_spp_send(
address: str,
data: str,
data_type: Literal["string", "hex", "line"] = "string",
) -> dict[str, Any]:
"""Send data to a connected SPP peer.
Args:
address: Bluetooth address of the peer.
data: The data to send.
data_type: How to interpret the data parameter:
- "string": Send as-is (UTF-8 encoded)
- "hex": Parse as hex string (e.g. "48656c6c6f")
- "line": Append CR+LF for line-oriented protocols
(AT commands, NMEA, etc.)
Returns:
Send status with byte count.
"""
profile = await get_spp()
if not profile:
return {"status": "error", "error": "SPP not enabled"}
if data_type == "hex":
try:
raw = bytes.fromhex(data)
except ValueError as exc:
return {"status": "error", "error": f"Invalid hex: {exc}"}
elif data_type == "line":
raw = (data + "\r\n").encode("utf-8")
else:
raw = data.encode("utf-8")
ok = await profile.send(address, raw)
if ok:
return {"status": "ok", "bytes_sent": len(raw)}
return {"status": "error", "error": f"No active SPP connection to {address}"}
@mcp.tool()
async def bt_spp_recv(
since_index: int = 0,
address: str | None = None,
limit: int = 50,
) -> dict[str, Any]:
"""Read received data from SPP connections (cursor-based polling).
Returns buffered data events since the given index. Use the highest
returned index + 1 as since_index for the next poll to avoid
duplicates.
Args:
since_index: Return events with index >= this value.
Start at 0 for first call, then use last index + 1.
address: Filter by peer address (optional).
limit: Maximum number of events to return.
Returns:
List of data events with index, timestamp, address, and values.
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "events": [], "hint": "SPP not enabled"}
events = profile.get_recv_events(since_index=since_index, address=address, limit=limit)
return {
"status": "ok",
"count": len(events),
"events": events,
"next_index": events[-1]["index"] + 1 if events else since_index,
}
@mcp.tool()
async def bt_spp_clear_recv() -> dict[str, Any]:
"""Clear the SPP receive buffer.
Removes all buffered received data events. The index counter
continues from where it left off (not reset to 0).
Returns:
Count of events cleared.
"""
profile = await get_spp()
if not profile:
return {"status": "ok", "cleared": 0}
count = profile.clear_recv_events()
return {"status": "ok", "cleared": count}