Add BLE GATT server and ELM327 OBD-II emulator
Implement peripheral/GATT server support (14 tools) using BlueZ's GattManager1 D-Bus API, enabling mcbluetooth to act as a BLE device that advertises, accepts connections, and handles reads/writes/notifications. Add ELM327 emulator (7 tools) that builds on the GATT server to emulate a BLE OBD-II adapter over Nordic UART Service. Handles AT commands, Mode 01/03/04/09 queries, configurable PIDs, DTCs, and VIN with auto-response — no LLM round-trip needed for protocol handling. New files: - gatt_server.py: D-Bus ServiceInterface hierarchy + GattServerManager - tools/gatt_server.py: MCP tool wrappers for server lifecycle - tools/bt_elm327_emu.py: ELM327 protocol handler + NUS setup Also adds 2 MCP resources (bluetooth://gatt/server, .../writes).
This commit is contained in:
parent
b0aa6434ae
commit
d0f0565e62
@ -69,6 +69,7 @@ ignore = ["E501"]
|
|||||||
[tool.ruff.lint.per-file-ignores]
|
[tool.ruff.lint.per-file-ignores]
|
||||||
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
|
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
|
||||||
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
|
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
|
||||||
|
"src/mcbluetooth/gatt_server.py" = ["F821", "F722"]
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
asyncio_mode = "auto"
|
asyncio_mode = "auto"
|
||||||
|
|||||||
717
src/mcbluetooth/gatt_server.py
Normal file
717
src/mcbluetooth/gatt_server.py
Normal file
@ -0,0 +1,717 @@
|
|||||||
|
"""BLE GATT Server implementation for BlueZ.
|
||||||
|
|
||||||
|
Registers as a BLE peripheral via BlueZ's GattManager1 D-Bus API, allowing
|
||||||
|
mcbluetooth to act as a GATT server. Remote BLE central devices can discover
|
||||||
|
services, read/write characteristics, and subscribe to notifications.
|
||||||
|
|
||||||
|
D-Bus hierarchy exported to BlueZ:
|
||||||
|
/mcbluetooth/gatt ObjectManager
|
||||||
|
/mcbluetooth/gatt/serviceN GattService1
|
||||||
|
/mcbluetooth/gatt/serviceN/charM GattCharacteristic1
|
||||||
|
/mcbluetooth/gatt/serviceN/charM/descK GattDescriptor1
|
||||||
|
/mcbluetooth/gatt/adv0 LEAdvertisement1
|
||||||
|
|
||||||
|
Reference patterns: agent.py (ServiceInterface, pending requests),
|
||||||
|
hfp_ag.py (dedicated bus, module singleton, ProfileManager registration).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from dbus_fast import BusType, Message, MessageType, Variant
|
||||||
|
from dbus_fast.aio import MessageBus
|
||||||
|
from dbus_fast.service import ServiceInterface, dbus_property, method
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BLUEZ_SERVICE = "org.bluez"
|
||||||
|
GATT_MANAGER_IFACE = "org.bluez.GattManager1"
|
||||||
|
LE_ADV_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1"
|
||||||
|
APP_BASE_PATH = "/mcbluetooth/gatt"
|
||||||
|
ADV_PATH = "/mcbluetooth/gatt/adv0"
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Data Classes ====================
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class WriteEvent:
|
||||||
|
"""A write received from a remote BLE central device."""
|
||||||
|
|
||||||
|
index: int
|
||||||
|
timestamp: str
|
||||||
|
char_id: str
|
||||||
|
char_uuid: str
|
||||||
|
value: bytes
|
||||||
|
value_hex: str
|
||||||
|
value_string: str | None
|
||||||
|
device: str | None
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
d: dict[str, Any] = {
|
||||||
|
"index": self.index,
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
"char_id": self.char_id,
|
||||||
|
"char_uuid": self.char_uuid,
|
||||||
|
"value_hex": self.value_hex,
|
||||||
|
"length": len(self.value),
|
||||||
|
}
|
||||||
|
if self.value_string is not None:
|
||||||
|
d["value_string"] = self.value_string
|
||||||
|
if self.device:
|
||||||
|
d["device"] = self.device
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ServerCharacteristic:
|
||||||
|
"""Internal state for a server-side GATT characteristic."""
|
||||||
|
|
||||||
|
char_id: str
|
||||||
|
path: str
|
||||||
|
uuid: str
|
||||||
|
flags: list[str]
|
||||||
|
value: bytes = b""
|
||||||
|
notifying: bool = False
|
||||||
|
service_id: str = ""
|
||||||
|
desc_count: int = 0
|
||||||
|
dbus_obj: Any = field(default=None, repr=False)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ServerDescriptor:
|
||||||
|
"""Internal state for a server-side GATT descriptor."""
|
||||||
|
|
||||||
|
desc_id: str
|
||||||
|
path: str
|
||||||
|
uuid: str
|
||||||
|
flags: list[str]
|
||||||
|
value: bytes = b""
|
||||||
|
char_id: str = ""
|
||||||
|
dbus_obj: Any = field(default=None, repr=False)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ServerService:
|
||||||
|
"""Internal state for a server-side GATT service."""
|
||||||
|
|
||||||
|
service_id: str
|
||||||
|
path: str
|
||||||
|
uuid: str
|
||||||
|
primary: bool = True
|
||||||
|
char_count: int = 0
|
||||||
|
characteristics: dict[str, ServerCharacteristic] = field(default_factory=dict)
|
||||||
|
dbus_obj: Any = field(default=None, repr=False)
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== D-Bus Interfaces ====================
|
||||||
|
|
||||||
|
|
||||||
|
class GattApplication(ServiceInterface):
|
||||||
|
"""ObjectManager for the GATT application root.
|
||||||
|
|
||||||
|
BlueZ calls GetManagedObjects() on this path to discover all
|
||||||
|
services, characteristics, and descriptors we're hosting.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, manager: "GattServerManager"):
|
||||||
|
super().__init__("org.freedesktop.DBus.ObjectManager")
|
||||||
|
self._manager = manager
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def GetManagedObjects(self) -> "a{oa{sa{sv}}}": # noqa: F821
|
||||||
|
return self._manager._build_managed_objects()
|
||||||
|
|
||||||
|
|
||||||
|
class GattServiceIface(ServiceInterface):
|
||||||
|
"""org.bluez.GattService1 — exported at each service path.
|
||||||
|
|
||||||
|
No D-Bus methods; properties served via GetManagedObjects only.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("org.bluez.GattService1")
|
||||||
|
|
||||||
|
|
||||||
|
class GattCharacteristicIface(ServiceInterface):
|
||||||
|
"""org.bluez.GattCharacteristic1 — handles reads/writes from remotes.
|
||||||
|
|
||||||
|
ReadValue returns the stored value. WriteValue stores the value AND
|
||||||
|
records a WriteEvent for LLM polling (like agent.py's pending_requests).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, char: ServerCharacteristic, manager: "GattServerManager"):
|
||||||
|
super().__init__("org.bluez.GattCharacteristic1")
|
||||||
|
self._char = char
|
||||||
|
self._manager = manager
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
|
||||||
|
offset = 0
|
||||||
|
opt_offset = options.get("offset")
|
||||||
|
if opt_offset is not None:
|
||||||
|
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
||||||
|
return list(self._char.value[offset:])
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
|
||||||
|
new_bytes = bytes(value)
|
||||||
|
offset = 0
|
||||||
|
opt_offset = options.get("offset")
|
||||||
|
if opt_offset is not None:
|
||||||
|
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
||||||
|
|
||||||
|
if offset > 0:
|
||||||
|
self._char.value = self._char.value[:offset] + new_bytes
|
||||||
|
else:
|
||||||
|
self._char.value = new_bytes
|
||||||
|
|
||||||
|
device = None
|
||||||
|
opt_device = options.get("device")
|
||||||
|
if opt_device is not None:
|
||||||
|
device = opt_device.value if hasattr(opt_device, "value") else str(opt_device)
|
||||||
|
|
||||||
|
self._manager._record_write(self._char.char_id, self._char.uuid, self._char.value, device)
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def StartNotify(self) -> None:
|
||||||
|
self._char.notifying = True
|
||||||
|
log.info("GATT server: StartNotify on %s (%s)", self._char.char_id, self._char.uuid)
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def StopNotify(self) -> None:
|
||||||
|
self._char.notifying = False
|
||||||
|
log.info("GATT server: StopNotify on %s (%s)", self._char.char_id, self._char.uuid)
|
||||||
|
|
||||||
|
|
||||||
|
class GattDescriptorIface(ServiceInterface):
|
||||||
|
"""org.bluez.GattDescriptor1 — handles reads/writes on descriptors."""
|
||||||
|
|
||||||
|
def __init__(self, desc: ServerDescriptor):
|
||||||
|
super().__init__("org.bluez.GattDescriptor1")
|
||||||
|
self._desc = desc
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
|
||||||
|
offset = 0
|
||||||
|
opt_offset = options.get("offset")
|
||||||
|
if opt_offset is not None:
|
||||||
|
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
||||||
|
return list(self._desc.value[offset:])
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
|
||||||
|
self._desc.value = bytes(value)
|
||||||
|
|
||||||
|
|
||||||
|
class LEAdvertisementIface(ServiceInterface):
|
||||||
|
"""org.bluez.LEAdvertisement1 — BLE advertising data.
|
||||||
|
|
||||||
|
BlueZ reads advertisement properties via Properties.GetAll, so these
|
||||||
|
must be exposed as @dbus_property (unlike GATT objects which use
|
||||||
|
GetManagedObjects).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, adv_type: str, local_name: str, service_uuids: list[str]):
|
||||||
|
super().__init__("org.bluez.LEAdvertisement1")
|
||||||
|
self._type = adv_type
|
||||||
|
self._local_name = local_name
|
||||||
|
self._service_uuids = service_uuids
|
||||||
|
|
||||||
|
@method()
|
||||||
|
def Release(self) -> None:
|
||||||
|
log.info("GATT server: advertisement released by BlueZ")
|
||||||
|
|
||||||
|
@dbus_property()
|
||||||
|
def Type(self) -> "s": # noqa: F821
|
||||||
|
return self._type
|
||||||
|
|
||||||
|
@dbus_property()
|
||||||
|
def LocalName(self) -> "s": # noqa: F821
|
||||||
|
return self._local_name
|
||||||
|
|
||||||
|
@dbus_property()
|
||||||
|
def ServiceUUIDs(self) -> "as": # noqa: F821
|
||||||
|
return self._service_uuids
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== GATT Server Manager ====================
|
||||||
|
|
||||||
|
|
||||||
|
class GattServerManager:
|
||||||
|
"""Manages the GATT server lifecycle and state.
|
||||||
|
|
||||||
|
Coordinates D-Bus object creation, BlueZ registration,
|
||||||
|
advertising, and write event buffering.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._bus: MessageBus | None = None
|
||||||
|
self._app: GattApplication | None = None
|
||||||
|
self._adv: LEAdvertisementIface | None = None
|
||||||
|
self._services: dict[str, ServerService] = {}
|
||||||
|
self._characteristics: dict[str, ServerCharacteristic] = {}
|
||||||
|
self._descriptors: dict[str, ServerDescriptor] = {}
|
||||||
|
self._write_events: deque[WriteEvent] = deque(maxlen=200)
|
||||||
|
self._write_index: int = 0
|
||||||
|
self._service_count: int = 0
|
||||||
|
self._registered: bool = False
|
||||||
|
self._advertising: bool = False
|
||||||
|
self._write_callback: Any = None # async callable(char_id, value, device)
|
||||||
|
|
||||||
|
# ---- Service construction ----
|
||||||
|
|
||||||
|
def add_service(self, uuid: str, primary: bool = True) -> str:
|
||||||
|
"""Add a GATT service. Returns the service_id (e.g. 'service0')."""
|
||||||
|
service_id = f"service{self._service_count}"
|
||||||
|
self._service_count += 1
|
||||||
|
path = f"{APP_BASE_PATH}/{service_id}"
|
||||||
|
|
||||||
|
svc = ServerService(
|
||||||
|
service_id=service_id,
|
||||||
|
path=path,
|
||||||
|
uuid=uuid,
|
||||||
|
primary=primary,
|
||||||
|
)
|
||||||
|
svc.dbus_obj = GattServiceIface()
|
||||||
|
self._services[service_id] = svc
|
||||||
|
|
||||||
|
log.info("GATT server: added service %s (UUID=%s)", service_id, uuid)
|
||||||
|
return service_id
|
||||||
|
|
||||||
|
def add_characteristic(
|
||||||
|
self,
|
||||||
|
service_id: str,
|
||||||
|
uuid: str,
|
||||||
|
flags: list[str],
|
||||||
|
value: bytes = b"",
|
||||||
|
) -> str:
|
||||||
|
"""Add a characteristic to a service. Returns the char_id."""
|
||||||
|
svc = self._services.get(service_id)
|
||||||
|
if not svc:
|
||||||
|
raise ValueError(f"Service '{service_id}' not found")
|
||||||
|
|
||||||
|
char_idx = svc.char_count
|
||||||
|
svc.char_count += 1
|
||||||
|
char_id = f"{service_id}/char{char_idx}"
|
||||||
|
path = f"{APP_BASE_PATH}/{char_id}"
|
||||||
|
|
||||||
|
char = ServerCharacteristic(
|
||||||
|
char_id=char_id,
|
||||||
|
path=path,
|
||||||
|
uuid=uuid,
|
||||||
|
flags=flags,
|
||||||
|
value=value,
|
||||||
|
service_id=service_id,
|
||||||
|
)
|
||||||
|
char.dbus_obj = GattCharacteristicIface(char, self)
|
||||||
|
svc.characteristics[char_id] = char
|
||||||
|
self._characteristics[char_id] = char
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"GATT server: added characteristic %s (UUID=%s, flags=%s)",
|
||||||
|
char_id,
|
||||||
|
uuid,
|
||||||
|
flags,
|
||||||
|
)
|
||||||
|
return char_id
|
||||||
|
|
||||||
|
def add_descriptor(
|
||||||
|
self,
|
||||||
|
char_id: str,
|
||||||
|
uuid: str,
|
||||||
|
flags: list[str],
|
||||||
|
value: bytes = b"",
|
||||||
|
) -> str:
|
||||||
|
"""Add a descriptor to a characteristic. Returns the desc_id."""
|
||||||
|
char = self._characteristics.get(char_id)
|
||||||
|
if not char:
|
||||||
|
raise ValueError(f"Characteristic '{char_id}' not found")
|
||||||
|
|
||||||
|
desc_idx = char.desc_count
|
||||||
|
char.desc_count += 1
|
||||||
|
desc_id = f"{char_id}/desc{desc_idx}"
|
||||||
|
path = f"{APP_BASE_PATH}/{desc_id}"
|
||||||
|
|
||||||
|
desc = ServerDescriptor(
|
||||||
|
desc_id=desc_id,
|
||||||
|
path=path,
|
||||||
|
uuid=uuid,
|
||||||
|
flags=flags,
|
||||||
|
value=value,
|
||||||
|
char_id=char_id,
|
||||||
|
)
|
||||||
|
desc.dbus_obj = GattDescriptorIface(desc)
|
||||||
|
self._descriptors[desc_id] = desc
|
||||||
|
|
||||||
|
log.info("GATT server: added descriptor %s (UUID=%s)", desc_id, uuid)
|
||||||
|
return desc_id
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""Remove all services, characteristics, and descriptors."""
|
||||||
|
if self._registered:
|
||||||
|
raise RuntimeError("Cannot clear while registered — unregister first")
|
||||||
|
|
||||||
|
self._services.clear()
|
||||||
|
self._characteristics.clear()
|
||||||
|
self._descriptors.clear()
|
||||||
|
self._service_count = 0
|
||||||
|
|
||||||
|
# ---- Registration with BlueZ ----
|
||||||
|
|
||||||
|
async def register(self, adapter: str) -> None:
|
||||||
|
"""Register the GATT application with BlueZ GattManager1."""
|
||||||
|
if self._registered:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._services:
|
||||||
|
raise ValueError("No services defined — add at least one service first")
|
||||||
|
|
||||||
|
# Dedicated bus connection (like hfp_ag.py)
|
||||||
|
if not self._bus:
|
||||||
|
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||||
|
|
||||||
|
# Export ObjectManager at application root
|
||||||
|
self._app = GattApplication(self)
|
||||||
|
self._bus.export(APP_BASE_PATH, self._app)
|
||||||
|
|
||||||
|
# Export all service/characteristic/descriptor D-Bus objects
|
||||||
|
for svc in self._services.values():
|
||||||
|
self._bus.export(svc.path, svc.dbus_obj)
|
||||||
|
for char in svc.characteristics.values():
|
||||||
|
self._bus.export(char.path, char.dbus_obj)
|
||||||
|
for desc in self._descriptors.values():
|
||||||
|
self._bus.export(desc.path, desc.dbus_obj)
|
||||||
|
|
||||||
|
# Register with BlueZ
|
||||||
|
adapter_path = f"/org/bluez/{adapter}"
|
||||||
|
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
||||||
|
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
||||||
|
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
|
||||||
|
self._registered = True
|
||||||
|
log.info("GATT server: registered with BlueZ (%s)", adapter)
|
||||||
|
except Exception as e:
|
||||||
|
if "Already Exists" in str(e):
|
||||||
|
log.info("GATT server: stale registration — re-registering")
|
||||||
|
try:
|
||||||
|
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
|
||||||
|
self._registered = True
|
||||||
|
log.info("GATT server: re-registered with BlueZ (%s)", adapter)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def unregister(self, adapter: str) -> None:
|
||||||
|
"""Unregister the GATT application from BlueZ."""
|
||||||
|
if self._advertising:
|
||||||
|
await self.set_advertising(adapter, False)
|
||||||
|
|
||||||
|
if self._bus and self._registered:
|
||||||
|
try:
|
||||||
|
adapter_path = f"/org/bluez/{adapter}"
|
||||||
|
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
||||||
|
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
||||||
|
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
|
||||||
|
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._registered = False
|
||||||
|
|
||||||
|
# Disconnect bus (cleans up all exports)
|
||||||
|
if self._bus:
|
||||||
|
self._bus.disconnect()
|
||||||
|
self._bus = None
|
||||||
|
|
||||||
|
self._app = None
|
||||||
|
self._adv = None
|
||||||
|
log.info("GATT server: unregistered")
|
||||||
|
|
||||||
|
# ---- Advertising ----
|
||||||
|
|
||||||
|
async def set_advertising(
|
||||||
|
self,
|
||||||
|
adapter: str,
|
||||||
|
enable: bool,
|
||||||
|
name: str = "mcbluetooth",
|
||||||
|
service_uuids: list[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Start or stop BLE advertising."""
|
||||||
|
if not self._bus:
|
||||||
|
raise RuntimeError("Bus not connected — register first")
|
||||||
|
|
||||||
|
adapter_path = f"/org/bluez/{adapter}"
|
||||||
|
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
||||||
|
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
||||||
|
adv_mgr = proxy.get_interface(LE_ADV_MANAGER_IFACE)
|
||||||
|
|
||||||
|
if enable:
|
||||||
|
if self._advertising:
|
||||||
|
return
|
||||||
|
|
||||||
|
uuids = service_uuids or [svc.uuid for svc in self._services.values()]
|
||||||
|
self._adv = LEAdvertisementIface("peripheral", name, uuids)
|
||||||
|
self._bus.export(ADV_PATH, self._adv)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await adv_mgr.call_register_advertisement(ADV_PATH, {})
|
||||||
|
self._advertising = True
|
||||||
|
log.info("GATT server: advertising started (name=%s)", name)
|
||||||
|
except Exception as e:
|
||||||
|
if "Already Exists" in str(e):
|
||||||
|
try:
|
||||||
|
await adv_mgr.call_unregister_advertisement(ADV_PATH)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await adv_mgr.call_register_advertisement(ADV_PATH, {})
|
||||||
|
self._advertising = True
|
||||||
|
log.info("GATT server: advertising restarted (name=%s)", name)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
if not self._advertising:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await adv_mgr.call_unregister_advertisement(ADV_PATH)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._advertising = False
|
||||||
|
self._adv = None
|
||||||
|
log.info("GATT server: advertising stopped")
|
||||||
|
|
||||||
|
# ---- Value management ----
|
||||||
|
|
||||||
|
def set_value(self, char_id: str, value: bytes) -> None:
|
||||||
|
"""Set characteristic value and auto-notify if subscribed."""
|
||||||
|
char = self._characteristics.get(char_id)
|
||||||
|
if not char:
|
||||||
|
raise ValueError(f"Characteristic '{char_id}' not found")
|
||||||
|
|
||||||
|
char.value = value
|
||||||
|
if char.notifying:
|
||||||
|
self._emit_notification(char)
|
||||||
|
|
||||||
|
def get_value(self, char_id: str) -> bytes:
|
||||||
|
"""Get current characteristic value."""
|
||||||
|
char = self._characteristics.get(char_id)
|
||||||
|
if not char:
|
||||||
|
raise ValueError(f"Characteristic '{char_id}' not found")
|
||||||
|
return char.value
|
||||||
|
|
||||||
|
def notify(self, char_id: str) -> bool:
|
||||||
|
"""Explicitly send notification for current value."""
|
||||||
|
char = self._characteristics.get(char_id)
|
||||||
|
if not char:
|
||||||
|
raise ValueError(f"Characteristic '{char_id}' not found")
|
||||||
|
if not char.notifying:
|
||||||
|
return False
|
||||||
|
self._emit_notification(char)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _emit_notification(self, char: ServerCharacteristic) -> None:
|
||||||
|
"""Emit PropertiesChanged signal to trigger BLE notification."""
|
||||||
|
if not self._bus:
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = Message(
|
||||||
|
message_type=MessageType.SIGNAL,
|
||||||
|
path=char.path,
|
||||||
|
interface="org.freedesktop.DBus.Properties",
|
||||||
|
member="PropertiesChanged",
|
||||||
|
signature="sa{sv}as",
|
||||||
|
body=[
|
||||||
|
"org.bluez.GattCharacteristic1",
|
||||||
|
{"Value": Variant("ay", list(char.value))},
|
||||||
|
[],
|
||||||
|
],
|
||||||
|
)
|
||||||
|
self._bus.send_message(msg)
|
||||||
|
log.debug("GATT server: notification sent on %s (%d bytes)", char.char_id, len(char.value))
|
||||||
|
|
||||||
|
# ---- Write event monitoring ----
|
||||||
|
|
||||||
|
def _record_write(self, char_id: str, char_uuid: str, value: bytes, device: str | None) -> None:
|
||||||
|
"""Record a write event and fire callback if registered."""
|
||||||
|
event = WriteEvent(
|
||||||
|
index=self._write_index,
|
||||||
|
timestamp=datetime.now(UTC).isoformat(),
|
||||||
|
char_id=char_id,
|
||||||
|
char_uuid=char_uuid,
|
||||||
|
value=value,
|
||||||
|
value_hex=value.hex(),
|
||||||
|
value_string=_try_decode(value),
|
||||||
|
device=device,
|
||||||
|
)
|
||||||
|
self._write_events.append(event)
|
||||||
|
self._write_index += 1
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
"GATT server: write on %s from %s (%d bytes)",
|
||||||
|
char_id,
|
||||||
|
device or "unknown",
|
||||||
|
len(value),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fire async callback (e.g. ELM327 emulator auto-responder)
|
||||||
|
if self._write_callback:
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
loop.create_task(self._write_callback(char_id, value, device))
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_write_events(
|
||||||
|
self,
|
||||||
|
since_index: int = 0,
|
||||||
|
char_id: str | None = None,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Get write events, optionally filtered."""
|
||||||
|
events = [
|
||||||
|
e
|
||||||
|
for e in self._write_events
|
||||||
|
if e.index >= since_index and (char_id is None or e.char_id == char_id)
|
||||||
|
]
|
||||||
|
return [e.to_dict() for e in events[:limit]]
|
||||||
|
|
||||||
|
def clear_write_events(self) -> int:
|
||||||
|
"""Clear all write events. Returns count cleared."""
|
||||||
|
count = len(self._write_events)
|
||||||
|
self._write_events.clear()
|
||||||
|
return count
|
||||||
|
|
||||||
|
# ---- GetManagedObjects builder ----
|
||||||
|
|
||||||
|
def _build_managed_objects(self) -> dict[str, dict[str, dict[str, Variant]]]:
|
||||||
|
"""Build the response for ObjectManager.GetManagedObjects().
|
||||||
|
|
||||||
|
Returns the complete D-Bus object hierarchy that BlueZ uses
|
||||||
|
to discover our GATT services, characteristics, and descriptors.
|
||||||
|
"""
|
||||||
|
objects: dict[str, dict[str, dict[str, Variant]]] = {}
|
||||||
|
|
||||||
|
for svc in self._services.values():
|
||||||
|
char_paths = [c.path for c in svc.characteristics.values()]
|
||||||
|
objects[svc.path] = {
|
||||||
|
"org.bluez.GattService1": {
|
||||||
|
"UUID": Variant("s", svc.uuid),
|
||||||
|
"Primary": Variant("b", svc.primary),
|
||||||
|
"Characteristics": Variant("ao", char_paths),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for char in svc.characteristics.values():
|
||||||
|
desc_paths = [
|
||||||
|
d.path for d in self._descriptors.values() if d.char_id == char.char_id
|
||||||
|
]
|
||||||
|
objects[char.path] = {
|
||||||
|
"org.bluez.GattCharacteristic1": {
|
||||||
|
"UUID": Variant("s", char.uuid),
|
||||||
|
"Service": Variant("o", svc.path),
|
||||||
|
"Flags": Variant("as", char.flags),
|
||||||
|
"Descriptors": Variant("ao", desc_paths),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for desc in self._descriptors.values():
|
||||||
|
char = self._characteristics.get(desc.char_id)
|
||||||
|
if char:
|
||||||
|
objects[desc.path] = {
|
||||||
|
"org.bluez.GattDescriptor1": {
|
||||||
|
"UUID": Variant("s", desc.uuid),
|
||||||
|
"Characteristic": Variant("o", char.path),
|
||||||
|
"Flags": Variant("as", desc.flags),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return objects
|
||||||
|
|
||||||
|
# ---- Status ----
|
||||||
|
|
||||||
|
def get_status(self) -> dict[str, Any]:
|
||||||
|
"""Get overall GATT server status."""
|
||||||
|
services = []
|
||||||
|
for svc in self._services.values():
|
||||||
|
chars = []
|
||||||
|
for char in svc.characteristics.values():
|
||||||
|
descs = [
|
||||||
|
{
|
||||||
|
"desc_id": d.desc_id,
|
||||||
|
"uuid": d.uuid,
|
||||||
|
"flags": d.flags,
|
||||||
|
}
|
||||||
|
for d in self._descriptors.values()
|
||||||
|
if d.char_id == char.char_id
|
||||||
|
]
|
||||||
|
chars.append(
|
||||||
|
{
|
||||||
|
"char_id": char.char_id,
|
||||||
|
"uuid": char.uuid,
|
||||||
|
"flags": char.flags,
|
||||||
|
"value_length": len(char.value),
|
||||||
|
"notifying": char.notifying,
|
||||||
|
"descriptors": descs,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
services.append(
|
||||||
|
{
|
||||||
|
"service_id": svc.service_id,
|
||||||
|
"uuid": svc.uuid,
|
||||||
|
"primary": svc.primary,
|
||||||
|
"characteristics": chars,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"registered": self._registered,
|
||||||
|
"advertising": self._advertising,
|
||||||
|
"services": services,
|
||||||
|
"write_event_count": len(self._write_events),
|
||||||
|
"write_event_total": self._write_index,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Module-level lifecycle ====================
|
||||||
|
|
||||||
|
_manager: GattServerManager | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_gatt_server() -> GattServerManager:
|
||||||
|
"""Get or create the global GATT server manager (singleton)."""
|
||||||
|
global _manager
|
||||||
|
if _manager is None:
|
||||||
|
_manager = GattServerManager()
|
||||||
|
return _manager
|
||||||
|
|
||||||
|
|
||||||
|
async def shutdown_gatt_server(adapter: str) -> None:
|
||||||
|
"""Shut down the GATT server completely."""
|
||||||
|
global _manager
|
||||||
|
if _manager:
|
||||||
|
try:
|
||||||
|
await _manager.unregister(adapter)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
_manager = None
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Helpers ====================
|
||||||
|
|
||||||
|
|
||||||
|
def _try_decode(value: bytes) -> str | None:
|
||||||
|
"""Try to decode bytes as printable UTF-8."""
|
||||||
|
try:
|
||||||
|
decoded = value.decode("utf-8")
|
||||||
|
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
|
||||||
|
return decoded
|
||||||
|
return None
|
||||||
|
except (UnicodeDecodeError, ValueError):
|
||||||
|
return None
|
||||||
@ -17,6 +17,7 @@ from dataclasses import asdict
|
|||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
from mcbluetooth.dbus_client import get_client, get_notify_manager
|
from mcbluetooth.dbus_client import get_client, get_notify_manager
|
||||||
|
from mcbluetooth.gatt_server import get_gatt_server
|
||||||
|
|
||||||
|
|
||||||
def register_resources(mcp: FastMCP) -> None:
|
def register_resources(mcp: FastMCP) -> None:
|
||||||
@ -307,3 +308,43 @@ def register_resources(mcp: FastMCP) -> None:
|
|||||||
},
|
},
|
||||||
indent=2,
|
indent=2,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ==================== GATT Server Resources ====================
|
||||||
|
|
||||||
|
@mcp.resource(
|
||||||
|
"bluetooth://gatt/server",
|
||||||
|
name="GATT Server State",
|
||||||
|
description=(
|
||||||
|
"Current state of the BLE GATT server (peripheral mode). "
|
||||||
|
"Shows registered services, characteristics, advertising status, "
|
||||||
|
"and write event statistics."
|
||||||
|
),
|
||||||
|
mime_type="application/json",
|
||||||
|
)
|
||||||
|
async def resource_gatt_server() -> str:
|
||||||
|
"""Get GATT server state."""
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
return json.dumps(mgr.get_status(), indent=2)
|
||||||
|
|
||||||
|
@mcp.resource(
|
||||||
|
"bluetooth://gatt/server/writes",
|
||||||
|
name="GATT Server Write Events",
|
||||||
|
description=(
|
||||||
|
"Write events received from remote BLE central devices. "
|
||||||
|
"Shows the most recent writes to server characteristics "
|
||||||
|
"with timestamps, values, and source device info."
|
||||||
|
),
|
||||||
|
mime_type="application/json",
|
||||||
|
)
|
||||||
|
async def resource_gatt_server_writes() -> str:
|
||||||
|
"""Get recent write events from remote clients."""
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
events = mgr.get_write_events(since_index=0, limit=50)
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"count": len(events),
|
||||||
|
"total": mgr._write_index,
|
||||||
|
"events": events,
|
||||||
|
},
|
||||||
|
indent=2,
|
||||||
|
)
|
||||||
|
|||||||
@ -3,7 +3,17 @@
|
|||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
from mcbluetooth import resources
|
from mcbluetooth import resources
|
||||||
from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex
|
from mcbluetooth.tools import (
|
||||||
|
adapter,
|
||||||
|
audio,
|
||||||
|
ble,
|
||||||
|
bt_elm327_emu,
|
||||||
|
device,
|
||||||
|
gatt_server,
|
||||||
|
hfp,
|
||||||
|
monitor,
|
||||||
|
obex,
|
||||||
|
)
|
||||||
|
|
||||||
mcp = FastMCP(
|
mcp = FastMCP(
|
||||||
name="mcbluetooth",
|
name="mcbluetooth",
|
||||||
@ -36,6 +46,25 @@ To capture BLE notifications:
|
|||||||
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
|
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
|
||||||
4. Subscribe to the resource for real-time updates (client-side)
|
4. Subscribe to the resource for real-time updates (client-side)
|
||||||
|
|
||||||
|
### GATT Server Resources (peripheral mode)
|
||||||
|
- bluetooth://gatt/server - GATT server state (registered, services, advertising)
|
||||||
|
- bluetooth://gatt/server/writes - Write events from remote clients
|
||||||
|
|
||||||
|
To act as a BLE peripheral (GATT server):
|
||||||
|
1. Add services: bt_gatt_server_add_service(uuid)
|
||||||
|
2. Add characteristics: bt_gatt_server_add_characteristic(service_id, uuid, flags)
|
||||||
|
3. Register: bt_gatt_server_register(adapter)
|
||||||
|
4. Advertise: bt_gatt_server_advertise(adapter, enable=True, name="MyDevice")
|
||||||
|
5. Monitor writes: bt_gatt_server_read_writes(since_index=0)
|
||||||
|
6. Respond: bt_gatt_server_set_value(char_id, value) — auto-notifies subscribers
|
||||||
|
|
||||||
|
### ELM327 OBD-II Emulator
|
||||||
|
For quick OBD-II BLE adapter emulation (Nordic UART Service):
|
||||||
|
1. bt_elm327_emu_start(adapter, name="OBDII") — sets up NUS, advertises
|
||||||
|
2. bt_elm327_emu_set_pid(0x0C, "0BB8", "hex") — set RPM to 750
|
||||||
|
3. bt_elm327_emu_read_commands() — see what the OBD app sent
|
||||||
|
4. bt_elm327_emu_stop(adapter) — clean up
|
||||||
|
|
||||||
## Tools
|
## Tools
|
||||||
All tools require an explicit 'adapter' parameter (e.g., "hci0").
|
All tools require an explicit 'adapter' parameter (e.g., "hci0").
|
||||||
Use bt_list_adapters() to discover available adapters.
|
Use bt_list_adapters() to discover available adapters.
|
||||||
@ -56,6 +85,8 @@ device.register_tools(mcp)
|
|||||||
audio.register_tools(mcp)
|
audio.register_tools(mcp)
|
||||||
hfp.register_tools(mcp)
|
hfp.register_tools(mcp)
|
||||||
ble.register_tools(mcp)
|
ble.register_tools(mcp)
|
||||||
|
gatt_server.register_tools(mcp)
|
||||||
|
bt_elm327_emu.register_tools(mcp)
|
||||||
monitor.register_tools(mcp)
|
monitor.register_tools(mcp)
|
||||||
obex.register_tools(mcp)
|
obex.register_tools(mcp)
|
||||||
|
|
||||||
|
|||||||
678
src/mcbluetooth/tools/bt_elm327_emu.py
Normal file
678
src/mcbluetooth/tools/bt_elm327_emu.py
Normal file
@ -0,0 +1,678 @@
|
|||||||
|
"""ELM327 OBD-II BLE Emulator.
|
||||||
|
|
||||||
|
Emulates an ELM327 v1.5 adapter over BLE using Nordic UART Service (NUS).
|
||||||
|
OBD-II apps (Torque, Car Scanner, etc.) connect via BLE, send AT commands
|
||||||
|
and OBD-II PID queries, and receive simulated responses.
|
||||||
|
|
||||||
|
The emulator handles the full ELM327 protocol:
|
||||||
|
- AT command set (ATZ, ATE0, ATH1, ATSP, etc.)
|
||||||
|
- Mode 01 (live data) with configurable PID values
|
||||||
|
- Mode 03 (stored DTCs)
|
||||||
|
- Mode 09 (VIN query)
|
||||||
|
- Supported PID bitmaps auto-generated from configured PIDs
|
||||||
|
|
||||||
|
NUS UUIDs (same as real BLE OBD adapters):
|
||||||
|
Service: 6E400001-B5A3-F393-E0A9-E50E24DCCA9E
|
||||||
|
RX (write): 6E400002-... (client writes commands here)
|
||||||
|
TX (notify): 6E400003-... (server sends responses here)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
|
from mcbluetooth.gatt_server import get_gatt_server
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Nordic UART Service UUIDs
|
||||||
|
NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
|
||||||
|
NUS_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # Client writes here
|
||||||
|
NUS_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # Server notifies here
|
||||||
|
|
||||||
|
# Default ECU header (primary ECU response address)
|
||||||
|
ECU_HEADER = "7E8"
|
||||||
|
|
||||||
|
# Common OBD-II PIDs with default values
|
||||||
|
DEFAULT_PIDS: dict[int, tuple[bytes, str]] = {
|
||||||
|
0x05: (bytes([70]), "Coolant temp (°C, offset -40) → 30°C"),
|
||||||
|
0x0C: (bytes([0x0B, 0xB8]), "Engine RPM (× ¼) → 750 RPM"),
|
||||||
|
0x0D: (bytes([0]), "Vehicle speed (km/h) → 0"),
|
||||||
|
0x0F: (bytes([55]), "Intake air temp (°C, offset -40) → 15°C"),
|
||||||
|
0x11: (bytes([25]), "Throttle position (%) → ~10%"),
|
||||||
|
0x1F: (bytes([0x00, 0x00]), "Runtime since start (sec) → 0"),
|
||||||
|
0x2F: (bytes([51]), "Fuel tank level (%) → 20%"),
|
||||||
|
0x46: (bytes([60]), "Ambient air temp (°C, offset -40) → 20°C"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CommandRecord:
|
||||||
|
"""A command received from the OBD-II client."""
|
||||||
|
|
||||||
|
index: int
|
||||||
|
timestamp: str
|
||||||
|
raw: str
|
||||||
|
parsed: str
|
||||||
|
response: str
|
||||||
|
|
||||||
|
|
||||||
|
class ELM327Emulator:
|
||||||
|
"""ELM327 AT command and OBD-II protocol handler."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.device_id = "ELM327 v1.5"
|
||||||
|
self.protocol = "6" # ISO 15765-4 CAN (11/500)
|
||||||
|
self.echo = False
|
||||||
|
self.headers = True
|
||||||
|
self.spaces = False
|
||||||
|
self.linefeed = False
|
||||||
|
self.pid_values: dict[int, bytes] = {}
|
||||||
|
self.dtc_codes: list[str] = []
|
||||||
|
self.vin = "1MCBT00TESTING12345"
|
||||||
|
self._commands: deque[CommandRecord] = deque(maxlen=200)
|
||||||
|
self._cmd_index = 0
|
||||||
|
self._started = False
|
||||||
|
self._rx_char_id: str | None = None
|
||||||
|
self._tx_char_id: str | None = None
|
||||||
|
self._adapter: str | None = None
|
||||||
|
self._buf = "" # Accumulates partial writes
|
||||||
|
|
||||||
|
def set_defaults(self) -> None:
|
||||||
|
"""Load default PID values for a realistic idle vehicle."""
|
||||||
|
for pid, (value, _desc) in DEFAULT_PIDS.items():
|
||||||
|
self.pid_values[pid] = value
|
||||||
|
|
||||||
|
def handle_data(self, data: bytes) -> str | None:
|
||||||
|
"""Process incoming bytes, return response when a complete command is received.
|
||||||
|
|
||||||
|
ELM327 commands are terminated with CR (\\r). Accumulates partial
|
||||||
|
writes until a complete command is received.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
text = data.decode("utf-8", errors="replace")
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
self._buf += text
|
||||||
|
|
||||||
|
# Check for command terminator
|
||||||
|
if "\r" not in self._buf:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Split on CR, process first complete command
|
||||||
|
line, self._buf = self._buf.split("\r", 1)
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
return None
|
||||||
|
|
||||||
|
response = self._process_command(line)
|
||||||
|
|
||||||
|
# Record command
|
||||||
|
record = CommandRecord(
|
||||||
|
index=self._cmd_index,
|
||||||
|
timestamp=datetime.now(UTC).isoformat(),
|
||||||
|
raw=line,
|
||||||
|
parsed=line.upper().strip(),
|
||||||
|
response=response.replace("\r", "\\r").replace("\n", "\\n"),
|
||||||
|
)
|
||||||
|
self._commands.append(record)
|
||||||
|
self._cmd_index += 1
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _process_command(self, line: str) -> str:
|
||||||
|
"""Parse and handle an AT or OBD command."""
|
||||||
|
cmd = line.strip().upper()
|
||||||
|
|
||||||
|
if cmd.startswith("AT"):
|
||||||
|
return self._handle_at(cmd)
|
||||||
|
|
||||||
|
# OBD-II query
|
||||||
|
return self._handle_obd(cmd)
|
||||||
|
|
||||||
|
def _handle_at(self, cmd: str) -> str:
|
||||||
|
"""Handle ELM327 AT commands."""
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
|
||||||
|
if cmd == "ATZ":
|
||||||
|
self.echo = False
|
||||||
|
self.headers = True
|
||||||
|
self.spaces = False
|
||||||
|
return f"{cr}{self.device_id}{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATI":
|
||||||
|
return f"{self.device_id}{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "AT@1":
|
||||||
|
return f"mcbluetooth OBD-II Emulator{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATE0":
|
||||||
|
self.echo = False
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATE1":
|
||||||
|
self.echo = True
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATH0":
|
||||||
|
self.headers = False
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATH1":
|
||||||
|
self.headers = True
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATS0":
|
||||||
|
self.spaces = False
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATS1":
|
||||||
|
self.spaces = True
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd.startswith("ATL"):
|
||||||
|
self.linefeed = cmd.endswith("1")
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATDPN":
|
||||||
|
return f"A{self.protocol}{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATDP":
|
||||||
|
protocols = {
|
||||||
|
"0": "AUTO",
|
||||||
|
"6": "ISO 15765-4 (CAN 11/500)",
|
||||||
|
"7": "ISO 15765-4 (CAN 29/500)",
|
||||||
|
"8": "ISO 15765-4 (CAN 11/250)",
|
||||||
|
"9": "ISO 15765-4 (CAN 29/250)",
|
||||||
|
}
|
||||||
|
name = protocols.get(self.protocol, "AUTO")
|
||||||
|
return f"{name}{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "ATRV":
|
||||||
|
return f"12.6V{cr}{cr}>"
|
||||||
|
|
||||||
|
# Commands that just get OK
|
||||||
|
ok_prefixes = (
|
||||||
|
"ATSP",
|
||||||
|
"ATST",
|
||||||
|
"ATAT",
|
||||||
|
"ATM",
|
||||||
|
"ATCAF",
|
||||||
|
"ATD",
|
||||||
|
"ATCE",
|
||||||
|
"ATCF",
|
||||||
|
"ATCM",
|
||||||
|
"ATSH",
|
||||||
|
"ATFC",
|
||||||
|
"ATWM",
|
||||||
|
"ATMA",
|
||||||
|
"ATAR",
|
||||||
|
"ATPC",
|
||||||
|
)
|
||||||
|
for prefix in ok_prefixes:
|
||||||
|
if cmd.startswith(prefix):
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
if cmd == "AT":
|
||||||
|
return f"OK{cr}{cr}>"
|
||||||
|
|
||||||
|
return f"?{cr}{cr}>"
|
||||||
|
|
||||||
|
def _handle_obd(self, cmd: str) -> str:
|
||||||
|
"""Handle OBD-II service queries."""
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
cmd = cmd.replace(" ", "")
|
||||||
|
|
||||||
|
if len(cmd) < 4:
|
||||||
|
return f"?{cr}{cr}>"
|
||||||
|
|
||||||
|
try:
|
||||||
|
mode = int(cmd[:2], 16)
|
||||||
|
pid = int(cmd[2:4], 16)
|
||||||
|
except ValueError:
|
||||||
|
return f"?{cr}{cr}>"
|
||||||
|
|
||||||
|
if mode == 0x01:
|
||||||
|
return self._handle_mode01(pid) + f"{cr}>"
|
||||||
|
if mode == 0x03:
|
||||||
|
return self._handle_mode03() + f"{cr}>"
|
||||||
|
if mode == 0x04:
|
||||||
|
self.dtc_codes.clear()
|
||||||
|
return f"44{cr}{cr}>"
|
||||||
|
if mode == 0x09:
|
||||||
|
return self._handle_mode09(pid) + f"{cr}>"
|
||||||
|
|
||||||
|
return f"NO DATA{cr}{cr}>"
|
||||||
|
|
||||||
|
def _handle_mode01(self, pid: int) -> str:
|
||||||
|
"""Handle Mode 01 (current data) queries."""
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
|
||||||
|
# Supported PID bitmaps
|
||||||
|
if pid in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0):
|
||||||
|
bitmap = self._build_pid_bitmap(pid)
|
||||||
|
return self._format_response(0x41, pid, bitmap)
|
||||||
|
|
||||||
|
# Configured PID value
|
||||||
|
if pid in self.pid_values:
|
||||||
|
return self._format_response(0x41, pid, self.pid_values[pid])
|
||||||
|
|
||||||
|
return f"NO DATA{cr}"
|
||||||
|
|
||||||
|
def _handle_mode03(self) -> str:
|
||||||
|
"""Handle Mode 03 (stored DTCs) query."""
|
||||||
|
if not self.dtc_codes:
|
||||||
|
return self._format_response(0x43, 0x00, bytes([0x00, 0x00, 0x00, 0x00]))
|
||||||
|
|
||||||
|
# Pack DTCs into response (2 bytes each)
|
||||||
|
dtc_bytes = bytearray()
|
||||||
|
for code in self.dtc_codes[:3]: # Max 3 per frame
|
||||||
|
dtc_bytes.extend(self._encode_dtc(code))
|
||||||
|
|
||||||
|
# Pad to even number of DTCs
|
||||||
|
while len(dtc_bytes) % 4 != 0:
|
||||||
|
dtc_bytes.extend(b"\x00\x00")
|
||||||
|
|
||||||
|
return self._format_response(0x43, len(self.dtc_codes), bytes(dtc_bytes))
|
||||||
|
|
||||||
|
def _handle_mode09(self, pid: int) -> str:
|
||||||
|
"""Handle Mode 09 (vehicle info) queries."""
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
|
||||||
|
if pid == 0x00:
|
||||||
|
# Supported Mode 09 PIDs: 02 (VIN)
|
||||||
|
return self._format_response(0x49, 0x00, bytes([0x40, 0x00, 0x00, 0x00]))
|
||||||
|
|
||||||
|
if pid == 0x02:
|
||||||
|
# VIN (17 chars, multi-frame)
|
||||||
|
vin_bytes = self.vin.encode("ascii")[:17].ljust(17, b"0")
|
||||||
|
if self.headers:
|
||||||
|
lines = []
|
||||||
|
# First frame: 10 14 49 02 01 VIN[0:3]
|
||||||
|
lines.append(
|
||||||
|
f"{ECU_HEADER} 10 14 49 02 01 " + " ".join(f"{b:02X}" for b in vin_bytes[:3])
|
||||||
|
)
|
||||||
|
# Consecutive frames
|
||||||
|
offset = 3
|
||||||
|
seq = 1
|
||||||
|
while offset < len(vin_bytes):
|
||||||
|
chunk = vin_bytes[offset : offset + 7]
|
||||||
|
frame = f"{ECU_HEADER} 2{seq:01X} " + " ".join(f"{b:02X}" for b in chunk)
|
||||||
|
lines.append(frame)
|
||||||
|
offset += 7
|
||||||
|
seq = (seq + 1) & 0x0F
|
||||||
|
return cr.join(lines) + cr
|
||||||
|
else:
|
||||||
|
return f"4902{vin_bytes.hex().upper()}{cr}"
|
||||||
|
|
||||||
|
return f"NO DATA{cr}"
|
||||||
|
|
||||||
|
def _format_response(self, mode_resp: int, pid: int, data: bytes) -> str:
|
||||||
|
"""Format an OBD-II response with optional headers."""
|
||||||
|
cr = "\r\n" if self.linefeed else "\r"
|
||||||
|
length = 2 + len(data) # mode + pid + data
|
||||||
|
|
||||||
|
if self.headers:
|
||||||
|
sep = " " if self.spaces else ""
|
||||||
|
data_hex = sep.join(f"{b:02X}" for b in data)
|
||||||
|
return f"{ECU_HEADER}{sep}{length:02X}{sep}{mode_resp:02X}{sep}{pid:02X}{sep}{data_hex}{cr}"
|
||||||
|
else:
|
||||||
|
data_hex = "".join(f"{b:02X}" for b in data)
|
||||||
|
return f"{mode_resp:02X}{pid:02X}{data_hex}{cr}"
|
||||||
|
|
||||||
|
def _build_pid_bitmap(self, base_pid: int) -> bytes:
|
||||||
|
"""Build a 4-byte bitmap of supported PIDs for a range."""
|
||||||
|
bitmap = 0
|
||||||
|
all_pids = set(self.pid_values.keys())
|
||||||
|
|
||||||
|
for i in range(32):
|
||||||
|
actual_pid = base_pid + i + 1
|
||||||
|
if actual_pid in all_pids:
|
||||||
|
bitmap |= 1 << (31 - i)
|
||||||
|
|
||||||
|
# Set bit 32 (= bit 0) if the next range has supported PIDs
|
||||||
|
next_base = base_pid + 0x20
|
||||||
|
if next_base <= 0xE0:
|
||||||
|
for p in all_pids:
|
||||||
|
if next_base < p <= next_base + 0x20:
|
||||||
|
bitmap |= 1
|
||||||
|
break
|
||||||
|
|
||||||
|
return bitmap.to_bytes(4, "big")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _encode_dtc(code: str) -> bytes:
|
||||||
|
"""Encode a DTC string like 'P0301' into 2 bytes."""
|
||||||
|
prefixes = {"P": 0, "C": 1, "B": 2, "U": 3}
|
||||||
|
prefix = prefixes.get(code[0].upper(), 0)
|
||||||
|
number = int(code[1:], 16)
|
||||||
|
value = (prefix << 14) | number
|
||||||
|
return value.to_bytes(2, "big")
|
||||||
|
|
||||||
|
def get_commands(
|
||||||
|
self,
|
||||||
|
since_index: int = 0,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Get recorded commands."""
|
||||||
|
records = [
|
||||||
|
{
|
||||||
|
"index": r.index,
|
||||||
|
"timestamp": r.timestamp,
|
||||||
|
"command": r.raw,
|
||||||
|
"response_preview": r.response[:80],
|
||||||
|
}
|
||||||
|
for r in self._commands
|
||||||
|
if r.index >= since_index
|
||||||
|
]
|
||||||
|
return records[:limit]
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Module-level emulator ====================
|
||||||
|
|
||||||
|
_emulator: ELM327Emulator | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_emulator() -> ELM327Emulator:
|
||||||
|
"""Get the global emulator instance."""
|
||||||
|
global _emulator
|
||||||
|
if _emulator is None:
|
||||||
|
_emulator = ELM327Emulator()
|
||||||
|
return _emulator
|
||||||
|
|
||||||
|
|
||||||
|
async def _on_write(char_id: str, value: bytes, device: str | None) -> None:
|
||||||
|
"""Callback fired when a remote client writes to the RX characteristic."""
|
||||||
|
emu = _get_emulator()
|
||||||
|
if not emu._started or char_id != emu._rx_char_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
response = emu.handle_data(value)
|
||||||
|
if response and emu._tx_char_id:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
mgr.set_value(emu._tx_char_id, response.encode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Tool registration ====================
|
||||||
|
|
||||||
|
|
||||||
|
def register_tools(mcp: FastMCP) -> None:
|
||||||
|
"""Register ELM327 emulator tools with the MCP server."""
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_start(
|
||||||
|
adapter: str,
|
||||||
|
name: str = "OBDII",
|
||||||
|
load_defaults: bool = True,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Start the ELM327 OBD-II BLE emulator.
|
||||||
|
|
||||||
|
Sets up a Nordic UART Service (NUS) GATT server, registers with
|
||||||
|
BlueZ, and starts advertising. OBD-II apps can then discover and
|
||||||
|
connect to this emulated adapter.
|
||||||
|
|
||||||
|
The emulator auto-responds to AT commands and OBD-II PID queries.
|
||||||
|
Configure PID values with bt_elm327_emu_set_pid().
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adapter: Bluetooth adapter (e.g., "hci0").
|
||||||
|
name: BLE device name shown to scanners (default "OBDII").
|
||||||
|
load_defaults: Pre-load default PID values for a realistic
|
||||||
|
idle vehicle (RPM 750, coolant 30°C, etc.).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status with NUS characteristic IDs and default PID info.
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
if emu._started:
|
||||||
|
return {"status": "error", "error": "Emulator already running"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
if load_defaults:
|
||||||
|
emu.set_defaults()
|
||||||
|
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
|
||||||
|
# Build NUS GATT hierarchy
|
||||||
|
svc_id = mgr.add_service(NUS_SERVICE_UUID)
|
||||||
|
rx_id = mgr.add_characteristic(
|
||||||
|
svc_id,
|
||||||
|
NUS_RX_CHAR_UUID,
|
||||||
|
flags=["write", "write-without-response"],
|
||||||
|
)
|
||||||
|
tx_id = mgr.add_characteristic(
|
||||||
|
svc_id,
|
||||||
|
NUS_TX_CHAR_UUID,
|
||||||
|
flags=["notify"],
|
||||||
|
value=f"{emu.device_id}\r\n>".encode(),
|
||||||
|
)
|
||||||
|
|
||||||
|
emu._rx_char_id = rx_id
|
||||||
|
emu._tx_char_id = tx_id
|
||||||
|
emu._adapter = adapter
|
||||||
|
|
||||||
|
# Register write callback for auto-response
|
||||||
|
mgr._write_callback = _on_write
|
||||||
|
|
||||||
|
# Register + advertise
|
||||||
|
await mgr.register(adapter)
|
||||||
|
await mgr.set_advertising(
|
||||||
|
adapter,
|
||||||
|
True,
|
||||||
|
name=name,
|
||||||
|
service_uuids=[NUS_SERVICE_UUID],
|
||||||
|
)
|
||||||
|
|
||||||
|
emu._started = True
|
||||||
|
|
||||||
|
pid_info = (
|
||||||
|
{pid: desc for pid, (_val, desc) in DEFAULT_PIDS.items()} if load_defaults else {}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"emulator": "ELM327 v1.5",
|
||||||
|
"adapter": adapter,
|
||||||
|
"name": name,
|
||||||
|
"rx_char_id": rx_id,
|
||||||
|
"tx_char_id": tx_id,
|
||||||
|
"default_pids": pid_info,
|
||||||
|
"vin": emu.vin,
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_stop(adapter: str) -> dict[str, Any]:
|
||||||
|
"""Stop the ELM327 OBD-II BLE emulator.
|
||||||
|
|
||||||
|
Stops advertising, unregisters GATT services, and cleans up.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adapter: Bluetooth adapter (e.g., "hci0").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status confirming shutdown.
|
||||||
|
"""
|
||||||
|
global _emulator
|
||||||
|
emu = _get_emulator()
|
||||||
|
|
||||||
|
if not emu._started:
|
||||||
|
return {"status": "ok", "was_running": False}
|
||||||
|
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
mgr._write_callback = None
|
||||||
|
await mgr.unregister(adapter)
|
||||||
|
mgr.clear()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
emu._started = False
|
||||||
|
_emulator = None
|
||||||
|
return {"status": "ok", "was_running": True}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_status() -> dict[str, Any]:
|
||||||
|
"""Get ELM327 emulator status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- running: Whether the emulator is active
|
||||||
|
- pids: Configured PID values with descriptions
|
||||||
|
- dtcs: Active diagnostic trouble codes
|
||||||
|
- command_count: Number of commands received
|
||||||
|
- settings: Current ELM327 settings (echo, headers, etc.)
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
|
||||||
|
pids = {}
|
||||||
|
for pid, value in emu.pid_values.items():
|
||||||
|
desc = ""
|
||||||
|
if pid in DEFAULT_PIDS:
|
||||||
|
_, desc = DEFAULT_PIDS[pid]
|
||||||
|
pids[f"0x{pid:02X}"] = {
|
||||||
|
"value_hex": value.hex(),
|
||||||
|
"value_bytes": list(value),
|
||||||
|
"description": desc,
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"running": emu._started,
|
||||||
|
"device_id": emu.device_id,
|
||||||
|
"adapter": emu._adapter,
|
||||||
|
"protocol": emu.protocol,
|
||||||
|
"pids": pids,
|
||||||
|
"dtcs": emu.dtc_codes,
|
||||||
|
"vin": emu.vin,
|
||||||
|
"command_count": emu._cmd_index,
|
||||||
|
"settings": {
|
||||||
|
"echo": emu.echo,
|
||||||
|
"headers": emu.headers,
|
||||||
|
"spaces": emu.spaces,
|
||||||
|
"linefeed": emu.linefeed,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_set_pid(
|
||||||
|
pid: int,
|
||||||
|
value: str,
|
||||||
|
value_type: str = "hex",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Set an OBD-II PID response value.
|
||||||
|
|
||||||
|
When an OBD app queries this PID (Mode 01), the emulator returns
|
||||||
|
the configured value. Values follow OBD-II encoding conventions.
|
||||||
|
|
||||||
|
Common PIDs:
|
||||||
|
0x05: Coolant temp (1 byte, value - 40 = °C)
|
||||||
|
0x0C: Engine RPM (2 bytes, value / 4 = RPM)
|
||||||
|
0x0D: Vehicle speed (1 byte = km/h)
|
||||||
|
0x0F: Intake air temp (1 byte, value - 40 = °C)
|
||||||
|
0x11: Throttle position (1 byte, value / 2.55 = %)
|
||||||
|
0x2F: Fuel tank level (1 byte, value / 2.55 = %)
|
||||||
|
0x46: Ambient air temp (1 byte, value - 40 = °C)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pid: PID number (e.g., 0x0C for RPM).
|
||||||
|
value: Encoded value. For RPM of 3000: "0BB8" (hex) or "3000" (int).
|
||||||
|
value_type: "hex" (raw bytes), "int" (little-endian integer),
|
||||||
|
or "string" (UTF-8).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status with the configured PID value.
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if value_type == "hex":
|
||||||
|
data = bytes.fromhex(value)
|
||||||
|
elif value_type == "int":
|
||||||
|
int_val = int(value)
|
||||||
|
byte_len = max(1, (int_val.bit_length() + 7) // 8)
|
||||||
|
data = int_val.to_bytes(byte_len, "big") # OBD uses big-endian
|
||||||
|
elif value_type == "string":
|
||||||
|
data = value.encode("utf-8")
|
||||||
|
else:
|
||||||
|
return {"status": "error", "error": f"Unknown value_type: {value_type}"}
|
||||||
|
|
||||||
|
emu.pid_values[pid] = data
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"pid": f"0x{pid:02X}",
|
||||||
|
"value_hex": data.hex(),
|
||||||
|
"value_bytes": list(data),
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_set_dtcs(dtc_codes: list[str]) -> dict[str, Any]:
|
||||||
|
"""Set diagnostic trouble codes for Mode 03 queries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dtc_codes: List of DTC strings (e.g., ["P0301", "P0420", "C0035"]).
|
||||||
|
P = Powertrain, C = Chassis, B = Body, U = Network.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status with configured DTCs.
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
emu.dtc_codes = [c.upper() for c in dtc_codes]
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"dtcs": emu.dtc_codes,
|
||||||
|
"count": len(emu.dtc_codes),
|
||||||
|
}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_set_vin(vin: str) -> dict[str, Any]:
|
||||||
|
"""Set the Vehicle Identification Number for Mode 09 queries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
vin: 17-character VIN string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status with configured VIN.
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
if len(vin) != 17:
|
||||||
|
return {"status": "error", "error": f"VIN must be 17 characters (got {len(vin)})"}
|
||||||
|
emu.vin = vin.upper()
|
||||||
|
return {"status": "ok", "vin": emu.vin}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_elm327_emu_read_commands(
|
||||||
|
since_index: int = 0,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Read commands received from the OBD-II client.
|
||||||
|
|
||||||
|
Monitor what the connected app is requesting. Useful for debugging
|
||||||
|
protocol issues or understanding app behavior.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
since_index: Only return commands with index >= this value.
|
||||||
|
limit: Maximum commands to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of received commands with timestamps and responses.
|
||||||
|
"""
|
||||||
|
emu = _get_emulator()
|
||||||
|
cmds = emu.get_commands(since_index, limit)
|
||||||
|
next_idx = max((c["index"] for c in cmds), default=since_index) + 1 if cmds else since_index
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"commands": cmds,
|
||||||
|
"count": len(cmds),
|
||||||
|
"next_index": next_idx,
|
||||||
|
}
|
||||||
392
src/mcbluetooth/tools/gatt_server.py
Normal file
392
src/mcbluetooth/tools/gatt_server.py
Normal file
@ -0,0 +1,392 @@
|
|||||||
|
"""GATT Server tools for Bluetooth MCP server.
|
||||||
|
|
||||||
|
These tools let Linux act as a BLE peripheral (GATT server) that
|
||||||
|
advertises services, accepts connections, and handles reads/writes
|
||||||
|
from remote central devices. Use cases include: emulating sensors,
|
||||||
|
building test harnesses, mocking BLE devices, protocol fuzzing.
|
||||||
|
|
||||||
|
Typical flow:
|
||||||
|
1. bt_gatt_server_add_service(uuid) → service0
|
||||||
|
2. bt_gatt_server_add_characteristic(service0, ...) → service0/char0
|
||||||
|
3. bt_gatt_server_register(adapter="hci0") → registered
|
||||||
|
4. bt_gatt_server_advertise(adapter="hci0", enable=True, name="MyDevice")
|
||||||
|
5. Remote connects, writes → bt_gatt_server_read_writes()
|
||||||
|
6. Respond via bt_gatt_server_set_value() → auto-notifies
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
|
from mcbluetooth.gatt_server import get_gatt_server, shutdown_gatt_server
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_value(value: str, value_type: str) -> bytes:
|
||||||
|
"""Convert a string value to bytes based on value_type."""
|
||||||
|
if value_type == "hex":
|
||||||
|
return bytes.fromhex(value)
|
||||||
|
elif value_type == "string":
|
||||||
|
return value.encode("utf-8")
|
||||||
|
elif value_type == "int":
|
||||||
|
int_val = int(value)
|
||||||
|
byte_len = max(1, (int_val.bit_length() + 7) // 8)
|
||||||
|
return int_val.to_bytes(byte_len, "little")
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown value_type: {value_type}")
|
||||||
|
|
||||||
|
|
||||||
|
def register_tools(mcp: FastMCP) -> None:
|
||||||
|
"""Register GATT server tools with the MCP server."""
|
||||||
|
|
||||||
|
# ---- Lifecycle ----
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_register(adapter: str) -> dict[str, Any]:
|
||||||
|
"""Register the GATT application with BlueZ.
|
||||||
|
|
||||||
|
Exports all added services/characteristics to D-Bus and calls
|
||||||
|
GattManager1.RegisterApplication. Must add at least one service
|
||||||
|
before registering.
|
||||||
|
|
||||||
|
After registration, remote BLE centrals can discover and interact
|
||||||
|
with the hosted services (once advertising is started).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adapter: Adapter name (e.g., "hci0").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Registration status with service count.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
await mgr.register(adapter)
|
||||||
|
status = mgr.get_status()
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"registered": True,
|
||||||
|
"service_count": len(status["services"]),
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_unregister(adapter: str) -> dict[str, Any]:
|
||||||
|
"""Unregister the GATT application from BlueZ.
|
||||||
|
|
||||||
|
Stops advertising (if active) and removes the application
|
||||||
|
registration. Services remain in memory and can be re-registered.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adapter: Adapter name (e.g., "hci0").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status confirming unregistration.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
await shutdown_gatt_server(adapter)
|
||||||
|
return {"status": "ok", "registered": False}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_status() -> dict[str, Any]:
|
||||||
|
"""Get GATT server status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- registered: Whether the application is registered with BlueZ
|
||||||
|
- advertising: Whether BLE advertising is active
|
||||||
|
- services: List of services with their characteristics
|
||||||
|
- write_event_count: Number of buffered write events
|
||||||
|
"""
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
return {"status": "ok", **mgr.get_status()}
|
||||||
|
|
||||||
|
# ---- Service construction ----
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_add_service(
|
||||||
|
uuid: str,
|
||||||
|
primary: bool = True,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Add a GATT service to the server.
|
||||||
|
|
||||||
|
Services must be added BEFORE registering with BlueZ.
|
||||||
|
If already registered, unregister first, add services, then re-register.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uuid: Service UUID (e.g., "180F" for Battery Service, or full
|
||||||
|
128-bit "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" for custom).
|
||||||
|
primary: Whether this is a primary service (default True).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The assigned service_id (e.g., "service0") for adding characteristics.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
service_id = mgr.add_service(uuid, primary)
|
||||||
|
return {"status": "ok", "service_id": service_id, "uuid": uuid}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_add_characteristic(
|
||||||
|
service_id: str,
|
||||||
|
uuid: str,
|
||||||
|
flags: list[str],
|
||||||
|
value: str | None = None,
|
||||||
|
value_type: str = "hex",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Add a characteristic to a GATT service.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
service_id: Service to add to (e.g., "service0").
|
||||||
|
uuid: Characteristic UUID.
|
||||||
|
flags: List of flags/permissions. Common values:
|
||||||
|
"read", "write", "write-without-response", "notify", "indicate",
|
||||||
|
"encrypt-read", "encrypt-write".
|
||||||
|
value: Initial value (optional). Format depends on value_type.
|
||||||
|
value_type: How to interpret value — "hex", "string", or "int".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The assigned char_id (e.g., "service0/char0") for setting values.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
initial = _parse_value(value, value_type) if value else b""
|
||||||
|
char_id = mgr.add_characteristic(service_id, uuid, flags, initial)
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"char_id": char_id,
|
||||||
|
"uuid": uuid,
|
||||||
|
"flags": flags,
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_add_descriptor(
|
||||||
|
char_id: str,
|
||||||
|
uuid: str,
|
||||||
|
flags: list[str],
|
||||||
|
value: str | None = None,
|
||||||
|
value_type: str = "hex",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Add a descriptor to a GATT characteristic.
|
||||||
|
|
||||||
|
Note: CCCD (0x2902) for notify/indicate is auto-managed by BlueZ —
|
||||||
|
don't add it manually.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
char_id: Characteristic to add to (e.g., "service0/char0").
|
||||||
|
uuid: Descriptor UUID (e.g., "2901" for Characteristic User Description).
|
||||||
|
flags: List of flags (e.g., ["read"], ["read", "write"]).
|
||||||
|
value: Initial value (optional).
|
||||||
|
value_type: How to interpret value — "hex", "string", or "int".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The assigned desc_id.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
initial = _parse_value(value, value_type) if value else b""
|
||||||
|
desc_id = mgr.add_descriptor(char_id, uuid, flags, initial)
|
||||||
|
return {"status": "ok", "desc_id": desc_id, "uuid": uuid}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_clear() -> dict[str, Any]:
|
||||||
|
"""Remove all services, characteristics, and descriptors.
|
||||||
|
|
||||||
|
Must unregister first. Use this to rebuild the service hierarchy
|
||||||
|
from scratch.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status confirming services were cleared.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
mgr.clear()
|
||||||
|
return {"status": "ok", "cleared": True}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
# ---- Value management ----
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_set_value(
|
||||||
|
char_id: str,
|
||||||
|
value: str,
|
||||||
|
value_type: str = "hex",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Set a characteristic value on the GATT server.
|
||||||
|
|
||||||
|
If a remote client has subscribed to notifications on this
|
||||||
|
characteristic, the new value is automatically pushed via
|
||||||
|
BLE notification.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
char_id: Characteristic ID (e.g., "service0/char0").
|
||||||
|
value: Value to set.
|
||||||
|
value_type: How to interpret value — "hex" (e.g., "0102ff"),
|
||||||
|
"string" (UTF-8 text), or "int" (decimal number).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status with value details and whether notification was sent.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
data = _parse_value(value, value_type)
|
||||||
|
char = mgr._characteristics.get(char_id)
|
||||||
|
was_notifying = char.notifying if char else False
|
||||||
|
mgr.set_value(char_id, data)
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"char_id": char_id,
|
||||||
|
"value_hex": data.hex(),
|
||||||
|
"length": len(data),
|
||||||
|
"notified": was_notifying,
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_get_value(char_id: str) -> dict[str, Any]:
|
||||||
|
"""Read a characteristic value from the server side.
|
||||||
|
|
||||||
|
This reads the value stored on OUR server, not from a remote device.
|
||||||
|
Use bt_ble_read() for reading from remote GATT servers.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
char_id: Characteristic ID (e.g., "service0/char0").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The current value as hex, bytes, and string (if decodable).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
value = mgr.get_value(char_id)
|
||||||
|
result: dict[str, Any] = {
|
||||||
|
"status": "ok",
|
||||||
|
"char_id": char_id,
|
||||||
|
"value_hex": value.hex(),
|
||||||
|
"value_bytes": list(value),
|
||||||
|
"length": len(value),
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
result["value_string"] = value.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
pass
|
||||||
|
return result
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_notify(char_id: str) -> dict[str, Any]:
|
||||||
|
"""Explicitly send a BLE notification for the current value.
|
||||||
|
|
||||||
|
Normally, bt_gatt_server_set_value auto-notifies. Use this to
|
||||||
|
re-send the current value without changing it.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
char_id: Characteristic ID (e.g., "service0/char0").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status indicating whether notification was sent.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
sent = mgr.notify(char_id)
|
||||||
|
if sent:
|
||||||
|
return {"status": "ok", "notified": True}
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"notified": False,
|
||||||
|
"reason": "No client subscribed to notifications",
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
# ---- Advertising ----
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_advertise(
|
||||||
|
adapter: str,
|
||||||
|
enable: bool,
|
||||||
|
name: str = "mcbluetooth",
|
||||||
|
service_uuids: list[str] | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Start or stop BLE advertising.
|
||||||
|
|
||||||
|
When advertising, the device becomes discoverable to BLE scanners
|
||||||
|
and can accept connections from central devices.
|
||||||
|
|
||||||
|
Must register the GATT application first.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adapter: Adapter name (e.g., "hci0").
|
||||||
|
enable: True to start advertising, False to stop.
|
||||||
|
name: Local name shown to scanners (default "mcbluetooth").
|
||||||
|
service_uuids: UUIDs to include in advertisement. If not specified,
|
||||||
|
uses all registered service UUIDs.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Advertising status.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
await mgr.set_advertising(adapter, enable, name=name, service_uuids=service_uuids)
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"advertising": enable,
|
||||||
|
"name": name if enable else None,
|
||||||
|
}
|
||||||
|
except Exception as exc:
|
||||||
|
return {"status": "error", "error": str(exc)}
|
||||||
|
|
||||||
|
# ---- Write event monitoring ----
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_read_writes(
|
||||||
|
since_index: int = 0,
|
||||||
|
char_id: str | None = None,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Read write events from remote BLE clients.
|
||||||
|
|
||||||
|
When a remote central device writes to a server characteristic,
|
||||||
|
the write is buffered here. Poll this to see incoming commands/data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
since_index: Only return events with index >= this value.
|
||||||
|
Start with 0, then use the highest returned index + 1.
|
||||||
|
char_id: Filter to writes on this characteristic (optional).
|
||||||
|
limit: Maximum events to return (default 50).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of write events with value, timestamp, and source device.
|
||||||
|
"""
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
events = mgr.get_write_events(since_index, char_id, limit)
|
||||||
|
next_index = (
|
||||||
|
max((e["index"] for e in events), default=since_index) + 1 if events else since_index
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"events": events,
|
||||||
|
"count": len(events),
|
||||||
|
"next_index": next_index,
|
||||||
|
}
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def bt_gatt_server_clear_writes() -> dict[str, Any]:
|
||||||
|
"""Clear the write event buffer.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of events cleared.
|
||||||
|
"""
|
||||||
|
mgr = get_gatt_server()
|
||||||
|
count = mgr.clear_write_events()
|
||||||
|
return {"status": "ok", "cleared_count": count}
|
||||||
Loading…
x
Reference in New Issue
Block a user