diff --git a/pyproject.toml b/pyproject.toml index aab8249..794f3ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ ignore = ["E501"] [tool.ruff.lint.per-file-ignores] # dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations "src/mcbluetooth/hfp_ag.py" = ["F821", "F722"] +"src/mcbluetooth/gatt_server.py" = ["F821", "F722"] [tool.pytest.ini_options] asyncio_mode = "auto" diff --git a/src/mcbluetooth/gatt_server.py b/src/mcbluetooth/gatt_server.py new file mode 100644 index 0000000..dab4851 --- /dev/null +++ b/src/mcbluetooth/gatt_server.py @@ -0,0 +1,717 @@ +"""BLE GATT Server implementation for BlueZ. + +Registers as a BLE peripheral via BlueZ's GattManager1 D-Bus API, allowing +mcbluetooth to act as a GATT server. Remote BLE central devices can discover +services, read/write characteristics, and subscribe to notifications. + +D-Bus hierarchy exported to BlueZ: + /mcbluetooth/gatt ObjectManager + /mcbluetooth/gatt/serviceN GattService1 + /mcbluetooth/gatt/serviceN/charM GattCharacteristic1 + /mcbluetooth/gatt/serviceN/charM/descK GattDescriptor1 + /mcbluetooth/gatt/adv0 LEAdvertisement1 + +Reference patterns: agent.py (ServiceInterface, pending requests), +hfp_ag.py (dedicated bus, module singleton, ProfileManager registration). +""" + +import asyncio +import logging +from collections import deque +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +from dbus_fast import BusType, Message, MessageType, Variant +from dbus_fast.aio import MessageBus +from dbus_fast.service import ServiceInterface, dbus_property, method + +log = logging.getLogger(__name__) + +BLUEZ_SERVICE = "org.bluez" +GATT_MANAGER_IFACE = "org.bluez.GattManager1" +LE_ADV_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1" +APP_BASE_PATH = "/mcbluetooth/gatt" +ADV_PATH = "/mcbluetooth/gatt/adv0" + + +# ==================== Data Classes ==================== + + +@dataclass +class WriteEvent: + """A write received from a remote BLE central device.""" + + index: int + timestamp: str + char_id: str + char_uuid: str + value: bytes + value_hex: str + value_string: str | None + device: str | None + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = { + "index": self.index, + "timestamp": self.timestamp, + "char_id": self.char_id, + "char_uuid": self.char_uuid, + "value_hex": self.value_hex, + "length": len(self.value), + } + if self.value_string is not None: + d["value_string"] = self.value_string + if self.device: + d["device"] = self.device + return d + + +@dataclass +class ServerCharacteristic: + """Internal state for a server-side GATT characteristic.""" + + char_id: str + path: str + uuid: str + flags: list[str] + value: bytes = b"" + notifying: bool = False + service_id: str = "" + desc_count: int = 0 + dbus_obj: Any = field(default=None, repr=False) + + +@dataclass +class ServerDescriptor: + """Internal state for a server-side GATT descriptor.""" + + desc_id: str + path: str + uuid: str + flags: list[str] + value: bytes = b"" + char_id: str = "" + dbus_obj: Any = field(default=None, repr=False) + + +@dataclass +class ServerService: + """Internal state for a server-side GATT service.""" + + service_id: str + path: str + uuid: str + primary: bool = True + char_count: int = 0 + characteristics: dict[str, ServerCharacteristic] = field(default_factory=dict) + dbus_obj: Any = field(default=None, repr=False) + + +# ==================== D-Bus Interfaces ==================== + + +class GattApplication(ServiceInterface): + """ObjectManager for the GATT application root. + + BlueZ calls GetManagedObjects() on this path to discover all + services, characteristics, and descriptors we're hosting. + """ + + def __init__(self, manager: "GattServerManager"): + super().__init__("org.freedesktop.DBus.ObjectManager") + self._manager = manager + + @method() + def GetManagedObjects(self) -> "a{oa{sa{sv}}}": # noqa: F821 + return self._manager._build_managed_objects() + + +class GattServiceIface(ServiceInterface): + """org.bluez.GattService1 — exported at each service path. + + No D-Bus methods; properties served via GetManagedObjects only. + """ + + def __init__(self): + super().__init__("org.bluez.GattService1") + + +class GattCharacteristicIface(ServiceInterface): + """org.bluez.GattCharacteristic1 — handles reads/writes from remotes. + + ReadValue returns the stored value. WriteValue stores the value AND + records a WriteEvent for LLM polling (like agent.py's pending_requests). + """ + + def __init__(self, char: ServerCharacteristic, manager: "GattServerManager"): + super().__init__("org.bluez.GattCharacteristic1") + self._char = char + self._manager = manager + + @method() + def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821 + offset = 0 + opt_offset = options.get("offset") + if opt_offset is not None: + offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset) + return list(self._char.value[offset:]) + + @method() + def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821 + new_bytes = bytes(value) + offset = 0 + opt_offset = options.get("offset") + if opt_offset is not None: + offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset) + + if offset > 0: + self._char.value = self._char.value[:offset] + new_bytes + else: + self._char.value = new_bytes + + device = None + opt_device = options.get("device") + if opt_device is not None: + device = opt_device.value if hasattr(opt_device, "value") else str(opt_device) + + self._manager._record_write(self._char.char_id, self._char.uuid, self._char.value, device) + + @method() + def StartNotify(self) -> None: + self._char.notifying = True + log.info("GATT server: StartNotify on %s (%s)", self._char.char_id, self._char.uuid) + + @method() + def StopNotify(self) -> None: + self._char.notifying = False + log.info("GATT server: StopNotify on %s (%s)", self._char.char_id, self._char.uuid) + + +class GattDescriptorIface(ServiceInterface): + """org.bluez.GattDescriptor1 — handles reads/writes on descriptors.""" + + def __init__(self, desc: ServerDescriptor): + super().__init__("org.bluez.GattDescriptor1") + self._desc = desc + + @method() + def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821 + offset = 0 + opt_offset = options.get("offset") + if opt_offset is not None: + offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset) + return list(self._desc.value[offset:]) + + @method() + def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821 + self._desc.value = bytes(value) + + +class LEAdvertisementIface(ServiceInterface): + """org.bluez.LEAdvertisement1 — BLE advertising data. + + BlueZ reads advertisement properties via Properties.GetAll, so these + must be exposed as @dbus_property (unlike GATT objects which use + GetManagedObjects). + """ + + def __init__(self, adv_type: str, local_name: str, service_uuids: list[str]): + super().__init__("org.bluez.LEAdvertisement1") + self._type = adv_type + self._local_name = local_name + self._service_uuids = service_uuids + + @method() + def Release(self) -> None: + log.info("GATT server: advertisement released by BlueZ") + + @dbus_property() + def Type(self) -> "s": # noqa: F821 + return self._type + + @dbus_property() + def LocalName(self) -> "s": # noqa: F821 + return self._local_name + + @dbus_property() + def ServiceUUIDs(self) -> "as": # noqa: F821 + return self._service_uuids + + +# ==================== GATT Server Manager ==================== + + +class GattServerManager: + """Manages the GATT server lifecycle and state. + + Coordinates D-Bus object creation, BlueZ registration, + advertising, and write event buffering. + """ + + def __init__(self): + self._bus: MessageBus | None = None + self._app: GattApplication | None = None + self._adv: LEAdvertisementIface | None = None + self._services: dict[str, ServerService] = {} + self._characteristics: dict[str, ServerCharacteristic] = {} + self._descriptors: dict[str, ServerDescriptor] = {} + self._write_events: deque[WriteEvent] = deque(maxlen=200) + self._write_index: int = 0 + self._service_count: int = 0 + self._registered: bool = False + self._advertising: bool = False + self._write_callback: Any = None # async callable(char_id, value, device) + + # ---- Service construction ---- + + def add_service(self, uuid: str, primary: bool = True) -> str: + """Add a GATT service. Returns the service_id (e.g. 'service0').""" + service_id = f"service{self._service_count}" + self._service_count += 1 + path = f"{APP_BASE_PATH}/{service_id}" + + svc = ServerService( + service_id=service_id, + path=path, + uuid=uuid, + primary=primary, + ) + svc.dbus_obj = GattServiceIface() + self._services[service_id] = svc + + log.info("GATT server: added service %s (UUID=%s)", service_id, uuid) + return service_id + + def add_characteristic( + self, + service_id: str, + uuid: str, + flags: list[str], + value: bytes = b"", + ) -> str: + """Add a characteristic to a service. Returns the char_id.""" + svc = self._services.get(service_id) + if not svc: + raise ValueError(f"Service '{service_id}' not found") + + char_idx = svc.char_count + svc.char_count += 1 + char_id = f"{service_id}/char{char_idx}" + path = f"{APP_BASE_PATH}/{char_id}" + + char = ServerCharacteristic( + char_id=char_id, + path=path, + uuid=uuid, + flags=flags, + value=value, + service_id=service_id, + ) + char.dbus_obj = GattCharacteristicIface(char, self) + svc.characteristics[char_id] = char + self._characteristics[char_id] = char + + log.info( + "GATT server: added characteristic %s (UUID=%s, flags=%s)", + char_id, + uuid, + flags, + ) + return char_id + + def add_descriptor( + self, + char_id: str, + uuid: str, + flags: list[str], + value: bytes = b"", + ) -> str: + """Add a descriptor to a characteristic. Returns the desc_id.""" + char = self._characteristics.get(char_id) + if not char: + raise ValueError(f"Characteristic '{char_id}' not found") + + desc_idx = char.desc_count + char.desc_count += 1 + desc_id = f"{char_id}/desc{desc_idx}" + path = f"{APP_BASE_PATH}/{desc_id}" + + desc = ServerDescriptor( + desc_id=desc_id, + path=path, + uuid=uuid, + flags=flags, + value=value, + char_id=char_id, + ) + desc.dbus_obj = GattDescriptorIface(desc) + self._descriptors[desc_id] = desc + + log.info("GATT server: added descriptor %s (UUID=%s)", desc_id, uuid) + return desc_id + + def clear(self) -> None: + """Remove all services, characteristics, and descriptors.""" + if self._registered: + raise RuntimeError("Cannot clear while registered — unregister first") + + self._services.clear() + self._characteristics.clear() + self._descriptors.clear() + self._service_count = 0 + + # ---- Registration with BlueZ ---- + + async def register(self, adapter: str) -> None: + """Register the GATT application with BlueZ GattManager1.""" + if self._registered: + return + + if not self._services: + raise ValueError("No services defined — add at least one service first") + + # Dedicated bus connection (like hfp_ag.py) + if not self._bus: + self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + + # Export ObjectManager at application root + self._app = GattApplication(self) + self._bus.export(APP_BASE_PATH, self._app) + + # Export all service/characteristic/descriptor D-Bus objects + for svc in self._services.values(): + self._bus.export(svc.path, svc.dbus_obj) + for char in svc.characteristics.values(): + self._bus.export(char.path, char.dbus_obj) + for desc in self._descriptors.values(): + self._bus.export(desc.path, desc.dbus_obj) + + # Register with BlueZ + adapter_path = f"/org/bluez/{adapter}" + introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path) + proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection) + gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE) + + try: + await gatt_mgr.call_register_application(APP_BASE_PATH, {}) + self._registered = True + log.info("GATT server: registered with BlueZ (%s)", adapter) + except Exception as e: + if "Already Exists" in str(e): + log.info("GATT server: stale registration — re-registering") + try: + await gatt_mgr.call_unregister_application(APP_BASE_PATH) + except Exception: + pass + await gatt_mgr.call_register_application(APP_BASE_PATH, {}) + self._registered = True + log.info("GATT server: re-registered with BlueZ (%s)", adapter) + else: + raise + + async def unregister(self, adapter: str) -> None: + """Unregister the GATT application from BlueZ.""" + if self._advertising: + await self.set_advertising(adapter, False) + + if self._bus and self._registered: + try: + adapter_path = f"/org/bluez/{adapter}" + introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path) + proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection) + gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE) + await gatt_mgr.call_unregister_application(APP_BASE_PATH) + except Exception: + pass + self._registered = False + + # Disconnect bus (cleans up all exports) + if self._bus: + self._bus.disconnect() + self._bus = None + + self._app = None + self._adv = None + log.info("GATT server: unregistered") + + # ---- Advertising ---- + + async def set_advertising( + self, + adapter: str, + enable: bool, + name: str = "mcbluetooth", + service_uuids: list[str] | None = None, + ) -> None: + """Start or stop BLE advertising.""" + if not self._bus: + raise RuntimeError("Bus not connected — register first") + + adapter_path = f"/org/bluez/{adapter}" + introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path) + proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection) + adv_mgr = proxy.get_interface(LE_ADV_MANAGER_IFACE) + + if enable: + if self._advertising: + return + + uuids = service_uuids or [svc.uuid for svc in self._services.values()] + self._adv = LEAdvertisementIface("peripheral", name, uuids) + self._bus.export(ADV_PATH, self._adv) + + try: + await adv_mgr.call_register_advertisement(ADV_PATH, {}) + self._advertising = True + log.info("GATT server: advertising started (name=%s)", name) + except Exception as e: + if "Already Exists" in str(e): + try: + await adv_mgr.call_unregister_advertisement(ADV_PATH) + except Exception: + pass + await adv_mgr.call_register_advertisement(ADV_PATH, {}) + self._advertising = True + log.info("GATT server: advertising restarted (name=%s)", name) + else: + raise + else: + if not self._advertising: + return + try: + await adv_mgr.call_unregister_advertisement(ADV_PATH) + except Exception: + pass + self._advertising = False + self._adv = None + log.info("GATT server: advertising stopped") + + # ---- Value management ---- + + def set_value(self, char_id: str, value: bytes) -> None: + """Set characteristic value and auto-notify if subscribed.""" + char = self._characteristics.get(char_id) + if not char: + raise ValueError(f"Characteristic '{char_id}' not found") + + char.value = value + if char.notifying: + self._emit_notification(char) + + def get_value(self, char_id: str) -> bytes: + """Get current characteristic value.""" + char = self._characteristics.get(char_id) + if not char: + raise ValueError(f"Characteristic '{char_id}' not found") + return char.value + + def notify(self, char_id: str) -> bool: + """Explicitly send notification for current value.""" + char = self._characteristics.get(char_id) + if not char: + raise ValueError(f"Characteristic '{char_id}' not found") + if not char.notifying: + return False + self._emit_notification(char) + return True + + def _emit_notification(self, char: ServerCharacteristic) -> None: + """Emit PropertiesChanged signal to trigger BLE notification.""" + if not self._bus: + return + + msg = Message( + message_type=MessageType.SIGNAL, + path=char.path, + interface="org.freedesktop.DBus.Properties", + member="PropertiesChanged", + signature="sa{sv}as", + body=[ + "org.bluez.GattCharacteristic1", + {"Value": Variant("ay", list(char.value))}, + [], + ], + ) + self._bus.send_message(msg) + log.debug("GATT server: notification sent on %s (%d bytes)", char.char_id, len(char.value)) + + # ---- Write event monitoring ---- + + def _record_write(self, char_id: str, char_uuid: str, value: bytes, device: str | None) -> None: + """Record a write event and fire callback if registered.""" + event = WriteEvent( + index=self._write_index, + timestamp=datetime.now(UTC).isoformat(), + char_id=char_id, + char_uuid=char_uuid, + value=value, + value_hex=value.hex(), + value_string=_try_decode(value), + device=device, + ) + self._write_events.append(event) + self._write_index += 1 + + log.debug( + "GATT server: write on %s from %s (%d bytes)", + char_id, + device or "unknown", + len(value), + ) + + # Fire async callback (e.g. ELM327 emulator auto-responder) + if self._write_callback: + try: + loop = asyncio.get_running_loop() + loop.create_task(self._write_callback(char_id, value, device)) + except RuntimeError: + pass + + def get_write_events( + self, + since_index: int = 0, + char_id: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Get write events, optionally filtered.""" + events = [ + e + for e in self._write_events + if e.index >= since_index and (char_id is None or e.char_id == char_id) + ] + return [e.to_dict() for e in events[:limit]] + + def clear_write_events(self) -> int: + """Clear all write events. Returns count cleared.""" + count = len(self._write_events) + self._write_events.clear() + return count + + # ---- GetManagedObjects builder ---- + + def _build_managed_objects(self) -> dict[str, dict[str, dict[str, Variant]]]: + """Build the response for ObjectManager.GetManagedObjects(). + + Returns the complete D-Bus object hierarchy that BlueZ uses + to discover our GATT services, characteristics, and descriptors. + """ + objects: dict[str, dict[str, dict[str, Variant]]] = {} + + for svc in self._services.values(): + char_paths = [c.path for c in svc.characteristics.values()] + objects[svc.path] = { + "org.bluez.GattService1": { + "UUID": Variant("s", svc.uuid), + "Primary": Variant("b", svc.primary), + "Characteristics": Variant("ao", char_paths), + } + } + + for char in svc.characteristics.values(): + desc_paths = [ + d.path for d in self._descriptors.values() if d.char_id == char.char_id + ] + objects[char.path] = { + "org.bluez.GattCharacteristic1": { + "UUID": Variant("s", char.uuid), + "Service": Variant("o", svc.path), + "Flags": Variant("as", char.flags), + "Descriptors": Variant("ao", desc_paths), + } + } + + for desc in self._descriptors.values(): + char = self._characteristics.get(desc.char_id) + if char: + objects[desc.path] = { + "org.bluez.GattDescriptor1": { + "UUID": Variant("s", desc.uuid), + "Characteristic": Variant("o", char.path), + "Flags": Variant("as", desc.flags), + } + } + + return objects + + # ---- Status ---- + + def get_status(self) -> dict[str, Any]: + """Get overall GATT server status.""" + services = [] + for svc in self._services.values(): + chars = [] + for char in svc.characteristics.values(): + descs = [ + { + "desc_id": d.desc_id, + "uuid": d.uuid, + "flags": d.flags, + } + for d in self._descriptors.values() + if d.char_id == char.char_id + ] + chars.append( + { + "char_id": char.char_id, + "uuid": char.uuid, + "flags": char.flags, + "value_length": len(char.value), + "notifying": char.notifying, + "descriptors": descs, + } + ) + services.append( + { + "service_id": svc.service_id, + "uuid": svc.uuid, + "primary": svc.primary, + "characteristics": chars, + } + ) + + return { + "registered": self._registered, + "advertising": self._advertising, + "services": services, + "write_event_count": len(self._write_events), + "write_event_total": self._write_index, + } + + +# ==================== Module-level lifecycle ==================== + +_manager: GattServerManager | None = None + + +def get_gatt_server() -> GattServerManager: + """Get or create the global GATT server manager (singleton).""" + global _manager + if _manager is None: + _manager = GattServerManager() + return _manager + + +async def shutdown_gatt_server(adapter: str) -> None: + """Shut down the GATT server completely.""" + global _manager + if _manager: + try: + await _manager.unregister(adapter) + except Exception: + pass + _manager = None + + +# ==================== Helpers ==================== + + +def _try_decode(value: bytes) -> str | None: + """Try to decode bytes as printable UTF-8.""" + try: + decoded = value.decode("utf-8") + if all(c.isprintable() or c in "\r\n\t" for c in decoded): + return decoded + return None + except (UnicodeDecodeError, ValueError): + return None diff --git a/src/mcbluetooth/resources.py b/src/mcbluetooth/resources.py index 4c11331..446b5c0 100644 --- a/src/mcbluetooth/resources.py +++ b/src/mcbluetooth/resources.py @@ -17,6 +17,7 @@ from dataclasses import asdict from fastmcp import FastMCP from mcbluetooth.dbus_client import get_client, get_notify_manager +from mcbluetooth.gatt_server import get_gatt_server def register_resources(mcp: FastMCP) -> None: @@ -307,3 +308,43 @@ def register_resources(mcp: FastMCP) -> None: }, indent=2, ) + + # ==================== GATT Server Resources ==================== + + @mcp.resource( + "bluetooth://gatt/server", + name="GATT Server State", + description=( + "Current state of the BLE GATT server (peripheral mode). " + "Shows registered services, characteristics, advertising status, " + "and write event statistics." + ), + mime_type="application/json", + ) + async def resource_gatt_server() -> str: + """Get GATT server state.""" + mgr = get_gatt_server() + return json.dumps(mgr.get_status(), indent=2) + + @mcp.resource( + "bluetooth://gatt/server/writes", + name="GATT Server Write Events", + description=( + "Write events received from remote BLE central devices. " + "Shows the most recent writes to server characteristics " + "with timestamps, values, and source device info." + ), + mime_type="application/json", + ) + async def resource_gatt_server_writes() -> str: + """Get recent write events from remote clients.""" + mgr = get_gatt_server() + events = mgr.get_write_events(since_index=0, limit=50) + return json.dumps( + { + "count": len(events), + "total": mgr._write_index, + "events": events, + }, + indent=2, + ) diff --git a/src/mcbluetooth/server.py b/src/mcbluetooth/server.py index 82511eb..8de8e50 100644 --- a/src/mcbluetooth/server.py +++ b/src/mcbluetooth/server.py @@ -3,7 +3,17 @@ from fastmcp import FastMCP from mcbluetooth import resources -from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex +from mcbluetooth.tools import ( + adapter, + audio, + ble, + bt_elm327_emu, + device, + gatt_server, + hfp, + monitor, + obex, +) mcp = FastMCP( name="mcbluetooth", @@ -36,6 +46,25 @@ To capture BLE notifications: 3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource 4. Subscribe to the resource for real-time updates (client-side) +### GATT Server Resources (peripheral mode) +- bluetooth://gatt/server - GATT server state (registered, services, advertising) +- bluetooth://gatt/server/writes - Write events from remote clients + +To act as a BLE peripheral (GATT server): +1. Add services: bt_gatt_server_add_service(uuid) +2. Add characteristics: bt_gatt_server_add_characteristic(service_id, uuid, flags) +3. Register: bt_gatt_server_register(adapter) +4. Advertise: bt_gatt_server_advertise(adapter, enable=True, name="MyDevice") +5. Monitor writes: bt_gatt_server_read_writes(since_index=0) +6. Respond: bt_gatt_server_set_value(char_id, value) — auto-notifies subscribers + +### ELM327 OBD-II Emulator +For quick OBD-II BLE adapter emulation (Nordic UART Service): +1. bt_elm327_emu_start(adapter, name="OBDII") — sets up NUS, advertises +2. bt_elm327_emu_set_pid(0x0C, "0BB8", "hex") — set RPM to 750 +3. bt_elm327_emu_read_commands() — see what the OBD app sent +4. bt_elm327_emu_stop(adapter) — clean up + ## Tools All tools require an explicit 'adapter' parameter (e.g., "hci0"). Use bt_list_adapters() to discover available adapters. @@ -56,6 +85,8 @@ device.register_tools(mcp) audio.register_tools(mcp) hfp.register_tools(mcp) ble.register_tools(mcp) +gatt_server.register_tools(mcp) +bt_elm327_emu.register_tools(mcp) monitor.register_tools(mcp) obex.register_tools(mcp) diff --git a/src/mcbluetooth/tools/bt_elm327_emu.py b/src/mcbluetooth/tools/bt_elm327_emu.py new file mode 100644 index 0000000..07baf5c --- /dev/null +++ b/src/mcbluetooth/tools/bt_elm327_emu.py @@ -0,0 +1,678 @@ +"""ELM327 OBD-II BLE Emulator. + +Emulates an ELM327 v1.5 adapter over BLE using Nordic UART Service (NUS). +OBD-II apps (Torque, Car Scanner, etc.) connect via BLE, send AT commands +and OBD-II PID queries, and receive simulated responses. + +The emulator handles the full ELM327 protocol: + - AT command set (ATZ, ATE0, ATH1, ATSP, etc.) + - Mode 01 (live data) with configurable PID values + - Mode 03 (stored DTCs) + - Mode 09 (VIN query) + - Supported PID bitmaps auto-generated from configured PIDs + +NUS UUIDs (same as real BLE OBD adapters): + Service: 6E400001-B5A3-F393-E0A9-E50E24DCCA9E + RX (write): 6E400002-... (client writes commands here) + TX (notify): 6E400003-... (server sends responses here) +""" + +from __future__ import annotations + +import logging +from collections import deque +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any + +from fastmcp import FastMCP + +from mcbluetooth.gatt_server import get_gatt_server + +log = logging.getLogger(__name__) + +# Nordic UART Service UUIDs +NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" +NUS_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # Client writes here +NUS_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # Server notifies here + +# Default ECU header (primary ECU response address) +ECU_HEADER = "7E8" + +# Common OBD-II PIDs with default values +DEFAULT_PIDS: dict[int, tuple[bytes, str]] = { + 0x05: (bytes([70]), "Coolant temp (°C, offset -40) → 30°C"), + 0x0C: (bytes([0x0B, 0xB8]), "Engine RPM (× ¼) → 750 RPM"), + 0x0D: (bytes([0]), "Vehicle speed (km/h) → 0"), + 0x0F: (bytes([55]), "Intake air temp (°C, offset -40) → 15°C"), + 0x11: (bytes([25]), "Throttle position (%) → ~10%"), + 0x1F: (bytes([0x00, 0x00]), "Runtime since start (sec) → 0"), + 0x2F: (bytes([51]), "Fuel tank level (%) → 20%"), + 0x46: (bytes([60]), "Ambient air temp (°C, offset -40) → 20°C"), +} + + +@dataclass +class CommandRecord: + """A command received from the OBD-II client.""" + + index: int + timestamp: str + raw: str + parsed: str + response: str + + +class ELM327Emulator: + """ELM327 AT command and OBD-II protocol handler.""" + + def __init__(self): + self.device_id = "ELM327 v1.5" + self.protocol = "6" # ISO 15765-4 CAN (11/500) + self.echo = False + self.headers = True + self.spaces = False + self.linefeed = False + self.pid_values: dict[int, bytes] = {} + self.dtc_codes: list[str] = [] + self.vin = "1MCBT00TESTING12345" + self._commands: deque[CommandRecord] = deque(maxlen=200) + self._cmd_index = 0 + self._started = False + self._rx_char_id: str | None = None + self._tx_char_id: str | None = None + self._adapter: str | None = None + self._buf = "" # Accumulates partial writes + + def set_defaults(self) -> None: + """Load default PID values for a realistic idle vehicle.""" + for pid, (value, _desc) in DEFAULT_PIDS.items(): + self.pid_values[pid] = value + + def handle_data(self, data: bytes) -> str | None: + """Process incoming bytes, return response when a complete command is received. + + ELM327 commands are terminated with CR (\\r). Accumulates partial + writes until a complete command is received. + """ + try: + text = data.decode("utf-8", errors="replace") + except Exception: + return None + + self._buf += text + + # Check for command terminator + if "\r" not in self._buf: + return None + + # Split on CR, process first complete command + line, self._buf = self._buf.split("\r", 1) + line = line.strip() + if not line: + return None + + response = self._process_command(line) + + # Record command + record = CommandRecord( + index=self._cmd_index, + timestamp=datetime.now(UTC).isoformat(), + raw=line, + parsed=line.upper().strip(), + response=response.replace("\r", "\\r").replace("\n", "\\n"), + ) + self._commands.append(record) + self._cmd_index += 1 + + return response + + def _process_command(self, line: str) -> str: + """Parse and handle an AT or OBD command.""" + cmd = line.strip().upper() + + if cmd.startswith("AT"): + return self._handle_at(cmd) + + # OBD-II query + return self._handle_obd(cmd) + + def _handle_at(self, cmd: str) -> str: + """Handle ELM327 AT commands.""" + cr = "\r\n" if self.linefeed else "\r" + + if cmd == "ATZ": + self.echo = False + self.headers = True + self.spaces = False + return f"{cr}{self.device_id}{cr}{cr}>" + + if cmd == "ATI": + return f"{self.device_id}{cr}{cr}>" + + if cmd == "AT@1": + return f"mcbluetooth OBD-II Emulator{cr}{cr}>" + + if cmd == "ATE0": + self.echo = False + return f"OK{cr}{cr}>" + + if cmd == "ATE1": + self.echo = True + return f"OK{cr}{cr}>" + + if cmd == "ATH0": + self.headers = False + return f"OK{cr}{cr}>" + + if cmd == "ATH1": + self.headers = True + return f"OK{cr}{cr}>" + + if cmd == "ATS0": + self.spaces = False + return f"OK{cr}{cr}>" + + if cmd == "ATS1": + self.spaces = True + return f"OK{cr}{cr}>" + + if cmd.startswith("ATL"): + self.linefeed = cmd.endswith("1") + cr = "\r\n" if self.linefeed else "\r" + return f"OK{cr}{cr}>" + + if cmd == "ATDPN": + return f"A{self.protocol}{cr}{cr}>" + + if cmd == "ATDP": + protocols = { + "0": "AUTO", + "6": "ISO 15765-4 (CAN 11/500)", + "7": "ISO 15765-4 (CAN 29/500)", + "8": "ISO 15765-4 (CAN 11/250)", + "9": "ISO 15765-4 (CAN 29/250)", + } + name = protocols.get(self.protocol, "AUTO") + return f"{name}{cr}{cr}>" + + if cmd == "ATRV": + return f"12.6V{cr}{cr}>" + + # Commands that just get OK + ok_prefixes = ( + "ATSP", + "ATST", + "ATAT", + "ATM", + "ATCAF", + "ATD", + "ATCE", + "ATCF", + "ATCM", + "ATSH", + "ATFC", + "ATWM", + "ATMA", + "ATAR", + "ATPC", + ) + for prefix in ok_prefixes: + if cmd.startswith(prefix): + return f"OK{cr}{cr}>" + + if cmd == "AT": + return f"OK{cr}{cr}>" + + return f"?{cr}{cr}>" + + def _handle_obd(self, cmd: str) -> str: + """Handle OBD-II service queries.""" + cr = "\r\n" if self.linefeed else "\r" + cmd = cmd.replace(" ", "") + + if len(cmd) < 4: + return f"?{cr}{cr}>" + + try: + mode = int(cmd[:2], 16) + pid = int(cmd[2:4], 16) + except ValueError: + return f"?{cr}{cr}>" + + if mode == 0x01: + return self._handle_mode01(pid) + f"{cr}>" + if mode == 0x03: + return self._handle_mode03() + f"{cr}>" + if mode == 0x04: + self.dtc_codes.clear() + return f"44{cr}{cr}>" + if mode == 0x09: + return self._handle_mode09(pid) + f"{cr}>" + + return f"NO DATA{cr}{cr}>" + + def _handle_mode01(self, pid: int) -> str: + """Handle Mode 01 (current data) queries.""" + cr = "\r\n" if self.linefeed else "\r" + + # Supported PID bitmaps + if pid in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0): + bitmap = self._build_pid_bitmap(pid) + return self._format_response(0x41, pid, bitmap) + + # Configured PID value + if pid in self.pid_values: + return self._format_response(0x41, pid, self.pid_values[pid]) + + return f"NO DATA{cr}" + + def _handle_mode03(self) -> str: + """Handle Mode 03 (stored DTCs) query.""" + if not self.dtc_codes: + return self._format_response(0x43, 0x00, bytes([0x00, 0x00, 0x00, 0x00])) + + # Pack DTCs into response (2 bytes each) + dtc_bytes = bytearray() + for code in self.dtc_codes[:3]: # Max 3 per frame + dtc_bytes.extend(self._encode_dtc(code)) + + # Pad to even number of DTCs + while len(dtc_bytes) % 4 != 0: + dtc_bytes.extend(b"\x00\x00") + + return self._format_response(0x43, len(self.dtc_codes), bytes(dtc_bytes)) + + def _handle_mode09(self, pid: int) -> str: + """Handle Mode 09 (vehicle info) queries.""" + cr = "\r\n" if self.linefeed else "\r" + + if pid == 0x00: + # Supported Mode 09 PIDs: 02 (VIN) + return self._format_response(0x49, 0x00, bytes([0x40, 0x00, 0x00, 0x00])) + + if pid == 0x02: + # VIN (17 chars, multi-frame) + vin_bytes = self.vin.encode("ascii")[:17].ljust(17, b"0") + if self.headers: + lines = [] + # First frame: 10 14 49 02 01 VIN[0:3] + lines.append( + f"{ECU_HEADER} 10 14 49 02 01 " + " ".join(f"{b:02X}" for b in vin_bytes[:3]) + ) + # Consecutive frames + offset = 3 + seq = 1 + while offset < len(vin_bytes): + chunk = vin_bytes[offset : offset + 7] + frame = f"{ECU_HEADER} 2{seq:01X} " + " ".join(f"{b:02X}" for b in chunk) + lines.append(frame) + offset += 7 + seq = (seq + 1) & 0x0F + return cr.join(lines) + cr + else: + return f"4902{vin_bytes.hex().upper()}{cr}" + + return f"NO DATA{cr}" + + def _format_response(self, mode_resp: int, pid: int, data: bytes) -> str: + """Format an OBD-II response with optional headers.""" + cr = "\r\n" if self.linefeed else "\r" + length = 2 + len(data) # mode + pid + data + + if self.headers: + sep = " " if self.spaces else "" + data_hex = sep.join(f"{b:02X}" for b in data) + return f"{ECU_HEADER}{sep}{length:02X}{sep}{mode_resp:02X}{sep}{pid:02X}{sep}{data_hex}{cr}" + else: + data_hex = "".join(f"{b:02X}" for b in data) + return f"{mode_resp:02X}{pid:02X}{data_hex}{cr}" + + def _build_pid_bitmap(self, base_pid: int) -> bytes: + """Build a 4-byte bitmap of supported PIDs for a range.""" + bitmap = 0 + all_pids = set(self.pid_values.keys()) + + for i in range(32): + actual_pid = base_pid + i + 1 + if actual_pid in all_pids: + bitmap |= 1 << (31 - i) + + # Set bit 32 (= bit 0) if the next range has supported PIDs + next_base = base_pid + 0x20 + if next_base <= 0xE0: + for p in all_pids: + if next_base < p <= next_base + 0x20: + bitmap |= 1 + break + + return bitmap.to_bytes(4, "big") + + @staticmethod + def _encode_dtc(code: str) -> bytes: + """Encode a DTC string like 'P0301' into 2 bytes.""" + prefixes = {"P": 0, "C": 1, "B": 2, "U": 3} + prefix = prefixes.get(code[0].upper(), 0) + number = int(code[1:], 16) + value = (prefix << 14) | number + return value.to_bytes(2, "big") + + def get_commands( + self, + since_index: int = 0, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Get recorded commands.""" + records = [ + { + "index": r.index, + "timestamp": r.timestamp, + "command": r.raw, + "response_preview": r.response[:80], + } + for r in self._commands + if r.index >= since_index + ] + return records[:limit] + + +# ==================== Module-level emulator ==================== + +_emulator: ELM327Emulator | None = None + + +def _get_emulator() -> ELM327Emulator: + """Get the global emulator instance.""" + global _emulator + if _emulator is None: + _emulator = ELM327Emulator() + return _emulator + + +async def _on_write(char_id: str, value: bytes, device: str | None) -> None: + """Callback fired when a remote client writes to the RX characteristic.""" + emu = _get_emulator() + if not emu._started or char_id != emu._rx_char_id: + return + + response = emu.handle_data(value) + if response and emu._tx_char_id: + mgr = get_gatt_server() + mgr.set_value(emu._tx_char_id, response.encode("utf-8")) + + +# ==================== Tool registration ==================== + + +def register_tools(mcp: FastMCP) -> None: + """Register ELM327 emulator tools with the MCP server.""" + + @mcp.tool() + async def bt_elm327_emu_start( + adapter: str, + name: str = "OBDII", + load_defaults: bool = True, + ) -> dict[str, Any]: + """Start the ELM327 OBD-II BLE emulator. + + Sets up a Nordic UART Service (NUS) GATT server, registers with + BlueZ, and starts advertising. OBD-II apps can then discover and + connect to this emulated adapter. + + The emulator auto-responds to AT commands and OBD-II PID queries. + Configure PID values with bt_elm327_emu_set_pid(). + + Args: + adapter: Bluetooth adapter (e.g., "hci0"). + name: BLE device name shown to scanners (default "OBDII"). + load_defaults: Pre-load default PID values for a realistic + idle vehicle (RPM 750, coolant 30°C, etc.). + + Returns: + Status with NUS characteristic IDs and default PID info. + """ + emu = _get_emulator() + if emu._started: + return {"status": "error", "error": "Emulator already running"} + + try: + if load_defaults: + emu.set_defaults() + + mgr = get_gatt_server() + + # Build NUS GATT hierarchy + svc_id = mgr.add_service(NUS_SERVICE_UUID) + rx_id = mgr.add_characteristic( + svc_id, + NUS_RX_CHAR_UUID, + flags=["write", "write-without-response"], + ) + tx_id = mgr.add_characteristic( + svc_id, + NUS_TX_CHAR_UUID, + flags=["notify"], + value=f"{emu.device_id}\r\n>".encode(), + ) + + emu._rx_char_id = rx_id + emu._tx_char_id = tx_id + emu._adapter = adapter + + # Register write callback for auto-response + mgr._write_callback = _on_write + + # Register + advertise + await mgr.register(adapter) + await mgr.set_advertising( + adapter, + True, + name=name, + service_uuids=[NUS_SERVICE_UUID], + ) + + emu._started = True + + pid_info = ( + {pid: desc for pid, (_val, desc) in DEFAULT_PIDS.items()} if load_defaults else {} + ) + + return { + "status": "ok", + "emulator": "ELM327 v1.5", + "adapter": adapter, + "name": name, + "rx_char_id": rx_id, + "tx_char_id": tx_id, + "default_pids": pid_info, + "vin": emu.vin, + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_elm327_emu_stop(adapter: str) -> dict[str, Any]: + """Stop the ELM327 OBD-II BLE emulator. + + Stops advertising, unregisters GATT services, and cleans up. + + Args: + adapter: Bluetooth adapter (e.g., "hci0"). + + Returns: + Status confirming shutdown. + """ + global _emulator + emu = _get_emulator() + + if not emu._started: + return {"status": "ok", "was_running": False} + + try: + mgr = get_gatt_server() + mgr._write_callback = None + await mgr.unregister(adapter) + mgr.clear() + except Exception: + pass + + emu._started = False + _emulator = None + return {"status": "ok", "was_running": True} + + @mcp.tool() + async def bt_elm327_emu_status() -> dict[str, Any]: + """Get ELM327 emulator status. + + Returns: + - running: Whether the emulator is active + - pids: Configured PID values with descriptions + - dtcs: Active diagnostic trouble codes + - command_count: Number of commands received + - settings: Current ELM327 settings (echo, headers, etc.) + """ + emu = _get_emulator() + + pids = {} + for pid, value in emu.pid_values.items(): + desc = "" + if pid in DEFAULT_PIDS: + _, desc = DEFAULT_PIDS[pid] + pids[f"0x{pid:02X}"] = { + "value_hex": value.hex(), + "value_bytes": list(value), + "description": desc, + } + + return { + "status": "ok", + "running": emu._started, + "device_id": emu.device_id, + "adapter": emu._adapter, + "protocol": emu.protocol, + "pids": pids, + "dtcs": emu.dtc_codes, + "vin": emu.vin, + "command_count": emu._cmd_index, + "settings": { + "echo": emu.echo, + "headers": emu.headers, + "spaces": emu.spaces, + "linefeed": emu.linefeed, + }, + } + + @mcp.tool() + async def bt_elm327_emu_set_pid( + pid: int, + value: str, + value_type: str = "hex", + ) -> dict[str, Any]: + """Set an OBD-II PID response value. + + When an OBD app queries this PID (Mode 01), the emulator returns + the configured value. Values follow OBD-II encoding conventions. + + Common PIDs: + 0x05: Coolant temp (1 byte, value - 40 = °C) + 0x0C: Engine RPM (2 bytes, value / 4 = RPM) + 0x0D: Vehicle speed (1 byte = km/h) + 0x0F: Intake air temp (1 byte, value - 40 = °C) + 0x11: Throttle position (1 byte, value / 2.55 = %) + 0x2F: Fuel tank level (1 byte, value / 2.55 = %) + 0x46: Ambient air temp (1 byte, value - 40 = °C) + + Args: + pid: PID number (e.g., 0x0C for RPM). + value: Encoded value. For RPM of 3000: "0BB8" (hex) or "3000" (int). + value_type: "hex" (raw bytes), "int" (little-endian integer), + or "string" (UTF-8). + + Returns: + Status with the configured PID value. + """ + emu = _get_emulator() + + try: + if value_type == "hex": + data = bytes.fromhex(value) + elif value_type == "int": + int_val = int(value) + byte_len = max(1, (int_val.bit_length() + 7) // 8) + data = int_val.to_bytes(byte_len, "big") # OBD uses big-endian + elif value_type == "string": + data = value.encode("utf-8") + else: + return {"status": "error", "error": f"Unknown value_type: {value_type}"} + + emu.pid_values[pid] = data + return { + "status": "ok", + "pid": f"0x{pid:02X}", + "value_hex": data.hex(), + "value_bytes": list(data), + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_elm327_emu_set_dtcs(dtc_codes: list[str]) -> dict[str, Any]: + """Set diagnostic trouble codes for Mode 03 queries. + + Args: + dtc_codes: List of DTC strings (e.g., ["P0301", "P0420", "C0035"]). + P = Powertrain, C = Chassis, B = Body, U = Network. + + Returns: + Status with configured DTCs. + """ + emu = _get_emulator() + emu.dtc_codes = [c.upper() for c in dtc_codes] + return { + "status": "ok", + "dtcs": emu.dtc_codes, + "count": len(emu.dtc_codes), + } + + @mcp.tool() + async def bt_elm327_emu_set_vin(vin: str) -> dict[str, Any]: + """Set the Vehicle Identification Number for Mode 09 queries. + + Args: + vin: 17-character VIN string. + + Returns: + Status with configured VIN. + """ + emu = _get_emulator() + if len(vin) != 17: + return {"status": "error", "error": f"VIN must be 17 characters (got {len(vin)})"} + emu.vin = vin.upper() + return {"status": "ok", "vin": emu.vin} + + @mcp.tool() + async def bt_elm327_emu_read_commands( + since_index: int = 0, + limit: int = 50, + ) -> dict[str, Any]: + """Read commands received from the OBD-II client. + + Monitor what the connected app is requesting. Useful for debugging + protocol issues or understanding app behavior. + + Args: + since_index: Only return commands with index >= this value. + limit: Maximum commands to return. + + Returns: + List of received commands with timestamps and responses. + """ + emu = _get_emulator() + cmds = emu.get_commands(since_index, limit) + next_idx = max((c["index"] for c in cmds), default=since_index) + 1 if cmds else since_index + return { + "status": "ok", + "commands": cmds, + "count": len(cmds), + "next_index": next_idx, + } diff --git a/src/mcbluetooth/tools/gatt_server.py b/src/mcbluetooth/tools/gatt_server.py new file mode 100644 index 0000000..1e40b65 --- /dev/null +++ b/src/mcbluetooth/tools/gatt_server.py @@ -0,0 +1,392 @@ +"""GATT Server tools for Bluetooth MCP server. + +These tools let Linux act as a BLE peripheral (GATT server) that +advertises services, accepts connections, and handles reads/writes +from remote central devices. Use cases include: emulating sensors, +building test harnesses, mocking BLE devices, protocol fuzzing. + +Typical flow: + 1. bt_gatt_server_add_service(uuid) → service0 + 2. bt_gatt_server_add_characteristic(service0, ...) → service0/char0 + 3. bt_gatt_server_register(adapter="hci0") → registered + 4. bt_gatt_server_advertise(adapter="hci0", enable=True, name="MyDevice") + 5. Remote connects, writes → bt_gatt_server_read_writes() + 6. Respond via bt_gatt_server_set_value() → auto-notifies +""" + +from __future__ import annotations + +from typing import Any + +from fastmcp import FastMCP + +from mcbluetooth.gatt_server import get_gatt_server, shutdown_gatt_server + + +def _parse_value(value: str, value_type: str) -> bytes: + """Convert a string value to bytes based on value_type.""" + if value_type == "hex": + return bytes.fromhex(value) + elif value_type == "string": + return value.encode("utf-8") + elif value_type == "int": + int_val = int(value) + byte_len = max(1, (int_val.bit_length() + 7) // 8) + return int_val.to_bytes(byte_len, "little") + else: + raise ValueError(f"Unknown value_type: {value_type}") + + +def register_tools(mcp: FastMCP) -> None: + """Register GATT server tools with the MCP server.""" + + # ---- Lifecycle ---- + + @mcp.tool() + async def bt_gatt_server_register(adapter: str) -> dict[str, Any]: + """Register the GATT application with BlueZ. + + Exports all added services/characteristics to D-Bus and calls + GattManager1.RegisterApplication. Must add at least one service + before registering. + + After registration, remote BLE centrals can discover and interact + with the hosted services (once advertising is started). + + Args: + adapter: Adapter name (e.g., "hci0"). + + Returns: + Registration status with service count. + """ + try: + mgr = get_gatt_server() + await mgr.register(adapter) + status = mgr.get_status() + return { + "status": "ok", + "registered": True, + "service_count": len(status["services"]), + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_unregister(adapter: str) -> dict[str, Any]: + """Unregister the GATT application from BlueZ. + + Stops advertising (if active) and removes the application + registration. Services remain in memory and can be re-registered. + + Args: + adapter: Adapter name (e.g., "hci0"). + + Returns: + Status confirming unregistration. + """ + try: + await shutdown_gatt_server(adapter) + return {"status": "ok", "registered": False} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_status() -> dict[str, Any]: + """Get GATT server status. + + Returns: + - registered: Whether the application is registered with BlueZ + - advertising: Whether BLE advertising is active + - services: List of services with their characteristics + - write_event_count: Number of buffered write events + """ + mgr = get_gatt_server() + return {"status": "ok", **mgr.get_status()} + + # ---- Service construction ---- + + @mcp.tool() + async def bt_gatt_server_add_service( + uuid: str, + primary: bool = True, + ) -> dict[str, Any]: + """Add a GATT service to the server. + + Services must be added BEFORE registering with BlueZ. + If already registered, unregister first, add services, then re-register. + + Args: + uuid: Service UUID (e.g., "180F" for Battery Service, or full + 128-bit "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" for custom). + primary: Whether this is a primary service (default True). + + Returns: + The assigned service_id (e.g., "service0") for adding characteristics. + """ + try: + mgr = get_gatt_server() + service_id = mgr.add_service(uuid, primary) + return {"status": "ok", "service_id": service_id, "uuid": uuid} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_add_characteristic( + service_id: str, + uuid: str, + flags: list[str], + value: str | None = None, + value_type: str = "hex", + ) -> dict[str, Any]: + """Add a characteristic to a GATT service. + + Args: + service_id: Service to add to (e.g., "service0"). + uuid: Characteristic UUID. + flags: List of flags/permissions. Common values: + "read", "write", "write-without-response", "notify", "indicate", + "encrypt-read", "encrypt-write". + value: Initial value (optional). Format depends on value_type. + value_type: How to interpret value — "hex", "string", or "int". + + Returns: + The assigned char_id (e.g., "service0/char0") for setting values. + """ + try: + mgr = get_gatt_server() + initial = _parse_value(value, value_type) if value else b"" + char_id = mgr.add_characteristic(service_id, uuid, flags, initial) + return { + "status": "ok", + "char_id": char_id, + "uuid": uuid, + "flags": flags, + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_add_descriptor( + char_id: str, + uuid: str, + flags: list[str], + value: str | None = None, + value_type: str = "hex", + ) -> dict[str, Any]: + """Add a descriptor to a GATT characteristic. + + Note: CCCD (0x2902) for notify/indicate is auto-managed by BlueZ — + don't add it manually. + + Args: + char_id: Characteristic to add to (e.g., "service0/char0"). + uuid: Descriptor UUID (e.g., "2901" for Characteristic User Description). + flags: List of flags (e.g., ["read"], ["read", "write"]). + value: Initial value (optional). + value_type: How to interpret value — "hex", "string", or "int". + + Returns: + The assigned desc_id. + """ + try: + mgr = get_gatt_server() + initial = _parse_value(value, value_type) if value else b"" + desc_id = mgr.add_descriptor(char_id, uuid, flags, initial) + return {"status": "ok", "desc_id": desc_id, "uuid": uuid} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_clear() -> dict[str, Any]: + """Remove all services, characteristics, and descriptors. + + Must unregister first. Use this to rebuild the service hierarchy + from scratch. + + Returns: + Status confirming services were cleared. + """ + try: + mgr = get_gatt_server() + mgr.clear() + return {"status": "ok", "cleared": True} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + # ---- Value management ---- + + @mcp.tool() + async def bt_gatt_server_set_value( + char_id: str, + value: str, + value_type: str = "hex", + ) -> dict[str, Any]: + """Set a characteristic value on the GATT server. + + If a remote client has subscribed to notifications on this + characteristic, the new value is automatically pushed via + BLE notification. + + Args: + char_id: Characteristic ID (e.g., "service0/char0"). + value: Value to set. + value_type: How to interpret value — "hex" (e.g., "0102ff"), + "string" (UTF-8 text), or "int" (decimal number). + + Returns: + Status with value details and whether notification was sent. + """ + try: + mgr = get_gatt_server() + data = _parse_value(value, value_type) + char = mgr._characteristics.get(char_id) + was_notifying = char.notifying if char else False + mgr.set_value(char_id, data) + return { + "status": "ok", + "char_id": char_id, + "value_hex": data.hex(), + "length": len(data), + "notified": was_notifying, + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_get_value(char_id: str) -> dict[str, Any]: + """Read a characteristic value from the server side. + + This reads the value stored on OUR server, not from a remote device. + Use bt_ble_read() for reading from remote GATT servers. + + Args: + char_id: Characteristic ID (e.g., "service0/char0"). + + Returns: + The current value as hex, bytes, and string (if decodable). + """ + try: + mgr = get_gatt_server() + value = mgr.get_value(char_id) + result: dict[str, Any] = { + "status": "ok", + "char_id": char_id, + "value_hex": value.hex(), + "value_bytes": list(value), + "length": len(value), + } + try: + result["value_string"] = value.decode("utf-8") + except UnicodeDecodeError: + pass + return result + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_gatt_server_notify(char_id: str) -> dict[str, Any]: + """Explicitly send a BLE notification for the current value. + + Normally, bt_gatt_server_set_value auto-notifies. Use this to + re-send the current value without changing it. + + Args: + char_id: Characteristic ID (e.g., "service0/char0"). + + Returns: + Status indicating whether notification was sent. + """ + try: + mgr = get_gatt_server() + sent = mgr.notify(char_id) + if sent: + return {"status": "ok", "notified": True} + return { + "status": "ok", + "notified": False, + "reason": "No client subscribed to notifications", + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + # ---- Advertising ---- + + @mcp.tool() + async def bt_gatt_server_advertise( + adapter: str, + enable: bool, + name: str = "mcbluetooth", + service_uuids: list[str] | None = None, + ) -> dict[str, Any]: + """Start or stop BLE advertising. + + When advertising, the device becomes discoverable to BLE scanners + and can accept connections from central devices. + + Must register the GATT application first. + + Args: + adapter: Adapter name (e.g., "hci0"). + enable: True to start advertising, False to stop. + name: Local name shown to scanners (default "mcbluetooth"). + service_uuids: UUIDs to include in advertisement. If not specified, + uses all registered service UUIDs. + + Returns: + Advertising status. + """ + try: + mgr = get_gatt_server() + await mgr.set_advertising(adapter, enable, name=name, service_uuids=service_uuids) + return { + "status": "ok", + "advertising": enable, + "name": name if enable else None, + } + except Exception as exc: + return {"status": "error", "error": str(exc)} + + # ---- Write event monitoring ---- + + @mcp.tool() + async def bt_gatt_server_read_writes( + since_index: int = 0, + char_id: str | None = None, + limit: int = 50, + ) -> dict[str, Any]: + """Read write events from remote BLE clients. + + When a remote central device writes to a server characteristic, + the write is buffered here. Poll this to see incoming commands/data. + + Args: + since_index: Only return events with index >= this value. + Start with 0, then use the highest returned index + 1. + char_id: Filter to writes on this characteristic (optional). + limit: Maximum events to return (default 50). + + Returns: + List of write events with value, timestamp, and source device. + """ + mgr = get_gatt_server() + events = mgr.get_write_events(since_index, char_id, limit) + next_index = ( + max((e["index"] for e in events), default=since_index) + 1 if events else since_index + ) + return { + "status": "ok", + "events": events, + "count": len(events), + "next_index": next_index, + } + + @mcp.tool() + async def bt_gatt_server_clear_writes() -> dict[str, Any]: + """Clear the write event buffer. + + Returns: + Number of events cleared. + """ + mgr = get_gatt_server() + count = mgr.clear_write_events() + return {"status": "ok", "cleared_count": count}